Исполнители Java: как быть уведомленным, без блокировки, когда задача завершается?

Скажем, у меня есть очередь, полная задач, которые мне нужно отправить в службу исполнителя. Я хочу, чтобы они обрабатывались по одному. Самый простой способ, о котором я могу думать, это:

  1. Возьмите задачу из очереди
  2. Отправьте его исполнителю
  3. Вызовите .get на возвращенном Будущем и заблокируйте, пока не появится результат.
  4. Возьмите еще одну задачу из очереди …

Тем не менее, я стараюсь полностью не блокировать. Если у меня 10 000 таких очередей, для которых их задачи обрабатываются по одному, у меня закончится пространство стека, потому что большинство из них будет удерживать блокированные streamи.

Я хотел бы отправить задание и предоставить обратный вызов, который вызывается, когда задача завершена. Я буду использовать это уведомление обратного вызова в качестве флага для отправки следующей задачи. (Функциональные и реактивные джойстики, по-видимому, используют такие неблокирующие алгоритмы, но я не могу понять их код)

Как я могу это сделать с помощью java.util.concurrent JDK, не говоря о моей собственной службе исполнителя?

(очередь, которая кормит меня этими задачами, может сама блокироваться, но это проблема, которая будет решена позже)

    Определите интерфейс обратного вызова для получения любых параметров, которые вы хотите передать в уведомлении о завершении. Затем вызовите его в конце задачи.

    Вы даже можете написать общую оболочку для задач Runnable и отправить их в ExecutorService . Или, см. Ниже механизм, встроенный в Java 8.

     class CallbackTask implements Runnable { private final Runnable task; private final Callback callback; CallbackTask(Runnable task, Callback callback) { this.task = task; this.callback = callback; } public void run() { task.run(); callback.complete(); } } 

    С CompletableFuture Java 8 включала более сложные средства для компоновки конвейеров, где процессы могут выполняться асинхронно и условно. Вот надуманный, но полный пример уведомления.

     import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; public class GetTaskNotificationWithoutBlocking { public static void main(String... argv) throws Exception { ExampleService svc = new ExampleService(); GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking(); CompletableFuture f = CompletableFuture.supplyAsync(svc::work); f.thenAccept(listener::notify); System.out.println("Exiting main()"); } void notify(String msg) { System.out.println("Received message: " + msg); } } class ExampleService { String work() { sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */ char[] str = new char[5]; ThreadLocalRandom current = ThreadLocalRandom.current(); for (int idx = 0; idx < str.length; ++idx) str[idx] = (char) ('A' + current.nextInt(26)); String msg = new String(str); System.out.println("Generated message: " + msg); return msg; } public static void sleep(long average, TimeUnit unit) { String name = Thread.currentThread().getName(); long timeout = Math.min(exponential(average), Math.multiplyExact(10, average)); System.out.printf("%s sleeping %d %s...%n", name, timeout, unit); try { unit.sleep(timeout); System.out.println(name + " awoke."); } catch (InterruptedException abort) { Thread.currentThread().interrupt(); System.out.println(name + " interrupted."); } } public static long exponential(long avg) { return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble())); } } - import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; public class GetTaskNotificationWithoutBlocking { public static void main(String... argv) throws Exception { ExampleService svc = new ExampleService(); GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking(); CompletableFuture f = CompletableFuture.supplyAsync(svc::work); f.thenAccept(listener::notify); System.out.println("Exiting main()"); } void notify(String msg) { System.out.println("Received message: " + msg); } } class ExampleService { String work() { sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */ char[] str = new char[5]; ThreadLocalRandom current = ThreadLocalRandom.current(); for (int idx = 0; idx < str.length; ++idx) str[idx] = (char) ('A' + current.nextInt(26)); String msg = new String(str); System.out.println("Generated message: " + msg); return msg; } public static void sleep(long average, TimeUnit unit) { String name = Thread.currentThread().getName(); long timeout = Math.min(exponential(average), Math.multiplyExact(10, average)); System.out.printf("%s sleeping %d %s...%n", name, timeout, unit); try { unit.sleep(timeout); System.out.println(name + " awoke."); } catch (InterruptedException abort) { Thread.currentThread().interrupt(); System.out.println(name + " interrupted."); } } public static long exponential(long avg) { return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble())); } } 

    Используйте API-интерфейс Guava для просмотра в будущем и добавьте обратный вызов. Ср с веб-сайта :

     ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); ListenableFuture explosion = service.submit(new Callable() { public Explosion call() { return pushBigRedButton(); } }); Futures.addCallback(explosion, new FutureCallback() { // we want this handler to run immediately after we push the big red button! public void onSuccess(Explosion explosion) { walkAwayFrom(explosion); } public void onFailure(Throwable thrown) { battleArchNemesis(); // escaped the explosion! } }); 

    В Java 8 вы можете использовать CompletableFuture . Вот пример, который я использовал в своем коде, где я использую его для извлечения пользователей из моей службы пользователей, сопоставления их с моими объектами представления, а затем обновления моего представления или отображения диалогового windows с ошибкой (это приложение графического интерфейса):

      CompletableFuture.supplyAsync( userService::listUsers ).thenApply( this::mapUsersToUserViews ).thenAccept( this::updateView ).exceptionally( throwable -> { showErrorDialogFor(throwable); return null; } ); 

    Он выполняется асинхронно. Я использую два частных метода: mapUsersToUserViews и updateView .

    Вы можете расширить class FutureTask и переопределить метод done() , а затем добавить объект FutureTask в ExecutorService , поэтому метод done() будет вызываться, когда FutureTask завершен немедленно.

    ThreadPoolExecutor также имеет beforeExecute и afterExecute которые вы можете переопределить и использовать. Вот описание из Javadocs от ThreadPoolExecutor .

    Методы крюка

    Этот class предоставляет защищенные переопределяемые методы beforeExecute(java.lang.Thread, java.lang.Runnable) и afterExecute(java.lang.Runnable, java.lang.Throwable) , которые вызывают до и после выполнения каждой задачи. Они могут использоваться для управления средой исполнения; например, ThreadLocals инициализация ThreadLocals , сбор статистики или добавление записей журнала. Кроме того, метод terminated() может быть переопределен для выполнения любой специальной обработки, которая должна быть выполнена после полного прекращения Executor . Если методы перехвата или обратного вызова генерируют исключения, внутренние рабочие streamи могут, в свою очередь, прерываться и прерываться.

    Используйте CountDownLatch .

    Это от java.util.concurrent и это как раз способ дождаться, пока несколько streamов завершит выполнение, прежде чем продолжить.

    Чтобы добиться эффекта обратного вызова, который вам нужно, это требует дополнительной дополнительной работы. А именно, обрабатывая это самостоятельно в отдельном streamе, который использует CountDownLatch и ждет его, а затем продолжает уведомлять о том, что вам нужно уведомлять. Не существует встроенной поддержки обратного вызова или чего-либо подобного этому эффекту.


    EDIT: теперь, когда я еще больше понимаю ваш вопрос, я думаю, что вы слишком далеко, без необходимости. Если вы берете обычный SingleThreadExecutor , дайте ему все задачи, и он будет выполнять очередь изначально.

    Если вы хотите, чтобы задачи одновременно не выполнялись, используйте SingleThreadedExecutor . Задачи будут обрабатываться в том порядке, в котором они представлены. Вам даже не нужно выполнять задачи, просто отправьте их в exec.

    Чтобы добавить к ответу Мэтта, который помог, вот более скрупулезный пример, показывающий использование обратного вызова.

     private static Primes primes = new Primes(); public static void main(String[] args) throws InterruptedException { getPrimeAsync((p) -> System.out.println("onPrimeListener; p=" + p)); System.out.println("Adios mi amigito"); } public interface OnPrimeListener { void onPrime(int prime); } public static void getPrimeAsync(OnPrimeListener listener) { CompletableFuture.supplyAsync(primes::getNextPrime) .thenApply((prime) -> { System.out.println("getPrimeAsync(); prime=" + prime); if (listener != null) { listener.onPrime(prime); } return prime; }); } 

    Выход:

      getPrimeAsync(); prime=241 onPrimeListener; p=241 Adios mi amigito 

    Простой код для реализации механизма Callback с использованием ExecutorService

     import java.util.concurrent.*; import java.util.*; public class CallBackDemo{ public CallBackDemo(){ System.out.println("creating service"); ExecutorService service = Executors.newFixedThreadPool(5); try{ for ( int i=0; i<5; i++){ Callback callback = new Callback(i+1); MyCallable myCallable = new MyCallable((long)i+1,callback); Future future = service.submit(myCallable); //System.out.println("future status:"+future.get()+":"+future.isDone()); } }catch(Exception err){ err.printStackTrace(); } service.shutdown(); } public static void main(String args[]){ CallBackDemo demo = new CallBackDemo(); } } class MyCallable implements Callable{ Long id = 0L; Callback callback; public MyCallable(Long val,Callback obj){ this.id = val; this.callback = obj; } public Long call(){ //Add your business logic System.out.println("Callable:"+id+":"+Thread.currentThread().getName()); callback.callbackMethod(); return id; } } class Callback { private int i; public Callback(int i){ this.i = i; } public void callbackMethod(){ System.out.println("Call back:"+i); // Add your business logic } } . import java.util.concurrent.*; import java.util.*; public class CallBackDemo{ public CallBackDemo(){ System.out.println("creating service"); ExecutorService service = Executors.newFixedThreadPool(5); try{ for ( int i=0; i<5; i++){ Callback callback = new Callback(i+1); MyCallable myCallable = new MyCallable((long)i+1,callback); Future future = service.submit(myCallable); //System.out.println("future status:"+future.get()+":"+future.isDone()); } }catch(Exception err){ err.printStackTrace(); } service.shutdown(); } public static void main(String args[]){ CallBackDemo demo = new CallBackDemo(); } } class MyCallable implements Callable{ Long id = 0L; Callback callback; public MyCallable(Long val,Callback obj){ this.id = val; this.callback = obj; } public Long call(){ //Add your business logic System.out.println("Callable:"+id+":"+Thread.currentThread().getName()); callback.callbackMethod(); return id; } } class Callback { private int i; public Callback(int i){ this.i = i; } public void callbackMethod(){ System.out.println("Call back:"+i); // Add your business logic } } 

    вывод:

     creating service Callable:1:pool-1-thread-1 Call back:1 Callable:3:pool-1-thread-3 Callable:2:pool-1-thread-2 Call back:2 Callable:5:pool-1-thread-5 Call back:5 Call back:3 Callable:4:pool-1-thread-4 Call back:4 

    Ключевые заметки:

    1. Если вы хотите, чтобы задачи процесса последовательно newFixedThreadPool(5) в порядке FIFO, замените newFixedThreadPool(5) на newFixedThreadPool(1)
    2. Если вы хотите обработать следующую задачу после анализа результата callback предыдущей задачи, просто оставьте комментарий ниже строки

       //System.out.println("future status:"+future.get()+":"+future.isDone()); 
    3. Вы можете заменить newFixedThreadPool() одним из

       Executors.newCachedThreadPool() Executors.newWorkStealingPool() ThreadPoolExecutor 

      в зависимости от вашего использования.

    4. Если вы хотите обработать метод обратного вызова асинхронно

      а. Передайте общую задачу ExecutorService or ThreadPoolExecutor to Callable

      б. Преобразование метода Callable/Runnable в Callable/Runnable task

      с. Задайте задачу обратного вызова в ExecutorService or ThreadPoolExecutor

    Это расширение для ответа Паше с помощью ListenableFuture Guava.

    В частности, Futures.transform() возвращает ListenableFuture поэтому их можно использовать для цепочки асинхронных вызовов. Futures.addCallback() возвращает void , поэтому его нельзя использовать для цепочки, но он хорош для обработки успеха / сбоя при завершении async.

     // ListenableFuture1: Open Database ListenableFuture database = service.submit(() -> openDatabase()); // ListenableFuture2: Query Database for Cursor rows ListenableFuture cursor = Futures.transform(database, database -> database.query(table, ...)); // ListenableFuture3: Convert Cursor rows to List ListenableFuture> fooList = Futures.transform(cursor, cursor -> cursorToFooList(cursor)); // Final Callback: Handle the success/errors when final future completes Futures.addCallback(fooList, new FutureCallback>() { public void onSuccess(List foos) { doSomethingWith(foos); } public void onFailure(Throwable thrown) { log.error(thrown); } }); 

    ПРИМЕЧАНИЕ. Помимо цепочки асинхронных задач, Futures.transform() также позволяет вам планировать каждую задачу на отдельном исполнителе (не показано в этом примере).

    Вы можете использовать реализацию Callable таким образом, чтобы

     public class MyAsyncCallable implements Callable { CallbackInterface ci; public MyAsyncCallable(CallbackInterface ci) { this.ci = ci; } public V call() throws Exception { System.out.println("Call of MyCallable invoked"); System.out.println("Result = " + this.ci.doSomething(10, 20)); return (V) "Good job"; } } 

    где CallbackInterface – это нечто очень основное, как

     public interface CallbackInterface { public int doSomething(int a, int b); } 

    и теперь основной class будет выглядеть следующим образом:

     ExecutorService ex = Executors.newFixedThreadPool(2); MyAsyncCallable mac = new MyAsyncCallable((a, b) -> a + b); ex.submit(mac); 
    Давайте будем гением компьютера.