Рубрики
Без рубрики

Отладка реактивных потоков в Java

Узнайте, как эффективно и правильно отлаживать реактивные потоки в Spring 5.

Автор оригинала: Ger Roza.

1. Обзор

Отладка реактивных потоков, вероятно, является одной из основных проблем, с которыми нам придется столкнуться, как только мы начнем использовать эти структуры данных.

И учитывая, что реактивные потоки набирают популярность в последние годы, неплохо бы знать, как мы можем эффективно выполнять эту задачу.

Давайте начнем с настройки проекта с использованием реактивного стека, чтобы понять, почему это часто вызывает проблемы.

2. Сценарий с ошибками

Мы хотим смоделировать реальный сценарий, в котором запущено несколько асинхронных процессов и где мы ввели некоторые дефекты в код, которые в конечном итоге вызовут исключения.

Чтобы понять общую картину, мы упомянем , что наше приложение будет потреблять и обрабатывать потоки простых объектов Foo , которые содержат только id , formattedName и quantity поле. Для получения более подробной информации, пожалуйста, посмотрите о проекте здесь .

2.1. Анализ выходных данных журнала

Теперь давайте рассмотрим фрагмент кода и выходные данные, которые он генерирует, когда появляется необработанная ошибка:

public void processFoo(Flux flux) {
    flux = FooNameHelper.concatFooName(flux);
    flux = FooNameHelper.substringFooName(flux);
    flux = FooReporter.reportResult(flux);
    flux.subscribe();
}

public void processFooInAnotherScenario(Flux flux) {
    flux = FooNameHelper.substringFooName(flux);
    flux = FooQuantityHelper.divideFooQuantity(flux);
    flux.subscribe();
}

Запустив наше приложение в течение нескольких секунд, мы поймем, что оно время от времени регистрирует исключения.

Внимательно изучив одну из ошибок, мы найдем что-то похожее на это:

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at com.baeldung.debugging.consumer.service.FooNameHelper
      .lambda$1(FooNameHelper.java:38)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at r.c.p.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)
    at r.c.p.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)
    at r.c.p.Operators$MonoSubscriber.complete(Operators.java:1476)
    at r.c.p.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:211)
    at r.c.p.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:290)
    at r.c.p.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:118)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:50)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:27)
    at j.u.c.FutureTask.run(FutureTask.java:266)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .access$201(ScheduledThreadPoolExecutor.java:180)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .run(ScheduledThreadPoolExecutor.java:293)
    at j.u.c.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at j.l.Thread.run(Thread.java:748)

Основываясь на основной причине и заметив класс FooNameHelper , упомянутый в трассировке стека, мы можем представить, что в некоторых случаях наши объекты Foo обрабатываются со значением formattedName , которое короче, чем ожидалось.

Конечно, это всего лишь упрощенный случай, и решение кажется довольно очевидным.

Но давайте представим, что это был реальный сценарий, когда само исключение не помогает нам решить проблему без некоторой контекстной информации.

Было ли исключение вызвано как часть process Food, или process Foo В другом сценарии методе?

Повлияли ли другие предыдущие шаги на поле отформатированное имя до достижения этого этапа?

Запись в журнале не помогла бы нам разобраться в этих вопросах.

Что еще хуже, иногда исключение даже не выбрасывается из нашей функциональности.

Например, представьте, что мы полагаемся на реактивный репозиторий для сохранения наших объектов Foo . Если в этот момент возникнет ошибка, мы можем даже не иметь понятия о том, с чего начать отладку нашего кода.

Нам нужны инструменты для эффективной отладки реактивных потоков.

3. Использование сеанса отладки

Один из вариантов выяснить, что происходит с нашим приложением, – это начать сеанс отладки с помощью вашей любимой IDE.

Нам нужно будет настроить пару условных точек останова и проанализировать поток данных, когда каждый шаг в потоке будет выполнен.

Действительно, это может быть громоздкой задачей, особенно когда у нас есть много реактивных процессов, запущенных и совместно используемых ресурсов.

Кроме того, существует множество обстоятельств, при которых мы не можем начать сеанс отладки по соображениям безопасности.

4. Протоколирование информации С помощью doOnErrorMethod или с помощью параметра Subscribe

Иногда мы можем добавить полезную контекстную информацию, предоставив Consumer в качестве второго параметра метода subscribe :

public void processFoo(Flux flux) {

    // ...

    flux.subscribe(foo -> {
        logger.debug("Finished processing Foo with Id {}", foo.getId());
    }, error -> {
        logger.error(
          "The following error happened on processFoo method!",
           error);
    });
}

Примечание: Стоит отметить, что если нам не нужно проводить дальнейшую обработку на подписываться метод, мы можем приковать doOnError функция на нашем издателе:

flux.doOnError(error -> {
    logger.error("The following error happened on processFoo method!", error);
}).subscribe();

Теперь у нас будет некоторое представление о том, откуда может исходить ошибка, хотя у нас все еще нет большой информации о фактическом элементе, создавшем исключение.

5. Активация глобальной конфигурации отладки реактора

Библиотека Reactor предоставляет Класс Hooks , который позволяет нам настраивать поведение операторов Flux и Mono .

Просто добавив следующий оператор, наше приложение обработает вызовы методов издателей, обернет конструкцию оператора и захватит трассировку стека :

Hooks.onOperatorDebug();

После активации режима отладки наши журналы исключений будут содержать некоторую полезную информацию:

16:06:35.334 [parallel-1] ERROR c.b.d.consumer.service.FooService
  - The following error happened on processFoo method!
java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at c.d.b.c.s.FooNameHelper.lambda$1(FooNameHelper.java:38)
    ...
    at j.l.Thread.run(Thread.java:748)
    Suppressed: r.c.p.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
    reactor.core.publisher.Flux.map(Flux.java:5653)
    c.d.b.c.s.FooNameHelper.substringFooName(FooNameHelper.java:32)
    c.d.b.c.s.FooService.processFoo(FooService.java:24)
    c.d.b.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable
      .run(DelegatingErrorHandlingRunnable.java:54)
    o.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    o.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.map ⇢ c.d.b.c.s.FooNameHelper
            .substringFooName(FooNameHelper.java:32)
    |_    Flux.map ⇢ c.d.b.c.s.FooReporter.reportResult(FooReporter.java:15)

Как мы видим, первый раздел остается относительно прежним, но в следующих разделах содержится информация о:

  1. Трассировка сборки издателя — здесь мы можем подтвердить, что ошибка была впервые сгенерирована в методе processFoo .
  2. Операторы, которые заметили ошибку после ее первого запуска, с пользовательским классом, в котором они были прикованы.

Примечание: В этом примере, главным образом для того, чтобы это было ясно, мы добавляем операции для разных классов.

Мы можем включить или выключить режим отладки в любое время, но это не повлияет на объекты Flux и Mono , которые уже были созданы.

5.1. Выполнение операторов в разных потоках

Еще один аспект, который следует иметь в виду, заключается в том, что трассировка сборки генерируется правильно, даже если в потоке работают разные потоки.

Давайте рассмотрим следующий пример:

public void processFoo(Flux flux) {
    flux = flux.publishOn(Schedulers.newSingle("foo-thread"));
    // ...

    flux = flux.publishOn(Schedulers.newSingle("bar-thread"));
    flux = FooReporter.reportResult(flux);
    flux.subscribeOn(Schedulers.newSingle("starter-thread"))
      .subscribe();
}

Теперь, если мы проверим журналы, мы поймем, что в этом случае первый раздел может немного измениться, но последние два остаются практически неизменными.

Первая часть-это threadstacktrace, поэтому в ней будут показаны только операции, выполняемые конкретным потоком.

Как мы уже видели, это не самый важный раздел при отладке приложения, поэтому это изменение приемлемо.

6. Активация вывода отладки в одном процессе

Инструментирование и создание трассировки стека в каждом отдельном реактивном процессе является дорогостоящим.

Таким образом, мы должны применять первый подход только в критических случаях .

Во всяком случае, Reactor предоставляет способ включить режим отладки для отдельных важных процессов, который менее потребляет память .

Мы имеем в виду контрольно-пропускной пункт оператор:

public void processFoo(Flux flux) {
    
    // ...

    flux = flux.checkpoint("Observed error on processFoo", true);
    flux.subscribe();
}

Обратите внимание, что таким образом трассировка сборки будет регистрироваться на этапе контрольной точки:

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
	...
Assembly trace from producer [reactor.core.publisher.FluxMap],
  described as [Observed error on processFoo] :
    r.c.p.Flux.checkpoint(Flux.java:3096)
    c.b.d.c.s.FooService.processFoo(FooService.java:26)
    c.b.d.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    j.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    j.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.checkpoint ⇢ c.b.d.c.s.FooService.processFoo(FooService.java:26)

Мы должны реализовать пропускной пункт способ ближе к концу реактивной цепи.

В противном случае оператор не сможет наблюдать ошибки, возникающие ниже по потоку.

Кроме того, отметим, что библиотека предлагает перегруженный метод. Мы можем избежать:

  • указание описания наблюдаемой ошибки, если мы используем параметр no-args
  • создание заполненной трассировки стека (что является самой дорогостоящей операцией), предоставляя только пользовательское описание

7. Протоколирование последовательности элементов

Наконец, издатели Reactor предлагают еще один метод, который потенциально может пригодиться в некоторых случаях.

Вызывая метод log в нашей реактивной цепочке, приложение будет регистрировать каждый элемент в потоке с состоянием, которое он имеет на этом этапе .

Давайте попробуем это на нашем примере:

public void processFoo(Flux flux) {
    flux = FooNameHelper.concatFooName(flux);
    flux = FooNameHelper.substringFooName(flux);
    flux = flux.log();
    flux = FooReporter.reportResult(flux);
    flux = flux.doOnError(error -> {
        logger.error("The following error happened on processFoo method!", error);
    });
    flux.subscribe();
}

И проверьте журналы:

INFO  reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
INFO  reactor.Flux.Map.1 - request(unbounded)
INFO  reactor.Flux.Map.1 - onNext(Foo(id=0, formattedName=theFo, quantity=8))
INFO  reactor.Flux.Map.1 - onNext(Foo(id=1, formattedName=theFo, quantity=3))
INFO  reactor.Flux.Map.1 - onNext(Foo(id=2, formattedName=theFo, quantity=5))
INFO  reactor.Flux.Map.1 - onNext(Foo(id=3, formattedName=theFo, quantity=6))
INFO  reactor.Flux.Map.1 - onNext(Foo(id=4, formattedName=theFo, quantity=6))
INFO  reactor.Flux.Map.1 - cancel()
ERROR c.b.d.consumer.service.FooService 
  - The following error happened on processFoo method!
...

Мы можем легко увидеть состояние каждого объекта Foo на этом этапе и то, как фреймворк отменяет поток, когда происходит исключение.

Конечно, этот подход также является дорогостоящим, и нам придется использовать его с умеренностью.

8. Заключение

Мы можем потратить много времени и усилий на устранение неполадок, если не знаем инструментов и механизмов для правильной отладки нашего приложения.

Это особенно верно, если мы не привыкли обрабатывать реактивные и асинхронные структуры данных, и нам нужна дополнительная помощь, чтобы понять, как все работает.

Как всегда, полный пример доступен в репозитории GitHub .