1. Обзор
В этом всеобъемлющем учебнике мы рассмотрим практическое использование потоков Java 8 от создания до параллельного выполнения.
Чтобы понять этот материал, читатели должны иметь базовые знания Java 8 (лямбда-выражения, Необязательные, ссылки на методы) и потокового API. Чтобы лучше ознакомиться с этими темами, пожалуйста, ознакомьтесь с нашими предыдущими статьями: Новые функции в Java 8 и Введение в потоки Java 8 .
Дальнейшее чтение:
Лямбда-выражения и функциональные интерфейсы: Советы и рекомендации
Руководство по сборщикам Java 8
2. Создание потока
Существует множество способов создания экземпляра потока из разных источников. После создания экземпляр не будет изменять свой источник, поэтому позволяет создавать несколько экземпляров из одного источника.
2.1. Пустой поток
Мы должны использовать метод empty() в случае создания пустого потока:
StreamstreamEmpty = Stream.empty();
Мы часто используем метод empty() при создании, чтобы избежать возврата null для потоков без элемента:
public StreamstreamOf(List list) { return list == null || list.isEmpty() ? Stream.empty() : list.stream(); }
2.2. Поток сбора
Мы также можем создать поток любого типа Collection ( Collection, List, Set ):
Collectioncollection = Arrays.asList("a", "b", "c"); Stream streamOfCollection = collection.stream();
2.3. Поток массива
Массив также может быть источником потока:
StreamstreamOfArray = Stream.of("a", "b", "c");
Мы также можем создать поток из существующего массива или его части:
String[] arr = new String[]{"a", "b", "c"}; StreamstreamOfArrayFull = Arrays.stream(arr); Stream streamOfArrayPart = Arrays.stream(arr, 1, 3);
2.4. Stream.builder()
При использовании builder нужный тип должен быть дополнительно указан в правой части инструкции, в противном случае метод build() создаст экземпляр Stream:
StreamstreamBuilder = Stream. builder().add("a").add("b").add("c").build();
2.5. Поток.генерировать()
Метод generate() принимает Supplier для генерации элементов. Поскольку результирующий поток бесконечен, разработчик должен указать желаемый размер, или метод generate() будет работать до тех пор, пока не достигнет предела памяти:
StreamstreamGenerated = Stream.generate(() -> "element").limit(10);
Приведенный выше код создает последовательность из десяти строк со значением “элемент.”
2.6. Поток.итерация()
Другим способом создания бесконечного потока является использование метода iterate() :
StreamstreamIterated = Stream.iterate(40, n -> n + 2).limit(20);
Первым элементом результирующего потока является первый параметр метода iterate () . При создании каждого следующего элемента указанная функция применяется к предыдущему элементу. В приведенном выше примере вторым элементом будет 42.
2.7. Поток примитивов
Java 8 предлагает возможность создавать потоки из трех примитивных типов: int, long и double. Поскольку Stream является универсальным интерфейсом, и нет возможности использовать примитивы в качестве параметра типа с универсальными, были созданы три новых специальных интерфейса: IntStream, LongStream, DoubleStream.
Использование новых интерфейсов облегчает ненужный автоматический бокс, что позволяет повысить производительность:
IntStream intStream = IntStream.range(1, 3); LongStream longStream = LongStream.rangeClosed(1, 3);
Метод range(int startInclusive, int endExclusive) создает упорядоченный поток от первого параметра ко второму параметру. Он увеличивает значение последующих элементов с шагом, равным 1. Результат не включает в себя последний параметр, это просто верхняя граница последовательности.
Метод range Closed(int startInclusive, int endInclusive) делает то же самое, только с одним отличием, второй элемент включен. Мы можем использовать эти два метода для генерации любого из трех типов потоков примитивов.
Начиная с Java 8, класс Random предоставляет широкий спектр методов для генерации потоков примитивов. Например, следующий код создает Двойной поток, который содержит три элемента:
Random random = new Random(); DoubleStream doubleStream = random.doubles(3);
2.8. Поток строк
Мы также можем использовать String в качестве источника для создания потока с помощью метода chars() класса String . Поскольку в JDK нет интерфейса для CharStream , мы используем IntStream для представления потока символов.
IntStream streamOfChars = "abc".chars();
В следующем примере строка | разбивается на подстроки в соответствии с указанным регулярным выражением :
StreamstreamOfString = Pattern.compile(", ").splitAsStream("a, b, c");
2.9. Поток файлов
Кроме того, класс Java NIO Files позволяет нам генерировать Stream текстового файла с помощью метода lines () . Каждая строка текста становится элементом потока:
Path path = Paths.get("C:\\file.txt"); StreamstreamOfStrings = Files.lines(path); Stream streamWithCharset = Files.lines(path, Charset.forName("UTF-8"));
Кодировка | может быть указана в качестве аргумента метода lines () .
3. Ссылка на поток
Мы можем создать экземпляр потока и иметь доступную ссылку на него, если вызываются только промежуточные операции. Выполнение терминальной операции делает поток недоступным .
Чтобы продемонстрировать это, мы на некоторое время забудем, что наилучшей практикой является цепочка последовательности операций. Кроме того, это ненужная многословность, технически следующий код действителен:
Streamstream = Stream.of("a", "b", "c").filter(element -> element.contains("b")); Optional anyElement = stream.findAny();
Однако попытка повторно использовать ту же ссылку после вызова операции терминала вызовет исключение IllegalStateException:
OptionalfirstElement = stream.findFirst();
Поскольку исключение IllegalStateException является исключением RuntimeException , компилятор не будет сигнализировать о проблеме. Поэтому очень важно помнить, что Java 8 потоки не могут быть использованы повторно.
Такое поведение логично. Мы разработали потоки для применения конечной последовательности операций к источнику элементов в функциональном стиле, а не для хранения элементов.
Поэтому, чтобы предыдущий код работал правильно, необходимо внести некоторые изменения:
Listelements = Stream.of("a", "b", "c").filter(element -> element.contains("b")) .collect(Collectors.toList()); Optional anyElement = elements.stream().findAny(); Optional firstElement = elements.stream().findFirst();
4. Трубопровод потока
Чтобы выполнить последовательность операций над элементами источника данных и агрегировать их результаты, нам нужны три части: источник , промежуточная операция(ы) и терминальная операция.
Промежуточные операции возвращают новый измененный поток. Например, чтобы создать новый поток существующего без нескольких элементов, следует использовать метод skip() :
StreamonceModifiedStream = Stream.of("abcd", "bbcd", "cbcd").skip(1);
Если нам нужно более одной модификации, мы можем связать промежуточные операции в цепочку. Предположим, что нам также нужно заменить каждый элемент текущего потока подстрокой из первых нескольких символов. Мы можем сделать это, связав методы skip() и map() :
StreamtwiceModifiedStream = stream.skip(1).map(element -> element.substring(0, 3));
Как мы видим, метод map() принимает лямбда-выражение в качестве параметра. Если мы хотим узнать больше о лямбда-выражениях, мы можем взглянуть на наш учебник Лямбда-выражения и функциональные интерфейсы: советы и рекомендации .
Поток сам по себе ничего не стоит; пользователя интересует результат работы терминала, который может быть значением какого-либо типа или действием, применяемым к каждому элементу потока. Мы можем использовать только одну терминальную операцию на поток.
Правильный и наиболее удобный способ использования потоков-это конвейер потока , который представляет собой цепочку источника потока, промежуточных операций и терминальной операции:
Listlist = Arrays.asList("abc1", "abc2", "abc3"); long size = list.stream().skip(1) .map(element -> element.substring(0, 3)).sorted().count();
5. Ленивый вызов
Промежуточные операции ленивы. Это означает, что они будут вызываться только в том случае, если это необходимо для выполнения операции терминала.
Например, давайте вызовем метод was Called() , который увеличивает внутренний счетчик каждый раз, когда он вызывается:
private long counter; private void wasCalled() { counter++; }
Теперь давайте вызовем метод был вызван () из операции filter() :
Listlist = Arrays.asList("abc1", "abc2", "abc3"); counter = 0; Stream stream = list.stream().filter(element -> { wasCalled(); return element.contains("2"); });
Поскольку у нас есть источник из трех элементов, мы можем предположить, что метод filter() будет вызван три раза, а значение переменной counter будет равно 3. Однако запуск этого кода не изменяет счетчик вообще, он по-прежнему равен нулю, поэтому метод filter() даже не был вызван ни разу. Причина, по которой отсутствует работа терминала.
Давайте немного перепишем этот код, добавив операцию map() и терминальную операцию findFirst(). Мы также добавим возможность отслеживать порядок вызовов методов с помощью ведения журнала:
Optionalstream = list.stream().filter(element -> { log.info("filter() was called"); return element.contains("2"); }).map(element -> { log.info("map() was called"); return element.toUpperCase(); }).findFirst();
Результирующий журнал показывает, что мы дважды вызывали метод filter() и один раз метод map () . Это связано с тем, что конвейер выполняется вертикально. В нашем примере первый элемент потока не удовлетворял предикату фильтра. Затем мы вызвали метод filter() для второго элемента, который прошел фильтр. Не вызывая filter() для третьего элемента, мы спустились по конвейеру к методу map () .
Операция find First() удовлетворяет только одному элементу. Таким образом, в этом конкретном примере ленивый вызов позволил нам избежать двух вызовов методов, одного для filter() и одного для map().
6. Порядок исполнения
С точки зрения производительности правильный порядок является одним из наиболее важных аспектов операций цепочки в потоковом конвейере:
long size = list.stream().map(element -> { wasCalled(); return element.substring(0, 3); }).skip(2).count();
Выполнение этого кода увеличит значение счетчика на три. Это означает, что мы вызывали метод map() потока три раза, но значение size равно единице. Таким образом, результирующий поток содержит только один элемент, и мы выполнили дорогостоящие операции map() без причины два раза из трех.
Если мы изменим порядок skip() и map() методов , счетчик увеличится только на один. Поэтому мы вызовем метод map() только один раз:
long size = list.stream().skip(2).map(element -> { wasCalled(); return element.substring(0, 3); }).count();
Это приводит нас к следующему правилу: промежуточные операции, которые уменьшают размер потока, должны быть помещены перед операциями, которые применяются к каждому элементу. Поэтому нам нужно сохранить такие методы, как s kip (), filter (), и distinct() в верхней части нашего конвейера потока.
7. Сокращение потока
API имеет множество терминальных операций, которые агрегируют поток в тип или примитив: count (), max (), min (), и sum(). Однако эти операции работают в соответствии с предопределенной реализацией. Итак, что , если разработчику нужно настроить механизм сокращения потока? Есть два метода, которые позволяют нам сделать это, методы reduce () | и collect () .
7.1. Метод reduce()
Существует три варианта этого метода, которые отличаются своими сигнатурами и типами возвращаемых данных. Они могут иметь следующие параметры:
identity – начальное значение для аккумулятора или значение по умолчанию, если поток пуст и накапливать нечего
аккумулятор – функция, задающая логику агрегирования элементов. Поскольку накопитель создает новое значение для каждого шага уменьшения, количество новых значений равно размеру потока, и только последнее значение полезно. Это не очень хорошо для производительности.
объединитель – функция, которая агрегирует результаты работы накопителя. Мы вызываем объединитель только в параллельном режиме, чтобы уменьшить результаты накопителей из разных потоков.
Теперь давайте рассмотрим эти три метода в действии:
OptionalInt reduced = IntStream.range(1, 4).reduce((a, b) -> a + b);
уменьшено = 6 (1 + 2 + 3)
int reducedTwoParams = IntStream.range(1, 4).reduce(10, (a, b) -> a + b);
уменьшено Два Параметра (10 + 1 + 2 + 3)
int reducedParams = Stream.of(1, 2, 3) .reduce(10, (a, b) -> a + b, (a, b) -> { log.info("combiner was called"); return a + b; });
Результат будет таким же, как и в предыдущем примере (16), и входа в систему не будет, что означает, что объединитель не был вызван. Чтобы выполнить комбинированную работу, поток должен быть параллельным:
int reducedParallel = Arrays.asList(1, 2, 3).parallelStream() .reduce(10, (a, b) -> a + b, (a, b) -> { log.info("combiner was called"); return a + b; });
Результат здесь другой (36), и объединитель вызывался дважды. Здесь сокращение работает по следующему алгоритму: накопитель запускался три раза, добавляя каждый элемент потока в identity . Эти действия осуществляются параллельно. В результате у них есть (10 +; 10 +; 10 +;). Теперь объединитель может объединить эти три результата. Для этого требуется две итерации (12 +; 25 +).
7.2. Метод collect()
Сокращение потока также может быть выполнено с помощью другой терминальной операции, метода collect () . Он принимает аргумент типа Collector, который определяет механизм сокращения. Для большинства распространенных операций уже созданы предопределенные сборщики. Доступ к ним можно получить с помощью типа Коллекторы .
В этом разделе мы будем использовать следующий Список в качестве источника для всех потоков:
ListproductList = Arrays.asList(new Product(23, "potatoes"), new Product(14, "orange"), new Product(13, "lemon"), new Product(23, "bread"), new Product(13, "sugar"));
Преобразование потока в Коллекция ( Коллекция, Список или Набор ):
ListcollectorCollection = productList.stream().map(Product::getName).collect(Collectors.toList());
Сокращение до Строка :
String listToString = productList.stream().map(Product::getName) .collect(Collectors.joining(", ", "[", "]"));
Метод joiner() может иметь от одного до трех параметров (разделитель, префикс, суффикс). Самое удобное в использовании joiner() заключается в том, что разработчику не нужно проверять, достигает ли поток своего конца, чтобы применить суффикс, а не применять разделитель. Коллекционер позаботится об этом.
Обработка среднего значения всех числовых элементов потока:
double averagePrice = productList.stream() .collect(Collectors.averagingInt(Product::getPrice));
Обработка суммы всех числовых элементов потока:
int summingPrice = productList.stream() .collect(Collectors.summingInt(Product::getPrice));
Методы averaging X X(), summingXX() и summarizingXX() могут работать с примитивами ( int, long, double ) и с их классами-оболочками ( Integer, Long, Double ). Еще одной мощной особенностью этих методов является обеспечение отображения. В результате разработчику не нужно использовать дополнительную операцию map() перед методом collect () .
Сбор статистической информации об элементах потока:
IntSummaryStatistics statistics = productList.stream() .collect(Collectors.summarizingInt(Product::getPrice));
Используя полученный экземпляр типа IntSummaryStatistics , разработчик может создать статистический отчет, применив метод toString () . Результатом будет Строка общая для этого “IntSummaryStatistics{count=5,,,,200000,}.”
Также легко извлечь из этого объекта отдельные значения для count, sum, min, и average , применив методы getCount(), getSum(), getMin(), getAverage(), и getMax(). Все эти значения могут быть извлечены из одного конвейера.
Группировка элементов потока в соответствии с заданной функцией:
Map> collectorMapOfLists = productList.stream() .collect(Collectors.groupingBy(Product::getPrice));
В приведенном выше примере поток был сокращен до Map , который группирует все продукты по их цене.
Разделение элементов потока на группы в соответствии с некоторым предикатом:
Map> mapPartioned = productList.stream() .collect(Collectors.partitioningBy(element -> element.getPrice() > 15));
Подталкивание коллектора для выполнения дополнительного преобразования:
SetunmodifiableSet = productList.stream() .collect(Collectors.collectingAndThen(Collectors.toSet(), Collections::unmodifiableSet));
В данном конкретном случае коллектор преобразовал поток в Set , а затем создал из него неизменяемый Set .
Пользовательский коллектор:
Если по какой-то причине должен быть создан пользовательский коллектор, самый простой и наименее подробный способ сделать это-использовать метод of() типа Collector.
Collector> toLinkedList = Collector.of(LinkedList::new, LinkedList::add, (first, second) -> { first.addAll(second); return first; }); LinkedList linkedListOfPersons = productList.stream().collect(toLinkedList);
В этом примере экземпляр Collector был сокращен до LinkedList .
8. Параллельные потоки
До Java 8 распараллеливание было сложным. Появление ExecutorService и forkJoin | немного упростило жизнь разработчика, но все же стоило помнить, как создать конкретного исполнителя, как его запустить и так далее. Java 8 представила способ выполнения параллелизма в функциональном стиле.
API позволяет создавать параллельные потоки, которые выполняют операции в параллельном режиме. Когда источником потока является Коллекция или массив , это может быть достигнуто с помощью метода parallelStream() :
StreamstreamOfCollection = productList.parallelStream(); boolean isParallel = streamOfCollection.isParallel(); boolean bigPrice = streamOfCollection .map(product -> product.getPrice() * 12) .anyMatch(price -> price > 200);
Если источником потока является что-то иное , чем Коллекция или массив , следует использовать метод parallel() :
IntStream intStreamParallel = IntStream.range(1, 150).parallel(); boolean isParallel = intStreamParallel.isParallel();
Под капотом Stream API автоматически использует фреймворк Fork Join для параллельного выполнения операций. По умолчанию будет использоваться общий пул потоков, и нет возможности (по крайней мере, на данный момент) назначить ему какой-либо пользовательский пул потоков. Это можно преодолеть с помощью пользовательского набора параллельных коллекторов.
При использовании потоков в параллельном режиме избегайте блокировки операций. Также лучше использовать параллельный режим, когда для выполнения задач требуется одинаковое количество времени. Если одна задача длится намного дольше, чем другая, это может замедлить весь рабочий процесс приложения.
Поток в параллельном режиме может быть преобразован обратно в последовательный режим с помощью метода sequential() :
IntStream intStreamSequential = intStreamParallel.sequential(); boolean isParallel = intStreamSequential.isParallel();
9. Заключение
Stream API-это мощный, но простой в понимании набор инструментов для обработки последовательности элементов. При правильном использовании он позволяет нам сократить огромное количество шаблонного кода, создавать более читаемые программы и повышать производительность приложения.
В большинстве примеров кода, показанных в этой статье, мы оставили потоки неиспользуемыми (мы не применяли метод close() или терминальную операцию). В реальном приложении не оставляйте созданный поток неиспользуемым, так как это приведет к утечке памяти.
Полные примеры кода, сопровождающие эту статью, доступны на GitHub.