Как заставить ThreadPoolExecutor увеличить streamи до max перед очередью?

Некоторое время я был разочарован поведением ThreadPoolExecutor по умолчанию, которое поддерживает ExecutorService streamов ExecutorService которые многие из нас используют. Чтобы процитировать Javadocs:

Если существует больше, чем corePoolSize, но меньше streamов MaximumPoolSize, новый stream будет создан только в том случае, если очередь заполнена .

Это означает, что если вы определите пул streamов со следующим кодом, он никогда не запустит второй stream, потому что LinkedBlockingQueue неограничен.

 ExecutorService threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/, TimeUnit.SECONDS, new LinkedBlockingQueue(/* unlimited queue */)); 

Только если у вас есть ограниченная очередь, и очередь заполнена , все streamи над начальным номером запускаются. Я подозреваю, что большое количество младших программистов, многопоточных Java, не знают об этом поведении ThreadPoolExecutor .

Теперь у меня есть конкретный вариант использования, когда это не оптимально. Я ищу способы, не написав свой собственный class TPE, чтобы обойти это.

Мои требования касаются веб-службы, которая делает обратные вызовы для возможной ненадежной третьей стороны.

  • Я не хочу делать обратный звонок синхронно с веб-запросом, поэтому я хочу использовать пул streamов.
  • Я обычно получаю пару из них в минуту, поэтому я не хочу иметь newFixedThreadPool(...) с большим количеством streamов, которые в основном неактивны.
  • Каждый раз я получаю всплеск этого трафика, и я хочу увеличить количество streamов до некоторого максимального значения (скажем, 50).
  • Мне нужно сделать все возможное, чтобы сделать все обратные вызовы, поэтому я хочу поставить в очередь любые дополнительные из них выше 50. Я не хочу подавлять остальную часть моего веб-сервера с помощью newCachedThreadPool() .

Как я могу обойти это ограничение в ThreadPoolExecutor где очередь должна быть ограничена и заполнена до того, как начнется больше streamов? Как я могу заставить его запускать больше streamов перед задачами в очереди?

Редактировать:

@Flavio делает хороший вывод об использовании ThreadPoolExecutor.allowCoreThreadTimeOut(true) чтобы иметь тайм-аут ядра и выйти из него. Я подумал об этом, но мне все еще нужна функция core-threads. Я не хотел, чтобы количество streamов в пуле упало ниже размера ядра, если это возможно.

    Как я могу обойти это ограничение в ThreadPoolExecutor где очередь должна быть ограничена и полна, прежде чем запускается больше streamов.

    По-моему, я наконец нашел несколько элегантное (возможно, немного взломанное) решение этого ограничения с помощью ThreadPoolExecutor . Это связано с расширением LinkedBlockingQueue чтобы он возвращал false для queue.offer(...) когда уже queue.offer(...) некоторые задачи. Если текущие streamи не справляются с поставленными задачами, TPE добавит дополнительные streamи. Если пул уже находится в максимальных streamах, тогда вызывается RejectedExecutionHandler . Это обработчик, который затем put(...) в очередь.

    Конечно, странно писать очередь, где offer(...) может возвращать false и put() никогда не блокирует, так что это часть взлома. Но это хорошо работает с использованием TPE очереди, поэтому я не вижу никаких проблем с этим.

    Вот код:

     // extend LinkedBlockingQueue to force offer() to return false conditionally BlockingQueue queue = new LinkedBlockingQueue() { private static final long serialVersionUID = -6903933921423432194L; @Override public boolean offer(Runnable e) { /* * Offer it to the queue if there is 0 items already queued, else * return false so the TPE will add another thread. If we return false * and max threads have been reached then the RejectedExecutionHandler * will be called which will do the put into the queue. */ if (size() == 0) { return super.offer(e); } else { return false; } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*secs*/, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { /* * This does the actual put into the queue. Once the max threads * have been reached, the tasks will then queue up. */ executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } }); 

    С помощью этого механизма, когда я отправляю задания в очередь, ThreadPoolExecutor будет:

    1. Сначала измените количество streamов до размера ядра (здесь 1).
    2. Предложите его в очередь. Если очередь пуста, она будет поставлена ​​в очередь для обработки существующими streamами.
    3. Если в очереди уже есть 1 или более элементов, offer(...) вернет false.
    4. Если возвращается false, увеличьте количество streamов в пуле, пока они не достигнут максимального числа (здесь 50).
    5. Если в макс. То он вызывает RejectedExecutionHandler
    6. Затем RejectedExecutionHandler помещает задачу в очередь, обрабатываемую первым доступным streamом в порядке FIFO.

    Хотя в моем примере код выше, очередь не ограничена, вы также можете определить ее как ограниченную очередь. Например, если вы добавите пропускную способность 1000 к LinkedBlockingQueue то она будет:

    1. масштабировать streamи до макс.
    2. затем перейдите в очередь до 1000 задач
    3. затем заблокируйте вызывающего абонента, пока пространство не станет доступным для очереди.

    Кроме того, если вам действительно нужно использовать offer(...) в RejectedExecutionHandler вы можете использовать метод offer(E, long, TimeUnit) вместо Long.MAX_VALUE в качестве таймаута.

    Редактировать:

    Я изменил метод моего offer(...) переопределить для обратной связи Ральфа. Это только увеличит количество streamов в пуле, если они не будут поддерживать нагрузку.

    Редактировать:

    Другой настройкой этого ответа может быть фактически спросить TPE, если есть незанятые streamи, и только оковать элемент, если это так. Вам нужно будет сделать настоящий class для этого и добавить ourQueue.setThreadPoolExecutor(tpe); метод на нем.

    Тогда ваш метод offer(...) может выглядеть примерно так:

    1. Проверьте, есть ли tpe.getPoolSize() == tpe.getMaximumPoolSize() в этом случае просто вызовите super.offer(...) .
    2. tpe.getPoolSize() > tpe.getActiveCount() если tpe.getPoolSize() > tpe.getActiveCount() то вызовите super.offer(...) так как кажется, что это неработающие streamи.
    3. В противном случае верните false чтобы разветвить другой stream.

    Может быть, это:

     int poolSize = tpe.getPoolSize(); int maximumPoolSize = tpe.getMaximumPoolSize(); if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) { return super.offer(e); } else { return false; } 

    Обратите внимание, что методы get на TPE являются дорогими, поскольку они получают доступ к volatile полям или (в случае getActiveCount() ) блокируют TPE и getActiveCount() список нитей. Кроме того, здесь есть условия гонки, которые могут привести к неправильной постановке задачи или другой вилке, когда был простаивающий stream.

    Задайте размер ядра и максимальный размер с тем же значением и разрешите удаление основных streamов из пула с помощью allowCoreThreadTimeOut(true) .

    У меня уже есть два других ответа на этот вопрос, но я подозреваю, что это лучший.

    Он основан на методе принятого в настоящее время ответа , а именно:

    1. Переопределите метод offer() метода queue для (иногда) возврата false,
    2. что заставляет ThreadPoolExecutor либо создавать новый stream, либо отклонять задачу, и
    3. установите RejectedExecutionHandler на фактическую очередь задачи при отказе.

    Проблема заключается в том, что offer() должно возвращать false. Принимаемый в настоящее время ответ возвращает false, когда на очереди есть несколько задач, но, как я указал в моем комментарии, это вызывает нежелательные последствия. Альтернативно, если вы всегда возвращаете false, вы будете продолжать создавать новые streamи, даже если в очереди ждут streamи.

    Решение – использовать Java 7 LinkedTransferQueue и offer() call tryTransfer() . Когда есть потребительский stream ожидания, задача просто будет передана этому streamу. В противном случае offer() вернет false, и ThreadPoolExecutor создаст новый stream.

      BlockingQueue queue = new LinkedTransferQueue() { @Override public boolean offer(Runnable e) { return tryTransfer(e); } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); 

    Примечание. Теперь я предпочитаю и рекомендую другой ответ .

    Вот версия, которая мне кажется гораздо более простой: увеличьте corePoolSize (вплоть до предела MaximumPoolSize) всякий раз, когда выполняется новая задача, а затем уменьшите corePoolSize (вплоть до предела указанного пользователем «размер пула ядра») всякий раз, когда задача завершена.

    Другими словами, отслеживайте количество запущенных или запущенных задач и убедитесь, что corePoolSize равен числу задач, если он находится между указанным пользователем «размером пула пула» и максимальным размером пакета.

     public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor { private int userSpecifiedCorePoolSize; private int taskCount; public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); userSpecifiedCorePoolSize = corePoolSize; } @Override public void execute(Runnable runnable) { synchronized (this) { taskCount++; setCorePoolSizeToTaskCountWithinBounds(); } super.execute(runnable); } @Override protected void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable); synchronized (this) { taskCount--; setCorePoolSizeToTaskCountWithinBounds(); } } private void setCorePoolSizeToTaskCountWithinBounds() { int threads = taskCount; if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize; if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize(); setCorePoolSize(threads); } } 

    Как написано, class не поддерживает изменение указанного пользователем corePoolSize или maximumPoolSize после построения и не поддерживает управление рабочей очередью напрямую или через remove() или purge() .

    У нас есть подclass ThreadPoolExecutor который выполняет дополнительные ThreadPoolExecutor и overrides.

     public void execute(Runnable command) { super.execute(command); final int poolSize = getPoolSize(); if (poolSize < getMaximumPoolSize()) { if (getQueue().size() > creationThreshold) { synchronized (this) { setCorePoolSize(poolSize + 1); setCorePoolSize(poolSize); } } } } 

    возможно, это тоже помогает, но, к сожалению, ваши взгляды более вычурны …

    Рекомендуемый ответ разрешает только один (1) проблемы с пулом streamов JDK:

    1. Пулы streamов JDK смещены в сторону очередей. Поэтому вместо того, чтобы создавать новый stream, они будут ставить в очередь задачу. Только если очередь достигает своего предела, пул streamов создаст новый stream.

    2. Понижение нагрузки не происходит, когда нагрузка ослабевает. Например, если у нас есть пакет заданий, попадающих в пул, который приводит к тому, что пул переходит на максимальный уровень, за которым следует легкая загрузка максимум 2 заданий за раз, пул будет использовать все streamи для обслуживания легкой нагрузки, предотвращающей выход из streamа. (потребуется только 2 streamа …)

    Недовольный поведением выше, я пошел вперед и реализовал пул, чтобы преодолеть недостатки выше.

    Для решения проблемы 2) Использование Lifo scheduling решает проблему. Эта идея была представлена ​​Бен Маурером на конференции ACM Application 2015: Systems @ Facebook

    Так родилась новая реализация:

    LifoThreadPoolExecutorSQP

    Пока эта реализация улучшает выполнение асинхронного исполнения для ZEL .

    Реализация является функцией спина, позволяющей снизить накладные расходы на коммутаторе, что обеспечивает превосходную производительность для определенных случаев использования.

    Надеюсь, поможет…

    PS: JDK Fork Join Pool реализует ExecutorService и работает как «обычный» пул streamов, реализация выполняется, он использует планирование streamов LIFO, однако нет контроля над размером внутренней очереди, тайм-аутом выхода на пенсию …

    Примечание. Теперь я предпочитаю и рекомендую другой ответ .

    У меня есть другое предложение, следуя первоначальной идее изменения очереди, чтобы вернуть false. В этом случае все задачи могут входить в очередь, но всякий раз, когда задача выставляется в очередь после execute() , мы выполняем ее с помощью задачи no-op sentinel, которую очередь отклоняет, вызывая появление нового streamа, который будет выполнять no-op за которым сразу следует что-то из очереди.

    Поскольку рабочие streamи могут LinkedBlockingQueue для новой задачи, возможно, что задача может попасть в очередь, даже если есть ansible stream. Чтобы избежать появления новых streamов, даже когда есть streamи, нам нужно отслеживать, сколько streamов ожидает новых задач в очереди, и только порождать новый stream, когда в очереди больше задач, чем ожидающих streamов.

     final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } }; final AtomicInteger waitingThreads = new AtomicInteger(0); BlockingQueue queue = new LinkedBlockingQueue() { @Override public boolean offer(Runnable e) { // offer returning false will cause the executor to spawn a new thread if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get(); else return super.offer(e); } @Override public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { try { waitingThreads.incrementAndGet(); return super.poll(timeout, unit); } finally { waitingThreads.decrementAndGet(); } } @Override public Runnable take() throws InterruptedException { try { waitingThreads.incrementAndGet(); return super.take(); } finally { waitingThreads.decrementAndGet(); } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) { @Override public void execute(Runnable command) { super.execute(command); if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP); } }; threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r == SENTINEL_NO_OP) return; else throw new RejectedExecutionException(); } }); 

    Лучшее решение, о котором я могу думать, это расширить.

    ThreadPoolExecutor предлагает несколько методов hook: beforeExecute и afterExecute . В своем расширении вы можете поддерживать ограниченную очередь для подачи задач и вторую неограниченную очередь для обработки переполнения. Когда кто-то вызывает submit , вы можете попытаться поместить запрос в ограниченную очередь. Если вы встретили исключение, вы просто ставите задачу в очередь переполнения. Затем вы можете использовать hook afterExecute чтобы увидеть, есть ли что-либо в очереди переполнения после завершения задачи. Таким образом, исполнитель сначала позаботится о вещах в своей ограниченной очереди и автоматически вытащит из этой неограниченной очереди, как позволяет время.

    Похоже, что больше работы, чем ваше решение, но, по крайней мере, это не связано с непредвиденным поведением очередей. Я также думаю, что есть лучший способ проверить статус очереди и streamов, а не полагаться на исключения, которые довольно медленно бросать.

    Давайте будем гением компьютера.