1. Обзор
В этом уроке мы представим библиотеку Tools (Инструменты параллелизма Java).
Проще говоря, это обеспечивает ряд полезных структур данных, подходящих для работы в многопоточной среде.
2. Неблокирующие алгоритмы
Традиционно многопоточный код, работающий в изменяемом общем состоянии, использует блокировки для обеспечения согласованности данных и публикации (изменения, внесенные одним потоком, которые видны другому).
Такой подход имеет ряд недостатков:
- потоки могут быть заблокированы при попытке получить блокировку, не продвигаясь вперед до тех пор, пока не будет завершена операция другого потока – это эффективно предотвращает параллелизм
- чем тяжелее конфликт блокировок, тем больше времени JVM тратит на планирование потоков, управление конфликтами и очередями ожидающих потоков и тем меньше реальной работы он выполняет
- взаимоблокировки возможны, если задействовано более одной блокировки, и они получены/освобождены в неправильном порядке
- возможна инверсия приоритета опасность – поток с высоким приоритетом блокируется в попытке получить блокировку, удерживаемую потоком с низким приоритетом
- большую часть времени используются крупнозернистые замки, что сильно вредит параллелизму-мелкозернистая блокировка требует более тщательного проектирования, увеличивает накладные расходы на блокировку и более подвержена ошибкам
Альтернативой является использование неблокирующего алгоритма, т. Е. алгоритма, в котором сбой или приостановка любого потока не может привести к сбою или приостановке другого потока .
Неблокирующий алгоритм не блокируется , если хотя бы один из задействованных потоков гарантированно будет выполняться в течение произвольного периода времени, т. Е. Взаимоблокировки не могут возникнуть во время обработки.
Кроме того, эти алгоритмы не требуют ожидания , если также есть гарантированный прогресс для каждого потока.
Вот неблокирующий Стек пример из превосходного Параллелизма Java на практике книги; он определяет базовое состояние:
public class ConcurrentStack{ AtomicReference > top = new AtomicReference >(); private static class Node { public E item; public Node next; // standard constructor } }
А также несколько методов API:
public void push(E item){ NodenewHead = new Node (item); Node oldHead; do { oldHead = top.get(); newHead.next = oldHead; } while(!top.compareAndSet(oldHead, newHead)); } public E pop() { Node oldHead; Node newHead; do { oldHead = top.get(); if (oldHead == null) { return null; } newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item; }
Мы видим, что алгоритм использует мелкозернистые инструкции compare-and-swap ( CAS ) и является без блокировки (даже если несколько потоков одновременно вызывают top.compareAndSet () , один из них гарантированно будет успешным), но не без ожидания , поскольку нет никакой гарантии, что CAS в конечном итоге преуспеет для какого-либо конкретного потока.
3. Зависимость
Во-первых, давайте добавим зависимость Ctools в ваш pom.xml :
org.jctools jctools-core 2.1.2
Обратите внимание, что последняя доступная версия доступна на Maven Central .
4. Очереди JCTools
Библиотека предлагает несколько очередей для использования в многопоточной среде, т. Е. один или несколько потоков записываются в очередь, а один или несколько потоков считываются из нее потокобезопасным способом без блокировки.
Общим интерфейсом для всех реализаций Queue является org.jctools.очереди.Очередь передачи сообщений .
4.1. Типы очередей
Все очереди могут быть классифицированы в соответствии с их политикой производителя/потребителя:
- один производитель, один потребитель – такие классы называются с использованием префикса Spsc , например SpscArrayQueue
- один производитель, несколько потребителей – используйте Spmc префикс, например SpmcArrayQueue
- несколько производителей, один потребитель – используйте Mpsc префикс, например MpscArrayQueue
- несколько производителей, несколько потребителей – используйте Mpmc префикс, например MpmcArrayQueue
Важно отметить, что нет внутренних проверок политики, т. Е. очередь может молча функционировать неправильно в случае неправильного использования .
Например, приведенный ниже тест заполняет очередь с одним производителем из двух потоков и проходит, даже если потребитель не гарантированно увидит данные от разных производителей:
SpscArrayQueuequeue = new SpscArrayQueue<>(2); Thread producer1 = new Thread(() -> queue.offer(1)); producer1.start(); producer1.join(); Thread producer2 = new Thread(() -> queue.offer(2)); producer2.start(); producer2.join(); Set fromQueue = new HashSet<>(); Thread consumer = new Thread(() -> queue.drain(fromQueue::add)); consumer.start(); consumer.join(); assertThat(fromQueue).containsOnly(1, 2);
4.2. Реализация очередей
Обобщая приведенные выше классификации, вот список очередей JCTools:
- SpscArrayQueue – один производитель, один потребитель, использует массив внутри, ограниченная емкость
- SpscLinkedQueue – один производитель, один потребитель, использует связанный список внутри, несвязанная емкость
- SpscChunkedArrayQueue – один производитель, один потребитель, начинается с начальной мощности и растет до максимальной мощности
- SpscGrowableArrayQueue – один производитель, один потребитель, начинается с начальной емкости и растет до максимальной емкости. Это тот же контракт , что и SpscChunkedArrayQueue , единственное отличие-управление внутренними блоками. Рекомендуется использовать SpscChunkedArrayQueue , поскольку он имеет упрощенную реализацию
- SpscUnboundedArrayQueue – один производитель, один потребитель, использует массив внутри, несвязанная емкость
- SpmcArrayQueue – один производитель, несколько потребителей, использует массив внутри, ограниченная емкость
- MpscArrayQueue – несколько производителей, один потребитель, использует массив внутри, ограниченная емкость
- MpscLinkedQueue – несколько производителей, один потребитель, использует связанный список внутри, несвязанная емкость
- MpmcArrayQueue – несколько производителей, несколько потребителей, использует массив внутри, ограниченная емкость
4.3. Атомарные очереди
Все очереди, упомянутые в предыдущем разделе, используют sun.misc.Unsafe . Однако с появлением Java 9 и JEP-260 этот API становится недоступным по умолчанию.
Таким образом, существуют альтернативные очереди, которые используют java.util.concurrent.atomic.AtomicLongFieldUpdater (публичный API, менее производительный) вместо sun.misc.Unsafe .
Они генерируются из приведенных выше очередей, и между их именами вставлено слово Atomic , например SpscChunkedAtomicArrayQueue или MpmcAtomicArrayQueue .
Рекомендуется использовать “обычные” очереди, если это возможно, и прибегать к Атомарные очереди только в средах, где sun.misc.Небезопасно запрещено/неэффективно, как HotSpot Java9+ и JRockit.
4.4. Емкость
Все очереди JCTools также могут иметь максимальную емкость или быть несвязанными. Когда очередь заполнена и ограничена емкостью, она перестает принимать новые элементы.
В следующем примере мы:
- заполните очередь
- убедитесь, что после этого он перестанет принимать новые элементы
- слейте из него воду и убедитесь, что после этого можно добавить больше элементов
Обратите внимание, что несколько операторов кода отбрасываются для удобства чтения. Полную реализацию можно найти в на GitHub :
SpscChunkedArrayQueuequeue = new SpscChunkedArrayQueue<>(8, 16); CountDownLatch startConsuming = new CountDownLatch(1); CountDownLatch awakeProducer = new CountDownLatch(1); Thread producer = new Thread(() -> { IntStream.range(0, queue.capacity()).forEach(i -> { assertThat(queue.offer(i)).isTrue(); }); assertThat(queue.offer(queue.capacity())).isFalse(); startConsuming.countDown(); awakeProducer.await(); assertThat(queue.offer(queue.capacity())).isTrue(); }); producer.start(); startConsuming.await(); Set fromQueue = new HashSet<>(); queue.drain(fromQueue::add); awakeProducer.countDown(); producer.join(); queue.drain(fromQueue::add); assertThat(fromQueue).containsAll( IntStream.range(0, 17).boxed().collect(toSet()));
5. Другие инструменты Структуры данных
JCTools также предлагает несколько структур данных без очереди.
Все они перечислены ниже:
- NonBlockingHashMap – альтернатива без блокировки ConcurrentHashMap с улучшенными свойствами масштабирования и, как правило, более низкими затратами на мутации. Он реализован через sun.misc.Unsafe , поэтому не рекомендуется использовать этот класс в среде HotSpot Java 9+ или JRockit
- NonBlockingHashMapLong – как NonBlockingHashMap но использует примитивные длинные ключи
- NonBlockingHashSet – простая оболочка вокруг NonBlockingHashMap |/как у JDK java.util.Сборники.newSetFromMap() NonBlockingIdentityHashMap
- – like NonBlockingHashMap но сравнивает ключи по идентификатору. NonBlockingSetInt
- – многопоточный набор битовых векторов, реализованный в виде массива примитивов longs . Работает неэффективно в случае бесшумного автобокса
6. Тестирование производительности
Давайте использовать JMH для сравнения JDK ArrayBlockingQueue против Производительность очереди JCTools. JMH-это микро-бенчмарк с открытым исходным кодом от гуру Sun/Oracle JVM, который защищает нас от неопределенности алгоритмов оптимизации компилятора/jvm). Пожалуйста, не стесняйтесь получить более подробную информацию об этом в этой статье .
Обратите внимание, что в приведенном ниже фрагменте кода отсутствует несколько операторов, чтобы улучшить читаемость. Пожалуйста, найдите полный исходный код на GitHub:
public class MpmcBenchmark { @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK}) public volatile String implementation; public volatile Queuequeue; @Benchmark @Group(GROUP_NAME) @GroupThreads(PRODUCER_THREADS_NUMBER) public void write(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && !queue.offer(1L)) { // intentionally left blank } } @Benchmark @Group(GROUP_NAME) @GroupThreads(CONSUMER_THREADS_NUMBER) public void read(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && queue.poll() == null) { // intentionally left blank } } }
Результаты (за исключением 95-го процентиля, наносекунды на операцию):
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op
Мы видим, что | MpmcArrayQueue работает немного лучше, чем MpmcAtomicArrayQueue и ArrayBlockingQueue медленнее в два раза.
7. Недостатки использования JCTools
Использование JCTools имеет важный недостаток – невозможно обеспечить правильное использование библиотечных классов. Например, рассмотрим ситуацию, когда мы начинаем использовать MpscArrayQueue в нашем большом и зрелом проекте (обратите внимание, что должен быть один потребитель).
К сожалению, поскольку проект большой, существует вероятность того, что кто-то допустит ошибку программирования или конфигурации, и очередь теперь считывается из нескольких потоков. Система, кажется, работает, как и раньше, но теперь есть вероятность, что потребители пропустят некоторые сообщения. Это реальная проблема, которая может иметь большое влияние и которую очень трудно отладить.
В идеале должна быть возможность запустить систему с определенным системным свойством, которое заставляет инструменты обеспечивать политику доступа к потокам. Например, она может быть включена в локальных/тестовых/промежуточных средах (но не в рабочей среде). К сожалению, JCTools не предоставляет такого свойства.
Еще одно соображение заключается в том, что, хотя мы обеспечили, что JCTools значительно быстрее, чем аналог JDK, это не означает, что наше приложение набирает такую же скорость, как и при использовании пользовательских реализаций очередей. Большинство приложений не обмениваются большим количеством объектов между потоками и в основном связаны с вводом-выводом.
8. Заключение
Теперь у нас есть базовое представление о классах утилит, предлагаемых JCTools, и мы увидели, насколько хорошо они работают по сравнению с аналогами JDK при большой нагрузке.
В заключение, стоит использовать библиотеку только в том случае, если мы обмениваемся большим количеством объектов между потоками, и даже в этом случае необходимо быть очень осторожным, чтобы сохранить политику доступа к потокам.
Как всегда, полный исходный код для приведенных выше примеров можно найти на GitHub .