Рубрики
Без рубрики

Экспоненциальный Откат С Пружинным AMQP

Изучите два различных способа реализации стратегии повторных попыток отправки сообщений с именем Экспоненциальный откат.

Автор оригинала: baeldung.

1. введение

По умолчанию в Spring AMQP неудачное сообщение повторно ставится в очередь для следующего раунда потребления. Следовательно, может возникнуть бесконечный цикл потребления, что приведет к нестабильной ситуации и пустой трате ресурсов.

Хотя использование очереди Мертвых писем является стандартным способом обработки неудачных сообщений , мы можем захотеть повторить потребление сообщений и вернуть систему в нормальное состояние.

В этом уроке мы представим два различных способа реализации стратегии повторных попыток с именем Экспоненциальный откат .

2. Предварительные условия

На протяжении всего этого урока/| мы будем использовать RabbitMQ , популярную реализацию AMQP . Следовательно, мы можем обратиться к этой статье Spring AMQP для получения дальнейших инструкций по настройке и использованию RabbitMQ с Spring.

Для простоты мы также будем использовать образ docker для нашего экземпляра RabbitMQ, хотя подойдет любой экземпляр RabbitMQ, прослушивающий порт 5672.

Давайте запустим контейнер RabbitMQ docker:

docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management

Чтобы реализовать наши примеры, нам нужно добавить зависимость от spring-boot-starter-amqp . Последняя версия доступна на Maven Central :


    
        org.springframework.boot
        spring-boot-starter-amqp
        2.2.4.RELEASE
    

3. Блокирующий Путь

Наш первый способ будет использовать пружинные повторные приспособления. Мы создадим простую очередь и потребителя, настроенного на ожидание в течение некоторого времени между повторными попытками неудачного сообщения.

Во-первых, давайте создадим нашу очередь:

@Bean
public Queue blockingQueue() {
    return QueueBuilder.nonDurable("blocking-queue").build();
}

Во-вторых, давайте настроим стратегию возврата в RetryOperationsInterceptor и свяжем ее с пользовательским RabbitListenerContainerFactory :

@Bean
public RetryOperationsInterceptor retryInterceptor() {
    return RetryInterceptorBuilder.stateless()
      .backOffOptions(1000, 3.0, 10000)
      .maxAttempts(5)
      .recoverer(observableRecoverer())
      .build();
}

@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(
  ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);

    Advice[] adviceChain = { retryInterceptor };
    factory.setAdviceChain(adviceChain);

    return factory;
}

Как показано выше, мы настраиваем начальный интервал 1000 мс и множитель 3,0, до максимального времени ожидания 10000мс. Кроме того, после пяти попыток сообщение будет удалено.

Давайте добавим нашего потребителя и принудительно отправим сообщение об ошибке, создав исключение:

@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory")
public void consumeBlocking(String payload) throws Exception {
    logger.info("Processing message from blocking-queue: {}", payload);

    throw new Exception("exception occured!");
}

Наконец, давайте создадим тест и отправим два сообщения в нашу очередь:

@Test
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
    int nb = 2;

    CountDownLatch latch = new CountDownLatch(nb);
    observableRecoverer.setObserver(() -> latch.countDown());

    for (int i = 1; i <= nb; i++) {
        rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
    }

    latch.await();
}

Имейте в виду, что CountDownLatch используется только в качестве тестового приспособления.

Давайте запустим тест и проверим вывод журнала:

2020-02-18 21:17:55.638  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:56.641  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:59.644  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:08.654  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.657  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.875  ERROR : java.lang.Exception: exception occured!
2020-02-18 21:18:18.858  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:19.860  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:22.863  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:31.867  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.871  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.875 ERROR : java.lang.Exception: exception occured!

Как видно, этот журнал правильно показывает экспоненциальное время ожидания между каждой повторной попыткой. В то время как наша стратегия отступления работает, наш потребитель блокируется до тех пор, пока повторные попытки не будут исчерпаны. Тривиальное улучшение заключается в том, чтобы заставить нашего потребителя выполнять одновременно, установив атрибут concurrency для @RabbitListener :

@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory", concurrency = "2")

Однако удаленное сообщение по-прежнему блокирует экземпляр потребителя. Таким образом, приложение может страдать от проблем с задержкой.

В следующем разделе мы представим неблокирующий способ реализации аналогичной стратегии.

4. Неблокирующий Способ

Альтернативный способ включает в себя несколько очередей повторных попыток в сочетании с истечением срока действия сообщения. На самом деле, когда срок действия сообщения истекает, оно попадает в очередь мертвых писем. Другими словами, если DLQconsumer отправляет сообщение обратно в исходную очередь, мы, по сути, выполняем цикл повторных попыток .

В результате количество используемых очередей повторных попыток-это количество попыток, которые будут выполняться .

Во-первых, давайте создадим очередь мертвых писем для наших очередей повторных попыток:

@Bean
public Queue retryWaitEndedQueue() {
    return QueueBuilder.nonDurable("retry-wait-ended-queue").build();
}

Давайте добавим потребителя в очередь повторов мертвых писем. Исключительная ответственность этого потребителя заключается в отправке сообщения обратно в исходную очередь :

@RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory")
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception{
    MessageProperties props = message.getMessageProperties();

    rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"), 
      props.getHeader("x-original-routing-key"), message);
}

Во-вторых, давайте создадим объект-оболочку для наших очередей повторных попыток. Этот объект будет содержать конфигурацию экспоненциального отката:

public class RetryQueues {
    private Queue[] queues;
    private long initialInterval;
    private double factor;
    private long maxWait;

    // constructor, getters and setters

В-третьих, давайте определим три очереди повторных попыток:

@Bean
public Queue retryQueue1() {
    return QueueBuilder.nonDurable("retry-queue-1")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public Queue retryQueue2() {
    return QueueBuilder.nonDurable("retry-queue-2")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public Queue retryQueue3() {
    return QueueBuilder.nonDurable("retry-queue-3")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public RetryQueues retryQueues() {
    return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
}

Затем нам нужен перехватчик для обработки потребления сообщений:

public class RetryQueuesInterceptor implements MethodInterceptor {

    // fields and constructor

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        return tryConsume(invocation, this::ack, (messageAndChannel, e) -> {
            try {
                int retryCount = tryGetRetryCountOrFail(messageAndChannel, e);
                sendToNextRetryQueue(messageAndChannel, retryCount);
            } catch (Throwable t) {
                // ...
                throw new RuntimeException(t);
            }
        });
    }

В случае успешного возвращения потребителя мы просто подтверждаем сообщение.

Однако, если потребитель создает исключение и остались попытки, мы отправляем сообщение в следующую очередь повторных попыток:

private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
    String retryQueueName = retryQueues.getQueueName(retryCount);

    rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
        MessageProperties props = m.getMessageProperties();
        props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
        props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
        props.setHeader("x-original-exchange", props.getReceivedExchange());
        props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());

        return m;
    });

    mac.channel.basicReject(mac.message.getMessageProperties()
      .getDeliveryTag(), false);
}

Опять же, давайте подключим наш перехватчик к пользовательскому RabbitListenerContainerFactory :

@Bean
public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(
  ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);

    Advice[] adviceChain = { retryInterceptor };
    factory.setAdviceChain(adviceChain);

    return factory;
}

Наконец, мы определяем нашу основную очередь и потребителя, который имитирует неудачное сообщение:

@Bean
public Queue nonBlockingQueue() {
    return QueueBuilder.nonDurable("non-blocking-queue")
      .build();
}

@RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory", 
  ackMode = "MANUAL")
public void consumeNonBlocking(String payload) throws Exception {
    logger.info("Processing message from non-blocking-queue: {}", payload);

    throw new Exception("Error occured!");
}

Давайте создадим еще один тест и отправим два сообщения:

@Test
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
    int nb = 2;

    CountDownLatch latch = new CountDownLatch(nb);
    retryQueues.setObserver(() -> latch.countDown());

    for (int i = 1; i <= nb; i++) {
        rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
    }

    latch.await();
}

Затем давайте запустим наш тест и проверим журнал:

2020-02-19 10:31:40.640  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:40.656  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:41.620  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:41.623  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:44.415  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:44.420  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.751  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:52.774 ERROR : java.lang.Exception: Error occured!
2020-02-19 10:31:52.829  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.841 ERROR : java.lang.Exception: Error occured!

Опять же, мы видим экспоненциальное время ожидания между каждой повторной попыткой. Однако вместо блокировки до тех пор, пока не будет предпринята каждая попытка, сообщения обрабатываются одновременно .

Хотя эта настройка довольно гибкая и помогает устранить проблемы с задержкой, существует общая ловушка. Действительно, RabbitMQ удаляет просроченное сообщение только тогда, когда оно достигает головы очереди . Поэтому, если срок действия сообщения больше, оно заблокирует все остальные сообщения в очереди. По этой причине очередь ответов должна содержать только сообщения с одинаковым сроком действия .

4. Заключение

Как показано выше, системы, основанные на событиях, могут реализовать стратегию экспоненциального отката для повышения отказоустойчивости. Хотя реализация таких решений может быть тривиальной, важно понимать, что определенное решение может быть хорошо адаптировано к небольшой системе, но вызывает проблемы с задержкой в экосистемах с высокой пропускной способностью.

Исходный код доступен на GitHub .