Пул streamов с использованием boost asio
Я пытаюсь создать class пула ограниченных streamов, используя boost :: asio. Но я застрял в какой-то момент, может кто-нибудь мне помочь.
Единственная проблема – это место, где я должен уменьшить счетчик?
код не работает должным образом.
- WaitAll для нескольких дескрипторов в streamе STA не поддерживается
- Java: ExecutorService, который блокируется при представлении после определенного размера очереди
- Объединение streamов в C ++ 11
- Параметры TaskCreationOptions.LongRunning и ThreadPool
- Как поймать исключения из ThreadPool.QueueUserWorkItem?
проблема в том, что я не знаю, когда мой stream завершит выполнение и как я узнаю, что он вернулся в пул
#include #include #include #include #include #include using namespace std; using namespace boost; class ThreadPool { static int count; int NoOfThread; thread_group grp; mutex mutex_; asio::io_service io_service; int counter; stack thStk ; public: ThreadPool(int num) { NoOfThread = num; counter = 0; mutex::scoped_lock lock(mutex_); if(count == 0) count++; else return; for(int i=0 ; i NoOfThread) { cout<<"run out of threads \n"; return NULL; } counter++; thread* ptr = thStk.top(); thStk.pop(); return ptr; } }; int ThreadPool::count = 0; struct callable { void operator()() { cout<<"some task for thread \n"; } }; int main( int argc, char * argv[] ) { callable x; ThreadPool pool(10); thread* p = pool.getThread(); cout<get_id(); //how i can assign some function to thread pointer ? //how i can return thread pointer after work done so i can add //it back to stack? return 0; }
- Использование ThreadPool.QueueUserWorkItem в ASP.NET в сценарии с высоким трафиком
- Код для простого пула streamов в C #
- Как создать пул streamов, используя boost в C ++?
- Пул streamов C ++
- Конфигурация нитей на основе №. процессорных ядер
- Именование streamов и streamов-streamов ExecutorService
- Как настроить тонкую пул streamов для фьючерсов?
- Как реализовать PriorityBlockingQueue с помощью ThreadPoolExecutor и настраиваемых задач
Короче говоря, вам нужно обернуть предоставленную пользователем задачу другой функцией, которая будет:
- Вызовите функцию пользователя или вызываемый объект.
- Заблокируйте мьютекс и уменьшите счетчик.
Возможно, я не понимаю всех требований к этому пулу streamов. Таким образом, для ясности здесь приведен явный список того, что я считаю требованиями:
- Пул управляет временем жизни streamов. Пользователь не должен удалять streamи, которые находятся в пуле.
- Пользователь может назначить задачу пулу неинтрузивным способом.
- Когда задача назначается, если все streamи в пуле в настоящее время запускают другие задачи, задача отбрасывается.
Прежде чем я дам реализацию, есть несколько ключевых моментов, которые я хотел бы подчеркнуть:
- Как только stream запущен, он будет работать до завершения, отмены или завершения. Функция, выполняемая нитью, не может быть переназначена. Чтобы позволить одному streamу выполнять несколько функций в течение своей жизни, stream будет запускаться с функцией, которая будет считываться из очереди, например,
io_service::run()
и вызываемые типы отправляются в событие очереди, например, изio_service::post()
. -
io_service::run()
возвращает, если вio_service
нет ожидающей работы работы,io_service
остановлен или исключениеio_service
из обработчика, в котором работает stream. Чтобы предотвратитьio_serivce::run()
когда нет незавершенной работы, можно использовать classio_service::work
. - Определение требований типа задачи (т. Е. Тип задачи должен быть вызван синтаксисом
object()
вместо того, чтобы требовать тип (т.е. задача должна наследовать отprocess
), обеспечивает большую гибкость для пользователя. Он позволяет пользователю задавать задачу как указатель на функцию или тип, предоставляющий нулевойoperator()
.
Реализация с использованием boost::asio
:
#include #include class thread_pool { private: boost::asio::io_service io_service_; boost::asio::io_service::work work_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; public: /// @brief Constructor. thread_pool( std::size_t pool_size ) : work_( io_service_ ), available_( pool_size ) { for ( std::size_t i = 0; i < pool_size; ++i ) { threads_.create_thread( boost::bind( &boost::asio::io_service::run, &io_service_ ) ); } } /// @brief Destructor. ~thread_pool() { // Force all threads to return from io_service::run(). io_service_.stop(); // Suppress all exceptions. try { threads_.join_all(); } catch ( const std::exception& ) {} } /// @brief Adds a task to the thread pool if a thread is currently available. template < typename Task > void run_task( Task task ) { boost::unique_lock< boost::mutex > lock( mutex_ ); // If no threads are available, then return. if ( 0 == available_ ) return; // Decrement count, indicating thread is no longer available. --available_; // Post a wrapped task into the queue. io_service_.post( boost::bind( &thread_pool::wrap_task, this, boost::function< void() >( task ) ) ); } private: /// @brief Wrap a task so that the available count can be increased once /// the user provided task has completed. void wrap_task( boost::function< void() > task ) { // Run the user supplied task. try { task(); } // Suppress all exceptions. catch ( const std::exception& ) {} // Task has finished, so increment count of available threads. boost::unique_lock< boost::mutex > lock( mutex_ ); ++available_; } };
Несколько комментариев об осуществлении:
- Обработка исключений должна выполняться вокруг задачи пользователя. Если функция пользователя или вызываемый объект выдает исключение, которое не относится к типу
boost::thread_interrupted
,boost::thread_interrupted
std::terminate()
. Это результат исключений Boost.Thread в поведении функций streamов . Также стоит прочитать эффект Boost.Asio от исключений, отбрасываемых у обработчиков . - Если пользователь предоставляет
task
помощьюboost::bind
, тогда вложенныйboost::bind
не сможет скомпилироваться. Требуется один из следующих вариантов:- Не поддерживать
task
созданнуюboost::bind
. - Мета-программирование для выполнения ветвления во время компиляции на основе того, является ли тип пользователя, если результат
boost::bind
так чтоboost::protect
может использоваться, посколькуboost::protect
только функции на определенных объектах функции. - Используйте другой тип, чтобы косвенно передать объект
task
. Я решил использоватьboost::function
для удобства чтения за счет потери точного типа.boost::tuple
, хотя и немного менее читаемый, также можно использовать для сохранения точного типа, как показано в примере сериализации Boost.Asio.
- Не поддерживать
Код приложения теперь может использовать тип thread_pool
неинтрузивно:
void work() {}; struct worker { void operator()() {}; }; void more_work( int ) {}; int main() { thread_pool pool( 2 ); pool.run_task( work ); // Function pointer. pool.run_task( worker() ); // Callable object. pool.run_task( boost::bind( more_work, 5 ) ); // Callable object. }
thread_pool
может быть создан без Boost.Asio и может быть немного проще для сопровождающих, поскольку им больше не нужно знать о Boost.Asio
поведениях, например, когда io_service::run()
, а что io_service::work
объект:
#include #include #include class thread_pool { private: std::queue< boost::function< void() > > tasks_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; boost::condition_variable condition_; bool running_; public: /// @brief Constructor. thread_pool( std::size_t pool_size ) : available_( pool_size ), running_( true ) { for ( std::size_t i = 0; i < pool_size; ++i ) { threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ; } } /// @brief Destructor. ~thread_pool() { // Set running flag to false then notify all threads. { boost::unique_lock< boost::mutex > lock( mutex_ ); running_ = false; condition_.notify_all(); } try { threads_.join_all(); } // Suppress all exceptions. catch ( const std::exception& ) {} } /// @brief Add task to the thread pool if a thread is currently available. template < typename Task > void run_task( Task task ) { boost::unique_lock< boost::mutex > lock( mutex_ ); // If no threads are available, then return. if ( 0 == available_ ) return; // Decrement count, indicating thread is no longer available. --available_; // Set task and signal condition variable so that a worker thread will // wake up andl use the task. tasks_.push( boost::function< void() >( task ) ); condition_.notify_one(); } private: /// @brief Entry point for pool threads. void pool_main() { while( running_ ) { // Wait on condition variable while the task is empty and the pool is // still running. boost::unique_lock< boost::mutex > lock( mutex_ ); while ( tasks_.empty() && running_ ) { condition_.wait( lock ); } // If pool is no longer running, break out. if ( !running_ ) break; // Copy task locally and remove from the queue. This is done within // its own scope so that the task object is destructed immediately // after running the task. This is useful in the event that the // function contains shared_ptr arguments bound via bind. { boost::function< void() > task = tasks_.front(); tasks_.pop(); lock.unlock(); // Run the task. try { task(); } // Suppress all exceptions. catch ( const std::exception& ) {} } // Task has finished, so increment count of available threads. lock.lock(); ++available_; } // while running_ } };