Рубрики
Без рубрики

Утилита параллелизма Java с инструментами

Узнайте, как реализовать неблокирующие параллельные структуры данных с помощью библиотеки инструментов

Автор оригинала: baeldung.

1. Обзор

В этом уроке мы представим библиотеку Tools (Инструменты параллелизма Java).

Проще говоря, это обеспечивает ряд полезных структур данных, подходящих для работы в многопоточной среде.

2. Неблокирующие алгоритмы

Традиционно многопоточный код, работающий в изменяемом общем состоянии, использует блокировки для обеспечения согласованности данных и публикации (изменения, внесенные одним потоком, которые видны другому).

Такой подход имеет ряд недостатков:

  • потоки могут быть заблокированы при попытке получить блокировку, не продвигаясь вперед до тех пор, пока не будет завершена операция другого потока – это эффективно предотвращает параллелизм
  • чем тяжелее конфликт блокировок, тем больше времени JVM тратит на планирование потоков, управление конфликтами и очередями ожидающих потоков и тем меньше реальной работы он выполняет
  • взаимоблокировки возможны, если задействовано более одной блокировки, и они получены/освобождены в неправильном порядке
  • возможна инверсия приоритета опасность – поток с высоким приоритетом блокируется в попытке получить блокировку, удерживаемую потоком с низким приоритетом
  • большую часть времени используются крупнозернистые замки, что сильно вредит параллелизму-мелкозернистая блокировка требует более тщательного проектирования, увеличивает накладные расходы на блокировку и более подвержена ошибкам

Альтернативой является использование неблокирующего алгоритма, т. Е. алгоритма, в котором сбой или приостановка любого потока не может привести к сбою или приостановке другого потока .

Неблокирующий алгоритм не блокируется , если хотя бы один из задействованных потоков гарантированно будет выполняться в течение произвольного периода времени, т. Е. Взаимоблокировки не могут возникнуть во время обработки.

Кроме того, эти алгоритмы не требуют ожидания , если также есть гарантированный прогресс для каждого потока.

Вот неблокирующий Стек пример из превосходного Параллелизма Java на практике книги; он определяет базовое состояние:

public class ConcurrentStack {

    AtomicReference> top = new AtomicReference>();

    private static class Node  {
        public E item;
        public Node next;

        // standard constructor
    }
}

А также несколько методов API:

public void push(E item){
    Node newHead = new Node(item);
    Node oldHead;
    
    do {
        oldHead = top.get();
        newHead.next = oldHead;
    } while(!top.compareAndSet(oldHead, newHead));
}

public E pop() {
    Node oldHead;
    Node newHead;
    do {
        oldHead = top.get();
        if (oldHead == null) {
            return null;
        }
        newHead = oldHead.next;
    } while (!top.compareAndSet(oldHead, newHead));
    
    return oldHead.item;
}

Мы видим, что алгоритм использует мелкозернистые инструкции compare-and-swap ( CAS ) и является без блокировки (даже если несколько потоков одновременно вызывают top.compareAndSet () , один из них гарантированно будет успешным), но не без ожидания , поскольку нет никакой гарантии, что CAS в конечном итоге преуспеет для какого-либо конкретного потока.

3. Зависимость

Во-первых, давайте добавим зависимость Ctools в ваш pom.xml :


    org.jctools
    jctools-core
    2.1.2

Обратите внимание, что последняя доступная версия доступна на Maven Central .

4. Очереди JCTools

Библиотека предлагает несколько очередей для использования в многопоточной среде, т. Е. один или несколько потоков записываются в очередь, а один или несколько потоков считываются из нее потокобезопасным способом без блокировки.

Общим интерфейсом для всех реализаций Queue является org.jctools.очереди.Очередь передачи сообщений .

4.1. Типы очередей

Все очереди могут быть классифицированы в соответствии с их политикой производителя/потребителя:

  • один производитель, один потребитель – такие классы называются с использованием префикса Spsc , например SpscArrayQueue
  • один производитель, несколько потребителей – используйте Spmc префикс, например SpmcArrayQueue
  • несколько производителей, один потребитель – используйте Mpsc префикс, например MpscArrayQueue
  • несколько производителей, несколько потребителей – используйте Mpmc префикс, например MpmcArrayQueue

Важно отметить, что нет внутренних проверок политики, т. Е. очередь может молча функционировать неправильно в случае неправильного использования .

Например, приведенный ниже тест заполняет очередь с одним производителем из двух потоков и проходит, даже если потребитель не гарантированно увидит данные от разных производителей:

SpscArrayQueue queue = new SpscArrayQueue<>(2);

Thread producer1 = new Thread(() -> queue.offer(1));
producer1.start();
producer1.join();

Thread producer2 = new Thread(() -> queue.offer(2));
producer2.start();
producer2.join();

Set fromQueue = new HashSet<>();
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
consumer.start();
consumer.join();

assertThat(fromQueue).containsOnly(1, 2);

4.2. Реализация очередей

Обобщая приведенные выше классификации, вот список очередей JCTools:

  • SpscArrayQueue один производитель, один потребитель, использует массив внутри, ограниченная емкость
  • SpscLinkedQueue один производитель, один потребитель, использует связанный список внутри, несвязанная емкость
  • SpscChunkedArrayQueue один производитель, один потребитель, начинается с начальной мощности и растет до максимальной мощности
  • SpscGrowableArrayQueue один производитель, один потребитель, начинается с начальной емкости и растет до максимальной емкости. Это тот же контракт , что и SpscChunkedArrayQueue , единственное отличие-управление внутренними блоками. Рекомендуется использовать SpscChunkedArrayQueue , поскольку он имеет упрощенную реализацию
  • SpscUnboundedArrayQueue один производитель, один потребитель, использует массив внутри, несвязанная емкость
  • SpmcArrayQueue один производитель, несколько потребителей, использует массив внутри, ограниченная емкость
  • MpscArrayQueue несколько производителей, один потребитель, использует массив внутри, ограниченная емкость
  • MpscLinkedQueue несколько производителей, один потребитель, использует связанный список внутри, несвязанная емкость
  • MpmcArrayQueue несколько производителей, несколько потребителей, использует массив внутри, ограниченная емкость

4.3. Атомарные очереди

Все очереди, упомянутые в предыдущем разделе, используют sun.misc.Unsafe . Однако с появлением Java 9 и JEP-260 этот API становится недоступным по умолчанию.

Таким образом, существуют альтернативные очереди, которые используют java.util.concurrent.atomic.AtomicLongFieldUpdater (публичный API, менее производительный) вместо sun.misc.Unsafe .

Они генерируются из приведенных выше очередей, и между их именами вставлено слово Atomic , например SpscChunkedAtomicArrayQueue или MpmcAtomicArrayQueue .

Рекомендуется использовать “обычные” очереди, если это возможно, и прибегать к Атомарные очереди только в средах, где sun.misc.Небезопасно запрещено/неэффективно, как HotSpot Java9+ и JRockit.

4.4. Емкость

Все очереди JCTools также могут иметь максимальную емкость или быть несвязанными. Когда очередь заполнена и ограничена емкостью, она перестает принимать новые элементы.

В следующем примере мы:

  • заполните очередь
  • убедитесь, что после этого он перестанет принимать новые элементы
  • слейте из него воду и убедитесь, что после этого можно добавить больше элементов

Обратите внимание, что несколько операторов кода отбрасываются для удобства чтения. Полную реализацию можно найти в на GitHub :

SpscChunkedArrayQueue queue = new SpscChunkedArrayQueue<>(8, 16);
CountDownLatch startConsuming = new CountDownLatch(1);
CountDownLatch awakeProducer = new CountDownLatch(1);

Thread producer = new Thread(() -> {
    IntStream.range(0, queue.capacity()).forEach(i -> {
        assertThat(queue.offer(i)).isTrue();
    });
    assertThat(queue.offer(queue.capacity())).isFalse();
    startConsuming.countDown();
    awakeProducer.await();
    assertThat(queue.offer(queue.capacity())).isTrue();
});

producer.start();
startConsuming.await();

Set fromQueue = new HashSet<>();
queue.drain(fromQueue::add);
awakeProducer.countDown();
producer.join();
queue.drain(fromQueue::add);

assertThat(fromQueue).containsAll(
  IntStream.range(0, 17).boxed().collect(toSet()));

5. Другие инструменты Структуры данных

JCTools также предлагает несколько структур данных без очереди.

Все они перечислены ниже:

  • NonBlockingHashMap альтернатива без блокировки ConcurrentHashMap с улучшенными свойствами масштабирования и, как правило, более низкими затратами на мутации. Он реализован через sun.misc.Unsafe , поэтому не рекомендуется использовать этот класс в среде HotSpot Java 9+ или JRockit
  • NonBlockingHashMapLong как NonBlockingHashMap но использует примитивные длинные ключи
  • NonBlockingHashSet простая оболочка вокруг NonBlockingHashMap |/как у JDK java.util.Сборники.newSetFromMap() NonBlockingIdentityHashMap
  • like NonBlockingHashMap но сравнивает ключи по идентификатору. NonBlockingSetInt
  • многопоточный набор битовых векторов, реализованный в виде массива примитивов longs . Работает неэффективно в случае бесшумного автобокса

6. Тестирование производительности

Давайте использовать JMH для сравнения JDK ArrayBlockingQueue против Производительность очереди JCTools. JMH-это микро-бенчмарк с открытым исходным кодом от гуру Sun/Oracle JVM, который защищает нас от неопределенности алгоритмов оптимизации компилятора/jvm). Пожалуйста, не стесняйтесь получить более подробную информацию об этом в этой статье .

Обратите внимание, что в приведенном ниже фрагменте кода отсутствует несколько операторов, чтобы улучшить читаемость. Пожалуйста, найдите полный исходный код на GitHub:

public class MpmcBenchmark {

    @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK})
    public volatile String implementation;

    public volatile Queue queue;

    @Benchmark
    @Group(GROUP_NAME)
    @GroupThreads(PRODUCER_THREADS_NUMBER)
    public void write(Control control) {
        // noinspection StatementWithEmptyBody
        while (!control.stopMeasurement && !queue.offer(1L)) {
            // intentionally left blank
        }
    }

    @Benchmark
    @Group(GROUP_NAME)
    @GroupThreads(CONSUMER_THREADS_NUMBER)
    public void read(Control control) {
        // noinspection StatementWithEmptyBody
        while (!control.stopMeasurement && queue.poll() == null) {
            // intentionally left blank
        }
    }
}

Результаты (за исключением 95-го процентиля, наносекунды на операцию):

MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op

Мы видим, что | MpmcArrayQueue работает немного лучше, чем MpmcAtomicArrayQueue и ArrayBlockingQueue медленнее в два раза.

7. Недостатки использования JCTools

Использование JCTools имеет важный недостаток – невозможно обеспечить правильное использование библиотечных классов. Например, рассмотрим ситуацию, когда мы начинаем использовать MpscArrayQueue в нашем большом и зрелом проекте (обратите внимание, что должен быть один потребитель).

К сожалению, поскольку проект большой, существует вероятность того, что кто-то допустит ошибку программирования или конфигурации, и очередь теперь считывается из нескольких потоков. Система, кажется, работает, как и раньше, но теперь есть вероятность, что потребители пропустят некоторые сообщения. Это реальная проблема, которая может иметь большое влияние и которую очень трудно отладить.

В идеале должна быть возможность запустить систему с определенным системным свойством, которое заставляет инструменты обеспечивать политику доступа к потокам. Например, она может быть включена в локальных/тестовых/промежуточных средах (но не в рабочей среде). К сожалению, JCTools не предоставляет такого свойства.

Еще одно соображение заключается в том, что, хотя мы обеспечили, что JCTools значительно быстрее, чем аналог JDK, это не означает, что наше приложение набирает такую же скорость, как и при использовании пользовательских реализаций очередей. Большинство приложений не обмениваются большим количеством объектов между потоками и в основном связаны с вводом-выводом.

8. Заключение

Теперь у нас есть базовое представление о классах утилит, предлагаемых JCTools, и мы увидели, насколько хорошо они работают по сравнению с аналогами JDK при большой нагрузке.

В заключение, стоит использовать библиотеку только в том случае, если мы обмениваемся большим количеством объектов между потоками, и даже в этом случае необходимо быть очень осторожным, чтобы сохранить политику доступа к потокам.

Как всегда, полный исходный код для приведенных выше примеров можно найти на GitHub .