Рубрики
Без рубрики

Руководство по потоку.()

Изучите ключевые понятия операции Stream.reduce() в Java и как использовать ее для обработки последовательных и параллельных потоков.

Автор оригинала: Alejandro Ugarte.

1. Обзор

API потока предоставляет богатый набор промежуточных, сокращающих и терминальных функций, которые также поддерживают распараллеливание.

Более конкретно, операции потока сокращения позволяют нам получить один единственный результат из последовательности элементов , многократно применяя операцию объединения к элементам в последовательности.

В этом уроке мы рассмотрим операцию общего назначения Stream.reduce () /и рассмотрим ее в некоторых конкретных случаях использования.

Дальнейшее чтение:

Суммирование чисел с потоками Java

Введение в потоки Java 8

Руководство по интерфейсу бифункциональности Java

2. Ключевые понятия: Идентичность, Накопитель и Объединитель

Прежде чем мы углубимся в использование операции Stream.reduce () , давайте разберем элементы-участники операции на отдельные блоки. Таким образом, нам будет легче понять роль, которую играет каждый из них.

  • Identity – элемент, который является начальным значением операции сокращения и результатом по умолчанию, если поток пуст
  • Аккумулятор – функция, которая принимает два параметра: частичный результат операции сокращения и следующий элемент потока
  • Объединитель – функция, используемая для объединения частичного результата операции сокращения, когда сокращение распараллеливается или когда существует несоответствие между типами аргументов аккумулятора и типами реализации аккумулятора

3. Использование потока.()

Чтобы лучше понять функциональность элементов identity, accumulator и combiner, давайте рассмотрим некоторые основные примеры:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
int result = numbers
  .stream()
  .reduce(0, (subtotal, element) -> subtotal + element);
assertThat(result).isEqualTo(21);

В этом случае Целочисленное значение 0 является идентификатором. Он сохраняет начальное значение операции сокращения, а также результат по умолчанию, когда поток значений Integer пуст.

Аналогично, лямбда-выражение :

subtotal, element -> subtotal + element

является накопителем , так как он принимает частичную сумму значений Integer и следующий элемент в потоке.

Чтобы сделать код еще более кратким, мы можем использовать ссылку на метод вместо лямбда-выражения:

int result = numbers.stream().reduce(0, Integer::sum);
assertThat(result).isEqualTo(21);

Конечно, мы можем использовать операцию reduce() для потоков, содержащих другие типы элементов.

Например, мы можем использовать reduce() для массива элементов String и объединить их в один результат:

List letters = Arrays.asList("a", "b", "c", "d", "e");
String result = letters
  .stream()
  .reduce("", (partialString, element) -> partialString + element);
assertThat(result).isEqualTo("abcde");

Аналогично, мы можем переключиться на версию, использующую ссылку на метод:

String result = letters.stream().reduce("", String::concat);
assertThat(result).isEqualTo("abcde");

Давайте используем операцию reduce() для объединения элементов верхнего регистра массива letters :

String result = letters
  .stream()
  .reduce(
    "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase());
assertThat(result).isEqualTo("ABCDE");

Кроме того, мы можем использовать reduce() в распараллеленном потоке (подробнее об этом позже):

List ages = Arrays.asList(25, 30, 45, 28, 32);
int computedAges = ages.parallelStream().reduce(0, a, b -> a + b, Integer::sum);

Когда поток выполняется параллельно, среда выполнения Java разбивает поток на несколько подпотоков. В таких случаях нам нужно использовать функцию для объединения результатов подпотоков в один. Это роль объединителя — в приведенном выше фрагменте это ссылка на метод Integer::sum .

Как ни странно, этот код не будет компилироваться:

List users = Arrays.asList(new User("John", 30), new User("Julie", 35));
int computedAges = 
  users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge());

В этом случае у нас есть поток объектов User , а типами аргументов аккумулятора являются Integer и User. Однако реализация аккумулятора представляет собой сумму целых чисел, поэтому компилятор просто не может определить тип параметра user .

Мы можем устранить эту проблему с помощью комбинированного:

int result = users.stream()
  .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
assertThat(result).isEqualTo(65);

Проще говоря, если мы используем последовательные потоки и типы аргументов аккумулятора и типы его реализации совпадают, нам не нужно использовать объединитель.

4. Параллельное сокращение

Как мы узнали ранее, мы можем использовать reduce() в распараллеленных потоках.

Когда мы используем распараллеленные потоки, мы должны убедиться, что reduce() или любые другие агрегированные операции, выполняемые над потоками,:

  • ассоциативный: на результат не влияет порядок операндов
  • невмешательство: операция не влияет на источник данных
  • состояние без состояния и детерминированность: операция не имеет состояния и производит один и тот же вывод для данного входа

Мы должны выполнить все эти условия, чтобы предотвратить непредсказуемые результаты.

Как и ожидалось, операции, выполняемые с распараллеленными потоками , включая reduce () , выполняются параллельно, что позволяет использовать преимущества многоядерных аппаратных архитектур.

По очевидным причинам распараллеленные потоки гораздо более эффективны, чем последовательные аналоги. Даже в этом случае они могут быть излишними, если операции, применяемые к потоку, не являются дорогостоящими или количество элементов в потоке невелико.

Конечно, распараллеленные потоки-это правильный путь, когда нам нужно работать с большими потоками и выполнять дорогостоящие агрегированные операции.

Давайте создадим простой тест JVM (жгут проводов Java Microbenchmark) и сравним соответствующие времена выполнения при использовании операции reduce() в последовательном и распараллеленном потоке:

@State(Scope.Thread)
private final List userList = createUsers();

@Benchmark
public Integer executeReduceOnParallelizedStream() {
    return this.userList
      .parallelStream()
      .reduce(
        0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
}

@Benchmark
public Integer executeReduceOnSequentialStream() {
    return this.userList
      .stream()
      .reduce(
        0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
}

В приведенном выше тесте JMH мы сравниваем среднее время выполнения. Мы просто создаем Список , содержащий большое количество Пользовательских объектов. Затем мы вызываем reduce() для последовательного и распараллеленного потока и проверяем, что последний работает быстрее, чем первый (в секундах за операцию).

Это наши контрольные результаты:

Benchmark                                                   Mode  Cnt  Score    Error  Units
JMHStreamReduceBenchMark.executeReduceOnParallelizedStream  avgt    5  0,007 ±  0,001   s/op
JMHStreamReduceBenchMark.executeReduceOnSequentialStream    avgt    5  0,010 ±  0,001   s/op

5. Выбрасывание и обработка исключений При одновременном сокращении

В приведенных выше примерах операция reduce() не вызывает никаких исключений. Но это, конечно, может случиться.

Например, скажем, что нам нужно разделить все элементы потока на заданный коэффициент, а затем суммировать их:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
int divider = 2;
int result = numbers.stream().reduce(0, a / divider + b / divider);

Это будет работать до тех пор, пока переменная делитель не равна нулю. Но если он равен нулю, reduce() вызовет исключение ArithmeticException exception: divide by zero.

Мы можем легко поймать исключение и сделать с ним что-то полезное, например, зарегистрировать его, восстановить из него и так далее, в зависимости от варианта использования, используя блок try/catch:

public static int divideListElements(List values, int divider) {
    return values.stream()
      .reduce(0, (a, b) -> {
          try {
              return a / divider + b / divider;
          } catch (ArithmeticException e) {
              LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero");
          }
          return 0;
      });
}

Хотя этот подход будет работать, мы загрязнили лямбда-выражение с помощью try/catch block . У нас больше нет той чистой однострочки, которая была у нас раньше.

Чтобы устранить эту проблему, мы можем использовать метод рефакторинга функции извлечения и извлечь блок try/catch в отдельный метод :

private static int divide(int value, int factor) {
    int result = 0;
    try {
        result = value / factor;
    } catch (ArithmeticException e) {
        LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero");
    }
    return result
}

Теперь реализация метода divide List Elements() снова стала чистой и оптимизированной:

public static int divideListElements(List values, int divider) {
    return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider));
}

Предполагая, что divide List Elements() является служебным методом, реализованным абстрактным классом NumberUtils , мы можем создать модульный тест для проверки поведения метода divideListElements() :

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);

Давайте также протестируем метод divide List Elements () , когда предоставленный Список значений Integer содержит 0:

List numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);

Наконец, давайте проверим реализацию метода, когда делитель также равен 0:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);

6. Сложные Пользовательские Объекты

Мы также можем использовать Stream.reduce() с пользовательскими объектами,содержащими непримитивные поля. Для этого нам необходимо предоставить соответствующий i плотность , аккумулятор и объединитель для типа данных .

Предположим, что наш Пользователь является частью веб-сайта обзора. Каждый из наших Пользователей может обладать одним Рейтингом , который усредняется по многим Отзывам .

Во-первых, давайте начнем с нашего Обзора объекта.

Каждый Обзор должен содержать простой комментарий и оценку:

public class Review {

    private int points;
    private String review;

    // constructor, getters and setters
}

Затем нам нужно определить наш Рейтинг, который будет содержать наши отзывы вместе с полем баллы . По мере добавления новых отзывов это поле будет соответственно увеличиваться или уменьшаться:

public class Rating {

    double points;
    List reviews = new ArrayList<>();

    public void add(Review review) {
        reviews.add(review);
        computeRating();
    }

    private double computeRating() {
        double totalPoints = 
          reviews.stream().map(Review::getPoints).reduce(0, Integer::sum);
        this.points = totalPoints / reviews.size();
        return this.points;
    }

    public static Rating average(Rating r1, Rating r2) {
        Rating combined = new Rating();
        combined.reviews = new ArrayList<>(r1.reviews);
        combined.reviews.addAll(r2.reviews);
        combined.computeRating();
        return combined;
    }

}

Мы также добавили функцию average для вычисления среднего значения на основе двух входных данных Rating s. Это будет хорошо работать для наших компонентов combiner и accumulator .

Далее, давайте определим список Пользователей , каждый из которых имеет свои собственные наборы отзывов:

User john = new User("John", 30);
john.getRating().add(new Review(5, ""));
john.getRating().add(new Review(3, "not bad"));
User julie = new User("Julie", 35);
john.getRating().add(new Review(4, "great!"));
john.getRating().add(new Review(2, "terrible experience"));
john.getRating().add(new Review(4, ""));
List users = Arrays.asList(john, julie);

Теперь, когда Джон и Джули учтены, давайте использовать Stream.reduce() для вычисления среднего рейтинга для обоих пользователей.

В качестве идентификатора давайте вернем новый Рейтинг , если наш список ввода пуст :

Rating averageRating = users.stream()
  .reduce(new Rating(), 
    (rating, user) -> Rating.average(rating, user.getRating()), 
    Rating::average);

Если мы проведем математику, то обнаружим, что средний балл равен 3,6:

assertThat(averageRating.getPoints()).isEqualTo(3.6);

7. Заключение

В этой статье мы узнали, как использовать операцию Stream.reduce () .

Кроме того, мы узнали, как выполнять сокращения последовательных и распараллеленных потоков и как обрабатывать исключения при сокращении.

Как обычно, все примеры кода, показанные в этом руководстве, доступны на GitHub .