Рубрики
Без рубрики

Руководство по RejectedExecutionHandler

Узнайте о том, что происходит, когда пул потоков не может принимать больше задач, и как контролировать это, применяя политики насыщения

Автор оригинала: Ali Dehghani.

1. Обзор

Фреймворк исполнителя в Java – это попытка отделить отправку задачи от выполнения задачи. Хотя этот подход очень хорошо абстрагирует детали выполнения задачи, иногда нам все равно нужно настроить его для еще более оптимального выполнения.

В этом уроке мы увидим, что происходит, когда пул потоков не может принимать больше задач. Затем мы узнаем, как контролировать этот угловой случай, применяя соответствующие политики насыщения.

2. Повторный просмотр пулов потоков

На следующей диаграмме показано, как служба исполнителя работает внутри компании:

Вот что происходит когда мы отправляем новую задачу исполнителю :

  1. Если один из потоков доступен, он обрабатывает задачу.
  2. В противном случае исполнитель добавляет новую задачу в свою очередь.
  3. Когда поток завершает текущую задачу, он выбирает другую из очереди.

2.1. ThreadPoolExecutor

Большинство реализаций исполнителей используют хорошо известный ThreadPoolExecutor в качестве базовой реализации. Поэтому, чтобы лучше понять, как работает очередь задач, мы должны более подробно рассмотреть ее конструктор:

public ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue workQueue,
  RejectedExecutionHandler handler
)

2.2. Размер Основного Пула

Параметр corePoolSize определяет начальный размер пула потоков. Обычно исполнитель следит за тем, чтобы пул потоков содержал как минимум corePoolSize количество потоков.

Однако можно иметь меньше потоков, если мы включим параметр allowCoreThreadTimeOut .

2.3. Максимальный Размер Бассейна

Предположим, что все основные потоки заняты выполнением нескольких задач. В результате исполнитель ставит новые задачи в очередь до тех пор, пока они не получат возможность быть обработанными позже.

Когда эта очередь заполнится, исполнитель может добавить больше потоков в пул потоков. | maximumPoolSize устанавливает верхнюю границу количества потоков, которые потенциально может содержать пул потоков.

Когда эти потоки остаются простаивающими в течение некоторого времени, исполнитель может удалить их из пула. Следовательно, размер пула может уменьшиться до его основного размера.

2.4. Организация очередей

Как мы видели ранее, когда все основные потоки заняты, исполнитель добавляет новые задачи в очередь. Существует три различных подхода к организации очередей :

  • Неограниченная очередь : Очередь может содержать неограниченное количество задач. Поскольку эта очередь никогда не заполняется, исполнитель игнорирует максимальный размер. Исполнители fixed size и single thread используют этот подход.
  • Ограниченная очередь : Как следует из названия, очередь может содержать только ограниченное количество задач. В результате пул потоков будет расти при заполнении ограниченной очереди.
  • Синхронная передача : Удивительно, но эта очередь не может содержать никаких задач! При таком подходе мы можем поставить задачу в очередь тогда и только тогда, когда есть другой поток, выбирающий ту же задачу на другой стороне в то же время . Исполнитель кэшированного пула потоков использует этот подход внутренне.

Давайте предположим следующий сценарий, когда мы используем либо ограниченную очередь, либо синхронную передачу:

  • Все основные потоки заняты
  • Внутренняя очередь заполняется
  • Пул потоков увеличивается до максимально возможного размера, и все эти потоки также заняты

Что происходит, когда появляется новая задача?

3. Политика насыщения

Когда все потоки заняты, и внутренняя очередь заполняется, исполнитель становится насыщенным.

Исполнители могут выполнять предопределенные действия, как только они достигнут насыщения. Эти действия известны как Политики насыщения. Мы можем изменить политику насыщения исполнителя, передав экземпляр RejectedExecutionHandler его конструктору.

К счастью, Java предоставляет несколько встроенных реализаций для этого класса, каждая из которых охватывает конкретный случай использования. В следующих разделах мы подробно рассмотрим эти политики.

3.1. Политика отмены

Политика по умолчанию-это политика прерывания . Политика прерывания вызывает у исполнителя исключение RejectedExecutionException :

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new SynchronousQueue<>(), 
  new ThreadPoolExecutor.AbortPolicy());

executor.execute(() -> waitFor(250));

assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected")))
  .isInstanceOf(RejectedExecutionException.class);

Поскольку выполнение первой задачи занимает много времени, исполнитель отклоняет вторую задачу.

3.2. Политика работы с вызывающими абонентами

Вместо асинхронного выполнения задачи в другом потоке эта политика заставляет вызывающий поток выполнять задачу :

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new SynchronousQueue<>(), 
  new ThreadPoolExecutor.CallerRunsPolicy());

executor.execute(() -> waitFor(250));

long startTime = System.currentTimeMillis();
executor.execute(() -> waitFor(500));
long blockedDuration = System.currentTimeMillis() - startTime;

assertThat(blockedDuration).isGreaterThanOrEqualTo(500);

После отправки первой задачи исполнитель больше не может принимать новые задачи. Поэтому вызывающий поток блокируется до тех пор, пока не вернется вторая задача.

Политика caller-runs позволяет легко реализовать простую форму регулирования . То есть медленный потребитель может замедлить быстрого производителя, чтобы контролировать поток отправки задач.

3.3. Политика отмены

Политика отбрасывания автоматически отбрасывает новую задачу, когда она не может ее отправить :

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
  new SynchronousQueue<>(), 
  new ThreadPoolExecutor.DiscardPolicy());

executor.execute(() -> waitFor(100));

BlockingQueue queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("Discarded Result"));

assertThat(queue.poll(200, MILLISECONDS)).isNull();

Здесь вторая задача публикует простое сообщение в очередь. Поскольку он никогда не получает возможности выполнить, очередь остается пустой, даже если мы блокируем ее в течение некоторого времени.

3.4. Отказ от Самой Старой Политики

discardoldestpolicy сначала удаляет задачу из головы очереди, а затем повторно отправляет новую задачу :

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new ArrayBlockingQueue<>(2), 
  new ThreadPoolExecutor.DiscardOldestPolicy());

executor.execute(() -> waitFor(100));

BlockingQueue queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);

List results = new ArrayList<>();
queue.drainTo(results);

assertThat(results).containsExactlyInAnyOrder("Second", "Third");

На этот раз мы используем ограниченную очередь, которая может содержать только две задачи. Вот что происходит, когда мы отправляем эти четыре задачи:

  • Первые задачи захватывают один поток в течение 100 миллисекунд
  • Исполнитель успешно ставит в очередь вторую и третью задачи
  • Когда появляется четвертая задача, discardoldestpolicy удаляет самую старую задачу, чтобы освободить место для этой новой

Очереди discardoldestpolicy и priority плохо сочетаются друг с другом. Поскольку глава приоритетной очереди имеет самый высокий приоритет, мы можем просто потерять самую важную задачу .

3.5. Пользовательская политика

Также можно предоставить пользовательскую политику насыщения, просто реализовав интерфейс RejectedExecutionHandler :

class GrowPolicy implements RejectedExecutionHandler {

    private final Lock lock = new ReentrantLock();

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        lock.lock();
        try {
            executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 1);
        } finally {
            lock.unlock();
        }

        executor.submit(r);
    }
}

В этом примере, когда исполнитель становится насыщенным, мы увеличиваем максимальный размер пула на единицу, а затем повторно отправляем ту же задачу:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new ArrayBlockingQueue<>(2), 
  new GrowPolicy());

executor.execute(() -> waitFor(100));

BlockingQueue queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);

List results = new ArrayList<>();
queue.drainTo(results);

assertThat(results).contains("First", "Second", "Third");

Как и ожидалось, все четыре задачи выполнены.

3.6. Выключение

В дополнение к перегруженным исполнителям политики насыщения также применяются ко всем исполнителям, которые были закрыты :

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.shutdownNow();

assertThatThrownBy(() -> executor.execute(() -> {}))
  .isInstanceOf(RejectedExecutionException.class);

То же самое верно для всех исполнителей, которые находятся в середине завершения работы:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.execute(() -> waitFor(100));
executor.shutdown();

assertThatThrownBy(() -> executor.execute(() -> {}))
  .isInstanceOf(RejectedExecutionException.class);

4. Заключение

В этом уроке, во-первых, мы достаточно быстро обновили информацию о пулах потоков в Java. Затем, представив насыщенных исполнителей, мы узнали, как и когда применять различные политики насыщения.

Как обычно, пример кода доступен на GitHub .