Изящное завершение streamов и исполнителей

Следующий fragment кода пытается совместить это.

Код всегда навешивается и проверяет, есть ли какие-либо ожидающие обработки запросы. Если он есть, он создает новый stream для обработки запроса и отправляет его исполнителю. Как только все streamи выполнены, он спит в течение 60 секунд и снова проверяет ожидающие запросы.

public static void main(String a[]){ //variables init code omitted ExecutorService service = Executors.newFixedThreadPool(15); ExecutorCompletionService comp = new ExecutorCompletionService(service); while(true){ List pending = service.findPendingRequests(); int noPending = pending.size(); if (noPending > 0) { for (AppRequest req : pending) { Callable worker = new RequestThread(something, req); comp.submit(worker); } } for (int i = 0; i < noPending; i++) { try { Future f = comp.take(); long name; try { name = f.get(); LOGGER.debug(name + " got completed"); } catch (ExecutionException e) { LOGGER.error(e.toString()); } } catch (InterruptedException e) { LOGGER.error(e.toString()); } } TimeUnit.SECONDS.sleep(60); } } 

Мой вопрос – это большая часть обработки, которую эти streamи обрабатывают с базой данных. И эта программа будет работать на машине Windows. Что происходит с этими streamами, когда кто-то пытается отключить или выйти из машины? Как изящно закрыть запущенные streamи, а также исполнитель.?

Типичное упорядоченное закрытие ExecutorService может выглядеть примерно так:

 final ExecutorService executor; Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { executor.shutdown(); if (!executor.awaitTermination(SHUTDOWN_TIME)) { //optional * Logger.log("Executor did not terminate in the specified time."); //optional * List droppedTasks = executor.shutdownNow(); //optional ** Logger.log("Executor was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed."); //optional ** } } }); 

* Вы можете зарегистрировать, что у исполнителя все еще были задачи для обработки после ожидания времени, которое вы готовы ждать.
** Вы можете попытаться заставить рабочие streamи исполнителя отказаться от своих текущих задач и обеспечить, чтобы они не запускали ни одного из оставшихся.

Обратите внимание, что решение выше будет работать, когда пользователь выдает прерывание в ваш java процесс или когда ваш ExecutorService содержит только streamи демона. Если вместо этого ExecutorService содержит не-демонные streamи, которые еще не завершены, JVM не будет пытаться завершить работу, и поэтому крючки отключения не будут вызываться.

Если вы пытаетесь завершить процесс как часть дискретного жизненного цикла приложения (а не службы), то код завершения не должен быть помещен внутри крюка выключения, но в соответствующем месте, где программа предназначена для завершения.

В книге « Java Concurrency in Practice » говорится:

7.4. Закрытие JVM

JVM может закрываться либо упорядоченным, либо резким образом. После завершения последнего «нормального» (nondaemon) streamа происходит упорядоченное завершение работы, кто-то вызывает System.exit или другие средства для платформы (например, отправка SIGINT или нажатие Ctrl-C). […]

7.4.1. Крючки для выключения

При упорядоченном завершении работы JVM сначала запускает все зарегистрированные крючки отключения. Крюки остановки – это нестационарные streamи, зарегистрированные в Runtime.addShutdownHook. JVM не дает гарантий по заказу, в котором запускаются крючки остановки. Если какие-либо streamи приложений (daemon или nondaemon) все еще работают во время выключения, они продолжают работать одновременно с процессом останова. Когда все завершающие блокировки завершены, JVM может выбрать запуск финализаторов, если runFinalizersOnExit истинно, а затем останавливается. JVM не пытается остановить или прервать любые streamи приложений, которые все еще работают во время отключения; они внезапно прекращаются, когда JVM в конечном итоге останавливается. Если завершающие блокировки или финализаторы не завершены, тогда процесс упорядоченного выключения «зависает», и JVM должен быть отключен внезапно. […]

Важными битами являются: «JVM не пытается остановить или прервать любые streamи приложений, которые все еще работают во время отключения, они резко прекращаются, когда JVM в конечном итоге останавливается». поэтому я предполагаю, что соединение с БД внезапно прекратится, если нет завершающих крючков, чтобы сделать изящную очистку (если вы используете фреймворки, они обычно предоставляют такие прерывания остановки). По моему опыту, сеанс работы с БД может оставаться до тех пор, пока он не будет отключен БД и т. Д., Когда приложение. заканчивается без таких крючков.

Поскольку добавление крюка выключения для явного вызова shutdown() не сработало для меня, я нашел простое решение в Guava Google: com.google.common.util.concurrent.MoreExecutors.getExitingExecutorService .

Вы можете либо вызвать shutdown() в ExecutorService :

Вызывает упорядоченное завершение работы, в котором выполняются ранее поставленные задачи, но новые задачи не будут приняты.

или вы можете вызвать shutdownNow() :

Попытка остановить все активное выполнение задач, приостанавливает обработку ожидающих задач и возвращает список задач, ожидающих выполнения.

Нет никаких гарантий, кроме попыток максимального усилия прекратить обработку, активно выполняющую задачи. Например, типичные реализации будут отменены через Thread.interrupt (), поэтому любая задача, которая не отвечает на прерывания, может никогда не прекратиться.

Какой из них вы называете, зависит от того, насколько сильно вы хотите, чтобы он остановился …

У меня была аналогичная проблема, я использую, чтобы получить ошибку, как

oacloader.WebappClassLoaderBase :: Веб-приложение [ROOT], похоже, запустило stream с именем [pool-2-thread-1], но не смог его остановить. Вероятно, это приведет к утечке памяти. Stack trace of thread: sun.misc.Unsafe.park (собственный метод) java.util.concurrent.locks.LockSupport.park (LockSupport.java:175) java.util.concurrent.locks.AbstractQueuedSynchronizer $ ConditionObject.await (AbstractQueuedSynchronizer. Java: 2039)

Скорректированный код

 private ThreadPoolExecutor executorPool; @PostConstruct public void init() { log.debug("Initializing ThreadPoolExecutor"); executorPool = new ThreadPoolExecutor(1, 3, 1, TimeUnit.SECONDS, new ArrayBlockingQueue(1)); } @PreDestroy public void destroy() { log.debug("Shuting down ThreadPoolExecutor"); executorPool.shutdown(); } 
  • назначение ссылок атомарно, поэтому почему Interlocked.Exchange (ref Object, Object) необходимо?
  • Что такое алгоритм JVM Scheduling?
  • Как вы запрашиваете pthread, чтобы узнать, все ли работает?
  • ThreadStart с параметрами
  • Почему Thread.isInterrupted () всегда возвращает false?
  • Возврат значения из streamа?
  • Запуск нескольких streamов пользовательского интерфейса
  • C ++ Threads, std :: system_error - операция не разрешена?
  • Является ли доступ к переменной в C # атомной операцией?
  • Если я синхронизировал два метода в одном classе, они могут работать одновременно?
  • Как я могу обернуть метод, чтобы я мог убить его выполнение, если он превышает указанный тайм-аут?
  • Давайте будем гением компьютера.