1. введение
По умолчанию в Spring AMQP неудачное сообщение повторно ставится в очередь для следующего раунда потребления. Следовательно, может возникнуть бесконечный цикл потребления, что приведет к нестабильной ситуации и пустой трате ресурсов.
Хотя использование очереди Мертвых писем является стандартным способом обработки неудачных сообщений , мы можем захотеть повторить потребление сообщений и вернуть систему в нормальное состояние.
В этом уроке мы представим два различных способа реализации стратегии повторных попыток с именем Экспоненциальный откат .
2. Предварительные условия
На протяжении всего этого урока/| мы будем использовать RabbitMQ , популярную реализацию AMQP . Следовательно, мы можем обратиться к этой статье Spring AMQP для получения дальнейших инструкций по настройке и использованию RabbitMQ с Spring.
Для простоты мы также будем использовать образ docker для нашего экземпляра RabbitMQ, хотя подойдет любой экземпляр RabbitMQ, прослушивающий порт 5672.
Давайте запустим контейнер RabbitMQ docker:
docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management
Чтобы реализовать наши примеры, нам нужно добавить зависимость от spring-boot-starter-amqp . Последняя версия доступна на Maven Central :
org.springframework.boot spring-boot-starter-amqp 2.2.4.RELEASE
3. Блокирующий Путь
Наш первый способ будет использовать пружинные повторные приспособления. Мы создадим простую очередь и потребителя, настроенного на ожидание в течение некоторого времени между повторными попытками неудачного сообщения.
Во-первых, давайте создадим нашу очередь:
@Bean public Queue blockingQueue() { return QueueBuilder.nonDurable("blocking-queue").build(); }
Во-вторых, давайте настроим стратегию возврата в RetryOperationsInterceptor и свяжем ее с пользовательским RabbitListenerContainerFactory :
@Bean public RetryOperationsInterceptor retryInterceptor() { return RetryInterceptorBuilder.stateless() .backOffOptions(1000, 3.0, 10000) .maxAttempts(5) .recoverer(observableRecoverer()) .build(); } @Bean public SimpleRabbitListenerContainerFactory retryContainerFactory( ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); Advice[] adviceChain = { retryInterceptor }; factory.setAdviceChain(adviceChain); return factory; }
Как показано выше, мы настраиваем начальный интервал 1000 мс и множитель 3,0, до максимального времени ожидания 10000мс. Кроме того, после пяти попыток сообщение будет удалено.
Давайте добавим нашего потребителя и принудительно отправим сообщение об ошибке, создав исключение:
@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory") public void consumeBlocking(String payload) throws Exception { logger.info("Processing message from blocking-queue: {}", payload); throw new Exception("exception occured!"); }
Наконец, давайте создадим тест и отправим два сообщения в нашу очередь:
@Test public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception { int nb = 2; CountDownLatch latch = new CountDownLatch(nb); observableRecoverer.setObserver(() -> latch.countDown()); for (int i = 1; i <= nb; i++) { rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i); } latch.await(); }
Имейте в виду, что CountDownLatch используется только в качестве тестового приспособления.
Давайте запустим тест и проверим вывод журнала:
2020-02-18 21:17:55.638 INFO : Processing message from blocking-queue: blocking message 1 2020-02-18 21:17:56.641 INFO : Processing message from blocking-queue: blocking message 1 2020-02-18 21:17:59.644 INFO : Processing message from blocking-queue: blocking message 1 2020-02-18 21:18:08.654 INFO : Processing message from blocking-queue: blocking message 1 2020-02-18 21:18:18.657 INFO : Processing message from blocking-queue: blocking message 1 2020-02-18 21:18:18.875 ERROR : java.lang.Exception: exception occured! 2020-02-18 21:18:18.858 INFO : Processing message from blocking-queue: blocking message 2 2020-02-18 21:18:19.860 INFO : Processing message from blocking-queue: blocking message 2 2020-02-18 21:18:22.863 INFO : Processing message from blocking-queue: blocking message 2 2020-02-18 21:18:31.867 INFO : Processing message from blocking-queue: blocking message 2 2020-02-18 21:18:41.871 INFO : Processing message from blocking-queue: blocking message 2 2020-02-18 21:18:41.875 ERROR : java.lang.Exception: exception occured!
Как видно, этот журнал правильно показывает экспоненциальное время ожидания между каждой повторной попыткой. В то время как наша стратегия отступления работает, наш потребитель блокируется до тех пор, пока повторные попытки не будут исчерпаны. Тривиальное улучшение заключается в том, чтобы заставить нашего потребителя выполнять одновременно, установив атрибут concurrency для @RabbitListener :
@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory", concurrency = "2")
Однако удаленное сообщение по-прежнему блокирует экземпляр потребителя. Таким образом, приложение может страдать от проблем с задержкой.
В следующем разделе мы представим неблокирующий способ реализации аналогичной стратегии.
4. Неблокирующий Способ
Альтернативный способ включает в себя несколько очередей повторных попыток в сочетании с истечением срока действия сообщения. На самом деле, когда срок действия сообщения истекает, оно попадает в очередь мертвых писем. Другими словами, если DLQconsumer отправляет сообщение обратно в исходную очередь, мы, по сути, выполняем цикл повторных попыток .
В результате количество используемых очередей повторных попыток-это количество попыток, которые будут выполняться .
Во-первых, давайте создадим очередь мертвых писем для наших очередей повторных попыток:
@Bean public Queue retryWaitEndedQueue() { return QueueBuilder.nonDurable("retry-wait-ended-queue").build(); }
Давайте добавим потребителя в очередь повторов мертвых писем. Исключительная ответственность этого потребителя заключается в отправке сообщения обратно в исходную очередь :
@RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory") public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception{ MessageProperties props = message.getMessageProperties(); rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"), props.getHeader("x-original-routing-key"), message); }
Во-вторых, давайте создадим объект-оболочку для наших очередей повторных попыток. Этот объект будет содержать конфигурацию экспоненциального отката:
public class RetryQueues { private Queue[] queues; private long initialInterval; private double factor; private long maxWait; // constructor, getters and setters
В-третьих, давайте определим три очереди повторных попыток:
@Bean public Queue retryQueue1() { return QueueBuilder.nonDurable("retry-queue-1") .deadLetterExchange("") .deadLetterRoutingKey("retry-wait-ended-queue") .build(); } @Bean public Queue retryQueue2() { return QueueBuilder.nonDurable("retry-queue-2") .deadLetterExchange("") .deadLetterRoutingKey("retry-wait-ended-queue") .build(); } @Bean public Queue retryQueue3() { return QueueBuilder.nonDurable("retry-queue-3") .deadLetterExchange("") .deadLetterRoutingKey("retry-wait-ended-queue") .build(); } @Bean public RetryQueues retryQueues() { return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3()); }
Затем нам нужен перехватчик для обработки потребления сообщений:
public class RetryQueuesInterceptor implements MethodInterceptor { // fields and constructor @Override public Object invoke(MethodInvocation invocation) throws Throwable { return tryConsume(invocation, this::ack, (messageAndChannel, e) -> { try { int retryCount = tryGetRetryCountOrFail(messageAndChannel, e); sendToNextRetryQueue(messageAndChannel, retryCount); } catch (Throwable t) { // ... throw new RuntimeException(t); } }); }
В случае успешного возвращения потребителя мы просто подтверждаем сообщение.
Однако, если потребитель создает исключение и остались попытки, мы отправляем сообщение в следующую очередь повторных попыток:
private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception { String retryQueueName = retryQueues.getQueueName(retryCount); rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> { MessageProperties props = m.getMessageProperties(); props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount))); props.setHeader("x-retried-count", String.valueOf(retryCount + 1)); props.setHeader("x-original-exchange", props.getReceivedExchange()); props.setHeader("x-original-routing-key", props.getReceivedRoutingKey()); return m; }); mac.channel.basicReject(mac.message.getMessageProperties() .getDeliveryTag(), false); }
Опять же, давайте подключим наш перехватчик к пользовательскому RabbitListenerContainerFactory :
@Bean public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory( ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); Advice[] adviceChain = { retryInterceptor }; factory.setAdviceChain(adviceChain); return factory; }
Наконец, мы определяем нашу основную очередь и потребителя, который имитирует неудачное сообщение:
@Bean public Queue nonBlockingQueue() { return QueueBuilder.nonDurable("non-blocking-queue") .build(); } @RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory", ackMode = "MANUAL") public void consumeNonBlocking(String payload) throws Exception { logger.info("Processing message from non-blocking-queue: {}", payload); throw new Exception("Error occured!"); }
Давайте создадим еще один тест и отправим два сообщения:
@Test public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception { int nb = 2; CountDownLatch latch = new CountDownLatch(nb); retryQueues.setObserver(() -> latch.countDown()); for (int i = 1; i <= nb; i++) { rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i); } latch.await(); }
Затем давайте запустим наш тест и проверим журнал:
2020-02-19 10:31:40.640 INFO : Processing message from non-blocking-queue: non blocking message 1 2020-02-19 10:31:40.656 INFO : Processing message from non-blocking-queue: non blocking message 2 2020-02-19 10:31:41.620 INFO : Processing message from non-blocking-queue: non blocking message 1 2020-02-19 10:31:41.623 INFO : Processing message from non-blocking-queue: non blocking message 2 2020-02-19 10:31:44.415 INFO : Processing message from non-blocking-queue: non blocking message 1 2020-02-19 10:31:44.420 INFO : Processing message from non-blocking-queue: non blocking message 2 2020-02-19 10:31:52.751 INFO : Processing message from non-blocking-queue: non blocking message 1 2020-02-19 10:31:52.774 ERROR : java.lang.Exception: Error occured! 2020-02-19 10:31:52.829 INFO : Processing message from non-blocking-queue: non blocking message 2 2020-02-19 10:31:52.841 ERROR : java.lang.Exception: Error occured!
Опять же, мы видим экспоненциальное время ожидания между каждой повторной попыткой. Однако вместо блокировки до тех пор, пока не будет предпринята каждая попытка, сообщения обрабатываются одновременно .
Хотя эта настройка довольно гибкая и помогает устранить проблемы с задержкой, существует общая ловушка. Действительно, RabbitMQ удаляет просроченное сообщение только тогда, когда оно достигает головы очереди . Поэтому, если срок действия сообщения больше, оно заблокирует все остальные сообщения в очереди. По этой причине очередь ответов должна содержать только сообщения с одинаковым сроком действия .
4. Заключение
Как показано выше, системы, основанные на событиях, могут реализовать стратегию экспоненциального отката для повышения отказоустойчивости. Хотя реализация таких решений может быть тривиальной, важно понимать, что определенное решение может быть хорошо адаптировано к небольшой системе, но вызывает проблемы с задержкой в экосистемах с высокой пропускной способностью.
Исходный код доступен на GitHub .