Рубрики
Без рубрики

Публикация и получение сообщений с клиентом Nats Java

Узнайте, как подключиться к серверу NATS и отправлять как сообщения паб/суб, так и сообщения очереди, сбалансированные с нагрузкой.

Автор оригинала: Eric Goebelbecker.

Публикация и получение сообщений с клиентом Nats Java

1. Обзор

В этом учебнике мы будем использовать Java Клиент для NATs для подключения к NatS Сервер и публиковать и получать сообщения.

NATS предлагает три основных способа обмена сообщениями. Публикация/Подписка семантика доставляет сообщения всем подписчикам темы. Обмен сообщениями запросов/ответов отправляет запросы по темам и маршрутам обратно запрашиваемому.

Абоненты также могут присоединиться к группам очередей сообщений, когда они подписываются на тему. Сообщения, отправленные на связанную тему, доставляются только одному абоненту в группе очередей.

2. Настройка

2.1. Зависимость от Maven

Во-первых, мы должны добавить библиотеку NATS к нашему пом.xml:


    io.nats
    jnats
    1.0

Последняя версия библиотечного можно найти здесь , и проект Github здесь .

2.2. Сервер NATS

Во-вторых, нам понадобится сервер NATS для обмена сообщениями. Там инструкции для всех основных платформ здесь.

Мы предполагаем, что есть сервер, работающий на localhost:4222.

3. Подключение и обмен сообщениями

3.1. Подключение к NATS

тем подключиться () метод в статичном классе NATS создает блат .

Если мы хотим использовать соединение с опциями по умолчанию и слушать на localhost в порту 4222, мы можем использовать метод по умолчанию:

Connection natsConnection = Nats.connect();

Но Соединения есть много настраиваемых вариантов, некоторые из которых мы хотим переопределить.

Мы создадим Варианты объекта и передать его Натс :

private Connection initConnection() {
    Options options = new Options.Builder()
      .errorCb(ex -> log.error("Connection Exception: ", ex))
      .disconnectedCb(event -> log.error("Channel disconnected: {}", event.getConnection()))
      .reconnectedCb(event -> log.error("Reconnected to server: {}", event.getConnection()))
      .build();

    return Nats.connect(uri, options);
}

NATS Соединения долговечны. API попытается восстановить потерянное соединение.

Мы установили обратные вызовы, чтобы уведомить нас о том, когда происходит отключение и когда соединение восстановлено. В этом примере мы используем лямбды, но для приложений, которые должны сделать больше, чем просто войти в событие, мы можем установить объекты, которые реализуют необходимые интерфейсы.

Мы можем пробежать быстрый тест. Создайте соединение и добавьте сон в течение 60 секунд, чтобы процесс был запущен:

Connection natsConnection = initConnection();
Thread.sleep(60000);

Запустите это. Затем остановитесь и запустите сервер NATS:

[jnats-callbacks] ERROR com.baeldung.nats.NatsClient 
  - Channel disconnected: [email protected]
[reconnect] WARN io.nats.client.ConnectionImpl 
  - couldn't connect to nats://localhost:4222 (nats: connection read error)
[jnats-callbacks] ERROR com.baeldung.nats.NatsClient 
  - Reconnected to server: [email protected]

Мы видим, как обратные вызовы регистрируют отключение и повторное подключение.

3.2. Подпишитесь на Сообщения

Теперь, когда у нас есть связь, мы можем работать над обработкой сообщений.

NatS Сообщение является контейнером для массива байты . В дополнение к ожидаемой setData (byte) и byte- getData () методы настройки и получения назначения сообщения и ответа на темы.

Мы подписываемся на темы, которые Строки.

NATS поддерживает как синхронные, так и асинхронные подписки.

Давайте посмотрим на асинхронную подписку:

AsyncSubscription subscription = natsConnection
  .subscribe( topic, msg -> log.info("Received message on {}", msg.getSubject()));

API обеспечивает Сообщения к нашему MessageHandler (), в своей нити.

Некоторые приложения могут захотеть контролировать поток, который обрабатывает сообщения вместо этого:

SyncSubscription subscription = natsConnection.subscribeSync("foo.bar");
Message message = subscription.nextMessage(1000);

Синхронное имеет блокирующий nextMessage () метод, который будет блокировать для указанного количества миллисекунд. Мы будем использовать синхронные подписки для наших тестов, чтобы сделать тестовые случаи простыми.

AsyncSubscription и Синхронное оба имеют Отписаться () метод, который мы можем использовать, чтобы закрыть подписку.

subscription.unsubscribe();

3.3. Публикация сообщений

Издательский Сообщения может быть сделано несколькими способами.

Самый простой метод требует только темы Струнные и сообщение байты :

natsConnection.publish("foo.bar", "Hi there!".getBytes());

Если издатель желает получить ответ или предоставить конкретную информацию об источнике сообщения, он также может отправить сообщение с ответом на тему:

natsConnection.publish("foo.bar", "bar.foo", "Hi there!".getBytes());

Есть также перегрузки для нескольких других комбинаций, таких как прохождение в Сообщение вместо байты .

3.4. Простой обмен сообщениями

Учитывая действительный Связь , мы можем написать тест, который проверяет обмен сообщениями:

SyncSubscription fooSubscription = natsConnection.subscribe("foo.bar");
SyncSubscription barSubscription = natsConnection.subscribe("bar.foo");
natsConnection.publish("foo.bar", "bar.foo", "hello there".getBytes());

Message message = fooSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));

natsConnection
  .publish(message.getReplyTo(), message.getSubject(), "hello back".getBytes());

message = barSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello back", new String(message.getData()));

Мы начинаем с подписки на две темы с синхронной подписки, поскольку они работают гораздо лучше внутри теста JUnit. Затем мы отправляем сообщение одному из них, указывая другое в качестве ответТо адрес.

После прочтения сообщения из первого пункта назначения мы “перевернуть” темы, чтобы отправить ответ.

3.5. Подписка на Wildcard

Сервер NATS поддерживает тематические подстановочные знаки.

Wildcards работают на тематических токенах, которые разделены с символом ‘.’. Персонаж звездочки ”’ соответствует индивидуальному маркеру. Больше, чем символ “>” является подстановочный знак матч для остальной части темы, которая может быть более одного маркера.

Например:

  • foo.» соответствует foo.bar, foo.requests, но не foo.bar.requests
  • foo.> матчи foo.bar, foo.requests, foo.bar.requests, foo.bar.baeldung и т.д.

Давайте попробуем несколько тестов:

SyncSubscription fooSubscription = client.subscribeSync("foo.*");

client.publishMessage("foo.bar", "bar.foo", "hello there");

Message message = fooSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));

client.publishMessage("foo.bar.plop", "bar.foo", "hello there");
message = fooSubscription.nextMessage(200);
assertNull("Got message!", message);

SyncSubscription barSubscription = client.subscribeSync("foo.>");

client.publishMessage("foo.bar.plop", "bar.foo", "hello there");

message = barSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));

4. Запрос/ответ сообщений

Наш тест обмена сообщениями напоминал распространенную идиому в системах обмена сообщениями в пабе/субсемейных сообщениях; запрос/ответ. NATS имеет явную поддержку этого запроса/ответа обмена сообщениями .

Издатели могут установить обработчик для запросов, используя асинхронный метод подписки, который мы использовали выше:

AsyncSubscription subscription = natsConnection
  .subscribe("foo.bar.requests", new MessageHandler() {
    @Override
    public void onMessage(Message msg) {
        natsConnection.publish(message.getReplyTo(), reply.getBytes());
    }
});

Или они могут отвечать на запросы по мере их прибытия.

API предоставляет запрос () метод:

Message reply = natsConnection.request("foo.bar.requests", request.getBytes(), 100);

Этот метод создает временный почтовый ящик для ответа и пишет ответ на адрес для нас.

Запрос () возвращает ответ, или нулевой если запрос не превышает времени. Последним аргументом является количество миллисекунд для ожидания.

Мы можем изменить наш тест для запроса/ответа:

natsConnection.subscribe(salary.requests", message -> {
    natsConnection.publish(message.getReplyTo(), "denied!".getBytes());
});
Message reply = natsConnection.request("salary.requests", "I need a raise.", 100);
assertNotNull("No message!", reply);
assertEquals("denied!", new String(reply.getData()));

5. Очереди сообщений

Абоненты могут указывать группы очередей во время подписки. Когда сообщение будет опубликовано в группе NATS будет доставить его на один и только один абонент .

Группы очередей не сохраняются сообщения. Если слушатели недоступны, сообщение отбрасывается.

5.1. Подписка на очереди

Абоненты указывают имя группы очереди в качестве струна:

SyncSubscription subscription = natsConnection.subscribe("topic", "queue name");

Существует также асинхронная версия, конечно:

SyncSubscription subscription = natsConnection
  .subscribe("topic", "queue name", new MessageHandler() {
    @Override
    public void onMessage(Message msg) {
        log.info("Received message on {}", msg.getSubject());
    }
});

Подписка создает очередь на сервере NATS.

5.2. Публикация в очередях

Публикация сообщения в группах очередей просто требует публикации на связанную тему:

natsConnection.publish("foo",  "queue message".getBytes());

Сервер NATS будет маршрутить сообщение в очередь и выбрать получатель сообщений.

Мы можем проверить это с помощью теста:

SyncSubscription queue1 = natsConnection.subscribe("foo", "queue name");
SyncSubscription queue2 = natsConnection.subscribe("foo", "queue name");

natsConnection.publish("foo", "foobar".getBytes());

List messages = new ArrayList<>();

Message message = queue1.nextMessage(200);
if (message != null) messages.add(message);

message = queue2.nextMessage(200);
if (message != null) messages.add(message);

assertEquals(1, messages.size());

SyncSubscription queue1 = natsConnection.subscribe("foo");
SyncSubscription queue2 = natsConnection.subscribe("foo");

В этом кратком введении мы подключились к серверу NATS и отправили как сообщения паб/суб, так и сообщения очереди, сбалансированные по нагрузке. Мы рассмотрели поддержку NATS для подстановочных подписок. Мы также использовали сообщения запроса/ответа.

Образцы кода, как всегда, можно найти более на GitHub .