Рубрики
Без рубрики

Руководство по библиотеке параллельных коллекторов Java

Узнайте, как использовать параллельные коллекторы, библиотеку коллекторов API Java Stream, предназначенную для преодоления ограничений стандартных параллельных потоков.

Автор оригинала: baeldung.

1. введение

Parallel-collectors – это небольшая библиотека, предоставляющая набор коллекторов API Java Stream, которые позволяют параллельную обработку, в то же время обходя основные недостатки стандартных параллельных потоков.

2. Зависимости Maven

Если мы хотим начать использовать библиотеку, нам нужно добавить одну запись в Maven’s pom.xml файл:


    com.pivovarit
    parallel-collectors
    1.1.0

Или одна строка в файле сборки Gradle:

compile 'com.pivovarit:parallel-collectors:1.1.0'

Новейшую версию можно найти на Maven Central .

3. Предостережения о параллельных потоках

Параллельные потоки были одним из самых ярких моментов Java 8, но они оказались применимы исключительно к обработке тяжелых процессоров.

Причиной этого был тот факт , что Параллельные потоки были внутренне поддержаны общим для JVM ForkJoinPool , который обеспечивал ограниченный параллелизм и использовался всеми параллельными потоками, работающими на одном экземпляре JVM.

Например, представьте, что у нас есть список идентификаторов, и мы хотим использовать их для получения списка пользователей, и что эта операция является дорогостоящей.

Для этого мы могли бы использовать параллельные потоки:

List ids = Arrays.asList(1, 2, 3); 
List results = ids.parallelStream() 
  .map(i -> fetchById(i)) // each operation takes one second
  .collect(Collectors.toList()); 

System.out.println(results); // [user-1, user-2, user-3]

И действительно, мы видим, что наблюдается заметное ускорение. Но это становится проблематичным, если мы начинаем выполнять несколько параллельных операций блокировки… параллельно. Это может быстро насытить пул и привести к потенциально огромным задержкам. Вот почему важно создавать переборки, создавая отдельные пулы потоков, чтобы предотвратить влияние несвязанных задач на выполнение друг друга.

Чтобы предоставить пользовательский экземпляр ForkJoinPool , мы могли бы использовать трюк , описанный здесь , но этот подход опирался на недокументированный взлом и был неисправен до JDK10. Мы можем прочитать больше в самом выпуске – [JDK8190974] .

4. Параллельные коллекторы в действии

Параллельные сборщики, как следует из названия, являются просто стандартными сборщиками API потока, которые позволяют выполнять дополнительные операции параллельно на собирать() фаза.

Параллельные коллекторы (которые отражают Коллекторы класс) класс-это фасад, обеспечивающий доступ ко всей функциональности библиотеки.

Если бы мы хотели повторить приведенный выше пример, мы могли бы просто написать:

ExecutorService executor = Executors.newFixedThreadPool(10);

List ids = Arrays.asList(1, 2, 3);

CompletableFuture> results = ids.stream()
  .collect(ParallelCollectors.parallelToList(i -> fetchById(i), executor, 4));

System.out.println(results.join()); // [user-1, user-2, user-3]

Однако результат тот же, мы смогли предоставить наш пользовательский пул потоков, указать ваш пользовательский уровень параллелизма, и результат был получен в виде экземпляра CompletableFuture без блокировки текущего потока.

Стандартные параллельные потоки, с другой стороны, не могли достичь ни одного из них.

4.1. Параллельные Коллекторы.параллельный Список/Набор()

Как бы интуитивно это ни было, если мы хотим параллельно обрабатывать Поток и собирать результаты в Список или Набор , мы можем просто использовать ParallelCollectors.parallelToList или parallelToSet :

List ids = Arrays.asList(1, 2, 3);

List results = ids.stream()
  .collect(parallelToList(i -> fetchById(i), executor, 4))
  .join();

4.2. Параллельные Коллекторы.параллельные Карте()

Если мы хотим собрать элементы Stream в экземпляр Map , как и в случае с Stream API, нам нужно предоставить два картографа:

List ids = Arrays.asList(1, 2, 3);

Map results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4))
  .join(); // {1=user-1, 2=user-2, 3=user-3}

Мы также можем предоставить пользовательскую Карту экземпляр Поставщика :

Map results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4))
  .join();

И пользовательская стратегия разрешения конфликтов:

List ids = Arrays.asList(1, 2, 3);

Map results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4))
  .join();

4.3. Параллельные Коллекторы.()

Аналогично вышесказанному, мы можем передать наш пользовательский Поставщик коллекции , если мы хотим получить результаты, упакованные в наш пользовательский контейнер:

List results = ids.stream()
  .collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4))
  .join();

4.4. Параллельные Коллекторы.параллельные Потоку()

Если вышесказанного недостаточно, мы действительно можем получить экземпляр Stream и продолжить пользовательскую обработку там:

Map> results = ids.stream()
  .collect(parallelToStream(i -> fetchById(i), executor, 4))
  .thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length())))
  .join();

4.5. Параллельные Коллекторы.параллельные()

Это позволяет нам передавать результаты в порядке завершения:

ids.stream()
  .collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4))
  .forEach(System.out::println);

// user-1
// user-3
// user-2

В этом случае мы можем ожидать, что сборщик будет возвращать разные результаты каждый раз, так как мы ввели случайную задержку обработки.

4.6. Параллельные Коллекторы.параллельный Заказ()

Это средство позволяет передавать результаты в потоковом режиме, как и выше, но поддерживает первоначальный порядок:

ids.stream()
  .collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4))
  .forEach(System.out::println);

// user-1
// user-2 
// user-3 

В этом случае сборщик всегда будет поддерживать порядок, но может быть медленнее, чем указано выше.

5. Ограничения

На момент написания статьи параллельные коллекторы не работают с бесконечными потоками даже если используются операции короткого замыкания-это ограничение конструкции, налагаемое внутренними компонентами API потока. Проще говоря, Stream уличные коллекторы как операции без короткого замыкания, поэтому поток должен обработать все вышестоящие элементы перед завершением.

Другое ограничение заключается в том, что операции короткого замыкания не прерывают оставшиеся задачи после короткого замыкания.

6. Заключение

Мы видели, как библиотека parallel-collectors позволяет нам выполнять параллельную обработку с помощью пользовательского API потока Java Collectors и CompletableFutures для использования пользовательских пулов потоков, параллелизма и неблокирующего стиля CompletableFutures.

Как всегда, фрагменты кода доступны на GitHub .

Для дальнейшего чтения см. Библиотеку parallel-collectors на GitHub, блог автора/| и учетную запись автора Twitter .