Автор оригинала: Alejandro Ugarte.
1. Обзор
API потока предоставляет богатый набор промежуточных, сокращающих и терминальных функций, которые также поддерживают распараллеливание.
Более конкретно, операции потока сокращения позволяют нам получить один единственный результат из последовательности элементов , многократно применяя операцию объединения к элементам в последовательности.
В этом уроке мы рассмотрим операцию общего назначения Stream.reduce () /и рассмотрим ее в некоторых конкретных случаях использования.
Дальнейшее чтение:
Суммирование чисел с потоками Java
Введение в потоки Java 8
Руководство по интерфейсу бифункциональности Java
2. Ключевые понятия: Идентичность, Накопитель и Объединитель
Прежде чем мы углубимся в использование операции Stream.reduce () , давайте разберем элементы-участники операции на отдельные блоки. Таким образом, нам будет легче понять роль, которую играет каждый из них.
- Identity – элемент, который является начальным значением операции сокращения и результатом по умолчанию, если поток пуст
- Аккумулятор – функция, которая принимает два параметра: частичный результат операции сокращения и следующий элемент потока
- Объединитель – функция, используемая для объединения частичного результата операции сокращения, когда сокращение распараллеливается или когда существует несоответствие между типами аргументов аккумулятора и типами реализации аккумулятора
3. Использование потока.()
Чтобы лучше понять функциональность элементов identity, accumulator и combiner, давайте рассмотрим некоторые основные примеры:
Listnumbers = Arrays.asList(1, 2, 3, 4, 5, 6); int result = numbers .stream() .reduce(0, (subtotal, element) -> subtotal + element); assertThat(result).isEqualTo(21);
В этом случае Целочисленное значение 0 является идентификатором. Он сохраняет начальное значение операции сокращения, а также результат по умолчанию, когда поток значений Integer пуст.
Аналогично, лямбда-выражение :
subtotal, element -> subtotal + element
является накопителем , так как он принимает частичную сумму значений Integer и следующий элемент в потоке.
Чтобы сделать код еще более кратким, мы можем использовать ссылку на метод вместо лямбда-выражения:
int result = numbers.stream().reduce(0, Integer::sum); assertThat(result).isEqualTo(21);
Конечно, мы можем использовать операцию reduce() для потоков, содержащих другие типы элементов.
Например, мы можем использовать reduce() для массива элементов String и объединить их в один результат:
Listletters = Arrays.asList("a", "b", "c", "d", "e"); String result = letters .stream() .reduce("", (partialString, element) -> partialString + element); assertThat(result).isEqualTo("abcde");
Аналогично, мы можем переключиться на версию, использующую ссылку на метод:
String result = letters.stream().reduce("", String::concat); assertThat(result).isEqualTo("abcde");
Давайте используем операцию reduce() для объединения элементов верхнего регистра массива letters :
String result = letters .stream() .reduce( "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase()); assertThat(result).isEqualTo("ABCDE");
Кроме того, мы можем использовать reduce() в распараллеленном потоке (подробнее об этом позже):
Listages = Arrays.asList(25, 30, 45, 28, 32); int computedAges = ages.parallelStream().reduce(0, a, b -> a + b, Integer::sum);
Когда поток выполняется параллельно, среда выполнения Java разбивает поток на несколько подпотоков. В таких случаях нам нужно использовать функцию для объединения результатов подпотоков в один. Это роль объединителя — в приведенном выше фрагменте это ссылка на метод Integer::sum .
Как ни странно, этот код не будет компилироваться:
Listusers = Arrays.asList(new User("John", 30), new User("Julie", 35)); int computedAges = users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge());
В этом случае у нас есть поток объектов User , а типами аргументов аккумулятора являются Integer и User. Однако реализация аккумулятора представляет собой сумму целых чисел, поэтому компилятор просто не может определить тип параметра user .
Мы можем устранить эту проблему с помощью комбинированного:
int result = users.stream() .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); assertThat(result).isEqualTo(65);
Проще говоря, если мы используем последовательные потоки и типы аргументов аккумулятора и типы его реализации совпадают, нам не нужно использовать объединитель.
4. Параллельное сокращение
Как мы узнали ранее, мы можем использовать reduce() в распараллеленных потоках.
Когда мы используем распараллеленные потоки, мы должны убедиться, что reduce() или любые другие агрегированные операции, выполняемые над потоками,:
- ассоциативный: на результат не влияет порядок операндов
- невмешательство: операция не влияет на источник данных
- состояние без состояния и детерминированность: операция не имеет состояния и производит один и тот же вывод для данного входа
Мы должны выполнить все эти условия, чтобы предотвратить непредсказуемые результаты.
Как и ожидалось, операции, выполняемые с распараллеленными потоками , включая reduce () , выполняются параллельно, что позволяет использовать преимущества многоядерных аппаратных архитектур.
По очевидным причинам распараллеленные потоки гораздо более эффективны, чем последовательные аналоги. Даже в этом случае они могут быть излишними, если операции, применяемые к потоку, не являются дорогостоящими или количество элементов в потоке невелико.
Конечно, распараллеленные потоки-это правильный путь, когда нам нужно работать с большими потоками и выполнять дорогостоящие агрегированные операции.
Давайте создадим простой тест JVM (жгут проводов Java Microbenchmark) и сравним соответствующие времена выполнения при использовании операции reduce() в последовательном и распараллеленном потоке:
@State(Scope.Thread) private final ListuserList = createUsers(); @Benchmark public Integer executeReduceOnParallelizedStream() { return this.userList .parallelStream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } @Benchmark public Integer executeReduceOnSequentialStream() { return this.userList .stream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); }
В приведенном выше тесте JMH мы сравниваем среднее время выполнения. Мы просто создаем Список , содержащий большое количество Пользовательских объектов. Затем мы вызываем reduce() для последовательного и распараллеленного потока и проверяем, что последний работает быстрее, чем первый (в секундах за операцию).
Это наши контрольные результаты:
Benchmark Mode Cnt Score Error Units JMHStreamReduceBenchMark.executeReduceOnParallelizedStream avgt 5 0,007 ± 0,001 s/op JMHStreamReduceBenchMark.executeReduceOnSequentialStream avgt 5 0,010 ± 0,001 s/op
5. Выбрасывание и обработка исключений При одновременном сокращении
В приведенных выше примерах операция reduce() не вызывает никаких исключений. Но это, конечно, может случиться.
Например, скажем, что нам нужно разделить все элементы потока на заданный коэффициент, а затем суммировать их:
Listnumbers = Arrays.asList(1, 2, 3, 4, 5, 6); int divider = 2; int result = numbers.stream().reduce(0, a / divider + b / divider);
Это будет работать до тех пор, пока переменная делитель не равна нулю. Но если он равен нулю, reduce() вызовет исключение ArithmeticException exception: divide by zero.
Мы можем легко поймать исключение и сделать с ним что-то полезное, например, зарегистрировать его, восстановить из него и так далее, в зависимости от варианта использования, используя блок try/catch:
public static int divideListElements(Listvalues, int divider) { return values.stream() .reduce(0, (a, b) -> { try { return a / divider + b / divider; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return 0; }); }
Хотя этот подход будет работать, мы загрязнили лямбда-выражение с помощью try/catch block . У нас больше нет той чистой однострочки, которая была у нас раньше.
Чтобы устранить эту проблему, мы можем использовать метод рефакторинга функции извлечения и извлечь блок try/catch в отдельный метод :
private static int divide(int value, int factor) { int result = 0; try { result = value / factor; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return result }
Теперь реализация метода divide List Elements() снова стала чистой и оптимизированной:
public static int divideListElements(Listvalues, int divider) { return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider)); }
Предполагая, что divide List Elements() является служебным методом, реализованным абстрактным классом NumberUtils , мы можем создать модульный тест для проверки поведения метода divideListElements() :
Listnumbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);
Давайте также протестируем метод divide List Elements () , когда предоставленный Список значений Integer содержит 0:
Listnumbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);
Наконец, давайте проверим реализацию метода, когда делитель также равен 0:
Listnumbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);
6. Сложные Пользовательские Объекты
Мы также можем использовать Stream.reduce() с пользовательскими объектами,содержащими непримитивные поля. Для этого нам необходимо предоставить соответствующий i плотность , аккумулятор и объединитель для типа данных .
Предположим, что наш Пользователь является частью веб-сайта обзора. Каждый из наших Пользователей может обладать одним Рейтингом , который усредняется по многим Отзывам .
Во-первых, давайте начнем с нашего Обзора объекта.
Каждый Обзор должен содержать простой комментарий и оценку:
public class Review { private int points; private String review; // constructor, getters and setters }
Затем нам нужно определить наш Рейтинг, который будет содержать наши отзывы вместе с полем баллы . По мере добавления новых отзывов это поле будет соответственно увеличиваться или уменьшаться:
public class Rating { double points; Listreviews = new ArrayList<>(); public void add(Review review) { reviews.add(review); computeRating(); } private double computeRating() { double totalPoints = reviews.stream().map(Review::getPoints).reduce(0, Integer::sum); this.points = totalPoints / reviews.size(); return this.points; } public static Rating average(Rating r1, Rating r2) { Rating combined = new Rating(); combined.reviews = new ArrayList<>(r1.reviews); combined.reviews.addAll(r2.reviews); combined.computeRating(); return combined; } }
Мы также добавили функцию average для вычисления среднего значения на основе двух входных данных Rating s. Это будет хорошо работать для наших компонентов combiner и accumulator .
Далее, давайте определим список Пользователей , каждый из которых имеет свои собственные наборы отзывов:
User john = new User("John", 30); john.getRating().add(new Review(5, "")); john.getRating().add(new Review(3, "not bad")); User julie = new User("Julie", 35); john.getRating().add(new Review(4, "great!")); john.getRating().add(new Review(2, "terrible experience")); john.getRating().add(new Review(4, "")); Listusers = Arrays.asList(john, julie);
Теперь, когда Джон и Джули учтены, давайте использовать Stream.reduce() для вычисления среднего рейтинга для обоих пользователей.
В качестве идентификатора давайте вернем новый Рейтинг , если наш список ввода пуст :
Rating averageRating = users.stream() .reduce(new Rating(), (rating, user) -> Rating.average(rating, user.getRating()), Rating::average);
Если мы проведем математику, то обнаружим, что средний балл равен 3,6:
assertThat(averageRating.getPoints()).isEqualTo(3.6);
7. Заключение
В этой статье мы узнали, как использовать операцию Stream.reduce () .
Кроме того, мы узнали, как выполнять сокращения последовательных и распараллеленных потоков и как обрабатывать исключения при сокращении.
Как обычно, все примеры кода, показанные в этом руководстве, доступны на GitHub .