Как дождаться завершения всех streamов, используя ExecutorService?

Мне нужно выполнить некоторое количество задач 4 за раз, что-то вроде этого:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } //...wait for completion somehow 

Как я могу получить уведомление, когда все они будут завершены? На данный момент я не могу думать ни о чем лучше, чем устанавливать какой-либо глобальный счетчик задач и уменьшать его в конце каждой задачи, а затем контролировать в бесконечном цикле этот счетчик, чтобы он стал 0; или получить список фьючерсов и в бесконечном мониторе цикла isDone для всех из них. Каковы лучшие решения, не связанные с бесконечными циклами?

Благодарю.

В основном на ExecutorService вы вызываете shutdown() а затем awaitTermination() :

 ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } taskExecutor.shutdown(); try { taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { ... } 

Используйте CountDownLatch :

 CountDownLatch latch = new CountDownLatch(totalNumberOfTasks); ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } try { latch.await(); } catch (InterruptedException E) { // handle } 

и в рамках вашей задачи (заключите в try / finally)

 latch.countDown(); 

ExecutorService.invokeAll() делает это за вас.

 ExecutorService taskExecutor = Executors.newFixedThreadPool(4); List> tasks; // your tasks // invokeAll() returns when all tasks are complete List> futures = taskExecutor.invokeAll(tasks); 

Вы также можете использовать Списки фьючерсов:

 List futures = new ArrayList(); // now add to it: futures.add(executorInstance.submit(new Callable() { public Void call() throws IOException { // do something return null; } })); 

затем, когда вы хотите присоединиться ко всем из них, это, по сути, эквивалент объединения каждого из них (с добавленной выгодой, что он повторно вызывает исключения из дочерних streamов в основной):

 for(Future f: this.futures) { f.get(); } 

В принципе, трюк заключается в вызове .get () для каждого Будущего по одному, а не бесконечному циклу вызова isDone () on (все или каждый). Таким образом, вы сможете «перейти» через этот блок и пройти мимо него, как только закончится последний stream. Предостережение состоит в том, что, поскольку вызов .get () повторно вызывает исключения, если один из streamов умирает, вы могли бы повысить его от этого, возможно, до того, как другие streamи завершили выполнение [чтобы этого избежать, вы можете добавить исключение catch ExecutionException вокруг получить звонок]. Другое предостережение – это ссылка на все streamи, поэтому, если у них есть локальные переменные streamа, они не будут собираться до тех пор, пока вы не пройдете мимо этого блока (хотя вы могли бы обойти это, если бы это стало проблемой, путем удаления Будущее от ArrayList). Если вы хотите знать, какое будущее «заканчивается первым», вы можете использовать что-то вроде https://stackoverflow.com/a/31885029/32453

Только мои два цента. Чтобы преодолеть требование CountDownLatch чтобы знать количество заданий заранее, вы можете сделать это по-старому, используя простой Semaphore .

 ExecutorService taskExecutor = Executors.newFixedThreadPool(4); int numberOfTasks=0; Semaphore s=new Semaphore(0); while(...) { taskExecutor.execute(new MyTask()); numberOfTasks++; } try { s.aquire(numberOfTasks); ... 

В своей задаче просто вызовите s.release() поскольку вы будете latch.countDown();

В Java8 вы можете сделать это с помощью CompletableFuture :

 ExecutorService es = Executors.newFixedThreadPool(4); List tasks = getTasks(); CompletableFuture[] futures = tasks.stream() .map(task -> CompletableFuture.runAsync(task, es)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown(); 

Класс CyclicBarrier в Java 5 и более поздних версиях предназначен для такого рода вещей.

Немного поздно в игре, но ради завершения …

Вместо того, чтобы «ждать» для завершения всех задач, вы можете думать в терминах принципа Голливуда: «Не называй меня, я тебе позвоню» – когда я закончу. Я думаю, что полученный код более изящный …

Guava предлагает некоторые интересные инструменты для достижения этого.

Пример ::

Оберните службу-исполнитель в ListeningExecutorService ::

 ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); 

Отправить коллекцию вызовов для исполнения ::

 for (Callable callable : callables) { ListenableFuture lf = service.submit(callable); // listenableFutures is a collection listenableFutures.add(lf) }); 

Теперь важная часть:

 ListenableFuture> lf = Futures.successfulAsList(listenableFutures); 

Прикрепите обратный вызов к ListenableFuture, который вы можете использовать для уведомления, когда все фьючерсы завершены ::

  Futures.addCallback(lf, new FutureCallback>() { @Override public void onSuccess(List result) { log.info("@@ finished processing {} elements", Iterables.size(result)); // do something with all the results } @Override public void onFailure(Throwable t) { log.info("@@ failed because of :: {}", t); } }); 

Это также дает преимущество, что вы можете собрать все результаты в одном месте после завершения обработки …

Дополнительная информация здесь

Следуйте одному из подходов.

  1. Итерация по всем будущим задачам, возвращенная из submit на ExecutorService и проверка статуса с блокирующим вызовом get() на объекте Future как было предложено Kiran
  2. Использовать invokeAll() для ExecutorService
  3. CountDownLatch
  4. ForkJoinPool или Executors.html # newWorkStealingPool
  5. Использовать shutdown, awaitTermination, shutdownNow API ThreadPoolExecutor в правильной последовательности

Связанные вопросы SE:

Как CountDownLatch используется в многопоточности Java?

Как правильно закрыть java ExecutorService

Вы можете перенести свои задачи в другую, которая будет отправлять уведомления:

 taskExecutor.execute(new Runnable() { public void run() { taskStartedNotification(); new MyTask().run(); taskFinishedNotification(); } }); 

Я только что написал пример программы, которая решает вашу проблему. Данной краткой реализации не было, поэтому я добавлю ее. Хотя вы можете использовать executor.shutdown() и executor.awaitTermination() , это не самая лучшая практика, поскольку время, затраченное на разные streamи, было бы непредсказуемым.

 ExecutorService es = Executors.newCachedThreadPool(); List> tasks = new ArrayList<>(); for (int j = 1; j <= 10; j++) { tasks.add(new Callable() { @Override public Integer call() throws Exception { int sum = 0; System.out.println("Starting Thread " + Thread.currentThread().getId()); for (int i = 0; i < 1000000; i++) { sum += i; } System.out.println("Stopping Thread " + Thread.currentThread().getId()); return sum; } }); } try { List> futures = es.invokeAll(tasks); int flag = 0; for (Future f : futures) { Integer res = f.get(); System.out.println("Sum: " + res); if (!f.isDone()) flag = 1; } if (flag == 0) System.out.println("SUCCESS"); else System.out.println("FAILED"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } 

Просто чтобы предоставить больше альтернатив здесь, чтобы использовать защелку / барьеры. Вы также можете получить частичные результаты, пока все они не закончат использование CompletionService .

Из Java Concurrency на практике: «Если у вас есть партия вычислений для представления Исполнителю, и вы хотите получить их результаты по мере их появления, вы можете сохранить будущее, связанное с каждой задачей, и повторно опросить для завершения, вызвав get с помощью timeout of zero. Это возможно, но утомительно . К счастью, есть лучший способ : служба завершения ».

Здесь реализация

 public class TaskSubmiter { private final ExecutorService executor; TaskSubmiter(ExecutorService executor) { this.executor = executor; } void doSomethingLarge(AnySourceClass source) { final List info = doPartialAsyncProcess(source); CompletionService completionService = new ExecutorCompletionService(executor); for (final InterestedResult interestedResultItem : info) completionService.submit(new Callable() { public PartialResult call() { return InterestedResult.doAnOperationToGetPartialResult(); } }); try { for (int t = 0, n = info.size(); t < n; t++) { Future f = completionService.take(); PartialResult PartialResult = f.get(); processThisSegment(PartialResult); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { throw somethinghrowable(e.getCause()); } } } 

Вы можете использовать свой собственный подclass ExecutorCompletionService для wrap taskExecutor и собственную реализацию BlockingQueue, чтобы получать информацию, когда каждая задача завершается, и выполнять любые обратные вызовы или другие действия, которые вы желаете, когда количество завершенных задач достигает желаемой цели.

вы должны использовать метод executorService.shutdown() и executorService.awaitTermination .

Пример:

 public class ScheduledThreadPoolExample { public static void main(String[] args) throws InterruptedException { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); executorService.scheduleAtFixedRate(() -> System.out.println("process task."), 0, 1, TimeUnit.SECONDS); TimeUnit.SECONDS.sleep(10); executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); } } 

Java 8 – Мы можем использовать stream API для обработки streamа. См. Ниже fragment

 final List tasks = ...; //or any other functional interface tasks.stream().parallel().forEach(Runnable::run) // Uses default pool //alternatively to specify parallelism new ForkJoinPool(15).submit( () -> tasks.stream().parallel().forEach(Runnable::run) ).get(); 

Вы можете использовать этот код:

 public class MyTask implements Runnable { private CountDownLatch countDownLatch; public MyTask(CountDownLatch countDownLatch { this.countDownLatch = countDownLatch; } @Override public void run() { try { //Do somethings // this.countDownLatch.countDown();//important } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS); ExecutorService taskExecutor = Executors.newFixedThreadPool(4); for (int i = 0; i < NUMBER_OF_TASKS; i++){ taskExecutor.execute(new MyTask(countDownLatch)); } countDownLatch.await(); System.out.println("Finish tasks"); 

Это мое решение, основанное на подсказке «AdamSkywalker», и оно работает

 package frss.main; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestHilos { void procesar() { ExecutorService es = Executors.newFixedThreadPool(4); List tasks = getTasks(); CompletableFuture[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown(); System.out.println("FIN DEL PROCESO DE HILOS"); } private List getTasks() { List tasks = new ArrayList(); Hilo01 task1 = new Hilo01(); tasks.add(task1); Hilo02 task2 = new Hilo02(); tasks.add(task2); return tasks; } private class Hilo01 extends Thread { @Override public void run() { System.out.println("HILO 1"); } } private class Hilo02 extends Thread { @Override public void run() { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("HILO 2"); } } public static void main(String[] args) { TestHilos test = new TestHilos(); test.procesar(); } } 

Поэтому я отправляю свой ответ из связанного вопроса здесь, если кто-то хочет более простой способ сделать это

 ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture[] futures = new CompletableFuture[10]; int i = 0; while (...) { futures[i++] = CompletableFuture.runAsync(runner, executor); } CompletableFuture.allOf(futures).join(); // THis will wait until all future ready. 

вот два варианта, просто немного путайте, что лучше всего идти.

Опция 1:

 ExecutorService es = Executors.newFixedThreadPool(4); List tasks = getTasks(); CompletableFuture[] futures = tasks.stream() .map(task -> CompletableFuture.runAsync(task, es)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown(); 

Вариант 2:

 ExecutorService es = Executors.newFixedThreadPool(4); List< Future> futures = new ArrayList<>(); for(Runnable task : taskList) { futures.add(es.submit(task)); } for(Future future : futures) { try { future.get(); }catch(Exception e){ // do logging and nothing else } } es.shutdown(); 

Здесь put future.get (); в try catch хорошая идея?

Это может помочь

 Log.i(LOG_TAG, "shutting down executor..."); executor.shutdown(); while (true) { try { Log.i(LOG_TAG, "Waiting for executor to terminate..."); if (executor.isTerminated()) break; if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { break; } } catch (InterruptedException ignored) {} } 

Вы можете вызвать waitTillDone () в этом classе Runner :

 Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool while(...) { runner.run(new MyTask()); // here you submit your task } runner.waitTillDone(); // and this blocks until all tasks are finished (or failed) runner.shutdown(); // once you done you can shutdown the runner 

Вы можете повторно использовать этот class и вызывать waitTillDone () столько раз, сколько хотите, перед вызовом shutdown (), плюс ваш код предельно прост . Также вам не обязательно знать количество задач .

Чтобы использовать его, просто добавьте этот параметр gradle / maven compile 'com.github.matejtymes:javafixes:1.1.1' в соответствие с вашим проектом.

Более подробную информацию можно найти здесь:

https://github.com/MatejTymes/JavaFixes

http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

В исполнителе есть метод getActiveCount() – который дает количество активных streamов.

После охвата streamа мы можем проверить, является ли значение activeCount() равным 0 . Когда значение равно нулю, подразумевается, что в настоящий момент нет активных streamов, что означает, что задача завершена:

 while (true) { if (executor.getActiveCount() == 0) { //ur own piece of code break; } } 
  • Неустойчиво дорого?
  • В чем разница между параллелизмом, параллелизмом и асинхронными методами?
  • Инструкции SSE: какие процессоры могут выполнять атомные операции памяти 16B?
  • В чем разница между ConcurrentHashMap и Collections.synchronizedMap (Карта)?
  • Кажется, что сервлет обрабатывает несколько одновременных запросов браузера синхронно
  • RxJava вместо AsyncTask?
  • Как я могу атомизировать приращение переменной в Swift?
  • Невозможно создать кэшированный пул streamов с ограничением размера?
  • Инициализация двух streamов одним и тем же экземпляром исполняемого файла
  • Безопасно ли читать указатель на функцию одновременно без блокировки?
  • Как заставить BackgroundWorker возвращать объект
  • Давайте будем гением компьютера.