Рубрики
Без рубрики

Параллелизм в весеннем веб-потоке

Параллелизм в весеннем веб – потоке зависит от многих факторов. Давайте рассмотрим их более подробно.

Автор оригинала: Kumar Chandrakant.

1. введение

В этом уроке мы рассмотрим параллелизм в реактивных программах, написанных с помощью Spring Web Flux .

Мы начнем с обсуждения параллелизма в отношении реактивного программирования. После этого мы рассмотрим, как Spring Web Flux предлагает абстракции параллелизма в различных библиотеках реактивных серверов.

2. Мотивация для реактивного программирования

Типичное веб-приложение состоит из нескольких сложных взаимодействующих частей . Многие из этих взаимодействий являются блокирующими по своей природе как, например, те, которые связаны с вызовом базы данных для извлечения или обновления данных. Несколько других, однако, являются независимыми и могут выполняться одновременно, возможно, параллельно.

Например, два запроса пользователя к веб-серверу могут обрабатываться разными потоками. На многоядерной платформе это имеет очевидное преимущество с точки зрения общего времени отклика. Следовательно, эта модель параллелизма известна как модель потока на запрос :

На приведенной выше схеме каждый поток обрабатывает по одному запросу за раз.

В то время как параллелизм на основе потоков решает для нас часть проблемы, он ничего не делает для решения того факта, что большинство наших взаимодействий в рамках одного потока все еще блокируются . Кроме того, собственные потоки, которые мы используем для достижения параллелизма в Java, имеют значительные затраты с точки зрения переключения контекста.

Тем временем, по мере того как веб-приложения сталкиваются со все большим количеством запросов, модель поток на запрос начинает не соответствовать ожиданиям .

Следовательно, нам нужна модель параллелизма, которая может помочь нам обрабатывать все больше запросов с относительно меньшим количеством потоков . Это одна из основных мотиваций для принятия реактивного программирования .

3. Параллелизм в реактивном программировании

Реактивное программирование помогает нам структурировать программу с точки зрения потоков данных и распространения изменений через них . Следовательно, в полностью неблокирующей среде это может позволить нам достичь более высокого параллелизма с лучшим использованием ресурсов.

Однако является ли реактивное программирование полным отходом от параллелизма на основе потоков? Хотя это сильное утверждение, реактивное программирование, безусловно, имеет совершенно другой подход к использованию потоков для достижения параллелизма . Итак, фундаментальное различие, которое приносит реактивное программирование, – это асинхронность.

Другими словами, поток программы превращается из последовательности синхронных операций в асинхронный поток событий.

Например, в рамках реактивной модели вызов чтения базы данных не блокирует вызывающий поток во время извлечения данных. Вызов немедленно возвращает издателя, на которого другие могут подписаться . Подписчик может обработать событие после того, как оно произойдет, и может даже в дальнейшем генерировать события самостоятельно:

Прежде всего, реактивное программирование не подчеркивает, какие события потока должны генерироваться и потребляться. Акцент делается, скорее, на структурировании программы как асинхронного потока событий .

Издатель и подписчик здесь не обязательно должны быть частью одного потока. Это помогает нам лучше использовать доступные потоки и, следовательно, повысить общий параллелизм.

4. Цикл событий

Существует несколько моделей программирования, которые описывают реактивный подход к параллелизму .

В этом разделе мы рассмотрим некоторые из них, чтобы понять, как реактивное программирование обеспечивает более высокий параллелизм с меньшим количеством потоков.

Одной из таких моделей реактивного асинхронного программирования для серверов является цикл событий модель :

Выше приведен абстрактный дизайн цикла событий , который представляет идеи реактивного асинхронного программирования:

  • Цикл событий выполняется непрерывно в одном потоке , хотя у нас может быть столько циклов событий , сколько доступно ядер
  • Цикл событий обрабатывает события из очереди событий последовательно и возвращается немедленно после регистрации обратного вызова на платформе
  • платформа может инициировать завершение операции, такой как вызов базы данных или вызов внешней службы
  • То цикл событий может вызвать обратный звонок на завершение операции уведомление и отправка результата исходному вызывающему абоненту

цикл событий модель реализована в ряде платформ, включая Node.js , Нетти и Ngnix . Они предлагают гораздо лучшую масштабируемость, чем традиционные платформы , такие как Apache HTTP Server , Tomcat или JBoss .

5. Реактивное программирование с помощью пружинного веб-потока

Теперь у нас достаточно знаний о реактивном программировании и его модели параллелизма, чтобы изучить эту тему в Spring WebFlux.

Web Flux-это //Spring ‘s веб-фреймворк с реактивным стеком , который был добавлен в версии 5.0.

Давайте рассмотрим серверный стек Spring WebFlux, чтобы понять, как он дополняет традиционный веб-стек Spring:

Как мы видим, Spring Web Flux находится параллельно традиционному веб-фреймворку в Spring и не обязательно заменяет его .

Здесь следует отметить несколько важных моментов:

  • Spring Web Flux расширяет традиционную модель программирования на основе аннотаций с функциональной маршрутизацией
  • Кроме того, он адаптирует базовые среды выполнения HTTP к API реактивных потоков , делая среды выполнения совместимыми
  • Следовательно, он способен поддерживать широкий спектр реактивных сред выполнения, включая контейнеры сервлета 3.1+, такие как Tomcat, Reactor, Netty или Undertow
  • Наконец, он включает в себя Web Client , реактивный и неблокирующий клиент для HTTP-запросов, предлагающий функциональные и свободные API

6. Модель потоковой передачи в поддерживаемых средах выполнения

Как мы уже обсуждали ранее, реактивные программы, как правило, работают всего с несколькими потоками и используют их по максимуму. Однако количество и характер потоков зависят от фактической среды выполнения API реактивного потока, которую мы выбираем.

Чтобы уточнить, Spring Web Flux может адаптироваться к различным средам выполнения с помощью общего API, предоставляемого HttpHandler . Этот API представляет собой простой контракт только с одним методом, который обеспечивает абстракцию над различными серверными API, такими как Reactor Netty, Servlet 3.1 API или Undertow API.

Давайте теперь разберемся в модели потоковой передачи, реализованной в некоторых из них.

В то время как Nettie является сервером по умолчанию в приложении Web Flux, это просто вопрос объявления правильной зависимости для переключения на любой другой поддерживаемый сервер :


    org.springframework.boot
    spring-boot-starter-webflux
    
        
            org.springframework.boot
            spring-boot-starter-reactor-netty
        
    


    org.springframework.boot
    spring-boot-starter-tomcat

Хотя можно наблюдать потоки, созданные в виртуальной машине Java несколькими способами, довольно легко просто вытащить их из самого класса Thread :

Thread.getAllStackTraces()
  .keySet()
  .stream()
  .collect(Collectors.toList());

6.1. Реакторная Нетти

Как мы уже говорили, Reactor Netty является встроенным сервером по умолчанию в стартере Spring Boot Web Flux. Давайте попробуем увидеть потоки, которые Нетти создает по умолчанию. Следовательно, в начале мы не будем добавлять никаких других зависимостей или использовать WebClient. Таким образом, если мы запустим приложение Spring Web Flux, созданное с помощью стартера Spring Boot, мы можем ожидать, что увидим некоторые потоки по умолчанию, которые он создает:

Обратите внимание, что, помимо обычного потока для сервера, Netty порождает кучу рабочих потоков для обработки запросов . Обычно это не более чем доступные ядра процессора. Это вывод на четырехъядерной машине. Мы также увидим кучу потоков домашнего хозяйства, типичных для среды JVM, но они здесь не важны.

Netty использует модель цикла событий для обеспечения масштабируемого параллелизма в реактивной асинхронной манере. Давайте посмотрим, как Netty реализует цикл событий используя Java NIO для обеспечения этой масштабируемости :

Здесь EventLoopGroup управляет одним или несколькими Циклами событий , которые должны выполняться непрерывно . Следовательно, не рекомендуется создавать больше Циклов событий , чем количество доступных ядер.

EventLoopGroup далее назначает Цикл событий каждому вновь созданному Каналу . Таким образом, в течение всего срока службы канала все операции выполняются одним и тем же потоком.

6.2. Apache Tomcat

Spring Web Flux также поддерживается в традиционном контейнере сервлетов, например Apache Tomcat .

WebFlux полагается на API сервлета 3.1 с неблокирующим вводом-выводом . Хотя он использует API сервлета за низкоуровневым адаптером, API сервлета недоступен для прямого использования.

Давайте посмотрим, какие потоки мы ожидаем в приложении Web Flux, работающем на Tomcat:

Количество и тип потоков, которые мы можем там видеть, сильно отличаются от того, что мы наблюдали ранее.

Начнем с того, что Tomcat начинается с большего количества рабочих потоков, которые по умолчанию равны десяти . Конечно, мы также увидим некоторые потоки домашнего хозяйства, типичные для JVM, и контейнер Catalina, которые мы можем игнорировать для этого обсуждения.

Давайте разберемся в архитектуре Tomcat с Java NIO, чтобы соотнести ее с потоками, которые мы видим выше.

Tomcat 5 и далее поддерживает NIO в своем компоненте соединителя, который в первую очередь отвечает за получение запросов .

Другим компонентом Tomcat является компонент контейнера, который отвечает за функции управления контейнерами.

Здесь нас интересует модель потоковой передачи, которую компонент Connector реализует для поддержки NIO. Он состоит из Акцептора , опросника, и Рабочего в составе модуля NioEndpoint :

Tomcat порождает один или несколько потоков для Acceptor , Poller и Worker с обычно пулом потоков, выделенным для Worker .

Хотя подробное обсуждение архитектуры Tomcat выходит за рамки этого руководства, теперь у нас должно быть достаточно информации, чтобы понять потоки, которые мы видели ранее.

7. Модель потоковой передачи в веб-клиенте

Веб-клиент является реактивным HTTP-клиентом, который является частью Spring Web Flux . Мы можем использовать его в любое время, когда нам требуется связь на основе REST, и это позволяет нам создавать приложения, которые являются сквозными | реактивными .

Как мы уже видели ранее, реактивные приложения работают всего с несколькими потоками, и поэтому ни одна часть приложения не может блокировать поток. Следовательно, Веб-клиент играет жизненно важную роль в том, чтобы помочь нам реализовать потенциал веб-потока.

7.1. Использование WebClient

Использование WebClient также довольно просто. Нам не нужно включать какие-либо конкретные зависимости, поскольку это часть Spring Web Flux .

Давайте создадим простую конечную точку REST, которая возвращает Моно:

@GetMapping("/index")
public Mono getIndex() {
    return Mono.just("Hello World!");
}

Затем мы будем использовать WebClient для вызова этой конечной точки REST и реактивного использования данных:

WebClient.create("http://localhost:8080/index").get()
  .retrieve()
  .bodyToMono(String.class)
  .doOnNext(s -> printThreads());

Здесь мы также печатаем потоки, созданные с помощью метода, который мы обсуждали ранее.

7.2. Понимание модели резьбы

Итак, как работает модель потоковой передачи в случае WebClient ?

Что ж, неудивительно, что Веб-клиент также реализует параллелизм с использованием модели цикла событий . Конечно, он полагается на базовую среду выполнения для обеспечения необходимой инфраструктуры.

Если мы запускаем WebClient на реакторе Netty, он разделяет цикл событий, который Netty использует для сервера . Следовательно, в этом случае мы можем не заметить большой разницы в создаваемых потоках.

Однако Веб-клиент также поддерживается в контейнере сервлета 3.1+, таком как Jetty, но способ его работы там отличается .

Если мы сравним потоки , созданные в приложении Web Flux под управлением Jetty с WebClient и без него, мы заметим несколько дополнительных потоков.

Здесь/| Веб-клиент должен создать свой цикл событий . Таким образом, мы можем видеть фиксированное количество потоков обработки, которые создает этот цикл событий:

В некоторых случаях наличие отдельного пула потоков для клиента и сервера может обеспечить лучшую производительность . Хотя это не поведение по умолчанию для Netty, всегда можно объявить выделенный пул потоков для Веб-клиента , если это необходимо.

Мы увидим, как это возможно, в следующем разделе.

8. Модель потоковой передачи в библиотеках доступа к данным

Как мы видели ранее, даже простое приложение обычно состоит из нескольких частей, которые необходимо соединить.

Типичные примеры этих частей включают базы данных и брокеры сообщений. Существующие библиотеки для подключения ко многим из них все еще блокируются, но это быстро меняется.

В настоящее время существует несколько баз данных, которые предлагают реактивные библиотеки для подключения . Многие из этих библиотек доступны в Spring Data , в то время как мы можем использовать и другие напрямую.

Модель потоковой передачи, которую используют эти библиотеки, представляет для нас особый интерес.

8.1. Весенние данные MongoDB

Spring Data MongoDB обеспечивает поддержку реактивного репозитория для MongoDB, построенного поверх драйвера реактивных потоков MongoDB//. Наиболее примечательно, что этот драйвер полностью реализует API реактивных потоков для обеспечения асинхронной обработки потока с неблокирующим обратным давлением .

Настройка поддержки реактивного репозитория для MongoDB в приложении Spring Boot так же проста, как добавление зависимости:


    org.springframework.boot
    spring-boot-starter-data-mongodb-reactive

Это позволит нам создать репозиторий и использовать его для выполнения некоторых основных операций с MongoDB неблокирующим способом:

public interface PersonRepository extends ReactiveMongoRepository {
}
.....
personRepository.findAll().doOnComplete(this::printThreads);

Итак, какие потоки мы можем ожидать увидеть, когда запустим это приложение на сервере Netty?

Что ж, неудивительно, что мы не увидим большой разницы, поскольку a Spring Data reactive repository использует тот же цикл событий, который доступен для сервера.

8.2. Реактор Кафки

Весна все еще находится в процессе создания полноценной поддержки реактивной Кафки. Тем не менее, у нас есть варианты, доступные за пределами Spring.

Reactor Kafka – это реактивный API для Кафки на основе Reactor . Reactor Kafka позволяет публиковать и использовать сообщения с использованием функциональных API, а также с неблокирующим обратным давлением .

Во-первых, нам нужно добавить необходимую зависимость в наше приложение, чтобы начать использовать Reactor Kafka:


    io.projectreactor.kafka
    reactor-kafka
    1.2.2.RELEASE

Это должно позволить нам создавать сообщения Кафке неблокирующим способом:

// producerProps: Map of Standard Kafka Producer Configurations
SenderOptions senderOptions = SenderOptions.create(producerProps);
KafkaSender sender =  KafkaSender.create(senderOptions);
Flux> outboundFlux = Flux
  .range(1, 10)
  .map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i));
sender.send(outboundFlux).subscribe();

Точно так же мы должны быть в состоянии потреблять сообщения от Кафки также неблокирующим образом:

// consumerProps: Map of Standard Kafka Consumer Configurations
ReceiverOptions receiverOptions = ReceiverOptions.create(consumerProps);
receiverOptions.subscription(Collections.singleton("reactive-test"));
KafkaReceiver receiver = KafkaReceiver.create(receiverOptions);
Flux> inboundFlux = receiver.receive();
inboundFlux.doOnComplete(this::printThreads)

Это довольно просто и понятно.

Мы подписываемся на тему реактивный тест в Кафке и получаем Поток сообщений.

Интересная вещь для нас-это потоки, которые создаются :

Мы видим несколько потоков, которые не типичны для сервера Netty .

Это указывает на то, что Reactor Kafka управляет собственным пулом потоков с несколькими рабочими потоками, которые участвуют исключительно в обработке сообщений Kafka. Конечно, мы увидим кучу других потоков, связанных с Netty и JVM, которые мы можем игнорировать.

Производители Кафки используют отдельный сетевой поток для отправки запросов брокеру. Кроме того, они доставляют ответы приложению на однопоточном объединенном планировщике .

С другой стороны, у потребителя Кафки есть один поток на группу потребителей – он блокирует прослушивание входящих сообщений. Затем входящие сообщения планируется обработать в другом пуле потоков.

9. Параметры планирования в веб-потоке

До сих пор мы видели, что реактивное программирование действительно сияет в совершенно неблокирующей среде всего с несколькими потоками . Но это также означает, что, если действительно есть блокирующая часть, это приведет к гораздо худшей производительности. Это связано с тем, что операция блокировки может полностью заморозить цикл событий.

Итак, как мы обрабатываем длительные процессы или блокирующие операции в реактивном программировании?

Честно говоря, лучшим вариантом было бы просто избегать их. Однако это не всегда возможно, и нам может потребоваться специальная стратегия планирования для этих частей нашего приложения .

Spring Web Flux предлагает механизм переключения обработки в другой пул потоков между цепочкой потоков данных . Это может обеспечить нам точный контроль над стратегией планирования, которую мы хотим для определенных задач. Конечно, WebFlux может предложить это на основе абстракций пула потоков, известных как планировщики, доступных в базовых реактивных библиотеках.

9.1. Реактор

В Reactor класс Scheduler определяет модель выполнения , а также место выполнения .

Класс Schedulers предоставляет ряд контекстов выполнения, таких как immediate , single , elastic и parallel|/.

Они предоставляют различные типы пулов потоков, которые могут быть полезны для различных заданий. Более того, мы всегда можем создать свой собственный График с уже существующим ExecutorService .

В то время как Планировщики предоставляют нам несколько контекстов выполнения, Reactor также предоставляет нам различные способы переключения контекста выполнения . Это методы опубликовать и подписаться на .

Мы можем использовать publishing с Планировщиком в любом месте цепочки, при этом Планировщик влияет на все последующие операторы.

Хотя мы также можем использовать subscribeOn с Планировщиком в любом месте цепочки, это повлияет только на контекст источника излучения.

Если мы вспомним, WebClient на Netty использует тот же цикл событий , созданный для сервера в качестве поведения по умолчанию. Однако у нас могут быть веские причины для создания выделенного пула потоков для веб-клиента.

Давайте посмотрим, как мы можем достичь этого в Reaktor, который является реактивной библиотекой по умолчанию в WebFlux:

Scheduler scheduler = Schedulers.newBoundedElastic(5, 10, "MyThreadGroup");

WebClient.create("http://localhost:8080/index").get()
  .retrieve()
  .bodyToMono(String.class)
  .publishOn(scheduler)
  .doOnNext(s -> printThreads());

Ранее мы не наблюдали никакой разницы в потоках, созданных на Netty с WebClient или без него . Однако, если мы сейчас запустим приведенный выше код, мы увидим, что создается несколько новых потоков :

Здесь мы можем видеть потоки, созданные как часть нашего ограниченного пула эластичных потоков . Именно здесь публикуются ответы от Веб-клиента после подписки.

Это оставляет основной пул потоков для обработки запросов сервера.

9.2. RxJava

Поведение по умолчанию в RxJava не сильно отличается от поведения реактора .

Observable и цепочка операторов , которые мы применяем к нему, выполняют свою работу – и уведомляют наблюдателей – в том же потоке, где была вызвана подписка. Кроме того, RxJava , как и Reactor, предлагает способы введения в цепочку префиксных или пользовательских стратегий планирования.

RxJava также имеет класс Планировщиков , который предлагает ряд моделей выполнения для Наблюдаемой цепочки . К ним относятся новый поток , немедленный , батут , io , вычисление и тест . Конечно, это также позволяет нам определить Планировщик | из Java Исполнителя .

Кроме того, RxJava также предлагает два метода расширения для достижения этой цели , subscribeOn и observeOn .

Метод subscribe изменяет поведение по умолчанию, указывая другой Планировщик , на котором должен работать Observable .

С другой стороны, метод observeOn указывает другой планировщик, который Observable может использовать для отправки уведомлений наблюдателям.

Как мы уже обсуждали ранее, Spring Web Flux по умолчанию использует Reactor в качестве своей реактивной библиотеки. Но, поскольку он полностью совместим с API реактивных потоков, можно переключиться на другую реализацию реактивных потоков, такую как RxJava (для RxJava 1.x с адаптером реактивных потоков).

Нам нужно явно добавить зависимость:


    io.reactivex.rxjava2
    rxjava
    2.2.19

Затем мы можем начать использовать типы RxJava, такие как Observable , в нашем приложении вместе с конкретными планировщиками RxJava |:

io.reactivex.Observable
    .fromIterable(Arrays.asList("Tom", "Sawyer"))
    .map(s -> s.toUpperCase())
    .observeOn(io.reactivex.schedulers.Schedulers.trampoline())
    .doOnComplete(this::printThreads);

В результате, если мы запустим это приложение, помимо обычных потоков, связанных с Netty и JVM , мы увидим несколько потоков, связанных с нашим планировщиком RxJava |:

10. Заключение

В этой статье мы исследовали предпосылки реактивного программирования в контексте параллелизма.

Мы наблюдали разницу в модели параллелизма в традиционном и реактивном программировании. Это позволило нам изучить модель параллелизма в Spring Web Flux и ее использование в модели потоков для ее достижения.

Далее мы исследовали модель потоковой передачи в Web Flux в сочетании с различными библиотеками HttpRuntime и reactive.

Мы также обсудили, чем отличается модель потоков при использовании WebClient или библиотеки доступа к данным.

Наконец, мы коснулись вариантов управления стратегией планирования в нашей реактивной программе в WebFlux.

Как всегда, исходный код этой статьи можно найти на GitHub .