Как прервать BlockingQueue, который блокирует take ()?
У меня есть class, который берет объекты из BlockingQueue
и обрабатывает их, вызывая метод take()
в непрерывном цикле. В какой-то момент я знаю, что в очередь не будет добавлено больше объектов. Как прервать метод take()
чтобы он блокировал блокировку?
Вот class, который обрабатывает объекты:
public class MyObjHandler implements Runnable { private final BlockingQueue queue; public class MyObjHandler(BlockingQueue queue) { this.queue = queue; } public void run() { try { while (true) { MyObj obj = queue.take(); // process obj here // ... } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
И вот метод, который использует этот class для обработки объектов:
- Правильный способ синхронизации ArrayList в java
- Когда я должен использовать CompletionService над ExecutorService?
- JavaFX2: Могу ли я приостановить фоновое задание / службу?
- Как использовать свойство CancellationToken?
- Как вы запрашиваете pthread, чтобы узнать, все ли работает?
public void testHandler() { BlockingQueue queue = new ArrayBlockingQueue(100); MyObjectHandler handler = new MyObjectHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler // to stop waiting for more objects? }
- Связаны ли статические переменные между streamами?
- Когда и как следует использовать переменную ThreadLocal?
- Можем ли мы иметь условия гонки в однопоточной программе?
- Что такое алгоритм JVM Scheduling?
- Что будет использоваться для обмена данными между streamами, выполняются на одном ядре с HT?
- Что такое состояние гонки?
- Связаны ли в LinkedBlockingQueue методы удаления и удаления?
- На какой уровень блокирует MongoDB запись? (или: что это означает «за соединение»,
Если прерывание streamа не является вариантом, другое – разместить объект «маркер» или «команда» в очереди, который будет распознан MyObjHandler как таковой и вырваться из цикла.
BlockingQueue queue = new ArrayBlockingQueue (100); MyObjectHandler handler = new MyObjectHandler(queue); Thread thread = new Thread(handler); thread.start(); for (Iterator i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } thread.interrupt();
Однако, если вы это сделаете, stream может быть прерван, пока в очереди все еще есть элементы, ожидающие обработки. Возможно, вы захотите рассмотреть вопрос об использовании poll
а не take
, что позволит streamу обработки отключиться и завершить работу, когда он ждал какое-то время без нового ввода.
Очень поздно, но надеюсь, что это тоже помогает другим, поскольку я столкнулся с подобной проблемой и использовал подход poll
предложенный Эриксоном выше, с некоторыми незначительными изменениями,
class MyObjHandler implements Runnable { private final BlockingQueue queue; public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL public MyObjHandler(BlockingQueue queue) { this.queue = queue; Finished = false; } @Override public void run() { while (true) { try { MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS); if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return { // process obj here // ... } if(Finished && queue.isEmpty()) return; } catch (InterruptedException e) { return; } } } } public void testHandler() { BlockingQueue queue = new ArrayBlockingQueue (100); MyObjHandler handler = new MyObjHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler to stop waiting for more objects? handler.Finished = true; //THIS TELLS HIM //If you need you can wait for the termination otherwise remove join myThread.join(); }
Это позволило решить обе проблемы
- Пометить
BlockingQueue
так, чтобы он знал, что ему больше не нужно ждать элементов - Не прерывалось между ними, так что блоки обработки заканчиваются только тогда, когда обрабатываются все элементы в очереди, и нет элементов, которые необходимо добавить
Прерывать stream:
thread.interrupt()
Или не прерывай, это противно.
public class MyQueue extends ArrayBlockingQueue { private static final long serialVersionUID = 1L; private boolean done = false; public ParserQueue(int capacity) { super(capacity); } public void done() { done = true; } public boolean isDone() { return done; } /** * May return null if producer ends the production after consumer * has entered the element-await state. */ public T take() throws InterruptedException { T el; while ((el = super.poll()) == null && !done) { synchronized (this) { wait(); } } return el; } }
- когда производитель помещает объект в очередь, вызовите
queue.notify()
, если он закончится, вызовитеqueue.done()
- loop while (! queue.isDone () ||! queue.isEmpty ())
- test take () возвращаемое значение для null