Рубрики
Без рубрики

Учебное пособие по пружинному реактору

Автор оригинала: Dhananjay Singh.

Обзор

В этой статье мы познакомимся с проектом Spring Reactor и его важностью. Идея состоит в том, чтобы воспользоваться спецификацией Реактивных потоков для создания неблокирующих реактивных приложений на JVM.

Используя эти знания, мы создадим простое реактивное приложение и сравним его с традиционным блокирующим приложением.

Реактивные приложения-это “горячая новинка”, из-за которой многие приложения переключаются на эту модель. Вы можете прочитать больше об этом в Реактивный манифест .

Мотивация

Обычные API-интерфейсы блокируют

Современные приложения работают с большим количеством одновременных пользователей и данных. Закон Мура больше не действует так, как раньше. Аппаратные возможности, хотя и увеличиваются, не поспевают за современными приложениями, где производительность очень важна.

Разработчики Java по умолчанию пишут блокирующий код. Это просто то, как был настроен API. Другим примером может быть традиционный подход сервлета ( Tomcat ). Каждый запрос гарантирует новый поток, который ожидает завершения всего фонового процесса, чтобы отправить ответ обратно.

Это означает, что наша логика уровня данных по умолчанию блокирует приложение, так как Потоки праздно ждут ответа. Расточительно не использовать эти Потоки для других целей, пока мы ждем ответа, чтобы вернуться.

Кредит: Кредит:

Примечание: Это может быть проблемой, если у нас ограниченные ресурсы или выполнение процесса занимает слишком много времени.

Асинхронные неподвижные блоки

В Java вы можете писать код асинхронно, используя Обратные вызовы и Фьючерсы . Затем вы можете получить и объединить потоки в какой-то более поздний момент времени и обработать результат. Java 8 представила нам новый класс – CompletableFuture , который значительно облегчает координацию этих действий.

Это работает простым способом – когда заканчивается один процесс, начинается другой. После завершения второго процесса результаты объединяются в третий процесс.

Это значительно облегчает координацию вашего приложения, но в конечном итоге оно все равно блокируется, поскольку создает потоки и ожидает вызова метода .join () .

Кредит: Кредит:

Реактивное программирование

Нам нужно асинхронное и неблокирующее . Группа разработчиков из таких компаний, как Netflix, Pivotal, Red Hat и т.д., Собрались вместе и сошлись на чем-то, что называется Спецификация реактивных потоков .

Project Reactor-это реализация Spring спецификации Reactive, и ей особенно нравится модуль Spring Webflux , хотя вы можете использовать его с другими модулями, такими как RxJava .

Идея состоит в том, чтобы работать Асинхронно с Обратным давлением с использованием Издателей и Подписчиков .

Здесь мы знакомимся с несколькими новыми концепциями! Давайте объясним их по порядку:

  • ИздательИздатель является поставщиком потенциально неограниченного числа элементов.
  • ПодписчикПодписчик слушает этого Издателя, запрашивая новые данные. Иногда его также называют Потребителем .
  • Обратное давление – Способность Подписчика сообщать Издателю, сколько запросов он может обработать за один раз. Таким образом, за поток данных отвечает Подписчик , а не Издатель , поскольку он просто предоставляет данные.

Проект “Реактор” предлагает 2 типа издателей. Они считаются основными строительными блоками Spring Web flux :

  • Flux – это издатель, который производит 0 к N значениям. Это может быть безгранично. Операции, возвращающие несколько элементов, используют этот тип.
  • Mono – это издатель, который производит 0 чтобы 1 ценность. Операции, возвращающие один элемент, используют этот тип.

Разработка Реактивных Приложений

Учитывая все вышесказанное, давайте перейдем к созданию простого веб-приложения и воспользуемся преимуществами этой новой реактивной парадигмы!

Самый простой способ начать со скелетного загрузочного проекта Spring, как всегда, – это использовать Spring Initializr . Выберите предпочитаемую версию Spring Boot и добавьте зависимость “Реактивный веб”. После этого сгенерируйте его как проект Maven, и все готово!

Давайте определим простое POJO – Приветствие :

public class Greeting {
    private String msg;
    // Constructors, getters and setters
}

Определение издателя

Наряду с этим, давайте определим простой контроллер REST с адекватным отображением:

@RestController
public class GreetReactiveController {
    @GetMapping("/greetings")
    public Publisher greetingPublisher() {
        Flux greetingFlux = Flux.generate(sink -> sink.next(new Greeting("Hello"))).take(50);
        return greetingFlux;
    }
}

Вызов Flux .generate() создаст бесконечный поток Приветствия объекта.

Метод take () , как следует из названия, будет принимать только первые 50 значений из потока.

Важно отметить, что возвращаемым типом метода является асинхронный тип Publisher<Приветствие> .

Чтобы проверить эту конечную точку, перейдите в свой браузер по адресу http://localhost:8080/greetings или используйте curl клиент в командной строке – curl localhost:8080/приветствия

Вам будет предложено дать ответ, который выглядит примерно так:

Это не выглядит таким уж большим делом, и мы могли бы просто вернуть Список<Приветствие> для достижения того же визуального результата.

Но опять же, обратите внимание, что мы возвращаем Поток<Приветствие> , который является асинхронным типом, так как это все меняет.

Предположим, у нас был издатель, который вернул более тысячи записей или даже больше. Подумайте о том, что должна делать структура. Ему задан объект типа Приветствие , который он должен преобразовать в JSON для конечного пользователя.

Если бы мы использовали традиционный подход с Spring MVC, эти объекты продолжали бы накапливаться в вашей оперативной памяти, и как только она соберет все, она вернет это клиенту. Это может превысить объем нашей оперативной памяти, а также заблокировать обработку любых других операций в то же время.

Когда мы используем Spring Web flux, вся внутренняя динамика меняется. Платформа начинает подписываться на эти записи от издателя, сериализует каждый элемент и отправляет его обратно клиенту по частям.

Мы делаем все асинхронно, не создавая слишком много потоков и не повторно используя потоки, которые чего-то ждут. Самое приятное то, что вам не нужно делать для этого ничего лишнего. В традиционном Spring MVC мы могли бы добиться того же, вернув AsyncResult , DefferedResult и т. Д. , Чтобы получить некоторую асинхронность, Но внутренне Spring MVC пришлось создать новый поток, который блокируется, так как ему приходится ждать.

События, отправленные Сервером

Другим издателем, который использовался с момента их прибытия, являются События, отправленные сервером .

Эти события позволяют веб-странице получать обновления с сервера в режиме реального времени.

Давайте определим простой реактивный сервер:

@GetMapping(value = "/greetings/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher sseGreetings() {
    Flux delayElements = Flux
            .generate(sink -> sink.next(new Greeting("Hello @" + Instant.now().toString())))
            .delayElements(Duration.ofSeconds(1));
    return delayElements;
}

В качестве альтернативы мы могли бы определить это:

@GetMapping(value = "/greetings/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux events() {
    Flux greetingFlux = Flux.fromStream(Stream.generate(() -> new Greeting("Hello @" + Instant.now().toString())));
    Flux durationFlux = Flux.interval(Duration.ofSeconds(1));
    return Flux.zip(greetingFlux, durationFlux).map(Tuple2::getT1);
}

Эти методы создают значение TEXT_EVENT_STREAM_VALUE , что, по сути, означает, что данные отправляются в виде событий, отправляемых сервером.

Git Essentials

Ознакомьтесь с этим практическим руководством по изучению Git, содержащим лучшие практики и принятые в отрасли стандарты. Прекратите гуглить команды Git и на самом деле изучите это!

Обратите внимание, что в первом примере мы используем Publisher , а во втором примере мы используем Поток . Правильным вопросом было бы:

“Какой тип возврата я должен использовать в таком случае?”

Рекомендуется использовать Flux и Mono над Издателем . Оба этих класса являются реализациями интерфейса Publisher , происходящего из реактивных потоков. Хотя вы можете использовать их взаимозаменяемо, использование реализаций более выразительно и описательно.

В этих двух примерах показаны два способа создания событий, отправленных сервером с задержкой:

  • ..элементы задержки() – Этот метод задерживает каждый элемент потока на заданную длительность
  • .zip() – Мы определяем поток для генерации событий и поток для генерации значений каждую секунду. Соединяя их вместе, мы получаем поток, генерирующий события каждую секунду.

Перейдите к http://localhost:8080/greetings/sse или используйте curl клиент в командной строке, и вы увидите ответ, который выглядит примерно так:

Определение потребителя

Теперь давайте посмотрим на потребительскую сторону этого. Стоит отметить, что вам не нужно иметь реактивного издателя, чтобы использовать реактивное программирование на стороне потребителя:

public class Person {
    private int id;
    private String name;
    // Constructor with getters and setters
}

И тогда у нас есть традиционный RestController с одним отображением:

@RestController
public class PersonController {
    private static List personList = new ArrayList<>();
    static {
        personList.add(new Person(1, "John"));
        personList.add(new Person(2, "Jane"));
        personList.add(new Person(3, "Max"));
        personList.add(new Person(4, "Alex"));
        personList.add(new Person(5, "Aloy"));
        personList.add(new Person(6, "Sarah"));
    }

    @GetMapping("/person/{id}")
    public Person getPerson(@PathVariable int id, @RequestParam(defaultValue = "2") int delay)
            throws InterruptedException {
        Thread.sleep(delay * 1000);
        return personList.stream().filter((person) -> person.getId() == id).findFirst().get();
    }
}

Мы инициализировали список типа Персона и на основе идентификатора , переданного в наше сопоставление, мы отфильтровываем этого человека с помощью потока.

Вас может встревожить использование Thread.sleep() здесь, хотя он используется только для имитации задержки сети в 2 секунды.

Если вам интересно прочитать больше о потоках Java , мы об этом расскажем!

Давайте продолжим и создадим нашего потребителя. Как и издатель, мы можем легко сделать это с помощью Spring Initializr:

Наше приложение производителя работает на порту 8080 . Теперь предположим, что мы хотим позвонить /человеку/{id} конечной точке 5 раз. Мы знаем, что по умолчанию каждый ответ задерживается на 2 секунды из-за “задержки в сети”.

Давайте сначала сделаем это, используя традиционный подход RestTemplate :

public class CallPersonUsingRestTemplate {

    private static final Logger logger = LoggerFactory.getLogger(CallPersonUsingRestTemplate.class);
    private static RestTemplate restTemplate = new RestTemplate();

    static {
        String baseUrl = "http://localhost:8080";
        restTemplate.setUriTemplateHandler(new DefaultUriBuilderFactory(baseUrl));
    }

    public static void main(String[] args) {
        Instant start = Instant.now();

        for (int i = 1; i <= 5; i++) {
            restTemplate.getForObject("/person/{id}", Person.class, i);
        }

        logTime(start);
    }

    private static void logTime(Instant start) {
        logger.debug("Elapsed time: " + Duration.between(start, Instant.now()).toMillis() + "ms");
    }
}

Давайте запустим его:

Как и ожидалось, это заняло чуть более 10 секунд, и по умолчанию Spring MVC работает именно так.

В наши дни и в этом возрасте ждать результата на странице чуть более 10 секунд недопустимо. В этом разница между сохранением клиента/клиента и потерей его из-за слишком долгого ожидания.

Spring Reactor представил новый веб-клиент для выполнения веб-запросов под названием WebClient . По сравнению с RestTemplate этот клиент более функциональен и полностью реагирует. Он включен в зависимость spring-boot-starter-weblux и построен для замены RestTemplate неблокирующим способом.

Давайте перепишем тот же контроллер, на этот раз, используя WebClient :

public class CallPersonUsingWebClient_Step1 {

    private static final Logger logger = LoggerFactory.getLogger(CallPersonUsingWebClient_Step1.class);
    private static String baseUrl = "http://localhost:8080";
    private static WebClient client = WebClient.create(baseUrl);

    public static void main(String[] args) {

        Instant start = Instant.now();

        for (int i = 1; i <= 5; i++) {
            client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class);
        }

        logTime(start);
    }

    private static void logTime(Instant start) {
        logger.debug("Elapsed time: " + Duration.between(start, Instant.now()).toMillis() + "ms");
    }

}

Здесь мы создали Веб-клиент , передав базовый файл . Затем в основном методе мы просто вызываем конечную точку.

get() указывает, что мы делаем запрос GET . Мы знаем, что ответом будет один объект, поэтому мы используем Mono , как объяснялось ранее.

В конечном счете, мы попросили Spring сопоставить ответ с Человеком классом:

И ничего не произошло, как и ожидалось.

Это потому, что мы не подписываемся . Все это откладывается. Это асинхронно, но также не запускается, пока мы не вызовем метод .subscribe () . Это обычная проблема для людей, которые новички в Spring Reactor, поэтому следите за этим.

Давайте изменим наш основной метод и добавим подписку:

for (int i = 1; i <= 5; i++) {
    client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class).subscribe();
}

Добавление метода запрашивает у нас желаемый результат:

Запрос отправляется, но метод .subscribe() не сидит и не ждет ответа. Поскольку он не блокируется, он закончился до получения ответа вообще.

Можем ли мы противостоять этому, связав .block() в конце вызовов методов?

for (int i = 1; i <= 5; i++) {
    client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class).block();
}

Результат:

На этот раз мы получили ответ для каждого человека, хотя это заняло более 10 секунд. Это противоречит цели того, чтобы приложение было реактивным.

Способ устранения всех этих проблем прост: мы составляем список типов Mono и ждем, пока все они завершатся, вместо того, чтобы ждать каждого из них:

List> list = Stream.of(1, 2, 3, 4, 5)
    .map(i -> client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class))
    .collect(Collectors.toList());

Mono.when(list).block();

Результат:

Это то, к чему мы стремимся. На этот раз это заняло чуть более двух секунд, даже при значительном запаздывании сети. Это значительно повышает эффективность нашего приложения и действительно меняет правила игры.

Если вы внимательно посмотрите на потоки, Reactor использует их повторно, а не создает новые. Это действительно важно, если ваше приложение обрабатывает много запросов за короткий промежуток времени.

Вывод

В этой статье мы обсудили необходимость реактивного программирования и его реализацию Spring – реактор Spring.

Затем мы обсудили модуль Spring Webflux, который внутренне использует Reactor, а также такие концепции, как Издатель и Подписчик . После этого мы создали приложение, которое публикует данные в виде реактивного потока и использует их в другом приложении.

Исходный код этого руководства можно найти на Github .