Исполнители Java: как я могу остановить отправленные задачи?

Я поставил задачу с помощью исполнителей, и мне нужно, чтобы она остановилась через некоторое время (например, 5 минут). Я пробовал сделать вот так:

for (Future fut : e.invokeAll(tasks, 300, TimeUnit.SECONDS)) { try { fut.get(); } catch (CancellationException ex) { fut.cancel(true); tasks.clear(); } catch(ExecutionException ex){ ex.printStackTrace(); //FIXME: gestita con printstack } } 

Но я всегда получаю сообщение об ошибке: у меня есть общий вектор, который нужно модифицировать задачами, а затем читать по streamу, и даже если я остановлю всю задачу, если произойдет таймаут, я получаю:

 Exception in thread "Thread-1" java.util.ConcurrentModificationException 

Здесь что-то не так? Как я могу остановить переданные задачи, которые все еще работают через 5 минут?

Просто потому, что вы вызываете cancel() в Future это не означает, что задача остановится автоматически. Вы должны выполнить некоторую работу в рамках задачи, чтобы убедиться, что она остановится:

  • Используйте cancel(true) чтобы прерывание было отправлено в задание.
  • Обрабатывать InterruptedException . Если функция в вашей задаче вызывает исключение InterruptedException , убедитесь, что вы выходите изящно как можно скорее после обнаружения исключения.
  • Периодически проверяйте Thread.currentThread().isInterrupted() если задача выполняет непрерывное вычисление.

Например:

 class LongTask implements Callable { public Double call() { // Sleep for a while; handle InterruptedException appropriately try { Thread.sleep(10000); } catch (InterruptedException ex) { System.out.println("Exiting gracefully!"); return null; } // Compute for a while; check Thread.isInterrupted() periodically double sum = 0.0; for (long i = 0; i < 10000000; i++) { sum += 10.0 if (Thread.currentThread().isInterrupted()) { System.out.println("Exiting gracefully"); return null; } } return sum; } } 

Кроме того, как упоминалось в других сообщениях: ConcurrentModificationException может быть выбрано даже при использовании streamобезопасного classа Vector , поскольку iteratorы, которые вы получаете из Vector , не являются streamобезопасными и, следовательно, должны быть синхронизированы. Продвинутый for-loop использует iteratorы, поэтому следите:

 final Vector vector = new Vector(); vector.add(1.0); vector.add(2.0); // Not thread safe! If another thread modifies "vector" during the loop, then // a ConcurrentModificationException will be thrown. for (Double num : vector) { System.out.println(num); } // You can try this as a quick fix, but it might not be what you want: synchronized (vector) { // "vector" must be final for (Double num : vector) { System.out.println(num); } } 

tasks.clear() ConcurrentModificationException исходит от вашего вызова tasks.clear() то время как ваши Exceitors выполняет итерацию над вашими tasks Vector . То, что вы можете попытаться сделать, это вызвать shutdownNow() в ExecutorService

Наиболее распространенный случай для ConcurrentModificationException – это когда vector изменяется одновременно с его повторением. Часто это будет сделано в одном streamе. Вам нужно удерживать блокировку Vector для всей итерации (и осторожно, чтобы не заходить в тупик).

fut.get () – это блокирующий вызов, даже после таймаута, который вы заблокируете, пока задача не будет выполнена. Если вы хотите остановиться как можно ближе к отметке 5 минут, вам нужно проверить флаг прерывания, я просто рекомендую вам это сделать с помощью метода Thread.isInterrupted (), который сохраняет состояние прерывания. Если вы хотите просто немедленно остановиться и не нуждаетесь в очистке какого-либо состояния, тогда создайте исключение, которое будет уловлено Будущим и указано вам как ExecutionException.

fut.cancel (true) ничего не делает, поскольку метод invokeAll () уже сделал это для вас.

Если вы не используете коллекцию «Задачи» где-то в другом месте, вам, вероятно, не нужно вызывать clear (). Это не будет источником вашей проблемы, поскольку метод invokeAll () выполняется со списком к моменту вызова clear (). Но если вам нужно начать формирование списка новых задач для выполнения, я предлагаю вам создать новый список задач, а не использовать старый список новых задач.

К сожалению, у меня нет ответа на вашу проблему. Я не вижу здесь достаточной информации, чтобы диагностировать ее. Ничто в приведенном fragmentе кода не указывает на неправильное (только ненужное) использование библиотечных classов / методов. Возможно, если вы включили полную трассировку стека, вместо одной строки.

Поместите fut.cancel(true); в блоке finally

  • Когда и как следует использовать переменную ThreadLocal?
  • Почему параллельный stream не использует все streamи ForkJoinPool?
  • Изящное завершение streamов и исполнителей
  • Как эффективно отображать OpenCV-видео в Qt?
  • Создание блокирующей очереди в .NET?
  • Invoke или BeginInvoke нельзя вызвать в элементе управления до тех пор, пока дескриптор windows не будет создан
  • Сколько streamов может поддерживать Java VM?
  • Синхронизация доступа к SimpleDateFormat
  • Нужно ли защищать доступ для чтения к контейнеру STL в многопоточной среде?
  • Как оценить накладные расходы на переключение streamов?
  • Вопрос о прекращении streamа в .NET.
  • Давайте будем гением компьютера.