Ожидание выполнения нескольких streamов в Java

Во время выполнения моей программы запускается несколько streamов. Количество streamов зависит от определенных пользователем параметров, но все они выполняют один и тот же метод с разными переменными.

В некоторых ситуациях для очистки требуется среднее выполнение, часть этого останавливает все streamи, я не хочу, чтобы они немедленно останавливались, но я просто установил переменную, которую они проверяют, для того, чтобы это завершало их. Проблема в том, что она может занять до 1/2 секунды до остановки streamа. Тем не менее, я должен быть уверен, что все streamи остановились, прежде чем очистка может продолжаться. Очистка выполняется из другого streamа, так что технически мне нужен этот stream, чтобы дождаться окончания остальных streamов.

Я подумал о нескольких способах этого, но все они кажутся слишком сложными. Я надеялся, что будет какой-то метод, который может дождаться завершения группы streamов. Есть ли что-нибудь подобное?

Благодарю.

Просто присоединитесь к ним один за другим:

for (Thread thread : threads) { thread.join(); } 

(Вам нужно будет что-то сделать с InterruptedException , и вы можете захотеть предоставить тайм-аут на случай, если все пойдет не так, но это основная идея …)

Если вы используете java 1.5 или выше, вы можете попробовать CyclicBarrier . Вы можете передать операцию очистки как свой параметр конструктора и просто вызвать barrier.await() для всех streamов, когда есть необходимость в очистке.

Определите метод утилиты (или методы) самостоятельно:

 public static waitFor(Collection 

Или у вас может быть массив

 public static waitFor(Thread[] ts) throws InterruptedException { waitFor(Arrays.asList(ts)); } 

В качестве альтернативы вы можете использовать CyclicBarrier в библиотеке java.util.concurrent для реализации произвольной точки рандеву между несколькими streamами.

Вы видели classы Executor в java.util.concurrent ? Вы можете запускать свои streamи через ExecutorService . Он дает вам один объект, который вы можете использовать для отмены streamов или ждать их завершения.

Если вы контролируете создание streamов (представление в ExecutorService), то, похоже, вы можете использовать ExecutorCompletionService см. ExecutorCompletionService? Зачем нужен один, если у нас есть invokeAll? для различных ответов там.

Если вы не контролируете создание streamов, вот подход, который позволяет вам присоединяться к нитям «один за другим по мере их завершения» (и знать, какой из них заканчивается первым и т. Д.), Вдохновленный classом Ruby ThreadWait . В основном, создавая «просмотр streamов», которые предупреждают о завершении других streamов, вы можете знать, когда «следующий» stream из многих завершается.

Вы бы использовали его примерно так:

 JoinThreads join = new JoinThreads(threads); for(int i = 0; i < threads.size(); i++) { Thread justJoined = join.joinNextThread(); System.out.println("Done with a thread, just joined=" + justJoined); } 

И источник:

 public static class JoinThreads { java.util.concurrent.LinkedBlockingQueue doneThreads = new LinkedBlockingQueue(); public JoinThreads(List threads) { for(Thread t : threads) { final Thread joinThis = t; new Thread(new Runnable() { @Override public void run() { try { joinThis.join(); doneThreads.add(joinThis); } catch (InterruptedException e) { // "should" never get here, since we control this thread and don't call interrupt on it } } }).start(); } } Thread joinNextThread() throws InterruptedException { return doneThreads.take(); } } 

Хорошей частью этого является то, что он работает с генерическими streamами Java, без изменений, любой stream может быть соединен. Предостережение это требует некоторого дополнительного создания streamа. Кроме того, эта конкретная реализация «оставляет нити за собой», если вы не вызываете joinNextThread () полное количество раз и не имеете метода «close» и т. Д. Комментарий здесь, если вы хотите создать более полированную версию. Вы также можете использовать тот же тип шаблона с «Фьючерсами» вместо объектов Thread и т. Д.

  • Как использовать семафоры между процессами с использованием общей памяти
  • Как синхронизированные статические методы работают на Java?
  • Убедитесь, что синхронизированные блокировки Java выполнены в порядке?
  • Как защитить ресурсы, которые могут использоваться в многопоточной или асинхронной среде?
  • Java Singleton и синхронизация
  • Заказ streamов для запуска в том порядке, в котором они были созданы / запущены
  • Правильный способ синхронизации ArrayList в java
  • Как сделать мой ArrayList streamобезопасным? Другой подход к проблеме в Java?
  • Статический и нестатический объект блокировки в синхронизированном блоке
  • Синхронизировать устройства Android через время GPS?
  • Как определить, заблокирован ли объект (синхронизирован), чтобы он не блокировался на Java?
  • Давайте будем гением компьютера.