Рубрики
Без рубрики

Введение в весенний облачный поток

Узнайте, как создавать микрослужбы, управляемые данными и событиями, с помощью Spring Cloud Stream и RabbitM.

Автор оригинала: baeldung.

Введение в весенний облачный поток

1. Обзор

Spring Cloud Stream – это основа, построенная поверх весенней загрузки и весенней интеграции, которая помогает в создании микрослужб, управляемых событиями или с помощью сообщений, .

В этой статье мы представим концепции и конструкции Весеннего Облачного Потока с некоторыми простыми примерами.

2. Мейвен зависимостей

Чтобы начать работу, нам нужно добавить Весенний облачный стартовый поток с брокером RabbitM’ Maven зависимость как средства обмена сообщениями для нашей пом.xml :


    org.springframework.cloud
    spring-cloud-starter-stream-rabbit
    1.3.0.RELEASE

И мы добавим зависимость модуля от Maven Central для поддержки JUnit:


    org.springframework.cloud
    spring-cloud-stream-test-support
    1.3.0.RELEASE
    test

3. Основные концепции

Архитектура микрослужб следует « умные конечные точки и тупые трубы ” принцип. Связь между конечными точками определяется сообщениями-средними сторонами, такими как RabbitM или Apache Kafka. Службы общаются, публикуя события домена через эти конечные точки или каналы .

Давайте рассмотрим концепции, которые составляют платформу Spring Cloud Stream, а также основные парадигмы, которые мы должны знать, чтобы построить службы, управляемые сообщением.

3.1. Конструкции

Давайте посмотрим на простой сервис в Весеннем Облачном Потоке, который слушает входные связывания и отправляет ответ на выходной связывание:

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public LogMessage enrichLogMessage(LogMessage log) {
        return new LogMessage(String.format("[1]: %s", log.getMessage()));
    }
}

Аннотация @EnableBinding настраивает приложение для связывания каналов ВХОДНЫЕ и ВЫХОД определяется в рамках интерфейса Процессор . Оба канала являются привязками, которые могут быть настроены для использования конкретного средства обмена сообщениями или связующего.

Давайте рассмотрим определение всех этих понятий:

  • Привязки — набор интерфейсов, которые определяют входные и выходные каналы декларативно
  • Биндер — реализация программ для обмена сообщениями, таких как Kafka или RabbitM
  • Канал — представляет собой трубу связи между мессенджером и приложением
  • StreamListeners — методы обработки сообщений в бобах, которые будут автоматически вызываться на сообщение с канала после СообщениеКонвертер делает сериализацию/дезериализацию между событиями, специфичными для среднего программного обеспечения, и типами доменных объектов/POJOs
  • Мес мудрец Шемас – эти схемы, используемые для сериализации и дезериализации сообщений, могут статично считыться с места или загружаться динамически, поддерживая эволюцию типов объектов домена

3.2. Шаблоны связи

Сообщения, назначенные в пункты назначения, доставляются Публикация-Подписка шаблон обмена сообщениями. Издатели классифицируют сообщения на темы, каждый из которых идентифицируется по имени. Абоненты проявляют интерес к одной или несколько темам. Средства среднего программного обеспечения фильтруют сообщения, доставляя эти интересные темы подписчикам.

Теперь подписчиков можно сгруппить. группы потребителей является набор абонентов или потребителей, идентифицированных группа ID , в рамках которого сообщения из раздела темы или темы доставляются сбалансированно.

4. Модель программирования

В этом разделе описаны основы создания приложений Spring Cloud Stream.

4.1. Функциональное тестирование

Поддержка тестирования является связующим звеном, позволяющим взаимодействовать с каналами и проверять сообщения.

Давайте отправим сообщение вышеупомянутой обогатитьLogMessage службы и проверить, содержит ли ответ текст “1”: “ в начале сообщения:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationTests {

    @Autowired
    private Processor pipe;

    @Autowired
    private MessageCollector messageCollector;

    @Test
    public void whenSendMessage_thenResponseShouldUpdateText() {
        pipe.input()
          .send(MessageBuilder.withPayload(new LogMessage("This is my message"))
          .build());

        Object payload = messageCollector.forChannel(pipe.output())
          .poll()
          .getPayload();

        assertEquals("[1]: This is my message", payload.toString());
    }
}

4.2. Пользовательские каналы

В приведеном выше примере мы использовали Процессор интерфейс, предоставляемый Spring Cloud, который имеет только один вход и один выходной канал.

Если нам нужно что-то другое, например, один вход и два выходных канала, мы можем создать пользовательский процессор:

public interface MyProcessor {
    String INPUT = "myInput";

    @Input
    SubscribableChannel myInput();

    @Output("myOutput")
    MessageChannel anOutput();

    @Output
    MessageChannel anotherOutput();
}

Весна обеспечит надлежащую реализацию этого интерфейса для нас. Имена каналов можно установить с помощью аннотаций, как в @Output (“myOutput”) .

В противном случае Spring будет использовать имена методов в качестве имен каналов. Таким образом, у нас есть три канала, называемые myInput , myOutput , и другойOutput .

Теперь давайте представим, что мы хотим, чтобы маршрут сообщения на один выход, если значение меньше, чем 10, а в другой выход значение больше, чем или равна 10:

@Autowired
private MyProcessor processor;

@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
    if (val < 10) {
        processor.anOutput().send(message(val));
    } else {
        processor.anotherOutput().send(message(val));
    }
}

private static final  Message message(T val) {
    return MessageBuilder.withPayload(val).build();
}

4.3. Условная отправка

Использование @StreamListener аннотация, мы также можем фильтровать сообщения, которые мы ожидаем в потребительском используя любое условие, которое мы определяем с Выражения SpEL .

В качестве примера можно использовать условную отправку в качестве еще одного подхода к маршруту сообщений в различные выходы:

@Autowired
private MyProcessor processor;

@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
    processor.anOutput().send(message(val));
}

@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
    processor.anotherOutput().send(message(val));
}

Единственная ограничение этого подхода заключается в том, что эти методы не должны возвращать значение.

5. Настройка

Давайте наймем приложение, которое будет обрабатывать сообщение от брокера RabbitM.

5.1. Конфигурация связующего

Мы можем настроить наше приложение для использования реализации связующего по умолчанию через META-INF/spring.binders :

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Или мы можем добавить библиотеку связующих для RabbitM’ к classpath, включив эта зависимость :


    org.springframework.cloud
    spring-cloud-stream-binder-rabbit
    1.3.0.RELEASE

Если реализация связующего не предусмотрена, Spring будет использовать прямую коммуникацию сообщений между каналами.

5.2. Конфигурация Кролика

Чтобы настроить пример в разделе 3.1, чтобы использовать связующее звено RabbitM, нам нужно обновить application.yml расположен в src/main/resources :

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
        output:
          destination: queue.pretty.log.messages
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 
                port: 5672
                username: 
                password: 
                virtual-host: /

входные привязка будет использовать обмен называется очередь.log.сообщения , и выходной привязка будет использовать обменный queue.pretty.log.messages . Оба привязки будут использовать связующее звено под названием local_rabbit .

Обратите внимание, что нам не нужно создавать биржи или очереди RabbitM’ заранее. При запуске приложения оба обмена автоматически создаются .

Чтобы протестировать приложение, мы можем использовать сайт управления RabbitM, чтобы опубликовать сообщение. В Публикация сообщений панель биржевого очередь.log.сообщения , мы должны ввести запрос в формате JSON.

5.3. Настройка преобразования сообщений

Spring Cloud Stream позволяет нам применять конверсию сообщений для определенных типов содержимого. В приведенном выше примере вместо формата JSON мы хотим предоставить простой текст.

Чтобы сделать это, мы применить пользовательские преобразования к LogMessage с помощью СообщениеКонвертер :

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    //...

    @Bean
    public MessageConverter providesTextPlainMessageConverter() {
        return new TextPlainMessageConverter();
    }

    //...
}
public class TextPlainMessageConverter extends AbstractMessageConverter {

    public TextPlainMessageConverter() {
        super(new MimeType("text", "plain"));
    }

    @Override
    protected boolean supports(Class clazz) {
        return (LogMessage.class == clazz);
    }

    @Override
    protected Object convertFromInternal(Message message, 
        Class targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        String text = payload instanceof String 
          ? (String) payload 
          : new String((byte[]) payload);
        return new LogMessage(text);
    }
}

После применения этих изменений, вернуться к Публикация сообщений панели, если мы будем устанавливать заголовок ” contentTypes ” к ” текст/обычная ” и полезная нагрузка на ” Здравствуйте, “, он должен работать, как и раньше.

5.4. Группы потребителей

При запуске нескольких экземпляров нашего приложения каждый раз, когда есть новое сообщение в входном канале, все абоненты будут уведомлены .

Большую часть времени, нам нужно сообщение, которое будет обработано только один раз. Spring Cloud Stream реализует такое поведение через группы потребителей.

Для такого поведения каждый потребитель может использовать spring.cloud.stream.bindings. .group , чтобы указать название группы:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
          group: logMessageConsumers
          ...

6. Микрослужбы, управляемые сообщением

В этом разделе мы вводим все необходимые функции для запуска наших приложений Spring Cloud Stream в контексте микрослужб.

6.1. Масштабирование

При запуске нескольких приложений важно обеспечить правильное разделение данных между потребителями. Для этого Spring Cloud Stream предоставляет два свойства:

  • spring.cloud.stream.instanceCount – количество запущенных приложений
  • spring.cloud.stream.instanceИндекс – индекс текущего приложения

Например, если мы развернули два экземпляра из вышеуказанных MyLoggerServiceПрименение приложения, права собственности spring.cloud.stream.instanceCount должно быть 2 для обоих приложений, и spring.cloud.stream.instanceИндекс должно быть 0 и 1 соответственно.

Эти свойства автоматически устанавливаются при развертывании приложений Spring Cloud Stream с использованием Spring Data Flow, как описано в этой статье.

6.2. Разделение

События домена могут быть Разделенные Сообщения. Это помогает, когда мы масштабирование хранилища и повышение производительности приложений .

Событие домена обычно имеет ключ раздела, так что он попадает в один и тот же раздел с связанными сообщениями.

Допустим, мы хотим, чтобы сообщения журнала были разделены первой буквой в сообщении, которая была бы ключом раздела, и сгруппированы в два раздела.

Там будет один раздел для журнальных сообщений, которые начинаются с А-М и еще один раздел для Н-З. Это может быть настроено с использованием двух свойств:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression – выражение раздела полезной нагрузки
  • spring.cloud.stream.bindings.output.producer.partitionCount – количество групп

Иногда выражение раздела слишком сложно, чтобы написать его только в одной строке. В этих случаях мы можем написать нашу пользовательскую стратегию раздела, используя spring.cloud.stream.bindings.output.producer.partitionKeyExtractorКласс .

6.3. Индикатор здоровья

В контексте микрослужб мы также должны обнаружить, когда служба не работает или начинает . Spring Cloud Stream предоставляет доступ к management.health.binders.enabled для включения индикаторов здоровья связующих.

При запуске приложения мы можем запросить состояние здоровья в http://:/health .

7. Заключение

В этом учебнике мы представили основные концепции Весеннего Облачного Потока и показали, как использовать его на простых примерах по RabbitM. Более подробную информацию о весеннем облачном потоке можно найти здесь .

Исходный код этой статьи можно найти более на GitHub .