1. введение
В этом уроке мы создадим производителя и потребителя сообщений с помощью Spring Boot и Apache RocketMQ, платформы распределенных сообщений и потоковых данных с открытым исходным кодом.
2. Зависимости
Для проектов Maven нам нужно добавить RocketMQ Spring Boot Starter зависимость:
org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4
3. Создание Сообщений
В нашем примере мы создадим базовый производитель сообщений, который будет отправлять события всякий раз, когда пользователь добавляет или удаляет товар из корзины покупок.
Во-первых, давайте настроим местоположение нашего сервера и имя группы в нашем application.properties :
rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=cart-producer-group
Обратите внимание, что если бы у нас было несколько серверов имен, мы могли бы перечислить их как host:port;host:port .
Теперь, чтобы все было просто, мы создадим приложение CommandLineRunner и сгенерируем несколько событий во время запуска приложения:
@SpringBootApplication public class CartEventProducer implements CommandLineRunner { @Autowired private RocketMQTemplate rocketMQTemplate; public static void main(String[] args) { SpringApplication.run(CartEventProducer.class, args); } public void run(String... args) throws Exception { rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1)); rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2)); rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1)); } }
CartItemEvent состоит всего из двух свойств – идентификатора товара и количества:
class CartItemEvent { private String itemId; private int quantity; // constructor, getters and setters }
В приведенном выше примере мы используем метод convertAndSend () , общий метод, определенный классом AbstractMessageSendingTemplate abstract, для отправки наших событий корзины. Он принимает два параметра: назначение, которое в нашем случае является именем темы, и полезную нагрузку сообщения.
4. Потребитель сообщений
Использование сообщений RocketMQ так же просто, как создание компонента Spring с аннотацией @RocketMQMessageListener и реализация интерфейса RocketMQListener :
@SpringBootApplication public class CartEventConsumer { public static void main(String[] args) { SpringApplication.run(CartEventConsumer.class, args); } @Service @RocketMQMessageListener( topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic" ) public class CardItemAddConsumer implements RocketMQListener{ public void onMessage(CartItemEvent addItemEvent) { log.info("Adding item: {}", addItemEvent); // additional logic } } @Service @RocketMQMessageListener( topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic" ) public class CardItemRemoveConsumer implements RocketMQListener { public void onMessage(CartItemEvent removeItemEvent) { log.info("Removing item: {}", removeItemEvent); // additional logic } } }
Нам нужно создать отдельный компонент для каждой темы сообщения, которую мы слушаем. В каждом из этих слушателей мы определяем имя темы и имя группы потребителей через @ RocketMQ MessageListener аннотацию.
5. Синхронная и асинхронная передача
В предыдущих примерах мы использовали метод convertAndSend для отправки наших сообщений. Однако у нас есть и другие варианты.
Мы могли бы, например, вызвать sync Send , который отличается от convertAndSend , потому что он возвращает SendResult объект.
Его можно использовать, например, для проверки того, было ли наше сообщение успешно отправлено, или для получения его идентификатора:
public void run(String... args) throws Exception { SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic", new CartItemEvent("bike", 1)); SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic", new CartItemEvent("computer", 2)); SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic", new CartItemEvent("bike", 1)); }
Как и convertAndSend, этот метод возвращается только после завершения процедуры отправки.
Мы должны использовать синхронную передачу в случаях, требующих высокой надежности, таких как важные уведомления или SMS-уведомления.
С другой стороны, вместо этого мы можем захотеть отправить сообщение асинхронно и получить уведомление, когда отправка завершится.
Мы можем сделать это с помощью async Send , который принимает SendCallback в качестве параметра и немедленно возвращает:
rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.error("Successfully sent cart item"); } @Override public void onException(Throwable throwable) { log.error("Exception during cart item sending", throwable); } });
Мы используем асинхронную передачу в случаях, требующих высокой пропускной способности.
Наконец, для сценариев, где у нас очень высокие требования к пропускной способности, мы можем использовать sendOneWay вместо async Send . sendOneWay отличается от asyncSend тем, что не гарантирует отправку сообщения.
Односторонняя передача также может использоваться для обычных случаев надежности, таких как сбор журналов.
6. Отправка сообщений в транзакции
RocketMQ предоставляет нам возможность отправлять сообщения в рамках транзакции. Мы можем сделать это с помощью метода send In Transaction() :
MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build(); rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);
Кроме того, мы должны реализовать локальный интерфейс RocketMQ TransactionListener :
@RocketMQTransactionListener(txProducerGroup="test-transaction") class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN return RocketMQLocalTransactionState.COMMIT; } }
В отправить сообщение В Transaction () первым параметром является имя транзакции. Он должен совпадать с полем @RocketMQTransactionListener участника группы производителей tx.
7. Конфигурация производителя сообщений
Мы также можем настроить аспекты самого производителя сообщений:
- rocketmq.producer.send-message-timeout : Тайм – аут отправки сообщения в миллисекундах-значение по умолчанию равно 3000
- rocketmq.producer.compress-message-body-threshold : Порог, выше которого RocketMQ будет сжимать сообщения – значение по умолчанию равно 1024.
- rocketmq.producer.max-размер сообщения : Максимальный размер сообщения в байтах – значение по умолчанию 4096.
- rocketmq.producer.retry-times-when-sendasync-failed : Максимальное количество повторных попыток, выполняемых внутренне в асинхронном режиме до сбоя отправки – значение по умолчанию равно 2.
- rocketmq.producer.retry-next-server : Указывает, следует ли повторить попытку другого брокера при внутренней ошибке отправки – значение по умолчанию false .
- rocketmq.producer.retry-times-when-send-failed : Максимальное количество повторных попыток, выполняемых внутренне в асинхронном режиме до сбоя отправки – значение по умолчанию равно 2.
8. Заключение
В этой статье мы узнали, как отправлять и использовать сообщения с помощью Apache RocketMQ и Spring Boot. Как всегда, весь исходный код доступен на GitHub .