Рубрики
Без рубрики

Пружинный AMQP в реактивных приложениях

Узнайте, как создать реактивное приложение Spring Boot, которое интегрируется с сервером обмена сообщениями RabbitMQ, реализацией стандарта обмена сообщениями AMQP.

Автор оригинала: baeldung.

1. Обзор

В этом руководстве показано, как создать простое реактивное приложение Spring Boot, которое интегрируется с сервером обмена сообщениями RabbitMQ, популярной реализацией стандарта обмена сообщениями AMQP.

Мы рассматриваем сценарии “точка-точка” и “публикация-подписка”, используя распределенную настройку, которая подчеркивает различия между обоими шаблонами.

Обратите внимание, что мы предполагаем базовое знание AMQP, RabbitMQ и Spring Boot, в частности, ключевых понятий, таких как Обмены, Очереди, Темы и так далее. Более подробную информацию об этих концепциях можно найти по ссылкам ниже:

  • Обмен сообщениями С помощью Spring AMQP
  • Введение в RabbitMQ

2. Настройка сервера RabbitMQ

Хотя мы могли бы настроить локальную RabbitMQ локально, на практике мы, скорее всего, будем использовать специальную установку с дополнительными функциями, такими как высокая доступность, мониторинг, безопасность и т. Д.

Чтобы смоделировать такую среду в нашей машине разработки, мы будем использовать Docker для создания сервера, который будет использовать наше приложение.

Следующая команда запустит автономный сервер RabbitMQ:

$ docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3

Мы не объявляем никаких постоянных томов, поэтому непрочитанные сообщения будут потеряны между перезапусками. Услуга будет доступна в порту 5672 на хосте.

Мы можем проверить журналы сервера с помощью команды docker logs , которая должна выдать такой вывод:

$ docker logs rabbitmq
2018-06-09 13:42:29.718 [info] <0.33.0>
  Application lager started on node [email protected]
// ... some lines omitted
2018-06-09 13:42:33.491 [info] <0.226.0>
 Starting RabbitMQ 3.7.5 on Erlang 20.3.5
 Copyright (C) 2007-2018 Pivotal Software, Inc.
 Licensed under the MPL.  See http://www.rabbitmq.com/

  ##  ##
  ##  ##      RabbitMQ 3.7.5. Copyright (C) 2007-2018 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
  ######  ##
  ##########  Logs: 

              Starting broker...
2018-06-09 13:42:33.494 [info] <0.226.0>
 node           : [email protected]
 home dir       : /var/lib/rabbitmq
 config file(s) : /etc/rabbitmq/rabbitmq.conf
 cookie hash    : CY9rzUYh03PK3k6DJie09g==
 log(s)         : 
 database dir   : /var/lib/rabbitmq/mnesia/[email protected]

// ... more log lines

Поскольку образ включает в себя утилиту rabbitmqctl , мы можем использовать ее для выполнения административных задач в контексте нашего запущенного образа.

Например, мы можем получить информацию о состоянии сервера с помощью следующей команды:

$ docker exec rabbitmq rabbitmqctl status
Status of node [email protected] ...
[{pid,299},
 {running_applications,
     [{rabbit,"RabbitMQ","3.7.5"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.7.5"},
// ... other info omitted for brevity

Другие полезные команды включают в себя:

  • list_exchanges : Список всех объявленных обменов
  • list_queues : Перечислите все объявленные очереди, включая количество непрочитанных сообщений
  • list_bindings : Список всех определяет привязки между обменами и очередями, включая ключи маршрутизации

3. Весенняя настройка проекта AMQP

Как только наш сервер RabbitMQ будет запущен и запущен, мы сможем перейти к созданию нашего весеннего проекта. Этот пример проекта позволит любому клиенту REST отправлять и/или получать сообщения на сервер обмена сообщениями, используя модуль Spring AMQP и соответствующий стартер загрузки Spring для связи с ним.

Основные зависимости, которые нам нужно добавить в ваш pom.xml файл проекта:


    org.springframework.boot
    spring-boot-starter-amqp
    2.0.3.RELEASE


    org.springframework.boot
    spring-boot-starter-webflux
    2.0.2.RELEASE 

spring-boot-starter-amqp приносит все вещи, связанные с AMQP, в то время как spring-boot-starter-web flux является основной зависимостью, используемой для реализации нашего реактивного сервера REST.

Примечание: вы можете проверить последнюю версию модулей Spring Boot Starter AMQP и Webflux на Maven Central.

4. Сценарий 1: Обмен сообщениями “Точка-точка”

В этом первом сценарии мы будем использовать прямой обмен, который является логическим объектом в брокере, который получает сообщения от клиентов.

Прямой обмен направит все входящие сообщения в одну – и только одну – очередь , из которой они будут доступны для использования клиентами. Несколько клиентов могут подписаться на одну и ту же очередь, но только один получит данное сообщение.

4.1. Настройка обмена и очередей

В нашем сценарии мы используем объект Destination Info , который инкапсулирует имя exchange и ключ маршрутизации. Карта с ключом по имени пункта назначения будет использоваться для хранения всех доступных пунктов назначения.

За эту начальную настройку будет отвечать следующий метод @PostConstruct :

@Autowired
private AmqpAdmin amqpAdmin;
    
@Autowired
private DestinationsConfig destinationsConfig;

@PostConstruct
public void setupQueueDestinations() {
    destinationsConfig.getQueues()
        .forEach((key, destination) -> {
            Exchange ex = ExchangeBuilder.directExchange(
              destination.getExchange())
              .durable(true)
              .build();
            amqpAdmin.declareExchange(ex);
            Queue q = QueueBuilder.durable(
              destination.getRoutingKey())
              .build();
            amqpAdmin.declareQueue(q);
            Binding b = BindingBuilder.bind(q)
              .to(ex)
              .with(destination.getRoutingKey())
              .noargs();
            amqpAdmin.declareBinding(b);
        });
}

Этот метод использует компонент admin Amqp , созданный Spring, для объявления обменов, Очередей и связывания их вместе с помощью заданного ключа маршрутизации.

Все назначения поступают из Destinations Config bean, который является классом @ConfigurationProperties , используемым в нашем примере.

Этот класс имеет свойство, которое заполняется объектами DestinationInfo , построенными на основе сопоставлений, считанных из файла конфигурации application.yml .

4.2. Конечная точка производителя

Производители будут отправлять сообщения, отправляя HTTP POST в /очередь/{имя} местоположение.

Это реактивная конечная точка, поэтому мы используем Mono для возврата простого подтверждения:

@SpringBootApplication
@EnableConfigurationProperties(DestinationsConfig.class)
@RestController
public class SpringWebfluxAmqpApplication {
 
    // ... other members omitted
 
    @Autowired
    private AmqpTemplate amqpTemplate;

    @PostMapping(value = "/queue/{name}")
    public Mono> sendMessageToQueue(
      @PathVariable String name, @RequestBody String payload) {

        DestinationInfo d = destinationsConfig
          .getQueues().get(name);
        if (d == null) {
            return Mono.just(
              ResponseEntity.notFound().build());
        }
    
        return Mono.fromCallable(() -> {
            amqpTemplate.convertAndSend(
              d.getExchange(), 
              d.getRoutingKey(), 
              payload);  
            return ResponseEntity.accepted().build();
        });
    }

Сначала мы проверяем, соответствует ли параметр name допустимому назначению, и если да, то мы используем экземпляр autowired AmqpTemplate для фактической отправки полезной нагрузки – простого String сообщения – в RabbitMQ.

4.3. Фабрика MessageListenerContainer

Для асинхронного приема сообщений Spring AMQP использует абстрактный класс Message ContainerListener , который опосредует поток информации из очередей AMQP и прослушивателей, предоставляемых приложением.

Поскольку нам нужна конкретная реализация этого класса для подключения наших прослушивателей сообщений, мы определяем фабрику, которая изолирует код контроллера от его фактической реализации.

В нашем случае фабричный метод возвращает новый Простой контейнер сообщений каждый раз, когда мы вызываем его createMessageListenerContainer метод:

@Component
public class MessageListenerContainerFactory {

    @Autowired
    private ConnectionFactory connectionFactory;

    public MessageListenerContainerFactory() {}

    public MessageListenerContainer createMessageListenerContainer(String queueName) {
        SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
        mlc.addQueueNames(queueName);
        return mlc;
    }
}

4.4. Конечная точка потребителя

Потребители получат доступ к тому же адресу конечной точки, который используется производителями ( /очередь/{имя} ) для получения сообщений.

Эта конечная точка возвращает Поток событий, где каждое событие соответствует полученному сообщению:

@Autowired
private MessageListenerContainerFactory messageListenerContainerFactory;

@GetMapping(
  value = "/queue/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux receiveMessagesFromQueue(@PathVariable String name) {

    DestinationInfo d = destinationsConfig
      .getQueues()
      .get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
          .build());
    }

    MessageListenerContainer mlc = messageListenerContainerFactory
      .createMessageListenerContainer(d.getRoutingKey());

    Flux f = Flux. create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            mlc.start();
        });
        emitter.onDispose(() -> {
            mlc.stop();
        });
      });

    return Flux.interval(Duration.ofSeconds(5))
      .map(v -> "No news is good news")
      .mergeWith(f);
}

После первоначальной проверки имени назначения конечная точка потребителя создает MessageListenerContainer , используя MessageListenerContainerFactory и имя очереди, восстановленное из нашего реестра.

Как только у нас есть наш MessageListenerContainer , мы создаем сообщение Flux , используя один из его методов create() builder.

В нашем конкретном случае мы используем тот, который принимает лямбду, принимающую Аргумент FluxSink , который мы затем используем для подключения асинхронного API на основе прослушивателя Spring AMQPs к нашему реактивному приложению.

Мы также прикрепляем две дополнительные лямбды к излучателям onRequest() и onDispose() обратных вызовов, чтобы наш MessageListenerContainer мог выделять/освобождать свои внутренние ресурсы после жизненного цикла Flux s.

Наконец, мы объединяем полученный Поток с другим, созданным с помощью interval (), который создает новое событие каждые пять секунд. Эти фиктивные сообщения играют важную функцию в нашем случае : без них мы бы обнаружили отключение клиента только при получении сообщения и невозможности его отправки, что может занять много времени в зависимости от вашего конкретного случая использования.

4.5. Тестирование

Установив конечные точки как для потребителей, так и для издателей, мы теперь можем выполнить некоторые тесты с помощью нашего примера приложения.

Нам нужно определить детали подключения к серверу RabbitMQs и хотя бы одно место назначения в нашем application.yml , которое должно выглядеть следующим образом:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    
destinations:
  queues:
    NYSE:
      exchange: nyse
      routing-key: NYSE

Свойства spring.rabbitmq.* определяют основные свойства, необходимые для подключения к нашему серверу RabbitMQ, работающему в локальном контейнере Docker. Пожалуйста, обратите внимание, что IP-адрес, показанный выше, является лишь примером и может отличаться в конкретной настройке.

Очереди определяются с помощью назначения.очереди.<имя>.* , где <имя> используется в качестве имени назначения. Здесь мы объявили одно место назначения с именем “NYSE”, которое будет отправлять сообщения на биржу “nyse” на RabbitMQ с ключом маршрутизации “NYSE”.

Как только мы запустим сервер через командную строку или из нашей IDE, мы сможем начать отправлять и получать сообщения. Мы будем использовать утилиту curl , общую утилиту, доступную для ОС Windows, Mac и Linux.

В следующем списке показано, как отправить сообщение в пункт назначения и ожидаемый ответ от сервера:

$ curl -v -d "Test message" http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 12
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 12 out of 12 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

После выполнения этой команды мы можем убедиться, что сообщение было получено RabbitMQ и готово к использованию, выполнив следующую команду:

$ docker exec rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    1

Теперь мы можем читать сообщения с помощью curl с помощью следующей команды:

$ curl -v http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:Test message

data:No news is good news...

... same message repeating every 5 secs

Как мы видим, сначала мы получаем ранее сохраненное сообщение, а затем начинаем получать наше фиктивное сообщение каждые 5 секунд.

Если мы снова запустим команду для перечисления очередей, теперь мы увидим, что сообщения не хранятся:

$ docker exec rabbitmq rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    0

5. Сценарий 2: Публикация-Подписка

Другим распространенным сценарием для приложений обмена сообщениями является шаблон публикации-подписки, в котором одно сообщение должно быть отправлено нескольким потребителям.

RabbitMQ предлагает два типа обменов, которые поддерживают такие приложения: Разветвление и Тема.

Основное различие между этими двумя типами заключается в том, что последний позволяет нам фильтровать, какие сообщения получать, на основе шаблона ключа маршрутизации (например, “alarm.mail server.*”), предоставленного во время регистрации, в то время как первый просто реплицирует входящие сообщения во все связанные очереди.

RabbitMQ также поддерживает обмен заголовками, что позволяет более сложную фильтрацию сообщений, но его использование выходит за рамки этой статьи.

5.1. Настройка пунктов назначения

Мы определяем пункты назначения Pub/Sub во время запуска с помощью другого метода @PostConstruct , как это было в сценарии “точка-точка”.

Единственное отличие заключается в том , что мы создаем только Exchange , но не Очереди – они будут созданы по требованию и привязаны к Exchange позже, так как мы хотим эксклюзивную Очередь для каждого клиента:

@PostConstruct
public void setupTopicDestinations(
    destinationsConfig.getTopics()
      .forEach((key, destination) -> {
          Exchange ex = ExchangeBuilder
            .topicExchange(destination.getExchange())
            .durable(true)
            .build();
            amqpAdmin.declareExchange(ex);
      });
}

5.2. Конечная точка издателя

Клиенты будут использовать конечную точку издателя, доступную в расположении /topic/{name} , для публикации сообщений, которые будут отправлены всем подключенным клиентам.

Как и в предыдущем сценарии, мы используем @PostMapping , который возвращает Mono со статусом после отправки сообщения:

@PostMapping(value = "/topic/{name}")
public Mono> sendMessageToTopic(
  @PathVariable String name, @RequestBody String payload) {

    DestinationInfo d = destinationsConfig
      .getTopics()
      .get(name);
    
    if (d == null) {
        return Mono.just(ResponseEntity.notFound().build());
    }      
    
   return Mono.fromCallable(() -> {
       amqpTemplate.convertAndSend(
         d.getExchange(), d.getRoutingKey(),payload);   
            return ResponseEntity.accepted().build();
        });
    }

5.3. Конечная точка Подписчика

Наша конечная точка подписчика будет расположена по адресу /topic/{name} , производя Поток сообщений для подключенных клиентов.

Эти сообщения включают как полученные сообщения, так и фиктивные сообщения, генерируемые каждые 5 секунд:

@GetMapping(
  value = "/topic/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux receiveMessagesFromTopic(@PathVariable String name) {
    DestinationInfo d = destinationsConfig.getTopics()
        .get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
            .build());
    }
    Queue topicQueue = createTopicQueue(d);
    String qname = topicQueue.getName();
    MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);
    Flux f = Flux. create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            mlc.start();
        });
        emitter.onDispose(() -> {
            amqpAdmin.deleteQueue(qname);
            mlc.stop();
        });            
      });
    
    return Flux.interval(Duration.ofSeconds(5))
        .map(v -> "No news is good news")
        .mergeWith(f);
}

Этот код в основном такой же, как и в предыдущем случае, только со следующими отличиями: во-первых, мы создаем новую Очередь для каждого нового подписчика.

Мы делаем это с помощью вызова метода createTopicQueue () , который использует информацию из экземпляра DestinationInfo для создания исключительной, недолговечной очереди, которую мы затем привязываем к Exchange с помощью настроенного ключа маршрутизации:

private Queue createTopicQueue(DestinationInfo destination) {

    Exchange ex = ExchangeBuilder
      .topicExchange(destination.getExchange())
      .durable(true)
      .build();
    amqpAdmin.declareExchange(ex);
    Queue q = QueueBuilder
      .nonDurable()
      .build();     
    amqpAdmin.declareQueue(q);
    Binding b = BindingBuilder.bind(q)
      .to(ex)
      .with(destination.getRoutingKey())
      .noargs();        
    amqpAdmin.declareBinding(b);
    return q;
}

Обратите внимание, что, несмотря на то, что мы снова объявляем Exchange , RabbitMQ не создаст новый, так как мы уже объявили его во время запуска.

Второе отличие заключается в лямбде, которую мы передаем методу onDispose () , который на этот раз также удалит Очередь при отключении подписчика.

5.3. Тестирование

Для того, чтобы протестировать сценарий Pub-Sub, мы должны сначала определить назначение темы в out application.yml следующим образом:

destinations:
## ... queue destinations omitted      
  topics:
    weather:
      exchange: alerts
      routing-key: WEATHER

Здесь мы определили конечную точку темы, которая будет доступна в местоположении /тема/погода . Эта конечная точка будет использоваться для отправки сообщений на биржу “оповещения” на RabbitMQ с ключом маршрутизации “ПОГОДА”.

После запуска сервера мы можем проверить, что обмен был создан с помощью команды rabbitmqctl :

$ docker exec docker_rabbitmq_1 rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.topic       topic
amq.fanout      fanout
amq.match       headers
amq.headers     headers
        direct
amq.rabbitmq.trace      topic
amq.direct      direct
alerts  topic

Теперь, если мы выполним команду list_bindings , мы увидим, что нет очередей, связанных с обменом “предупреждениями”:

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        NYSE    queue   NYSE    []
nyse    exchange        NYSE    queue   NYSE    []

Давайте запустим пару подписчиков, которые подпишутся на наш пункт назначения, открыв две командные оболочки и выполнив следующую команду на каждой из них:

$ curl -v http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:No news is good news...

# ... same message repeating indefinitely

Наконец, мы снова используем curl для отправки некоторых предупреждений нашим подписчикам:

$ curl -v -d "Hurricane approaching!" http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 22
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 22 out of 22 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

Как только мы отправим сообщение, мы почти мгновенно увидим сообщение “Приближается ураган !” на каждой оболочке подписчиков.

Если мы сейчас проверим доступные привязки, то увидим, что у нас есть одна очередь для каждого подписчика:

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        IBOV    queue   IBOV    []
        exchange        NYSE    queue   NYSE    []
        exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g       
  queue   spring.gen-i0m0pbyKQMqpz2_KFZCd0g       []
        exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ       
  queue   spring.gen-wCHALTsIS1q11PQbARJ7eQ       []
alerts  exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g     
  queue   WEATHER []
alerts  exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ     
  queue   WEATHER []
ibov    exchange        IBOV    queue   IBOV    []
nyse    exchange        NYSE    queue   NYSE    []
quotes  exchange        NYSE    queue   NYSE    []

Как только мы нажмем Ctrl-C на оболочке подписчиков, наш шлюз в конечном итоге обнаружит, что клиент отключился, и удалит эти привязки.

6. Заключение

В этой статье мы продемонстрировали, как создать простое реактивное приложение, которое взаимодействует с сервером RabbitMQ с помощью модуля spring-amqp .

С помощью всего нескольких строк кода мы смогли создать функциональный шлюз HTTP-to-AMQP, который поддерживает как шаблоны интеграции “Точка-точка”, так и шаблоны интеграции “Публикация-подписка”, которые мы можем легко расширить, добавив дополнительные функции, такие как безопасность, путем добавления стандартных функций Spring.

Код, показанный в этой статье, доступен на Github.