Рубрики
Без рубрики

Регистрация реактивной последовательности

Узнайте, как регистрировать реактивные потоки.

Автор оригинала: baeldung.

1. Обзор

С появлением Spring WebFlux мы получили еще один мощный инструмент для написания реактивных , неблокирующих приложений. Хотя использование этой технологии теперь намного проще, чем раньше, отладка реактивных последовательностей в Spring WebFlux может быть довольно громоздкой .

В этом кратком руководстве мы увидим, как легко регистрировать события в асинхронных последовательностях и как избежать некоторых простых ошибок.

2. Зависимость Maven

Давайте добавим зависимость Spring WebFlux в наш проект, чтобы мы могли создавать реактивные потоки:


    org.springframework.boot
    spring-boot-starter-webflux

Мы можем получить последнюю версию spring-boot-starter-web flux зависимость от Maven Central.

3. Создание реактивного потока

Для начала давайте создадим реактивный поток с помощью Flux и используем метод log() для включения ведения журнала:

Flux reactiveStream = Flux.range(1, 5).log();

Далее мы подпишемся на него в соответствии с потребительскими ценностями:

reactiveStream.subscribe();

4. Протоколирование Реактивного потока

После запуска вышеуказанного приложения мы видим наш регистратор в действии:

2018-11-11 22:37:04 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:37:04 INFO | request(unbounded)
2018-11-11 22:37:04 INFO | onNext(1)
2018-11-11 22:37:04 INFO | onNext(2)
2018-11-11 22:37:04 INFO | onNext(3)
2018-11-11 22:37:04 INFO | onNext(4)
2018-11-11 22:37:04 INFO | onNext(5)
2018-11-11 22:37:04 INFO | onComplete()

Мы видим каждое событие, которое произошло в нашем потоке. Было выдано пять значений, а затем поток закрылся с помощью события onComplete () .

5. Расширенный Сценарий Ведения журнала

Мы можем изменить наше приложение, чтобы увидеть более интересный сценарий. Давайте добавим take() к Flux , который будет инструктировать поток предоставлять только определенное количество событий:

Flux reactiveStream = Flux.range(1, 5).log().take(3);

После выполнения кода мы увидим следующий вывод:

2018-11-11 22:45:35 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:45:35 INFO | request(unbounded)
2018-11-11 22:45:35 INFO | onNext(1)
2018-11-11 22:45:35 INFO | onNext(2)
2018-11-11 22:45:35 INFO | onNext(3)
2018-11-11 22:45:35 INFO | cancel()

Как мы видим, take() вызвал отмену потока после трех событий.

Размещение log() в вашем потоке имеет решающее значение . Давайте посмотрим, как размещение log() после take() приведет к другому результату:

Flux reactiveStream = Flux.range(1, 5).take(3).log();

И выход:

2018-11-11 22:49:23 INFO | onSubscribe([Fuseable] FluxTake.TakeFuseableSubscriber)
2018-11-11 22:49:23 INFO | request(unbounded)
2018-11-11 22:49:23 INFO | onNext(1)
2018-11-11 22:49:23 INFO | onNext(2)
2018-11-11 22:49:23 INFO | onNext(3)
2018-11-11 22:49:23 INFO | onComplete()

Как мы видим, изменение точки наблюдения изменило результат. Теперь поток произвел три события, но вместо cancel(), мы видим incomplete() . Это происходит потому, что мы наблюдаем результат использования take() вместо того, что было запрошено этим методом.

6. Заключение

В этой краткой статье мы рассмотрели, как регистрировать реактивные потоки с помощью встроенного метода log () .

И, как всегда, исходный код для приведенного выше примера можно найти на GitHub .