Рубрики
Без рубрики

ExecutorService – Ожидание завершения потоков

Узнайте, как использовать ExecutorService в различных сценариях для ожидания завершения выполнения потоков.

Автор оригинала: baeldung.

1. Обзор

Платформа ExecutorService упрощает обработку задач в нескольких потоках. Мы приведем примеры некоторых сценариев, в которых мы ждем, пока потоки завершат свое выполнение.

Кроме того, мы покажем, как изящно завершить работу ExecutorService и дождаться завершения выполнения уже запущенных потоков.

2. После Завершения работы Исполнителя

При использовании исполнителя мы можем отключить его, вызвав методы shutdown () или shutdownNow () . Хотя он не будет ждать, пока все потоки перестанут выполняться.

Ожидание завершения выполнения существующих потоков может быть достигнуто с помощью Ожидание() метод.

Это блокирует поток до тех пор, пока все задачи не завершат свое выполнение или не будет достигнут указанный тайм-аут:

public void awaitTerminationAfterShutdown(ExecutorService threadPool) {
    threadPool.shutdown();
    try {
        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
            threadPool.shutdownNow();
        }
    } catch (InterruptedException ex) {
        threadPool.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

3. Использование обратного отсчета

Далее давайте рассмотрим другой подход к решению этой проблемы – использование CountDownLatch для сигнала о завершении задачи.

Мы можем инициализировать его значением, представляющим количество раз, когда он может быть уменьшен, прежде чем все потоки, вызвавшие метод await () , будут уведомлены.

Например, если нам нужно, чтобы текущий поток ждал завершения выполнения другого потока N , мы можем инициализировать защелку с помощью N :

ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // ...
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

4. Использование invokeAll()

Первый подход, который мы можем использовать для запуска потоков, – это метод invokeAll () . Метод возвращает список Будущих объектов после завершения всех задач или истечения времени ожидания .

Кроме того, мы должны отметить, что порядок возвращаемых Будущих объектов совпадает со списком предоставленных Вызываемых объектов:

ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);

List> callables = Arrays.asList(
  new DelayedCallable("fast thread", 100), 
  new DelayedCallable("slow thread", 3000));

long startProcessingTime = System.currentTimeMillis();
List> futures = WORKER_THREAD_POOL.invokeAll(callables);

awaitTerminationAfterShutdown(WORKER_THREAD_POOL);

long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
 
assertTrue(totalProcessingTime >= 3000);

String firstThreadResponse = futures.get(0).get();
 
assertTrue("fast thread".equals(firstThreadResponse));

String secondThreadResponse = futures.get(1).get();
assertTrue("slow thread".equals(secondThreadResponse));

5. Использование ExecutorCompletionService

Другой подход к запуску нескольких потоков заключается в использовании ExecutorCompletionService. Он использует предоставленный ExecutorService для выполнения задач.

Одним из отличий от invokeAll() является порядок, в котором возвращаются фьючерсы, представляющие выполненные задачи. ExecutorCompletionService использует очередь для хранения результатов в порядке их завершения , в то время как invokeAll() возвращает список, имеющий тот же последовательный порядок, что и созданный итератором для данного списка задач:

CompletionService service
  = new ExecutorCompletionService<>(WORKER_THREAD_POOL);

List> callables = Arrays.asList(
  new DelayedCallable("fast thread", 100), 
  new DelayedCallable("slow thread", 3000));

for (Callable callable : callables) {
    service.submit(callable);
}

Доступ к результатам можно получить с помощью метода take() :

long startProcessingTime = System.currentTimeMillis();

Future future = service.take();
String firstThreadResponse = future.get();
long totalProcessingTime
  = System.currentTimeMillis() - startProcessingTime;

assertTrue("First response should be from the fast thread", 
  "fast thread".equals(firstThreadResponse));
assertTrue(totalProcessingTime >= 100
  && totalProcessingTime < 1000);
LOG.debug("Thread finished after: " + totalProcessingTime
  + " milliseconds");

future = service.take();
String secondThreadResponse = future.get();
totalProcessingTime
  = System.currentTimeMillis() - startProcessingTime;

assertTrue(
  "Last response should be from the slow thread", 
  "slow thread".equals(secondThreadResponse));
assertTrue(
  totalProcessingTime >= 3000
  && totalProcessingTime < 4000);
LOG.debug("Thread finished after: " + totalProcessingTime
  + " milliseconds");

awaitTerminationAfterShutdown(WORKER_THREAD_POOL);

6. Заключение

В зависимости от варианта использования у нас есть различные варианты ожидания завершения выполнения потоков.

A Обратный отсчет полезно, когда нам нужен механизм для уведомления одного или нескольких потоков о завершении набора операций, выполняемых другими потоками.

ExecutorCompletionService это полезно, когда нам нужно как можно скорее получить доступ к результату задачи и другим подходам, когда мы хотим дождаться завершения всех запущенных задач.

Исходный код статьи доступен на GitHub .