Объединение streamов в C ++ 11

Соответствующие вопросы :

О C ++ 11:

  • C ++ 11: std :: thread pooled?
  • Будет ли async (запуск :: async) в C ++ 11 сделать пулы streamов устаревшими, чтобы избежать дорогостоящего создания streamов?

О Boost:

  • C ++ ускоряет повторное использование streamов
  • boost :: thread и создание пула из них!

Как получить пул streamов для отправки задач , не создавая и не удаляя их снова и снова? Это означает, что постоянные streamи для повторной синхронизации не соединяются.


У меня есть код, который выглядит так:

namespace { std::vector workers; int total = 4; int arr[4] = {0}; void each_thread_does(int i) { arr[i] += 2; } } int main(int argc, char *argv[]) { for (int i = 0; i < 8; ++i) { // for 8 iterations, for (int j = 0; j < 4; ++j) { workers.push_back(std::thread(each_thread_does, j)); } for (std::thread &t: workers) { if (t.joinable()) { t.join(); } } arr[4] = std::min_element(arr, arr+4); } return 0; } 

Вместо того чтобы создавать и присоединять streamи к каждой итерации, я бы предпочел отправлять задачи на мои рабочие streamи на каждую итерацию и создавать их только один раз.

Вы можете использовать библиотеку пула streamов C ++, https://github.com/vit-vit/ctpl .

Затем код, который вы написали, может быть заменен следующим

 #include  // or  if ou do not have Boost library int main (int argc, char *argv[]) { ctpl::thread_pool p(2 /* two threads in the pool */); int arr[4] = {0}; std::vector> results(4); for (int i = 0; i < 8; ++i) { // for 8 iterations, for (int j = 0; j < 4; ++j) { results[j] = p.push([&arr, j](int){ arr[j] +=2; }); } for (int j = 0; j < 4; ++j) { results[j].get(); } arr[4] = std::min_element(arr, arr + 4); } } 

Вы получите нужное количество streamов и не будете создавать и удалять их снова и снова на итерациях.

Пул streamов означает, что все ваши streamи работают все время – другими словами, функция streamа никогда не возвращается. Чтобы дать streamам что-то значимое, вам нужно создать систему межпоточной коммуникации, как для того, чтобы сообщить streamу, что есть что-то делать, так и для передачи фактических рабочих данных.

Обычно это будет связано с некоторой структурой параллельных данных, и каждый stream, по-видимому, будет спать на какой-то переменной условия, которая будет уведомляться, когда есть работа. После получения уведомления один или несколько streamов пробуждаются, восстанавливают задачу из параллельной структуры данных, обрабатывают ее и сохраняют результат аналогичным образом.

Затем stream продолжится, чтобы проверить, есть ли еще больше работы, и если не вернуться спать.

В результате вы должны все это самостоятельно разработать, поскольку нет естественного понятия «работа», которое универсально применимо. Это довольно много работы, и есть некоторые тонкие проблемы, которые вам нужно сделать правильно. (Вы можете запрограммировать в Go, если вам нравится система, которая заботится о управлении streamами для вас за кулисами.)

Это копируется из моего ответа на другой очень похожий пост, надеюсь, что он может помочь:

1) Начните с максимального количества streamов, которые система может поддерживать:

 int Num_Threads = thread::hardware_concurrency(); 

2) Для эффективной реализации threadpool, когда streamи создаются в соответствии с Num_Threads, лучше не создавать новые или уничтожать старые (путем присоединения). Будет наказание за производительность, возможно, даже ваше приложение будет работать медленнее, чем серийная версия.

Каждый stream C ++ 11 должен работать в своей функции с бесконечным циклом, постоянно ожидая, что новые задачи будут захвачены и запущены.

Вот как подключить такую ​​функцию к пулу streamов:

 int Num_Threads = thread::hardware_concurrency(); vector Pool; for(int ii = 0; ii < Num_Threads; ii++) { Pool.push_back(thread(Infinite_loop_function));} 

3) Функция Infinite_loop_function

Это цикл while (true), ожидающий очереди задач

 void The_Pool:: Infinite_loop_function() { while(true) { { unique_lock lock(Queue_Mutex); condition.wait(lock, []{return !Queue.empty()}); Job = Queue.front(); Queue.pop(); } Job(); // function type } }; в void The_Pool:: Infinite_loop_function() { while(true) { { unique_lock lock(Queue_Mutex); condition.wait(lock, []{return !Queue.empty()}); Job = Queue.front(); Queue.pop(); } Job(); // function type } }; 

4) Создайте функцию для добавления задания в свою очередь

 void The_Pool:: Add_Job(function New_Job) { { unique_lock lock(Queue_Mutex); Queue.push(New_Job); } condition.notify_one(); } 

5) Привяжите произвольную функцию к вашей очереди

 Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object)); 

После интеграции этих ингредиентов у вас есть свой собственный динамический пул streamов. Эти streamи всегда работают, ожидая выполнения задания.

Я прошу прощения, если есть некоторые синтаксические ошибки, я набрал этот код, и у меня плохая память. Извините, что я не могу предоставить вам полный код пула streamов, который нарушил бы мою работу.

Путь streamа в ядре представляет собой набор streamов, связанных с функцией, работающей как цикл событий. Эти streamи будут бесконечно ждать выполнения задачи или их собственного завершения.

Задача threadpool заключается в предоставлении интерфейса для отправки заданий, определения (и, возможно, изменения) политики запуска этих заданий (правил планирования, создания streamов, размера пула) и контроля состояния streamов и связанных ресурсов.

Итак, для универсального пула нужно начать с определения того, что такое задача, как она запускается, прерывается, каков результат (см. Понятие обещания и будущего для этого вопроса), какие события должны отвечать streamи к тому, как они будут обращаться с ними, как эти события будут отличаться от тех, которые выполняются задачами. Это может стать довольно сложным, как вы можете видеть, и наложить ограничения на то, как будут работать streamи, поскольку решение становится все более и более привлекательным.

Текущая инструментария для обработки событий – это довольно баребоны (*): примитивы, такие как мьютексы, переменные условий и несколько абстракций поверх этого (блокировки, барьеры). Но в некоторых случаях эти абстракции могут оказаться непригодными (см. Этот связанный вопрос ), и нужно вернуться к использованию примитивов.

Необходимо также справиться с другими проблемами:

  • сигнал
  • I / O
  • аппаратное обеспечение (сродство процессора, гетерогенная настройка)

Как они будут играть в вашей обстановке?

Этот ответ на аналогичный вопрос указывает на существующую реализацию, предназначенную для повышения и stl.

Я предложил очень грубую реализацию threadpool для другого вопроса, который не затрагивает многие проблемы, описанные выше. Возможно, вы захотите его создать. Вы также можете захотеть взглянуть на существующие frameworks на других языках, чтобы найти вдохновение.


(*) Я не вижу в этом проблемы, совсем наоборот. Я думаю, что это самый дух C ++, унаследованный от C.

Что-то вроде этого может помочь (взято из рабочего приложения).

 #include  #include  #include  struct thread_pool { typedef std::unique_ptr asio_worker; thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) { for (int i = 0; i < threads; ++i) { auto worker = [this] { return service.run(); }; grp.add_thread(new boost::thread(worker)); } } template void enqueue(F f) { service.post(f); } ~thread_pool() { service_worker.reset(); grp.join_all(); service.stop(); } private: boost::asio::io_service service; asio_worker service_worker; boost::thread_group grp; }; 

Вы можете использовать его следующим образом:

 thread_pool pool(2); pool.enqueue([] { std::cout << "Hello from Task 1\n"; }); pool.enqueue([] { std::cout << "Hello from Task 2\n"; }); 

Имейте в виду, что переосмысление эффективного механизма асинхронного очередей не является тривиальным.

Boost :: asio :: io_service - очень эффективная реализация или фактически представляет собой набор оболочек, специфичных для платформы (например, он переносит порты завершения ввода-вывода в Windows).

Это еще одна реализация пула streamов, которая очень проста, понятна и понятна, использует только стандартную библиотеку C ++ 11 и может быть просмотрена или изменена для ваших целей. Должен быть хороший стартер, если вы хотите использовать stream бассейны:

https://github.com/progschj/ThreadPool

Изменить: теперь требуется C ++ 17 и понятия. (По состоянию на 9/12/16 достаточно только g ++ 6.0+.)

Однако из-за этого вычет шаблона намного точнее, поэтому стоит приложить усилия к созданию нового компилятора. Я еще не нашел функцию, которая требует явных аргументов шаблона.

Он также теперь принимает любой подходящий вызываемый объект ( и все еще статично виден! ).

Он также теперь включает в себя опциональный stream streamов с приоритетом streamа с зеленым streamом с использованием того же API. Однако этот class является POSIX. Он использует API ucontext_t для переключения задач в пользовательском пространстве.


Для этого я создал простую библиотеку. Ниже приведен пример использования. (Я отвечаю на это, потому что это была одна из вещей, которые я нашел, прежде чем я решил, что это необходимо написать сами).

 bool is_prime(int n){ // Determine if n is prime. } int main(){ thread_pool pool(8); // 8 threads list> results; for(int n = 2;n < 10000;n++){ // Submit a job to the pool. results.emplace_back(pool.async(is_prime, n)); } int n = 2; for(auto i = results.begin();i != results.end();i++, n++){ // i is an iterator pointing to a future representing the result of is_prime(n) cout << n << " "; bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result. if(prime) cout << "is prime"; else cout << "is not prime"; cout << endl; } } 

Вы можете передать async любую функцию с любым возвращаемым значением (или void) и любыми (или нет) аргументами, и оно вернет соответствующий std::future . Чтобы получить результат (или просто дождаться завершения задачи), вы вызываете get() в будущем.

Вот github: https://github.com/Tyler-Hardin/thread_pool .

 Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool: 

function_pool.h

 #pragma once #include  #include  #include  #include  #include  #include  class Function_pool { private: std::queue> m_function_queue; std::mutex m_lock; std::condition_variable m_data_condition; std::atomic m_accept_functions; public: Function_pool(); ~Function_pool(); void push(std::function func); void done(); void infinite_loop_func(); }; 

function_pool.cpp

 #include "function_pool.h" Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true) { } Function_pool::~Function_pool() { } void Function_pool::push(std::function func) { std::unique_lock lock(m_lock); m_function_queue.push(func); // when we send the notification immediately, the consumer will try to get the lock , so unlock asap lock.unlock(); m_data_condition.notify_one(); } void Function_pool::done() { std::unique_lock lock(m_lock); m_accept_functions = false; lock.unlock(); // when we send the notification immediately, the consumer will try to get the lock , so unlock asap m_data_condition.notify_all(); //notify all waiting threads. } void Function_pool::infinite_loop_func() { std::function func; while (true) { { std::unique_lock lock(m_lock); m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; }); if (!m_accept_functions && m_function_queue.empty()) { //lock will be release automatically. //finish the thread loop and let it join in the main thread. return; } func = m_function_queue.front(); m_function_queue.pop(); //release the lock } func(); } } 

main.cpp

 #include "function_pool.h" #include  #include  #include  #include  #include  #include  Function_pool func_pool; class quit_worker_exception : public std::exception {}; void example_function() { std::cout << "bla" << std::endl; } int main() { std::cout << "stating operation" << std::endl; int num_threads = std::thread::hardware_concurrency(); std::cout << "number of threads = " << num_threads << std::endl; std::vector thread_pool; for (int i = 0; i < num_threads; i++) { thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool)); } //here we should send our functions for (int i = 0; i < 50; i++) { func_pool.push(example_function); } func_pool.done(); for (unsigned int i = 0; i < thread_pool.size(); i++) { thread_pool.at(i).join(); } } 

Путь streamа без зависимостей вне STL вполне возможен. Недавно я написал небольшую библиотеку streamов thread-only для решения одной и той же проблемы. Он поддерживает изменение динамического пула (изменение количества рабочих во время выполнения), ожидание, остановка, приостановка, возобновление и т. Д. Надеюсь, вы сочтете это полезным.

  • C # - ThreadPool vs Tasks
  • Что определяет количество streamов, создаваемых Java ForkJoinPool?
  • Java: ExecutorService, который блокируется при представлении после определенного размера очереди
  • Как реализовать PriorityBlockingQueue с помощью ThreadPoolExecutor и настраиваемых задач
  • Как создать пул streamов, используя boost в C ++?
  • Создание пула streamов с использованием boost
  • FixedThreadPool против CachedThreadPool: меньшее из двух зол
  • Именование streamов и streamов-streamов ExecutorService
  • Как я могу отключить пулы исполнителей / планировщиков Spring Spring до того, как все остальные бобы в веб-приложении будут уничтожены?
  • В чем смысл гибкости streamов в ASP.Net?
  • Сколько streamов слишком много?
  • Interesting Posts

    Переопределить стиль тела для содержимого в iframe

    Веб-приложение заблокировано при обработке другого веб-приложения при совместном использовании одного сеанса

    Скрытые особенности F #

    Как получить частоты каждого значения в БПФ?

    OnclientClick и OnClick не работают одновременно?

    WPF. Должен ли пользовательский элемент управления иметь свой собственный ViewModel?

    Страtagsя обновления соединения HBase Kerberos

    можно ли отключить вставку javac статических конечных переменных?

    Как напечатать f uint64_t? Сбой: «ложный трейлинг«% »в формате«

    Что такое модификатор доступа по умолчанию в Java?

    Как я могу заставить FileZilla игнорировать папку или файлы (например, .svn или CVS)?

    Есть ли кабель SATA, который может подключаться к нескольким дискам?

    Эффект Fade CSS3

    Существуют ли какие-либо соглашения по упорядочению методов Java?

    strptime, as.POSIXct и as.Date возвращают неожиданные NA

    Давайте будем гением компьютера.