SwingWorker, done () выполняется до завершения вызовов ()

Я некоторое время работаю с SwingWorker, и у меня появилось странное поведение, по крайней мере, для меня. Я ясно понимаю, что из-за причин производительности несколько призывов к методу publish () коллируются в одном вызове. Это имеет смысл для меня, и я подозреваю, что SwingWorker держит какую-то очередь для обработки всех вызовов.

Согласно руководству и API, когда SwingWorker завершает свое выполнение, либо doInBackground () заканчивается нормально, либо рабочий stream отменяется извне, тогда вызывается метод done () . Все идет нормально.

Но у меня есть пример (как показано в учебниках), где есть вызовы метода process() выполненные после done() метода done() . Поскольку оба метода выполняются в Thread Dispatch Thread, я бы ожидал, что done() будет выполнен после завершения всех вызовов process() . Другими словами:

Ожидаемое:

 Writing... Writing... Stopped! 

Результат:

 Writing... Stopped! Writing... 

Образец кода

 import java.awt.BorderLayout; import java.awt.Dimension; import java.awt.Graphics; import java.awt.event.ActionEvent; import java.util.List; import javax.swing.AbstractAction; import javax.swing.Action; import javax.swing.JButton; import javax.swing.JFrame; import javax.swing.JPanel; import javax.swing.JScrollPane; import javax.swing.JTextArea; import javax.swing.SwingUtilities; import javax.swing.SwingWorker; public class Demo { private SwingWorker worker; private JTextArea textArea; private Action startAction, stopAction; private void createAndShowGui() { startAction = new AbstractAction("Start writing") { @Override public void actionPerformed(ActionEvent e) { Demo.this.startWriting(); this.setEnabled(false); stopAction.setEnabled(true); } }; stopAction = new AbstractAction("Stop writing") { @Override public void actionPerformed(ActionEvent e) { Demo.this.stopWriting(); this.setEnabled(false); startAction.setEnabled(true); } }; JPanel buttonsPanel = new JPanel(); buttonsPanel.add(new JButton(startAction)); buttonsPanel.add(new JButton(stopAction)); textArea = new JTextArea(30, 50); JScrollPane scrollPane = new JScrollPane(textArea); JFrame frame = new JFrame("Test"); frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE); frame.add(scrollPane); frame.add(buttonsPanel, BorderLayout.SOUTH); frame.pack(); frame.setLocationRelativeTo(null); frame.setVisible(true); } private void startWriting() { stopWriting(); worker = new SwingWorker() { @Override protected Void doInBackground() throws Exception { while(!isCancelled()) { publish("Writing...\n"); } return null; } @Override protected void process(List chunks) { String string = chunks.get(chunks.size() - 1); textArea.append(string); } @Override protected void done() { textArea.append("Stopped!\n"); } }; worker.execute(); } private void stopWriting() { if(worker != null && !worker.isCancelled()) { worker.cancel(true); } } public static void main(String[] args) { SwingUtilities.invokeLater(new Runnable() { @Override public void run() { new Demo().createAndShowGui(); } }); } } 

КОРОТКИЙ ОТВЕТ:

Это происходит потому, что publish () не выполняет прямое планирование process , он устанавливает таймер, который будет запускать планирование блока process () в EDT после DELAY . Поэтому, когда работник отменен, все еще есть таймер, ожидающий запланировать процесс () с данными последнего опубликования. Причина использования таймера заключается в реализации оптимизации, когда один процесс может выполняться с объединенными данными нескольких изданий.

ДОЛГОЙ ОТВЕТ:

Давайте посмотрим, как publish () и cancel взаимодействуют друг с другом, для этого давайте погрузиться в какой-то исходный код.

Сначала легкая часть, cancel(true) :

 public final boolean cancel(boolean mayInterruptIfRunning) { return future.cancel(mayInterruptIfRunning); } 

Это отмена завершает вызов следующего кода:

 boolean innerCancel(boolean mayInterruptIfRunning) { for (;;) { int s = getState(); if (ranOrCancelled(s)) return false; if (compareAndSetState(s, CANCELLED)) // <----- break; } if (mayInterruptIfRunning) { Thread r = runner; if (r != null) r.interrupt(); // <----- } releaseShared(0); done(); // <----- return true; } 

Состояние SwingWorker установлено в CANCELLED , stream прерывается и done() , но это не работает SwingWorker, а future done (), которое указывается при создании экземпляра в конструкторе SwingWorker:

 future = new FutureTask(callable) { @Override protected void done() { doneEDT(); // <----- setState(StateValue.DONE); } }; 

И код doneEDT() :

 private void doneEDT() { Runnable doDone = new Runnable() { public void run() { done(); // <----- } }; if (SwingUtilities.isEventDispatchThread()) { doDone.run(); // <----- } else { doSubmit.add(doDone); } } 

Что вызывает прямое действие SwingWorkers done() если мы находимся в EDT, который является нашим делом. В этот момент SwingWorker должен остановиться, больше не нужно publish() , это достаточно просто продемонстрировать со следующей модификацией:

 while(!isCancelled()) { textArea.append("Calling publish\n"); publish("Writing...\n"); } 

Однако мы все еще получаем сообщение «Написание ...» от process (). Итак, давайте посмотрим, как называется процесс (). Исходный код для publish(...)

 protected final void publish(V... chunks) { synchronized (this) { if (doProcess == null) { doProcess = new AccumulativeRunnable() { @Override public void run(List args) { process(args); // <----- } @Override protected void submit() { doSubmit.add(this); // <----- } }; } } doProcess.add(chunks); // <----- } 

Мы видим, что run() Runnable doProcess - это тот, кто заканчивает вызов process(args) , но этот код просто вызывает doProcess.add(chunks) не doProcess.run() и есть doSubmit . Давайте посмотрим doProcess.add(chunks) .

 public final synchronized void add(T... args) { boolean isSubmitted = true; if (arguments == null) { isSubmitted = false; arguments = new ArrayList(); } Collections.addAll(arguments, args); // <----- if (!isSubmitted) { //This is what will make that for multiple publishes only one process is executed submit(); // <----- } } 

Итак, что publish() самом деле делает, добавляет куски в некоторые внутренние arguments ArrayList и вызывает submit() . Мы только что увидели, что представляют только вызовы doSubmit.add(this) , который является тем же самым методом add , поскольку оба doProcess и doSubmit расширяют AccumulativeRunnable , однако на этот раз вокруг V является Runnable вместо String как в doProcess . Таким образом, кусок - это runnable, который вызывает process(args) . Однако вызов submit() - это совершенно другой метод, определенный в classе doSubmit :

 private static class DoSubmitAccumulativeRunnable extends AccumulativeRunnable implements ActionListener { private final static int DELAY = (int) (1000 / 30); @Override protected void run(List args) { for (Runnable runnable : args) { runnable.run(); } } @Override protected void submit() { Timer timer = new Timer(DELAY, this); // <----- timer.setRepeats(false); timer.start(); } public void actionPerformed(ActionEvent event) { run(); // <----- } } 

Он создает таймер, который запускает actionPerformed код один раз после DELAY miliseconds. После того, как событие будет run() код будет помечен в EDT, который вызовет внутренний run() который заканчивается вызовом run(flush()) doProcess и, таким образом, выполняется process(chunk) , где chunk - это сброшенные данные arguments ArrayList. Я пропустил некоторые детали, цепочка вызовов «run» выглядит так:

  • doSubmit.run ()
  • doSubmit.run (flush ()) // Фактически цикл runnables, но будет иметь только один (*)
  • doProcess.run ()
  • doProcess.run (заподлицо ())
  • Процесс (fragment)

(*) isSubmited и flush() (который сбрасывает это логическое значение) делают так, что дополнительные вызовы для публикации не добавляют doProcess runnables для вызова в doSubmit.run (flush ()), однако их данные не игнорируются. Таким образом, выполняется один процесс для любого количества изданий, называемых в течение срока действия таймера.

В общем, то, что publish("Writing...") , является планирование вызова process(chunk) в EDT после DELAY. Это объясняет, почему даже после того, как мы отменили stream и больше не публикуется, все еще выполняется одно выполнение процесса, так как в тот момент, когда мы отменяем рабочего, существует (с большой вероятностью) таймер, который будет планировать process() после done() уже Запланированное.

Почему этот таймер используется вместо планирования процесса () в EDT с помощью invokeLater(doProcess) ? Чтобы реализовать оптимизацию производительности, описанную в документах :

Поскольку метод процесса вызывается асинхронно в Thread Dispatch Thread, при запуске метода процесса может возникнуть несколько вызовов метода публикации. Для целей производительности все эти вызовы объединены в один вызов с помощью конкатенированных аргументов. Например:

  publish("1"); publish("2", "3"); publish("4", "5", "6"); might result in: process("1", "2", "3", "4", "5", "6") 

Теперь мы знаем, что это работает, потому что все публикации, которые происходят в интервале DELAY, добавляют их args в эту внутреннюю переменную, мы видели arguments и process(chunk) будет выполняться со всеми этими данными за один раз.

ЭТО БЫСТРО? Временное решение?

Трудно сказать, если это ошибка или нет. Может быть, имеет смысл обрабатывать данные, которые опубликовал фоновый stream, поскольку работа действительно выполнена, и вам может быть интересно получить обновленный графический интерфейс с такой же информацией, как вы можете (если это то, что делает process() , например). И тогда, возможно, не имеет смысла, если done() требует, чтобы все обработанные данные и / или вызов process () после того, как done () создал несоответствия данных / GUI.

Существует очевидное обходное решение, если вы не хотите, чтобы какой-либо новый процесс () выполнялся после выполнения (), просто проверьте, отменен ли рабочий в методе process тоже!

 @Override protected void process(List chunks) { if (isCancelled()) return; String string = chunks.get(chunks.size() - 1); textArea.append(string); } 

Сложнее сделать make () выполнимым после последнего процесса (), например, сделать может просто использовать также таймер, который будет планировать фактическую работу done () после> DELAY. Хотя я не могу думать, что это будет обычным делом, поскольку, если вы отменили это, не должно быть важно пропустить еще один процесс (), когда мы знаем, что мы фактически отменяем исполнение всех будущих.

Прочитав превосходный ответ от DSquare, он пришел к выводу, что потребуется некоторое подclassификация, я придумал эту идею для всех, кто должен убедиться, что все опубликованные fragmentы были обработаны в EDT, прежде чем двигаться дальше.

NB Я попытался написать его на Java, а не на Jython (мой язык выбора и официально лучший язык в мире), но это немного сложно, потому что, например, publish является final , поэтому вам придется изобретать другой метод называть его, а также потому, что вы должны (зевать) параметризовать все с помощью дженериков в Java.

Этот код должен быть понятен любому человеку Java: просто чтобы помочь, с self.publication_counter.get() , это оценивается как False когда результат равен 0.

 # this is how you say Worker... is a subclass of SwingWorker in Python/Jython class WorkerAbleToWaitForPublicationToFinish( javax.swing.SwingWorker ): # __init__ is the constructor method in Python/Jython def __init__( self ): # we just add an "attribute" (here, publication_counter) to the object being created (self) to create a field of the new object self.publication_counter = java.util.concurrent.atomic.AtomicInteger() def await_processing_of_all_chunks( self ): while self.publication_counter.get(): time.sleep( 0.001 ) # fully functional override of the Java method def process( self, chunks ): for chunk in chunks: pass # DO SOMETHING WITH EACH CHUNK # decrement the counter by the number of chunks received # NB do this AFTER dealing with the chunks self.publication_counter.addAndGet( - len( chunks ) ) # fully functional override of the Java method def publish( self, *chunks ): # increment the counter by the number of chunks received # NB do this BEFORE publishing the chunks self.publication_counter.addAndGet( len( chunks )) self.super__publish( chunks ) 

Таким образом, в коде вызова вы добавляете что-то вроде:

  engine.update_xliff_task.get() engine.update_xliff_task.await_processing_of_all_chunks() 

PS использование while предложения, подобного этому (т. Е. Техника опроса), едва ли изящно. Я просмотрел доступные classы java.util.concurrent такие как CountDownLatch и Phaser (оба с методами блокировки streamов), но я не думаю, что это было бы подходящим для этой цели …

позже

Я был заинтересован в этом, чтобы настроить подходящий class параллелизма (написанный на Java, найденный на сайте Apache) под названием CounterLatch . Их версия останавливает stream при await() если достигается значение счетчика AtomicLong . Моя версия здесь позволяет вам либо сделать это, либо наоборот: сказать «подождать, пока счетчик не достигнет определенного значения, прежде чем поднимать защелку»:

NB использование AtomicLong для signal и AtomicBoolean для released : потому что в исходной Java они используют ключевое слово volatile . Я думаю, что использование атомных classов достигнет той же цели.

 class CounterLatch(): def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ): self.count = java.util.concurrent.atomic.AtomicLong( initial ) self.signal = java.util.concurrent.atomic.AtomicLong( wait_value ) class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ): def tryAcquireShared( sync_self, arg ): if lift_on_reached: return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1 else: return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1 def tryReleaseShared( self, args ): return True self.sync = Sync() self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False def await( self, *args ): if args: assert len( args ) == 2 assert type( args[ 0 ] ) is int timeout = args[ 0 ] assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit unit = args[ 1 ] return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)) else: self.sync.acquireSharedInterruptibly( 1 ) def count_relative( self, n ): previous = self.count.addAndGet( n ) if previous == self.signal.get(): self.sync.releaseShared( 0 ) return previous 

Итак, теперь мой код выглядит так:

В конструкторе SwingWorker:

 self.publication_counter_latch = CounterLatch() 

В SW.publish:

 self.publication_counter_latch.count_relative( len( chunks ) ) self.super__publish( chunks ) 

В streamе, ожидающем завершения обработки fragmentа:

 worker.publication_counter_latch.await() 
  • java swingworker thread для обновления основного Gui
  • Как заставить пример SwingWorker работать правильно?
  • Проблемы с добавлением JTextArea
  • Ожидание нескольких SwingWorkers
  • Не удается получить ArrayIndexOutOfBoundsException из будущего И SwingWorker, если stream запущен. Исполнитель
  • Как отменить выполнение SwingWorker?
  • SwingWorker в Java
  • SwingWorker в другом методе SwingWorker
  • SwingWorker не отвечает
  • SwingWorker не обновляет JProgressBar без Thread.sleep () в пользовательской диалоговой панели
  • Может ли индикатор выполнения работать в classе за пределами основного?
  • Давайте будем гением компьютера.