Рубрики
Без рубрики

Apache RocketMQ с пружинным загрузчиком

Apache RocketMQ с пружинным загрузчиком

Автор оригинала: baeldung.

1. введение

В этом уроке мы создадим производителя и потребителя сообщений с помощью Spring Boot и Apache RocketMQ, платформы распределенных сообщений и потоковых данных с открытым исходным кодом.

2. Зависимости

Для проектов Maven нам нужно добавить RocketMQ Spring Boot Starter зависимость:


    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.0.4

3. Создание Сообщений

В нашем примере мы создадим базовый производитель сообщений, который будет отправлять события всякий раз, когда пользователь добавляет или удаляет товар из корзины покупок.

Во-первых, давайте настроим местоположение нашего сервера и имя группы в нашем application.properties :

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=cart-producer-group

Обратите внимание, что если бы у нас было несколько серверов имен, мы могли бы перечислить их как host:port;host:port .

Теперь, чтобы все было просто, мы создадим приложение CommandLineRunner и сгенерируем несколько событий во время запуска приложения:

@SpringBootApplication
public class CartEventProducer implements CommandLineRunner {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(CartEventProducer.class, args);
    }

    public void run(String... args) throws Exception {
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1));
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2));
        rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1));
    }
}

CartItemEvent состоит всего из двух свойств – идентификатора товара и количества:

class CartItemEvent {
    private String itemId;
    private int quantity;

    // constructor, getters and setters
}

В приведенном выше примере мы используем метод convertAndSend () , общий метод, определенный классом AbstractMessageSendingTemplate abstract, для отправки наших событий корзины. Он принимает два параметра: назначение, которое в нашем случае является именем темы, и полезную нагрузку сообщения.

4. Потребитель сообщений

Использование сообщений RocketMQ так же просто, как создание компонента Spring с аннотацией @RocketMQMessageListener и реализация интерфейса RocketMQListener :

@SpringBootApplication
public class CartEventConsumer {

    public static void main(String[] args) {
        SpringApplication.run(CartEventConsumer.class, args);
    }

    @Service
    @RocketMQMessageListener(
      topic = "cart-item-add-topic",
      consumerGroup = "cart-consumer_cart-item-add-topic"
    )
    public class CardItemAddConsumer implements RocketMQListener {
        public void onMessage(CartItemEvent addItemEvent) {
            log.info("Adding item: {}", addItemEvent);
            // additional logic
        }
    }

    @Service
    @RocketMQMessageListener(
      topic = "cart-item-removed-topic",
      consumerGroup = "cart-consumer_cart-item-removed-topic"
    )
    public class CardItemRemoveConsumer implements RocketMQListener {
        public void onMessage(CartItemEvent removeItemEvent) {
            log.info("Removing item: {}", removeItemEvent);
            // additional logic
        }
    }
}

Нам нужно создать отдельный компонент для каждой темы сообщения, которую мы слушаем. В каждом из этих слушателей мы определяем имя темы и имя группы потребителей через @ RocketMQ MessageListener аннотацию.

5. Синхронная и асинхронная передача

В предыдущих примерах мы использовали метод convertAndSend для отправки наших сообщений. Однако у нас есть и другие варианты.

Мы могли бы, например, вызвать sync Send , который отличается от convertAndSend , потому что он возвращает SendResult объект.

Его можно использовать, например, для проверки того, было ли наше сообщение успешно отправлено, или для получения его идентификатора:

public void run(String... args) throws Exception { 
    SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("bike", 1)); 
    SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("computer", 2)); 
    SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic", 
      new CartItemEvent("bike", 1)); 
}

Как и convertAndSend, этот метод возвращается только после завершения процедуры отправки.

Мы должны использовать синхронную передачу в случаях, требующих высокой надежности, таких как важные уведомления или SMS-уведомления.

С другой стороны, вместо этого мы можем захотеть отправить сообщение асинхронно и получить уведомление, когда отправка завершится.

Мы можем сделать это с помощью async Send , который принимает SendCallback в качестве параметра и немедленно возвращает:

rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        log.error("Successfully sent cart item");
    }

    @Override
    public void onException(Throwable throwable) {
        log.error("Exception during cart item sending", throwable);
    }
});

Мы используем асинхронную передачу в случаях, требующих высокой пропускной способности.

Наконец, для сценариев, где у нас очень высокие требования к пропускной способности, мы можем использовать sendOneWay вместо async Send . sendOneWay отличается от asyncSend тем, что не гарантирует отправку сообщения.

Односторонняя передача также может использоваться для обычных случаев надежности, таких как сбор журналов.

6. Отправка сообщений в транзакции

RocketMQ предоставляет нам возможность отправлять сообщения в рамках транзакции. Мы можем сделать это с помощью метода send In Transaction() :

MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build();
rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);

Кроме того, мы должны реализовать локальный интерфейс RocketMQ TransactionListener :

@RocketMQTransactionListener(txProducerGroup="test-transaction")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
      @Override
      public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
          // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.UNKNOWN;
      }

      @Override
      public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
          // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.COMMIT;
      }
}

В отправить сообщение В Transaction () первым параметром является имя транзакции. Он должен совпадать с полем @RocketMQTransactionListener участника группы производителей tx.

7. Конфигурация производителя сообщений

Мы также можем настроить аспекты самого производителя сообщений:

  • rocketmq.producer.send-message-timeout : Тайм – аут отправки сообщения в миллисекундах-значение по умолчанию равно 3000
  • rocketmq.producer.compress-message-body-threshold : Порог, выше которого RocketMQ будет сжимать сообщения – значение по умолчанию равно 1024.
  • rocketmq.producer.max-размер сообщения : Максимальный размер сообщения в байтах – значение по умолчанию 4096.
  • rocketmq.producer.retry-times-when-sendasync-failed : Максимальное количество повторных попыток, выполняемых внутренне в асинхронном режиме до сбоя отправки – значение по умолчанию равно 2.
  • rocketmq.producer.retry-next-server : Указывает, следует ли повторить попытку другого брокера при внутренней ошибке отправки – значение по умолчанию false .
  • rocketmq.producer.retry-times-when-send-failed : Максимальное количество повторных попыток, выполняемых внутренне в асинхронном режиме до сбоя отправки – значение по умолчанию равно 2.

8. Заключение

В этой статье мы узнали, как отправлять и использовать сообщения с помощью Apache RocketMQ и Spring Boot. Как всегда, весь исходный код доступен на GitHub .