Как прервать BlockingQueue, который блокирует take ()?

У меня есть class, который берет объекты из BlockingQueue и обрабатывает их, вызывая метод take() в непрерывном цикле. В какой-то момент я знаю, что в очередь не будет добавлено больше объектов. Как прервать метод take() чтобы он блокировал блокировку?

Вот class, который обрабатывает объекты:

 public class MyObjHandler implements Runnable { private final BlockingQueue queue; public class MyObjHandler(BlockingQueue queue) { this.queue = queue; } public void run() { try { while (true) { MyObj obj = queue.take(); // process obj here // ... } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } 

И вот метод, который использует этот class для обработки объектов:

 public void testHandler() { BlockingQueue queue = new ArrayBlockingQueue(100); MyObjectHandler handler = new MyObjectHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler // to stop waiting for more objects? } 

Если прерывание streamа не является вариантом, другое – разместить объект «маркер» или «команда» в очереди, который будет распознан MyObjHandler как таковой и вырваться из цикла.

 BlockingQueue queue = new ArrayBlockingQueue(100); MyObjectHandler handler = new MyObjectHandler(queue); Thread thread = new Thread(handler); thread.start(); for (Iterator i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } thread.interrupt(); 

Однако, если вы это сделаете, stream может быть прерван, пока в очереди все еще есть элементы, ожидающие обработки. Возможно, вы захотите рассмотреть вопрос об использовании poll а не take , что позволит streamу обработки отключиться и завершить работу, когда он ждал какое-то время без нового ввода.

Очень поздно, но надеюсь, что это тоже помогает другим, поскольку я столкнулся с подобной проблемой и использовал подход poll предложенный Эриксоном выше, с некоторыми незначительными изменениями,

 class MyObjHandler implements Runnable { private final BlockingQueue queue; public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL public MyObjHandler(BlockingQueue queue) { this.queue = queue; Finished = false; } @Override public void run() { while (true) { try { MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS); if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return { // process obj here // ... } if(Finished && queue.isEmpty()) return; } catch (InterruptedException e) { return; } } } } public void testHandler() { BlockingQueue queue = new ArrayBlockingQueue(100); MyObjHandler handler = new MyObjHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler to stop waiting for more objects? handler.Finished = true; //THIS TELLS HIM //If you need you can wait for the termination otherwise remove join myThread.join(); } 

Это позволило решить обе проблемы

  1. Пометить BlockingQueue так, чтобы он знал, что ему больше не нужно ждать элементов
  2. Не прерывалось между ними, так что блоки обработки заканчиваются только тогда, когда обрабатываются все элементы в очереди, и нет элементов, которые необходимо добавить

Прерывать stream:

 thread.interrupt() 

Или не прерывай, это противно.

  public class MyQueue extends ArrayBlockingQueue { private static final long serialVersionUID = 1L; private boolean done = false; public ParserQueue(int capacity) { super(capacity); } public void done() { done = true; } public boolean isDone() { return done; } /** * May return null if producer ends the production after consumer * has entered the element-await state. */ public T take() throws InterruptedException { T el; while ((el = super.poll()) == null && !done) { synchronized (this) { wait(); } } return el; } } 
  1. когда производитель помещает объект в очередь, вызовите queue.notify() , если он закончится, вызовите queue.done()
  2. loop while (! queue.isDone () ||! queue.isEmpty ())
  3. test take () возвращаемое значение для null
  • Параллельность: Atomic и volatile в модели памяти C ++ 11
  • Несинхронизированные статические методы streamобезопасны, если они не изменяют переменные статического classа?
  • Как эта `this` ссылка на внешний class исчезает из-за публикации экземпляра внутреннего classа?
  • Может ли современное оборудование x86 не хранить один байт в памяти?
  • Синхронизация против блокировки
  • Простые числа Eratoshenes быстрее последовательны, чем одновременно?
  • Гарантии гарантий безопасности
  • Синхронизация конструктора в Java
  • Давайте будем гением компьютера.