Рубрики
Без рубрики

Введение в RabbitMQ для разработчиков Java

Предварительные требования Для этого урока вам понадобится некоторый опыт работы с Java, maven и иметь docker i… С тегами java, showdev, учебник.

Предпосылки

Для этого урока вам понадобится некоторый опыт работы с Java, maven и установка docker на вашем компьютере

Очередь

В информатике существует понятие очередей. Очередь – это набор сообщений, предназначенных для доставки от одного отправителя одному или нескольким получателям. Сообщения могут быть доставлены в порядке или не в порядке по дизайну. Компьютерная программа, которая обрабатывает эти транзакции, называется брокером сообщений. RabbitMQ – один из самых популярных брокеров сообщений, который работает поверх расширенного протокола очереди сообщений (AMQP). Существует четыре основных компонента, формирующих протокол AMQP: Издатель, Обмен, Очередь, Потребитель.

Издатель

Сообщения публикуются на бирже издателем, издатель также несет ответственность за настройку атрибутов сообщения, которые мы рассмотрим позже.

Обмены

Биржи отвечают за маршрутизацию сообщений в одну или несколько очередей, мы рассмотрим очереди позже. В rabbitmq существует 4 различных типа обменов.

1.Прямой 2.Разветвление 3. Тема 4. Заголовок

В этом уроке мы рассмотрим только два: Директ, я собираюсь сделать еще один урок по обмену разветвлениями позже.

Прямые обмены отвечают за маршрутизацию сообщений в очередь на основе ключа маршрутизации. Когда вы объявляете очередь, вы можете “привязать” ее к обмену с помощью ключа маршрутизации, мы рассмотрим эту тему позже. Прямые очереди подходят для распределения задач между работниками. Разветвленный обмен отправляет сообщение всем очередям, которые связаны с обменом ключом маршрутизации. Когда приходит сообщение, биржа отправит копию этого сообщения во все очереди. Обмены разветвлениями полезны для передачи сообщения нескольким узлам в распределенной системе.

Очереди

Очереди отвечают за хранение сообщений и доставку их потребителям. Очереди должны быть объявлены, прежде чем вы сможете начать их использовать. Очередь должна быть привязана к exchange, чтобы она могла начать получать сообщения. Привязка – это набор правил, которые биржи используют для маршрутизации сообщений в одну или несколько очередей.

Потребители

Потребители – это последняя часть головоломки, им нужно подписаться на очередь, чтобы они могли начать получать сообщения, когда потребитель получает и обрабатывает сообщение, ему необходимо “Подтвердить” сообщение, чтобы получить другое.

Установка RabbitMQ

Мы будем использовать docker для установки rabbitmq и его пользовательского интерфейса управления.

docker run --rm -it --hostname my-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

Приведенная выше команда установит rabbitmq и свяжет два порта с вашим локальным портом: 5672 и 15672. Вы можете использовать 15672, чтобы попасть на портал управления RabbitMQ: http://localhost:15672 пароль пользователя по умолчанию – гость/гость.

Приведенная выше команда установит rabbitmq и свяжет два порта с вашим локальным портом: 5672 и 15672. Вы можете использовать 15672, чтобы попасть на портал управления RabbitMQ: http://localhost:15672 пароль пользователя по умолчанию – гость/гость.

Нам нужно использовать клиентскую библиотеку на порту 5672 для связи с сервером RabbitMQ. Теперь давайте создадим прямой обмен и очередь. Но сначала мы должны загрузить клиентскую библиотеку. Я использую maven, вставьте эту зависимость в свой pom.xml затем выполните установку maven для загрузки клиента.


     com.rabbitmq
     amqp-client
     5.12.0
 

Теперь, когда библиотека загружена, давайте создадим фабрику соединений. Я создаю пакет под названием com.queue

package com.queue;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Queue {
   private static final String HOST = "localhost";
   private Channel channel;

   public Queue() {
       ConnectionFactory cf = new ConnectionFactory();
       cf.setHost(HOST);
       try {
           Connection connection = cf.newConnection();
           channel = connection.createChannel();
       } catch (Exception e) {
           System.err.println(e);
       }
   }

   public void createExchangeQueue(String queueName, String exchangeName, String exchangeType, String key) {
       try {
           channel.queueDeclare(queueName, false, false, false, null);
           channel.exchangeDeclare(exchangeName, exchangeType);
           channel.queueBind(queueName, exchangeName, key);
       } catch (Exception e) {
           System.err.println(e);
       }
   }
}

Там много чего происходит, давайте разберемся

Сначала я создал свой общедоступный конструктор под названием Queue. Затем в этом конструкторе было открыто соединение с RabbitMQ, создан канал и присвоен объект канала переменной с именем канал. После этого я создал общедоступный метод void под названием create Exchange Queue, чтобы создать очередь и обмен и связать их вместе с помощью ключа.

Хорошо, теперь у нас есть наш класс очереди. Давайте воспользуемся этим. Просто для развлечения я напишу программу для вычисления квадрата числа.

package com.rabbit;
import com.queue.Queue;
public final class App {
   private static String QUEUE_NAME = "square";
   private static String EXCHANGE_NAME = "myExchange";
   private static String KEY_NAME = "key";
   public static void main(String[] args) {
       Queue queue = new Queue();
       queue.createExchangeQueue(QUEUE_NAME, EXCHANGE_NAME, "direct", KEY_NAME);
   }
}

После выполнения приведенного выше кода вы можете перейти в интерфейс управления и перейти на вкладку очереди и вы увидите квадрат под списком очередей. Теперь давайте изменим наш класс очереди, чтобы отправлять сообщения в нашу квадратную очередь и получать сообщения из очереди.

package com.queue;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.nio.charset.StandardCharsets;
import com.rabbitmq.client.DeliverCallback;

public class Queue {
   private static final String HOST = "localhost";
   private Channel channel;

   public Queue() {
       ConnectionFactory cf = new ConnectionFactory();
       cf.setHost(HOST);
       try {
           Connection connection = cf.newConnection();
           channel = connection.createChannel();
       } catch (Exception e) {
           System.err.println(e);
       }
   }

   public void sendMessage(String exchange, String key, String message){
       try {
           channel.basicPublish(exchange, key, null, message.getBytes(StandardCharsets.UTF_8));
       } catch (Exception e) {
           System.err.println(e);
       }
   }

   public void listenToQueue(String queueName, DeliverCallback dlr) {
       try {
           channel.basicConsume(queueName, true, dlr, consumerTag -> { });
       } catch (Exception e) {
           System.err.println(e);
       }
   }

   public void createExchangeQueue(String queueName, String exchangeName, String exchangeType, String key) {
       try {
           channel.queueDeclare(queueName, false, false, false, null);
           channel.exchangeDeclare(exchangeName, exchangeType);
           channel.queueBind(queueName, exchangeName, key);
       } catch (Exception e) {
           System.err.println(e);
       }
   }
}

Я создал два новых метода. Первый – это открытый метод void, называемый SendMessage, он принимает обмен, ключ и сообщение, а затем использует метод basicPublish нашего объекта канала для доставки сообщения в нашу квадратную очередь. Второй метод – это еще один открытый метод void, который принимает имя очереди и функцию обратного вызова. Метод basicConsume вызывает вашу функцию обратного вызова каждый раз, когда в нашей квадратной очереди появляется новое сообщение. Теперь у нас полностью написан наш класс очереди, давайте начнем работать над нашей квадратной логикой, которая является нашим потребителем.

package com.square;
import com.rabbitmq.client.DeliverCallback;
import com.queue.Queue;

public class Square{
   private static String QUEUE_NAME = "square";
   private static String EXCHANGE_NAME = "myExchange";
   private static String KEY_NAME = "key";
   public void listenToMessage(){
       Queue queue = new Queue();
       queue.createExchangeQueue(QUEUE_NAME, EXCHANGE_NAME, "direct", KEY_NAME);
       queue.listenToQueue(QUEUE_NAME, findSquare);
   }

   DeliverCallback findSquare = (consumerTag, delivery) -> {
       String message = new String(delivery.getBody(), "UTF-8");
       int number = Integer.parseInt(message);
       int squareNumber = number * number;
       System.out.println("Square of " + message + " is: " + squareNumber );
   };
}

Это наш потребитель, у него есть один общедоступный метод, первый открывает новое соединение, создает новую очередь (квадратная очередь уже существует, поэтому она просто проверяет, есть ли она, прежде чем начать ее прослушивать), а затем начинает прослушивать очередь. Затем у нас есть функция обратного вызова под названием find Square, она получает сообщение из очереди, анализирует сообщение (сообщение приходит в виде строки), находит квадрат числа и выводит его на экран.

А вот и наш основной класс

package com.rabbit;
import com.queue.Queue;
import com.square.Square;
public final class App {
   private static String QUEUE_NAME = "square";
   private static String EXCHANGE_NAME = "myExchange";
   private static String KEY_NAME = "key";
   public static void main(String[] args) {
       Queue queue = new Queue();
       queue.createExchangeQueue(QUEUE_NAME, EXCHANGE_NAME, "direct", KEY_NAME);
       queue.sendMessage(EXCHANGE_NAME, KEY_NAME, "5");
       Square sq = new Square();
       sq.listenToMessage();
   }
}

Теперь, после выполнения приведенного выше кода, вы можете перейти к своей очереди из пользовательского интерфейса управления, и вы увидите, что у вас есть сообщение внутри квадратной очереди

Давайте воспользуемся этим сообщением и найдем квадрат

package com.rabbit;
import java.util.ArrayList;
import java.util.List;

import com.queue.Queue;
import com.square.Square;
public final class App {
   private static String QUEUE_NAME = "square";
   private static String EXCHANGE_NAME = "myExchange";
   private static String KEY_NAME = "key";
   public static void main(String[] args) {
       Queue queue = new Queue();
       queue.createExchangeQueue(QUEUE_NAME, EXCHANGE_NAME, "direct", KEY_NAME);
       List numbers = new ArrayList();
       numbers.add("1");
       numbers.add("2");
       numbers.add("3");
       numbers.add("4");
       numbers.add("5");
       numbers.forEach((n)-> queue.sendMessage(EXCHANGE_NAME, KEY_NAME, n));
       Square sq = new Square();
       sq.listenToMessage();
   }
}

Теперь у нас есть список чисел вместо одного, и мы используем метод forEach для перебора списка и отправки чисел в очередь по одному. Теперь запустите своего потребителя, и вы увидите, что потребитель автоматически принимает все сообщения и показывает квадрат всех чисел.

Вывод

RabbitMQ – популярный брокер сообщений, работающий поверх протокола AMPQ. Протокол AMPQ состоит из 4 компонентов: 1 – Издатель, 2 – Обмен, 3 -Очередь, 4 – Потребитель.

Оригинал: “https://dev.to/pharzad/introduction-to-rabbitmq-for-java-developers-7dk”