Рубрики
Без рубрики

Реализация эффективного разделителя перетасовки для Java Stream API

Смотрите, как реализовать эффективный случайный разделитель в Java

Автор оригинала: Grzegorz Piwowarek.

Сортировка экземпляра потока проста и включает в себя всего один вызов метода API – добиться обратного не так просто.

В этой статье мы увидим, как перетасовать поток на Java – охотно и лениво, используя фабрики сборщиков потоков и пользовательские разделители.

Нетерпеливый Коллекционер Перетасовки

Одно из наиболее прагматичных решений вышеуказанной проблемы уже было описано Хайнцем в этой статье .

В основном это включает в себя инкапсуляцию сложной операции сбора всего потока в список, сбор#перемешивание в нем и преобразование в поток:

public static  Collector> toEagerShuffledStream() {
    return Collectors.collectingAndThen(
      toList(),
      list -> {
          Collections.shuffle(list);
          return list.stream();
      });
}

Это решение будет оптимальным, если мы хотим обработать все элементы потока в случайном порядке, но оно может откусить, если мы захотим обработать только небольшое их подмножество – это все потому, что вся коллекция заранее перемешивается, даже если мы запрашиваем только один элемент.

Давайте взглянем на простой тест и результаты, которые он дал:

@State(Scope.Benchmark)
public class RandomSpliteratorBenchmark {

    private List source;

    @Param({"1", "10", "100", "1000", "10000", "10000"})
    public int limit;

    @Param({"100000"})
    public int size;

    @Setup(Level.Iteration)
    public void setUp() {
        source = IntStream.range(0, size)
          .boxed()
          .map(Object::toString)
          .collect(Collectors.toList());
    }

    @Benchmark
    public List eager() {
        return source.stream()
          .collect(toEagerShuffledStream())
          .limit(limit)
          .collect(Collectors.toList());
    }
            (limit)   Mode  Cnt     Score     Error  Units
eager             1  thrpt    5   467.796 ±   9.074  ops/s
eager            10  thrpt    5   467.694 ±  17.166  ops/s
eager           100  thrpt    5   459.765 ±   8.048  ops/s
eager          1000  thrpt    5   467.934 ±  43.095  ops/s
eager         10000  thrpt    5   449.471 ±   5.549  ops/s
eager        100000  thrpt    5   331.111 ±   5.626  ops/s

Как мы видим, он довольно хорошо масштабируется вместе с количеством элементов, потребляемых из результирующего потока, жаль, что абсолютное значение не так впечатляет для относительно небольших чисел – в таких ситуациях предварительная перетасовка всей коллекции оказывается довольно расточительной.

Давайте посмотрим, что мы можем с этим сделать.

Ленивый Сборщик Перетасовки

Чтобы сэкономить драгоценные циклы процессора, вместо предварительной перетасовки всей коллекции, мы можем просто выбрать количество элементов, соответствующих восходящему спросу.

Для этого нам нужно реализовать пользовательский разделитель, который позволит нам перебирать объекты в случайном порядке, а затем мы сможем создать экземпляр потока с помощью вспомогательного метода из класса StreamSupport:

public class RandomSpliterator implements Spliterator {

    // ...

    public static  Collector> toLazyShuffledStream() {
        return Collectors.collectingAndThen(
          toList(),
          list -> StreamSupport.stream(
            new ShuffledSpliterator<>(list), false));
    }
}

Мы не можем избежать оценки всего потока, даже если мы хотим выбрать один случайный элемент (что означает, что нет поддержки бесконечных последовательностей), поэтому совершенно нормально инициировать наш разделитель случайных чисел со списком, но есть загвоздка…

Если конкретная реализация списка не поддерживает произвольный доступ в режиме постоянного времени, это решение может оказаться намного медленнее, чем подход eagle. Чтобы защититься от этого сценария, мы можем выполнить простую проверку при создании экземпляра разделителя:

private RandomSpliterator(
  List source, Supplier random) {
    if (source.isEmpty()) { ... } // throw
    this.source = source instanceof RandomAccess 
      ? source 
      : new ArrayList<>(source);
    this.random = random.get();
}

Создание нового экземпляра ArrayList является дорогостоящим, но незначительным по сравнению со стоимостью, создаваемой реализациями, которые не обеспечивают O(1) произвольный доступ.

И теперь мы можем переопределить самый важный метод – попробуйте Advance().

В этом случае все довольно просто – на каждой итерации нам нужно случайным образом выбирать и удалять случайный элемент из исходной коллекции.

Мы не можем беспокоиться о изменении источника, так как мы не публикуем Случайный разделитель, а только коллектор, основанный на нем:

@Override
public boolean tryAdvance(Consumer action) {
    int remaining = source.size();
    action.accept(source.remove(random.nextInt(remaining)));
    return remaining - 1 > 0;
}

Помимо этого, нам необходимо реализовать три других метода:

@Override
public Spliterator trySplit() {
    return null; // to indicate that split is not possible
}

@Override
public long estimateSize() {
    return source.size();
}

@Override
public int characteristics() {
    return SIZED;
}

И теперь мы пробуем это и видим, что это действительно работает:

IntStream.range(0, 10).boxed()
  .collect(toLazyShuffledStream())
  .forEach(System.out::println);

И результат:

3
4
8
1
7
6
5
0
2
9

Соображения Производительности

В этой реализации мы заменили N обменов элементами массива на M поисков/удалений, где:

N – размер коллекции M – количество выбранных элементов Как правило, один поиск/удаление из списка ArrayList является более дорогостоящей операцией, чем замена одного элемента, что делает это решение не таким масштабируемым, но значительно более эффективным при относительно низких значениях M.

Давайте теперь посмотрим, как это решение сравнивается с подходом, продемонстрированным в начале(оба рассчитаны для коллекции, содержащей 100_000 объектов).:

            (limit)   Mode  Cnt     Score     Error  Units
eager             1  thrpt    5   467.796 ±   9.074  ops/s
eager            10  thrpt    5   467.694 ±  17.166  ops/s
eager           100  thrpt    5   459.765 ±   8.048  ops/s
eager          1000  thrpt    5   467.934 ±  43.095  ops/s
eager         10000  thrpt    5   449.471 ±   5.549  ops/s
eager        100000  thrpt    5   331.111 ±   5.626  ops/s
lazy              1  thrpt    5  1530.763 ±  72.096  ops/s
lazy             10  thrpt    5  1462.305 ±  23.860  ops/s
lazy            100  thrpt    5   823.212 ± 119.771  ops/s
lazy           1000  thrpt    5   166.786 ±  16.306  ops/s
lazy          10000  thrpt    5    19.475 ±   4.052  ops/s
lazy         100000  thrpt    5     4.097 ±   0.416  ops/s
Давайте теперь посмотрим, как это решение сравнивается с подходом, продемонстрированным в начале(оба рассчитаны для коллекции, содержащей 100_000 объектов).:

Как мы видим, это решение превосходит первое, если количество обработанных элементов потока относительно невелико, но по мере увеличения соотношения обработанный/размер коллекции пропускная способность резко падает.

Это все из – за дополнительных накладных расходов, возникающих при удалении элементов из списка ArrayList, содержащих оставшиеся объекты-каждое удаление требует сдвига внутреннего массива на один, используя относительно дорогой метод System#arraycopy.

Мы можем заметить аналогичную закономерность для гораздо больших коллекций (1_000_000 элементов).:

      (limit)    (size)   Mode  Cnt  Score   Err  Units
eager       1  10000000  thrpt    5  0.915        ops/s
eager      10  10000000  thrpt    5  0.783        ops/s
eager     100  10000000  thrpt    5  0.965        ops/s
eager    1000  10000000  thrpt    5  0.936        ops/s
eager   10000  10000000  thrpt    5  0.860        ops/s
lazy        1  10000000  thrpt    5  4.338        ops/s
lazy       10  10000000  thrpt    5  3.149        ops/s
lazy      100  10000000  thrpt    5  2.060        ops/s
lazy     1000  10000000  thrpt    5  0.370        ops/s
lazy    10000  10000000  thrpt    5  0.05         ops/s
Мы можем заметить аналогичную закономерность для гораздо больших коллекций (1_000_000 элементов).:

… и гораздо меньшие (128 элементов, обратите внимание на масштаб!):

   (limit)    (size)   Mode  Cnt       Score   Error  Units
eager        2     128    thrpt    5  246439.459          ops/s
eager        4     128    thrpt    5  333866.936          ops/s
eager        8     128    thrpt    5  340296.188          ops/s
eager       16     128    thrpt    5  345533.673          ops/s
eager       32     128    thrpt    5  231725.156          ops/s
eager       64     128    thrpt    5  314324.265          ops/s
eager      128     128    thrpt    5  270451.992          ops/s
lazy         2     128    thrpt    5  765989.718          ops/s
lazy         4     128    thrpt    5  659421.041          ops/s
lazy         8     128    thrpt    5  652685.515          ops/s
lazy        16     128    thrpt    5  470346.570          ops/s
lazy        32     128    thrpt    5  324174.691          ops/s
lazy        64     128    thrpt    5  186472.090          ops/s
lazy       128     128    thrpt    5  108105.699          ops/s
и гораздо меньшие (128 элементов, обратите внимание на масштаб!):

Но можем ли мы сделать что-то лучше, чем это?

Дальнейшее Повышение Производительности

К сожалению, масштабируемость существующего решения весьма разочаровывает. Давайте попробуем улучшить его, но прежде чем мы это сделаем, мы должны сначала измерить:

Давайте попробуем улучшить его, но прежде чем мы это сделаем, мы должны сначала измерить:

Как и ожидалось, Arraylist#remove оказывается одной из горячих точек – другими словами, процессор тратит заметное количество времени на удаление объектов из списка ArrayList.

Это почему? Удаление из списка массивов включает удаление элемента из базового массива. Загвоздка в том, что размеры массивов в Java невозможно изменить – каждое удаление запускает создание нового массива меньшего размера:

private void fastRemove(Object[] es, int i) {
    modCount++;
    final int newSize;
    if ((newSize = size - 1) > i)
        System.arraycopy(es, i + 1, es, i, newSize - i);
    es[size = newSize] = null;
}

Что мы можем с этим поделать? Избегайте удаления элементов из списка массивов.

Для этого мы могли бы хранить оставшиеся элементы в массиве и отслеживать его размер отдельно:

public class ImprovedRandomSpliterator implements Spliterator {

    private final Random random;
    private final T[] source;
    private int size;

    private ImprovedRandomSpliterator(
      List source, Supplier random) {
        if (source.isEmpty()) {
            throw new IllegalArgumentException(...);
        }
        this.source = (T[]) source.toArray();
        this.random = random.get();
        this.size = this.source.length;
    }
}

К счастью, мы можем избежать проблем с параллелизмом, поскольку экземпляры этого разделителя не должны совместно использоваться потоками.

И теперь всякий раз, когда мы пытаемся удалить элемент, нам не нужно фактически создавать новый уменьшенный массив. Вместо этого мы можем уменьшить наш трекер размеров и игнорировать оставшуюся часть массива.

Но прямо перед этим нам нужно поменять последний элемент на возвращаемый элемент:

@Override
public boolean tryAdvance(Consumer action) {
    int nextIdx = random.nextInt(size);
    int lastIdx = size - 1;

    action.accept(source[nextIdx]);
    source[nextIdx] = source[lastIdx];
    source[lastIdx] = null; // let object be GCed
    return --size > 0;
}

Если мы опишем его сейчас, то увидим, что дорогостоящий звонок пропал:

Если мы опишем его сейчас, то увидим, что дорогостоящий звонок пропал:

Мы готовы повторить тесты и сравнить:

               (limit)  (size)   Mode  Cnt     Score     Error  Units
eager                1  100000  thrpt    3   456.811 ±  20.585  ops/s
eager               10  100000  thrpt    3   469.635 ±  23.281  ops/s
eager              100  100000  thrpt    3   466.486 ±  68.820  ops/s
eager             1000  100000  thrpt    3   454.459 ±  13.103  ops/s
eager            10000  100000  thrpt    3   443.640 ±  96.929  ops/s
eager           100000  100000  thrpt    3   335.134 ±  21.944  ops/s
lazy                 1  100000  thrpt    3  1587.536 ± 389.128  ops/s
lazy                10  100000  thrpt    3  1452.855 ± 406.879  ops/s
lazy               100  100000  thrpt    3   814.978 ± 242.077  ops/s
lazy              1000  100000  thrpt    3   167.825 ± 129.559  ops/s
lazy             10000  100000  thrpt    3    19.782 ±   8.596  ops/s
lazy            100000  100000  thrpt    3     3.970 ±   0.408  ops/s
lazy_improved        1  100000  thrpt    3  1509.264 ± 170.423  ops/s
lazy_improved       10  100000  thrpt    3  1512.150 ± 143.927  ops/s
lazy_improved      100  100000  thrpt    3  1463.093 ± 593.370  ops/s
lazy_improved     1000  100000  thrpt    3  1451.007 ±  58.948  ops/s
lazy_improved    10000  100000  thrpt    3  1148.581 ± 232.218  ops/s
lazy_improved   100000  100000  thrpt    3   383.022 ±  97.082  ops/s
Мы готовы повторить тесты и сравнить:

Как вы можете видеть, в итоге мы получили реализацию, которая намного более устойчива с точки зрения производительности к количеству элементов, к которым мы стремимся.

На самом деле, улучшенная реализация работает немного лучше, чем реализация на основе коллекций#shuffle, даже в пессимистичном сценарии!

Полный Пример …также можно найти на GitHub.

package com.pivovarit.stream;

import java.util.List;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class ImprovedRandomSpliterator implements Spliterator {

    private final Random random;
    private final T[] source;
    private int size;

    ImprovedRandomSpliterator(List source, Supplier random) {
        if (source.isEmpty()) {
            throw new IllegalArgumentException("RandomSpliterator can't be initialized with an empty collection");
        }
        this.source = (T[]) source.toArray();
        this.random = random.get();
        this.size = this.source.length;
    }

    @Override
    public boolean tryAdvance(Consumer action) {
        int nextIdx = random.nextInt(size);
        int lastIdx = size - 1;

        action.accept(source[nextIdx]);
        source[nextIdx] = source[lastIdx];
        source[lastIdx] = null; // let object be GCed
        return --size > 0;
    }

    @Override
    public Spliterator trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return source.length;
    }

    @Override
    public int characteristics() {
        return SIZED;
    }
}
package com.pivovarit.stream;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.util.stream.Collectors.toCollection;

public final class RandomCollectors {

    private RandomCollectors() {
    }

    public static  Collector> toImprovedLazyShuffledStream() {
        return Collectors.collectingAndThen(
          toCollection(ArrayList::new),
          list -> !list.isEmpty()
            ? StreamSupport.stream(new ImprovedRandomSpliterator<>(list, Random::new), false)
            : Stream.empty());
    }

    public static  Collector> toLazyShuffledStream() {
        return Collectors.collectingAndThen(
          toCollection(ArrayList::new),
          list -> !list.isEmpty()
            ? StreamSupport.stream(new RandomSpliterator<>(list, Random::new), false)
            : Stream.empty());
    }

    public static  Collector> toEagerShuffledStream() {
        return Collectors.collectingAndThen(
          toCollection(ArrayList::new),
          list -> {
              Collections.shuffle(list);
              return list.stream();
          });
    }
}

Оригинал: “https://www.codementor.io/@pivovarit/implementing-an-efficient-shuffled-spliterator-for-java-stream-api-r1rgn19u3”