1. введение
Parallel-collectors – это небольшая библиотека, предоставляющая набор коллекторов API Java Stream, которые позволяют параллельную обработку, в то же время обходя основные недостатки стандартных параллельных потоков.
2. Зависимости Maven
Если мы хотим начать использовать библиотеку, нам нужно добавить одну запись в Maven’s pom.xml файл:
com.pivovarit parallel-collectors 1.1.0
Или одна строка в файле сборки Gradle:
compile 'com.pivovarit:parallel-collectors:1.1.0'
Новейшую версию можно найти на Maven Central .
3. Предостережения о параллельных потоках
Параллельные потоки были одним из самых ярких моментов Java 8, но они оказались применимы исключительно к обработке тяжелых процессоров.
Причиной этого был тот факт , что Параллельные потоки были внутренне поддержаны общим для JVM ForkJoinPool , который обеспечивал ограниченный параллелизм и использовался всеми параллельными потоками, работающими на одном экземпляре JVM.
Например, представьте, что у нас есть список идентификаторов, и мы хотим использовать их для получения списка пользователей, и что эта операция является дорогостоящей.
Для этого мы могли бы использовать параллельные потоки:
Listids = Arrays.asList(1, 2, 3); List results = ids.parallelStream() .map(i -> fetchById(i)) // each operation takes one second .collect(Collectors.toList()); System.out.println(results); // [user-1, user-2, user-3]
И действительно, мы видим, что наблюдается заметное ускорение. Но это становится проблематичным, если мы начинаем выполнять несколько параллельных операций блокировки… параллельно. Это может быстро насытить пул и привести к потенциально огромным задержкам. Вот почему важно создавать переборки, создавая отдельные пулы потоков, чтобы предотвратить влияние несвязанных задач на выполнение друг друга.
Чтобы предоставить пользовательский экземпляр ForkJoinPool , мы могли бы использовать трюк , описанный здесь , но этот подход опирался на недокументированный взлом и был неисправен до JDK10. Мы можем прочитать больше в самом выпуске – [JDK8190974] .
4. Параллельные коллекторы в действии
Параллельные сборщики, как следует из названия, являются просто стандартными сборщиками API потока, которые позволяют выполнять дополнительные операции параллельно на собирать() фаза.
Параллельные коллекторы (которые отражают Коллекторы класс) класс-это фасад, обеспечивающий доступ ко всей функциональности библиотеки.
Если бы мы хотели повторить приведенный выше пример, мы могли бы просто написать:
ExecutorService executor = Executors.newFixedThreadPool(10); Listids = Arrays.asList(1, 2, 3); CompletableFuture > results = ids.stream() .collect(ParallelCollectors.parallelToList(i -> fetchById(i), executor, 4)); System.out.println(results.join()); // [user-1, user-2, user-3]
Однако результат тот же, мы смогли предоставить наш пользовательский пул потоков, указать ваш пользовательский уровень параллелизма, и результат был получен в виде экземпляра CompletableFuture без блокировки текущего потока.
Стандартные параллельные потоки, с другой стороны, не могли достичь ни одного из них.
4.1. Параллельные Коллекторы.параллельный Список/Набор()
Как бы интуитивно это ни было, если мы хотим параллельно обрабатывать Поток и собирать результаты в Список или Набор , мы можем просто использовать ParallelCollectors.parallelToList или parallelToSet :
Listids = Arrays.asList(1, 2, 3); List results = ids.stream() .collect(parallelToList(i -> fetchById(i), executor, 4)) .join();
4.2. Параллельные Коллекторы.параллельные Карте()
Если мы хотим собрать элементы Stream в экземпляр Map , как и в случае с Stream API, нам нужно предоставить два картографа:
Listids = Arrays.asList(1, 2, 3); Map results = ids.stream() .collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4)) .join(); // {1=user-1, 2=user-2, 3=user-3}
Мы также можем предоставить пользовательскую Карту экземпляр Поставщика :
Mapresults = ids.stream() .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4)) .join();
И пользовательская стратегия разрешения конфликтов:
Listids = Arrays.asList(1, 2, 3); Map results = ids.stream() .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4)) .join();
4.3. Параллельные Коллекторы.()
Аналогично вышесказанному, мы можем передать наш пользовательский Поставщик коллекции , если мы хотим получить результаты, упакованные в наш пользовательский контейнер:
Listresults = ids.stream() .collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4)) .join();
4.4. Параллельные Коллекторы.параллельные Потоку()
Если вышесказанного недостаточно, мы действительно можем получить экземпляр Stream и продолжить пользовательскую обработку там:
Map> results = ids.stream() .collect(parallelToStream(i -> fetchById(i), executor, 4)) .thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length()))) .join();
4.5. Параллельные Коллекторы.параллельные()
Это позволяет нам передавать результаты в порядке завершения:
ids.stream() .collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4)) .forEach(System.out::println); // user-1 // user-3 // user-2
В этом случае мы можем ожидать, что сборщик будет возвращать разные результаты каждый раз, так как мы ввели случайную задержку обработки.
4.6. Параллельные Коллекторы.параллельный Заказ()
Это средство позволяет передавать результаты в потоковом режиме, как и выше, но поддерживает первоначальный порядок:
ids.stream() .collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4)) .forEach(System.out::println); // user-1 // user-2 // user-3
В этом случае сборщик всегда будет поддерживать порядок, но может быть медленнее, чем указано выше.
5. Ограничения
На момент написания статьи параллельные коллекторы не работают с бесконечными потоками даже если используются операции короткого замыкания-это ограничение конструкции, налагаемое внутренними компонентами API потока. Проще говоря, Stream уличные коллекторы как операции без короткого замыкания, поэтому поток должен обработать все вышестоящие элементы перед завершением.
Другое ограничение заключается в том, что операции короткого замыкания не прерывают оставшиеся задачи после короткого замыкания.
6. Заключение
Мы видели, как библиотека parallel-collectors позволяет нам выполнять параллельную обработку с помощью пользовательского API потока Java Collectors и CompletableFutures для использования пользовательских пулов потоков, параллелизма и неблокирующего стиля CompletableFutures.
Как всегда, фрагменты кода доступны на GitHub .
Для дальнейшего чтения см. Библиотеку parallel-collectors на GitHub, блог автора/| и учетную запись автора Twitter .