Рубрики
Без рубрики

Противодавление в реактивных системах

В середине января я провел лекцию на Kotlin.amsterdam, основанную на моем посте о переходе с императивного на реактивный… С тегами java, kotlin, reactive, противодавление.

В середине января я провел доклад на Kotlin.amsterdam на основе моего поста Переход от императивного к реактивному (приложение для весенней загрузки). Поскольку это была встреча на Kotlin, я продемонстрировал код Kotlin и добавил шаг, перенеся кодовую базу на сопрограммы. Во время вопросов и ответов кто-то спросил, реализовано ли в сопрограммах противодавление. Признаюсь, я не был уверен в ответе, поэтому провел небольшое исследование.

Этот пост содержит информацию о противодавлении в целом и о том, как RxJava (v3), Project Reactor и Сопрограммы Kotlin справятся с этим.

Что такое противодавление?

Противодавление (или противодавление) – это сопротивление или сила, противодействующие желаемому потоку жидкости по трубам, что приводит к потере на трение и падению давления.

Термин противодавление является неправильным, поскольку давление является скалярной величиной, поэтому оно имеет величину, но не направление.

Википедия

В программном обеспечении противодавление имеет немного схожее, но все же другое значение: учитывая быстрый производитель данных и медленного потребителя данных, противодавление – это механизм, который “отталкивает” производителя, чтобы он не был перегружен данными.

Независимо от того, основано ли оно на reactivestreams.org или Java java.util.concurrent. Поток , Реактивные потоки обеспечивают четыре строительных блока

  1. Издатель , который испускает элементы
  2. Подписчик , который реагирует когда элементы получены
  3. Подписка , которая связывает Издателя и Подписчик
  4. И Процессор

Вот диаграмма классов:

Подписка лежит в основе обратного давления с помощью метода request() .

Технические характеристики довольно просты:

Подписчик ДОЛЖЕН подать сигнал о запросе через Subscription.request(long n) для получения на следующем сигналы.

Цель этого правила состоит в том, чтобы установить, что ответственность за принятие решения о том, когда и сколько элементов он может и желает получать, лежит на Подписчике. Чтобы избежать изменения порядка сигналов, вызванного методами повторной подписки, настоятельно РЕКОМЕНДУЕТСЯ для синхронных реализаций подписчиков вызывать методы подписки в самом конце любой обработки сигнала. Рекомендуется, чтобы Подписчики запрашивали верхний предел того, что они могут обработать, поскольку запрос только одного элемента за раз приводит к изначально неэффективному протоколу “стоп-и-ожидание” .

Спецификации реактивных потоков для JVM

Технические характеристики реактивных потоков довольно солидны. Они также поставляются с TCK на основе Java .

Но определение того, как управлять элементами, отправленными производителем, которые не могут быть обработаны ниже по потоку, выходит за рамки спецификаций. Хотя проблема довольно проста, возможны различные решения. Каждый реактивный фреймворк предоставляет несколько вариантов, поэтому давайте рассмотрим их по очереди.

Противодавление в RxJava 3

RxJava v3 предоставляет несколько базовых классов:

Текучий Поток из 0..N элементов. Он поддерживает реактивные потоки и противодавление.
Наблюдаемый Поток из 0..N элементов. Он не поддерживает противодавление.
Одиночный Поток ровно: 1 элемент или ошибка
Может быть Поток, содержащий либо: нет элементов, ровно один элемент, либо ошибку
Завершаемый Поток, в котором нет элемента, кроме: либо завершения, либо сигнала об ошибке

Среди этих классов Flowable – это единственный класс, который реализует Реактивные потоки и противодавление. Тем не менее, обеспечение обратного давления – не единственная проблема. Как говорится в Rxjava wiki:

Противодавление не устраняет проблему перепроизводства Наблюдаемого или недостаточного потребления Абонента. Это просто перемещает проблему вверх по цепочке операторов к точке, где с ней можно справиться лучше.

Реактивное противодавление тяги – это не магия

Чтобы справиться с этим, RxJava предлагает две основные стратегии для обработки “перепроизводимых” товаров:

  1. Храните элементы в буфере

    Обратите внимание, что если вы не зададите верхнюю границу буфера, это может привести к Ошибка OutOfMemoryError .

  2. Отбрасывайте предметы

На следующей диаграмме представлены различные методы, которые реализуют эти стратегии:

Обратите внимание, что при обратном давлении Последний оператор аналогичен использованию onBackpressureBuffer(1) :

Обратите внимание, что я взял приведенные выше мраморные диаграммы из Rxjava wiki.

По сравнению с другими фреймворками, RxJava предлагает методы для отправки сигнала исключения переполнения после отправки всех элементов. Они позволяют потребителю получать товары и по-прежнему получать уведомления о том, что производитель удалил товары.

Противодавление в проектном реакторе

Стратегии, предлагаемые Project Reactor, аналогичны стратегиям RxJava.

Однако API-интерфейсы имеют некоторые незначительные различия. Например, Project Reactor предлагает удобный метод для создания исключения, если производитель переполняется:

var stream = Stream.generate(Math::random);

// RxJava
Flowable.fromStream(stream)        // 1
        .onBackpressureBuffer(0);  // 2

// Project Reactor
Flux.fromStream(stream)            // 1
    .onBackpressureError();        // 2
  1. Создайте реактивный поток
  2. Выбрасывать, если производитель переполняется

Вот диаграмма классов Flux , которая подчеркивает возможности обратного давления:

По сравнению с другими фреймворками, Project Reactor предлагает методы для установки TTL для буферизованных элементов, чтобы предотвратить их переполнение.

Противодавление в сопрограммах

Сопрограммы действительно предлагают те же возможности буферизации и удаления. Базовым классом в сопрограммах является Flow .

Вы можете использовать такие классы, как этот:

flow {                              // 1
  while (true) emit(Math.random())  // 2
}.buffer(10)                        // 3
  1. Создать Поток какой контент определяется следующим блоком
  2. Определите Поток содержимое
  3. Установите емкость буфера равной 10

Вывод

В целом, сопрограммы RxJava, Project Reactor и Kotlin предоставляют возможности противодавления. Все они справляются с производителем, который работает быстрее, чем его подписчик, предлагая две стратегии: либо буферизовать элементы, либо отбрасывать их.

Спасибо, мой друг Олег Докука за его любезный отзыв.

Чтобы идти дальше:

Первоначально опубликовано на Фанат Java 14 марта th 2021

Оригинал: “https://dev.to/nfrankel/backpressure-in-reactive-systems-3370”