1. Обзор
Платформа ExecutorService упрощает обработку задач в нескольких потоках. Мы приведем примеры некоторых сценариев, в которых мы ждем, пока потоки завершат свое выполнение.
Кроме того, мы покажем, как изящно завершить работу ExecutorService и дождаться завершения выполнения уже запущенных потоков.
2. После Завершения работы Исполнителя
При использовании исполнителя мы можем отключить его, вызвав методы shutdown () или shutdownNow () . Хотя он не будет ждать, пока все потоки перестанут выполняться.
Ожидание завершения выполнения существующих потоков может быть достигнуто с помощью Ожидание() метод.
Это блокирует поток до тех пор, пока все задачи не завершат свое выполнение или не будет достигнут указанный тайм-аут:
public void awaitTerminationAfterShutdown(ExecutorService threadPool) { threadPool.shutdown(); try { if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { threadPool.shutdownNow(); } } catch (InterruptedException ex) { threadPool.shutdownNow(); Thread.currentThread().interrupt(); } }
3. Использование обратного отсчета
Далее давайте рассмотрим другой подход к решению этой проблемы – использование CountDownLatch для сигнала о завершении задачи.
Мы можем инициализировать его значением, представляющим количество раз, когда он может быть уменьшен, прежде чем все потоки, вызвавшие метод await () , будут уведомлены.
Например, если нам нужно, чтобы текущий поток ждал завершения выполнения другого потока N , мы можем инициализировать защелку с помощью N :
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10); CountDownLatch latch = new CountDownLatch(2); for (int i = 0; i < 2; i++) { WORKER_THREAD_POOL.submit(() -> { try { // ... latch.countDown(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } // wait for the latch to be decremented by the two remaining threads latch.await();
4. Использование invokeAll()
Первый подход, который мы можем использовать для запуска потоков, – это метод invokeAll () . Метод возвращает список Будущих объектов после завершения всех задач или истечения времени ожидания .
Кроме того, мы должны отметить, что порядок возвращаемых Будущих объектов совпадает со списком предоставленных Вызываемых объектов:
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10); List> callables = Arrays.asList( new DelayedCallable("fast thread", 100), new DelayedCallable("slow thread", 3000)); long startProcessingTime = System.currentTimeMillis(); List > futures = WORKER_THREAD_POOL.invokeAll(callables); awaitTerminationAfterShutdown(WORKER_THREAD_POOL); long totalProcessingTime = System.currentTimeMillis() - startProcessingTime; assertTrue(totalProcessingTime >= 3000); String firstThreadResponse = futures.get(0).get(); assertTrue("fast thread".equals(firstThreadResponse)); String secondThreadResponse = futures.get(1).get(); assertTrue("slow thread".equals(secondThreadResponse));
5. Использование ExecutorCompletionService
Другой подход к запуску нескольких потоков заключается в использовании ExecutorCompletionService. Он использует предоставленный ExecutorService для выполнения задач.
Одним из отличий от invokeAll() является порядок, в котором возвращаются фьючерсы, представляющие выполненные задачи. ExecutorCompletionService использует очередь для хранения результатов в порядке их завершения , в то время как invokeAll() возвращает список, имеющий тот же последовательный порядок, что и созданный итератором для данного списка задач:
CompletionServiceservice = new ExecutorCompletionService<>(WORKER_THREAD_POOL); List > callables = Arrays.asList( new DelayedCallable("fast thread", 100), new DelayedCallable("slow thread", 3000)); for (Callable callable : callables) { service.submit(callable); }
Доступ к результатам можно получить с помощью метода take() :
long startProcessingTime = System.currentTimeMillis(); Futurefuture = service.take(); String firstThreadResponse = future.get(); long totalProcessingTime = System.currentTimeMillis() - startProcessingTime; assertTrue("First response should be from the fast thread", "fast thread".equals(firstThreadResponse)); assertTrue(totalProcessingTime >= 100 && totalProcessingTime < 1000); LOG.debug("Thread finished after: " + totalProcessingTime + " milliseconds"); future = service.take(); String secondThreadResponse = future.get(); totalProcessingTime = System.currentTimeMillis() - startProcessingTime; assertTrue( "Last response should be from the slow thread", "slow thread".equals(secondThreadResponse)); assertTrue( totalProcessingTime >= 3000 && totalProcessingTime < 4000); LOG.debug("Thread finished after: " + totalProcessingTime + " milliseconds"); awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
6. Заключение
В зависимости от варианта использования у нас есть различные варианты ожидания завершения выполнения потоков.
A Обратный отсчет полезно, когда нам нужен механизм для уведомления одного или нескольких потоков о завершении набора операций, выполняемых другими потоками.
ExecutorCompletionService это полезно, когда нам нужно как можно скорее получить доступ к результату задачи и другим подходам, когда мы хотим дождаться завершения всех запущенных задач.
Исходный код статьи доступен на GitHub .