Рубрики
Без рубрики

Пользовательские Пулы Потоков В Параллельных Потоках Java 8

Краткое введение в пользовательские пулы потоков и их использование в параллельных потоках Java 8.

Автор оригинала: baeldung.

1. Обзор

Java 8 представила концепцию S treams как эффективный способ выполнения массовых операций с данными. И параллельные Потоки могут быть получены в средах, поддерживающих параллелизм.

Эти потоки могут обеспечить повышенную производительность – за счет многопоточных накладных расходов.

В этом кратком руководстве мы рассмотрим одно из самых больших ограничений Stream API и посмотрим, как заставить параллельный поток работать с пользовательским потоковым экземпляром, в качестве альтернативы – есть библиотека, которая обрабатывает это .

2. Параллельный Поток

Давайте начнем с простого примера – вызов метода parallelStream для любого из типов Коллекции , который вернет, возможно, параллельный Поток :

@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
    List aList = new ArrayList<>();
    Stream parallelStream = aList.parallelStream();
        
    assertTrue(parallelStream.isParallel());
}

Обработка по умолчанию, выполняемая в таком потоке , использует ForkJoinPool.commonPool (), пул потоков, общий для всего приложения.

3. Пользовательский Пул Потоков

Мы действительно можем передать обычай Пул потоков при обработке течение .

В следующем примере можно использовать параллельный поток , используя пользовательский Пул потоков для вычисления суммы длинных значений от 1 до 1 000 000 включительно:

@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() 
  throws InterruptedException, ExecutionException {
    
    long firstNum = 1;
    long lastNum = 1_000_000;

    List aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
      .collect(Collectors.toList());

    ForkJoinPool customThreadPool = new ForkJoinPool(4);
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
 
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}

Мы использовали конструктор ForkJoinPool с уровнем параллелизма 4. Для определения оптимального значения для различных сред требуется некоторое экспериментирование, но хорошим эмпирическим правилом является простой выбор числа, основанного на количестве ядер вашего процессора.

Далее мы обработали содержимое параллельного Потока , суммировав их в вызове reduce .

Этот простой пример может не продемонстрировать полную полезность использования пользовательского пула потоков, но преимущества становятся очевидными в ситуациях, когда мы не хотим связывать общий пул потоков с длительными задачами, такими как обработка данных из сетевого источника, или общий пул потоков используется другими компонентами приложения.

Если мы запустим описанный выше метод тестирования, он пройдет. Пока все идет хорошо.

Однако, если мы создадим экземпляр класса ForkJoinPool в обычном методе таким же образом, как и в тестовом методе, это может привести к ошибке OutOfMemoryError .

Далее давайте подробнее рассмотрим причину утечки памяти.

4. Остерегайтесь утечки памяти

Как мы уже говорили ранее, общий пул потоков по умолчанию используется всем приложением. Общий пул потоков-это статический Пул потоков экземпляр.

Таким образом, при использовании пула потоков по умолчанию утечки памяти не происходит.

Теперь давайте рассмотрим наш метод тестирования. В методе тестирования мы создали объект ForkJoinPool. Когда метод тестирования будет завершен, объект customThreadPool не будет разыменован и собран мусор — вместо этого он будет ждать назначения новых задач .

То есть каждый раз, когда мы вызываем метод тестирования, будет создаваться новый пользовательский пул потоков объект, который не будет выпущен.

Решение проблемы довольно простое: выключите пользовательский пул потоков объект после выполнения метода:

try {
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
    customThreadPool.shutdown();
}

5. Заключение

Мы кратко рассмотрели, как запустить параллельный поток с помощью пользовательского пула потоков . В правильной среде и при правильном использовании уровня параллелизма в определенных ситуациях можно добиться повышения производительности.

Если мы создадим пользовательский Пул потоков , мы должны иметь в виду, чтобы вызвать его метод shutdown () , чтобы избежать утечки памяти.

Полные примеры кода, на которые ссылается эта статья, можно найти на GitHub .