Рубрики
Без рубрики

Обработка ошибок с помощью Spring AMQP

Узнайте о различных способах обработки ошибок с помощью Spring AMQP с помощью RabbitMQ.

Автор оригинала: baeldung.

1. введение

Асинхронный обмен сообщениями-это тип слабо связанной распределенной коммуникации, который становится все более популярным для реализации архитектуры, управляемой событиями . К счастью, Spring Framework предоставляет проект Spring AMQP, позволяющий нам создавать решения для обмена сообщениями на основе AMQP.

С другой стороны, работа с ошибками в таких средах может быть нетривиальной задачей . Поэтому в этом уроке мы рассмотрим различные стратегии обработки ошибок.

2. Настройка среды

Для этого урока мы будем использовать RabbitMQ , который реализует стандарт AMQP. Кроме того, Spring AMQP предоставляет модуль spring-rabbit , который делает интеграцию очень простой.

Давайте запустим RabbitMQ как автономный сервер. Мы запустим его в контейнере Docker, выполнив следующую команду:

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

Для получения подробной конфигурации и настройки зависимостей проекта, пожалуйста, обратитесь к нашей статье Spring AMQP .

3. Сценарий Отказа

Обычно существует больше типов ошибок, которые могут возникнуть в системах обмена сообщениями по сравнению с монолитными или однопакетными приложениями из-за их распределенной природы.

Мы можем указать на некоторые типы исключений:

  • Сеть- или Связанные с вводом-выводом/| – общие сбои сетевых соединений и операций ввода-вывода Протокол-
  • или связанные с инфраструктурой -ошибки, которые обычно представляют собой неправильную конфигурацию инфраструктуры обмена сообщениями Связанные с брокером
  • -сбои, предупреждающие о неправильной конфигурации между клиентами и брокером AMQP. Например, достижение определенных пределов или порога, аутентификация или недопустимая конфигурация политик Application-
  • и message-related – исключения, которые обычно указывают на нарушение некоторых бизнес-или прикладных правил

Конечно, этот список отказов не является исчерпывающим, но содержит наиболее распространенные типы ошибок.

Следует отметить, что Spring AMQP обрабатывает связанные с подключением и низкоуровневые проблемы из коробки, например, применяя политики retry или requeue . Кроме того, большинство отказов и неисправностей преобразуются в AmqpException или один из его подклассов.

В следующих разделах мы в основном сосредоточимся на специфичных для приложений и высокоуровневых ошибках, а затем рассмотрим глобальные стратегии обработки ошибок.

4. Настройка проекта

Теперь давайте определим простую конфигурацию очереди и обмена для начала:

public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .build();
}
 
@Bean
DirectExchange messagesExchange() {
    return new DirectExchange(EXCHANGE_MESSAGES);
}
 
@Bean
Binding bindingMessages() {
    return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}

Далее, давайте создадим простого производителя:

public void sendMessage() {
    rabbitTemplate
      .convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES,
        SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++);
}

И, наконец, потребитель, который бросает исключение:

@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(Message message) throws BusinessException {
    throw new BusinessException();
}

По умолчанию все неудачные сообщения будут немедленно запрашиваться в начале целевой очереди снова и снова.

Давайте запустим наш пример приложения, выполнив следующую команду Maven:

mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.errorhandling.ErrorHandlingApp

Теперь мы должны увидеть аналогичный результирующий результат:

WARN 22260 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
Caused by: com.baeldung.springamqp.errorhandling.errorhandler.BusinessException: null

Следовательно, по умолчанию мы будем видеть бесконечное количество таких сообщений в выходных данных.

Чтобы изменить это поведение у нас есть два варианта:

  • Установите параметр defaultrequeuerejected в значение false на стороне слушателя – spring.rabbitmq.listener.simple.default-requeue-rejected=false
  • Throw an AmqpRejectAndDontRequeueException – t his может быть полезен для сообщений, которые не будут иметь смысла в будущем, поэтому их можно отбросить.

Теперь давайте узнаем, как обрабатывать неудачные сообщения более разумным способом.

5. Очередь Мертвых писем

Очередь мертвых писем (DLQ) – это очередь, содержащая недоставленные или неудачные сообщения . DLQ позволяет нам обрабатывать неисправные или плохие сообщения, отслеживать шаблоны сбоев и восстанавливаться после исключений в системе.

Что еще более важно, это помогает предотвратить бесконечные циклы в очередях, которые постоянно обрабатывают плохие сообщения и снижают производительность системы.

Всего существует два основных понятия: Обмен мертвыми письмами (DLX) и сама очередь мертвых писем (DLQ). На самом деле, DLX-это обычный обмен, который мы можем определить как один из распространенных типов : direct , topic или fanout .

Очень важно понимать, что продюсер ничего не знает об очередях. Он знает только об обменах, и все произведенные сообщения маршрутизируются в соответствии с конфигурацией обмена и ключом маршрутизации сообщений .

Теперь давайте посмотрим, как обрабатывать исключения, применяя подход очереди мертвых писем.

5.1. Базовая конфигурация

Чтобы настроить DLQ, нам нужно указать дополнительные аргументы при определении нашей очереди:

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", "")
      .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
      .build();
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}

В приведенном выше примере мы использовали два дополнительных аргумента: x-dead-letter-exchange и x-dead-letter-routing-key . Пустое строковое значение для параметра x-dead-letter-exchange указывает брокеру использовать обмен по умолчанию .

Второй аргумент столь же важен, как и установка ключей маршрутизации для простых сообщений. Эта опция изменяет начальный ключ маршрутизации сообщения для дальнейшей маршрутизации по DLX.

5.2. Неудачная Маршрутизация Сообщений

Поэтому, когда сообщение не доставляется, оно направляется на обмен мертвыми письмами. Но, как мы уже отмечали, DLX-это нормальный обмен. Поэтому, если ключ маршрутизации неудачного сообщения не совпадает с обменом, он не будет доставлен в DLQ.

Exchange: (AMQP default)
Routing Key: baeldung-messages-queue.dlq

Таким образом, если мы опустим аргумент x-dead-letter-routing-key в нашем примере, неудачное сообщение застрянет в бесконечном цикле повторных попыток.

Кроме того, исходная метаинформация сообщения доступна в x-death heaven:

x-death:
  count: 1
  exchange: baeldung-messages-exchange
  queue: baeldung-messages-queue 
  reason: rejected
  routing-keys: baeldung-messages-queue 
  time: 1571232954

То приведенная выше информация доступна в консоли управления RabbitMQ обычно работает локально на порту 15672.

Помимо этой конфигурации, если мы используем Spring Cloud Stream , мы можем даже упростить процесс настройки, используя свойства конфигурации republishToDlq и autoBindDlq .

5.3. Обмен Мертвыми письмами

В предыдущем разделе мы видели, что ключ маршрутизации изменяется, когда сообщение направляется на обмен мертвыми буквами. Но такое поведение не всегда желательно. Мы можем изменить его, настроив DLX самостоятельно и определив его с помощью типа fanout :

public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
 
@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
      .build();
}
 
@Bean
FanoutExchange deadLetterExchange() {
    return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}
 
@Bean
Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}

На этот раз мы определили пользовательский обмен типа fan out , поэтому сообщения будут отправляться во все ограниченные очереди . Кроме того, мы установили значение аргумента x-dead-letter-exchange для имени нашего DLX. В то же время мы удалили аргумент x-dead-letter-routing-key .

Теперь, если мы запустим наш пример, неудачное сообщение должно быть доставлено в DLQ, но без изменения начального ключа маршрутизации:

Exchange: baeldung-messages-queue.dlx
Routing Key: baeldung-messages-queue

5.4. Обработка Сообщений Очереди Мертвых Писем

Конечно, причина, по которой мы переместили их в очередь мертвых писем, заключается в том, что они могут быть переработаны в другое время.

Давайте определим слушателя для очереди мертвых писем:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessages(Message message) {
    log.info("Received failed message: {}", message.toString());
}

Если мы запустим наш пример кода сейчас, то увидим вывод журнала:

WARN 11752 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 11752 --- [ntContainer#1-1] c.b.s.e.consumer.SimpleDLQAmqpContainer  : 
  Received failed message:

Мы получили неудачное сообщение, но что нам делать дальше? Ответ зависит от конкретных системных требований, вида исключения или типа сообщения.

Например, мы можем просто запросить сообщение в исходное место назначения:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
    log.info("Received failed message, requeueing: {}", failedMessage.toString());
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

Но такая логика исключений не отличается от политики повторных попыток по умолчанию:

INFO 23476 --- [ntContainer#0-1] c.b.s.e.c.RoutingDLQAmqpContainer        :
  Received message: 
WARN 23476 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 23476 --- [ntContainer#1-1] c.b.s.e.c.RoutingDLQAmqpContainer        : 
  Received failed message, requeueing:

Общая стратегия может потребовать повторной обработки сообщения в течение n раз, а затем отклонить его. Давайте реализуем эту стратегию, используя заголовки сообщений:

public void processFailedMessagesRetryHeaders(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Discarding message");
        return;
    }
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

Сначала мы получаем значение заголовка x-retries-count , затем сравниваем это значение с максимально допустимым значением. Впоследствии, если счетчик достигнет предельного числа попыток, сообщение будет отброшено:

WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Retrying message for the 1 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Retrying message for the 2 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Discarding message

Мы должны добавить, что мы также можем использовать заголовок x-message-ttl , чтобы установить время, после которого сообщение должно быть отброшено. Это может быть полезно для предотвращения бесконечного роста очередей.

5.5. Очередь на Парковку

С другой стороны, рассмотрим ситуацию, когда мы не можем просто отбросить сообщение, например, это может быть транзакция в банковской сфере. Кроме того, иногда сообщение может потребовать ручной обработки или нам просто нужно записать сообщения, которые потерпели неудачу более n раз.

Для подобных ситуаций существует понятие очереди на парковку . Мы можем переслать все сообщения из DLQ, которые вышли из строя более разрешенного количества раз, в очередь парковки для дальнейшей обработки .

Давайте теперь воплотим эту идею:

public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";
 
@Bean
FanoutExchange parkingLotExchange() {
    return new FanoutExchange(EXCHANGE_PARKING_LOT);
}
 
@Bean
Queue parkingLotQueue() {
    return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
}
 
@Bean
Binding parkingLotBinding() {
    return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
}

Во – вторых, давайте реорганизуем логику прослушивателя, чтобы отправить сообщение в очередь парковки:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Sending message to the parking lot queue");
        rabbitTemplate.send(EXCHANGE_PARKING_LOT, 
          failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
        return;
    }
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

В конце концов, нам также нужно обрабатывать сообщения, поступающие в очередь парковки:

@RabbitListener(queues = QUEUE_PARKING_LOT)
public void processParkingLotQueue(Message failedMessage) {
    log.info("Received message in parking lot queue");
    // Save to DB or send a notification.
}

Теперь мы можем сохранить неудачное сообщение в базе данных или, возможно, отправить уведомление по электронной почте.

Давайте проверим эту логику, запустив наше приложение:

WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Retrying message for the 1 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Retrying message for the 2 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Sending message to the parking lot queue
INFO 14768 --- [ntContainer#2-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Received message in parking lot queue

Как видно из выходных данных, после нескольких неудачных попыток сообщение было отправлено в очередь парковки.

6. Пользовательская Обработка Ошибок

В предыдущем разделе мы видели, как обрабатывать сбои с помощью выделенных очередей и обменов. Однако иногда нам может потребоваться отловить все ошибки, например, для регистрации или сохранения их в базе данных.

6.1. Глобальный обработчик ошибок

До сих пор мы использовали по умолчанию SimpleRabbitListenerContainerFactory и эта фабрика по умолчанию использует ConditionalRejectingErrorHandler . Этот обработчик ловит различные исключения и преобразует их в одно из исключений в иерархии AmqpException .

Важно отметить, что если нам нужно обрабатывать ошибки подключения, то нам нужно реализовать интерфейс ApplicationListener .

Проще говоря, Условное отклонение ErrorHandler решает, отклонять ли конкретное сообщение или нет. Когда сообщение, вызвавшее исключение, отклоняется, оно не запрашивается.

Давайте определим пользовательский Обработчик ошибок , который будет просто запрашивать только Бизнес-исключение s:

public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        if (!(t.getCause() instanceof BusinessException)) {
            throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
        }
    }
}

Кроме того, поскольку мы выбрасываем исключение внутри нашего метода listener, оно оборачивается в ListenerExecutionFailedException . Итак, нам нужно вызвать метод getCause , чтобы получить исходное исключение.

6.2. Стратегия Фатальных Исключений

Под капотом этот обработчик использует стратегию Fatal Exception , чтобы проверить, следует ли считать исключение фатальным. Если это так, то неудачное сообщение будет отклонено.

По умолчанию эти исключения фатальны:

  • MessageConversionException
  • MessageConversionException
  • MethodArgumentNotValidException
  • Метод Аргумент TypeMismatchException
  • NoSuchMethodException
  • ClassCastException

Вместо реализации интерфейса Ierrorhandler мы можем просто предоставить нашу стратегию Фатальных исключений :

public class CustomFatalExceptionStrategy 
      extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
    @Override
    public boolean isFatal(Throwable t) {
        return !(t.getCause() instanceof BusinessException);
    }
}

Наконец, нам нужно передать нашу пользовательскую стратегию конструктору Conditional Rejecting ErrorHandler :

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
  ConnectionFactory connectionFactory,
  SimpleRabbitListenerContainerFactoryConfigurer configurer) {
      SimpleRabbitListenerContainerFactory factory = 
        new SimpleRabbitListenerContainerFactory();
      configurer.configure(factory, connectionFactory);
      factory.setErrorHandler(errorHandler());
      return factory;
}
 
@Bean
public ErrorHandler errorHandler() {
    return new ConditionalRejectingErrorHandler(customExceptionStrategy());
}
 
@Bean
FatalExceptionStrategy customExceptionStrategy() {
    return new CustomFatalExceptionStrategy();
}

7. Заключение

В этом уроке мы обсудили различные способы обработки ошибок при использовании Spring AMQP и, в частности, RabbitMQ.

Каждая система нуждается в определенной стратегии обработки ошибок. Мы рассмотрели наиболее распространенные способы обработки ошибок в архитектуре, управляемой событиями. Кроме того, мы убедились, что можем объединить несколько стратегий для создания более комплексного и надежного решения.

Как всегда, полный исходный код статьи доступен на GitHub .