1. Обзор
С появлением Spring WebFlux мы получили еще один мощный инструмент для написания реактивных , неблокирующих приложений. Хотя использование этой технологии теперь намного проще, чем раньше, отладка реактивных последовательностей в Spring WebFlux может быть довольно громоздкой .
В этом кратком руководстве мы увидим, как легко регистрировать события в асинхронных последовательностях и как избежать некоторых простых ошибок.
2. Зависимость Maven
Давайте добавим зависимость Spring WebFlux в наш проект, чтобы мы могли создавать реактивные потоки:
org.springframework.boot spring-boot-starter-webflux
Мы можем получить последнюю версию spring-boot-starter-web flux зависимость от Maven Central.
3. Создание реактивного потока
Для начала давайте создадим реактивный поток с помощью Flux и используем метод log() для включения ведения журнала:
FluxreactiveStream = Flux.range(1, 5).log();
Далее мы подпишемся на него в соответствии с потребительскими ценностями:
reactiveStream.subscribe();
4. Протоколирование Реактивного потока
После запуска вышеуказанного приложения мы видим наш регистратор в действии:
2018-11-11 22:37:04 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 2018-11-11 22:37:04 INFO | request(unbounded) 2018-11-11 22:37:04 INFO | onNext(1) 2018-11-11 22:37:04 INFO | onNext(2) 2018-11-11 22:37:04 INFO | onNext(3) 2018-11-11 22:37:04 INFO | onNext(4) 2018-11-11 22:37:04 INFO | onNext(5) 2018-11-11 22:37:04 INFO | onComplete()
Мы видим каждое событие, которое произошло в нашем потоке. Было выдано пять значений, а затем поток закрылся с помощью события onComplete () .
5. Расширенный Сценарий Ведения журнала
Мы можем изменить наше приложение, чтобы увидеть более интересный сценарий. Давайте добавим take() к Flux , который будет инструктировать поток предоставлять только определенное количество событий:
FluxreactiveStream = Flux.range(1, 5).log().take(3);
После выполнения кода мы увидим следующий вывод:
2018-11-11 22:45:35 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 2018-11-11 22:45:35 INFO | request(unbounded) 2018-11-11 22:45:35 INFO | onNext(1) 2018-11-11 22:45:35 INFO | onNext(2) 2018-11-11 22:45:35 INFO | onNext(3) 2018-11-11 22:45:35 INFO | cancel()
Как мы видим, take() вызвал отмену потока после трех событий.
Размещение log() в вашем потоке имеет решающее значение . Давайте посмотрим, как размещение log() после take() приведет к другому результату:
FluxreactiveStream = Flux.range(1, 5).take(3).log();
И выход:
2018-11-11 22:49:23 INFO | onSubscribe([Fuseable] FluxTake.TakeFuseableSubscriber) 2018-11-11 22:49:23 INFO | request(unbounded) 2018-11-11 22:49:23 INFO | onNext(1) 2018-11-11 22:49:23 INFO | onNext(2) 2018-11-11 22:49:23 INFO | onNext(3) 2018-11-11 22:49:23 INFO | onComplete()
Как мы видим, изменение точки наблюдения изменило результат. Теперь поток произвел три события, но вместо cancel(), мы видим incomplete() . Это происходит потому, что мы наблюдаем результат использования take() вместо того, что было запрошено этим методом.
6. Заключение
В этой краткой статье мы рассмотрели, как регистрировать реактивные потоки с помощью встроенного метода log () .
И, как всегда, исходный код для приведенного выше примера можно найти на GitHub .