1. введение
В этом уроке мы рассмотрим концепцию fanout и обмен темами с Spring AMQP и RabbitMQ .
На высоком уровне разветвленные обмены будут транслировать одно и то же сообщение во все связанные очереди , в то время как тематические обмены используют ключ маршрутизации для передачи сообщений в определенную связанную очередь или очереди .
В этом руководстве рекомендуется предварительно ознакомиться с сообщениями с помощью Spring AMQP.
2. Настройка обмена разветвлениями
Давайте установим один разветвленный обмен с двумя связанными с ним очередями. Когда мы отправим сообщение на этот обмен, обе очереди получат сообщение. Наш обмен разветвлениями игнорирует любой ключ маршрутизации, включенный в сообщение.
Spring AMQP позволяет нам агрегировать все объявления очередей, обменов и привязок в Декларируемый объект:
@Bean public Declarables fanoutBindings() { Queue fanoutQueue1 = new Queue("fanout.queue1", false); Queue fanoutQueue2 = new Queue("fanout.queue2", false); FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange"); return new Declarables( fanoutQueue1, fanoutQueue2, fanoutExchange, bind(fanoutQueue1).to(fanoutExchange), BindingBuilder.bind(fanoutQueue2).to(fanoutExchange)); }
3. Настройка обмена темами
Теперь мы также настроим обмен темами с двумя очередями, каждая из которых имеет свой шаблон привязки:
@Bean public Declarables topicBindings() { Queue topicQueue1 = new Queue(topicQueue1Name, false); Queue topicQueue2 = new Queue(topicQueue2Name, false); TopicExchange topicExchange = new TopicExchange(topicExchangeName); return new Declarables( topicQueue1, topicQueue2, topicExchange, BindingBuilder .bind(topicQueue1) .to(topicExchange).with("*.important.*"), BindingBuilder .bind(topicQueue2) .to(topicExchange).with("#.error")); }
Обмен темами позволяет нам привязывать к нему очереди с различными ключевыми шаблонами. Это очень гибко и позволяет нам привязывать несколько очередей с одним и тем же шаблоном или даже несколько шаблонов к одной и той же очереди.
Когда ключ маршрутизации сообщения соответствует шаблону, оно будет помещено в очередь. Если очередь имеет несколько привязок, соответствующих ключу маршрутизации сообщения, в очередь помещается только одна копия сообщения.
Наши шаблоны привязки могут использовать звездочку ( ” * “), чтобы соответствовать слову в определенной позиции, или знак фунта ( ” # ” ), чтобы соответствовать нулю или нескольким словам.
Таким образом, наш topicQueue1 будет получать сообщения с ключами маршрутизации, имеющими шаблон из трех слов со средним словом “важно”-например: “user.important.error” или “blog.important.notification”.
И наша тема очереди 2 будет получать сообщения, которые имеют ключи маршрутизации, заканчивающиеся словом error; соответствующие примеры: “ошибка” , “user.important.error” или “blog.post.save.error”.
4. Настройка производителя
Мы будем использовать convertAndSend метод RabbitTemplate для отправки наших примеров сообщений:
String message = " payload is broadcast"; return args -> { rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, "topic important warn" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, "topic important error" + message); };
То Кроличья табличка обеспечивает много перегруженных convertAndSend() методы для различных типов обмена.
Когда мы отправляем сообщение на разветвленный обмен, ключ маршрутизации игнорируется, и сообщение передается во все связанные очереди.
Когда мы отправляем сообщение в раздел exchange, нам нужно передать ключ маршрутизации. На основе этого ключа маршрутизации сообщение будет доставлено в определенные очереди.
5. Настройка потребителей
Наконец, давайте настроим четырех потребителей – по одному для каждой очереди – для получения полученных сообщений:
@RabbitListener(queues = {FANOUT_QUEUE_1_NAME}) public void receiveMessageFromFanout1(String message) { System.out.println("Received fanout 1 message: " + message); } @RabbitListener(queues = {FANOUT_QUEUE_2_NAME}) public void receiveMessageFromFanout2(String message) { System.out.println("Received fanout 2 message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_1_NAME}) public void receiveMessageFromTopic1(String message) { System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_2_NAME}) public void receiveMessageFromTopic2(String message) { System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message); }
Мы настраиваем потребителей с помощью аннотации @RabbitListener . Единственный аргумент, передаваемый здесь, – это имя очередей. Потребители не знают здесь об обменах или ключах маршрутизации.
6. Запуск примера
Наш пример проекта представляет собой приложение Spring Boot, поэтому он инициализирует приложение вместе с подключением к RabbitMQ и настроит все очереди, обмены и привязки.
По умолчанию наше приложение ожидает экземпляр RabbitMQ, работающий на локальном хосте на порту 5672. Мы можем изменить это и другие значения по умолчанию в приложении.ямл .
Наш проект предоставляет конечную точку HTTP в URI – /broadcast –, которая принимает сообщения с сообщением в теле запроса.
Когда мы отправляем запрос на этот URL-адрес с телом “Test”, мы должны увидеть что-то похожее на это в выходных данных:
Received fanout 1 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important warn payload is broadcast Received topic 2 (#.error) message: topic important error payload is broadcast Received fanout 2 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important error payload is broadcast
Порядок, в котором мы увидим эти сообщения, конечно, не гарантирован.
7. Заключение
В этом кратком уроке мы рассмотрели веерные и тематические обмены с Spring AMQP и RabbitMQ.
Полный исходный код и все фрагменты кода для этого руководства доступны в репозитории GitHub .