Рубрики
Без рубрики

Весенняя интеграция Java DSL

Изучите основы весенней интеграции Java DSL, а также несколько трюков

Автор оригинала: Dhawal Kapil.

1. введение

В этом уроке мы узнаем о Java DSL Spring Integration для создания интеграций приложений.

Мы возьмем интеграцию с перемещением файлов, которую мы построили во введении к интеграции Spring, и вместо этого используем DSL.

2. Зависимости

Java DSL Spring Integration является частью Spring Integration Core .

Итак, мы можем добавить эту зависимость:


    org.springframework.integration
    spring-integration-core
    5.0.6.RELEASE

И для работы с вашим приложением для перемещения файлов нам также понадобится Файл интеграции Spring :


    org.springframework.integration
    spring-integration-file
    5.0.6.RELEASE

3. Весенняя интеграция Java DSL

До Java DSL пользователи настраивали компоненты Spring Integration в XML.

DSL представляет некоторые свободно используемые конструкторы, из которых мы можем легко создать полный конвейер интеграции a Spring исключительно на Java.

Итак, допустим, мы хотели создать канал, который будет содержать в верхнем регистре все данные, поступающие по каналу.

В прошлом мы могли бы это сделать:



И теперь мы можем вместо этого сделать:

@Bean
public IntegrationFlow upcaseFlow() {
    return IntegrationFlows.from("input")
      .transform(String::toUpperCase)
      .get();
}

4. Приложение для Перемещения Файлов

Чтобы начать интеграцию с перемещением файлов, нам понадобятся несколько простых строительных блоков.

4.1. Поток интеграции

Первый строительный блок, который нам нужен, – это поток интеграции, который мы можем получить из Потоков интеграции builder:

IntegrationFlows.from(...)

из можно взять несколько типов, но в этом уроке мы рассмотрим только три:

  • Источник сообщения s
  • Канал сообщений s, и
  • Строка s

Мы скоро поговорим обо всех трех.

После того , как мы вызвали из , нам теперь доступны некоторые методы настройки:

IntegrationFlow flow = IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs())
  .handle(targetDirectory())
  // add more components
  .get();

В конечном счете, Integration Flows всегда будет создавать экземпляр Integration Flow, который является конечным продуктом любого приложения Spring Integration.

Этот шаблон ввода, выполнения соответствующих преобразований и выдачи результатов является основополагающим для всех приложений Spring Integration .

4.2. Описание источника входных данных

Во-первых, чтобы переместить файлы, нам нужно указать нашему потоку интеграции, где он должен их искать, и для этого нам нужен MessageSource:

@Bean
public MessageSource sourceDirectory() {
  // .. create a message source
}

Проще говоря, источник сообщений – это место, из которого могут поступать сообщения, внешние по отношению к приложению .

Более конкретно, нам нужно что-то, что может адаптировать этот внешний источник в представление сообщений Spring. И поскольку эта адаптация ориентирована на ввод , их часто называют Адаптерами входных каналов.

Зависимость spring-integration-file дает нам адаптер входного канала, который отлично подходит для нашего варианта использования: FileReadingMessageSource:

@Bean
public MessageSource sourceDirectory() {
    FileReadingMessageSource messageSource = new FileReadingMessageSource();
    messageSource.setDirectory(new File(INPUT_DIR));
    return messageSource;
}

Здесь наш FileReadingMessageSource будет читать каталог, указанный INPUT_DIR , и создаст из него Источник сообщений .

Давайте укажем это в качестве нашего источника в вызове IntegrationFlows.from :

IntegrationFlows.from(sourceDirectory());

4.3. Настройка источника входного сигнала

Теперь, если мы думаем об этом как о долгоживущем приложении, мы , вероятно, захотим иметь возможность замечать файлы по мере их поступления , а не просто перемещать файлы, которые уже есть при запуске.

Чтобы облегчить это, из также можно взять дополнительный настроенный в качестве дальнейшей настройки источника ввода:

IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));

В этом случае мы можем сделать наш источник ввода более устойчивым, сообщив Spring Integration опрашивать этот источник–в данном случае нашу файловую систему–каждые 10 секунд.

И, конечно, это относится не только к нашему файловому источнику, мы могли бы добавить этот опросник в любой Источник сообщений .

4.4. Фильтрация сообщений от источника ввода

Далее предположим, что мы хотим, чтобы наше приложение для перемещения файлов перемещало только определенные файлы, скажем, файлы изображений с расширением jpg .

Для этого мы можем использовать Универсальный селектор :

@Bean
public GenericSelector onlyJpgs() {
    return new GenericSelector() {

        @Override
        public boolean accept(File source) {
          return source.getName().endsWith(".jpg");
        }
    };
}

Итак, давайте снова обновим наш поток интеграции:

IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs());

Или, поскольку этот фильтр настолько прост, мы могли бы вместо этого определить его с помощью лямбды :

IntegrationFlows.from(sourceDirectory())
  .filter(source -> ((File) source).getName().endsWith(".jpg"));

4.5. Обработка Сообщений С Помощью Активаторов Служб

Теперь, когда у нас есть отфильтрованный список файлов, нам нужно записать их в новое место.

Активатор службы s – это то, к чему мы обращаемся, когда думаем о результатах весенней интеграции.

Давайте воспользуемся FileWritingMessageHandler активатором службы из spring-integration-file :

@Bean
public MessageHandler targetDirectory() {
    FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
    handler.setFileExistsMode(FileExistsMode.REPLACE);
    handler.setExpectReply(false);
    return handler;
}

Здесь наш FileWritingMessageHandler будет записывать каждое Сообщение полезную нагрузку, которую он получает в OUTPUT_DIR .

Еще раз, давайте обновим:

IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs())
  .handle(targetDirectory());

И обратите внимание, кстати, на использование set Expect Reply . Поскольку потоки интеграции могут быть |/двунаправленными, этот вызов указывает, что этот конкретный канал является одним из способов.

4.6. Активация Нашего Интеграционного Потока

Когда мы добавим все наши компоненты, нам нужно зарегистрировать ваш Поток интеграции в качестве компонента , чтобы активировать его:

@Bean
public IntegrationFlow fileMover() {
    return IntegrationFlows.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000)))
      .filter(onlyJpgs())
      .handle(targetDirectory())
      .get();
}

Метод get извлекает экземпляр IntegrationFlow , который нам нужно зарегистрировать в качестве компонента Spring.

Как только контекст нашего приложения загружается, все наши компоненты, содержащиеся в нашем потоке интеграции , активируются.

И теперь наше приложение начнет перемещать файлы из исходного каталога в целевой каталог.

5. Дополнительные Компоненты

В нашем приложении для перемещения файлов на основе DSL мы создали адаптер входящего канала, Фильтр сообщений и Активатор службы.

Давайте рассмотрим несколько других распространенных компонентов интеграции Spring и посмотрим, как мы можем их использовать.

5.1. Каналы сообщений

Как упоминалось ранее, канал сообщений – это еще один способ инициализации потока:

IntegrationFlows.from("anyChannel")

Мы можем прочитать это как “пожалуйста, найдите или создайте канал под названием любой канал . Затем считайте любые данные, которые поступают в любой канал из других потоков.”

Но на самом деле это более универсальное назначение, чем это.

Проще говоря, канал абстрагирует производителей от потребителей, и мы можем думать о нем как о Java очереди . Канал может быть вставлен в любую точку потока .

Предположим, например, что мы хотим расставить приоритеты для файлов по мере их перемещения из одного каталога в другой:

@Bean
public PriorityChannel alphabetically() {
    return new PriorityChannel(1000, (left, right) -> 
      ((File)left.getPayload()).getName().compareTo(
        ((File)right.getPayload()).getName()));
}

Затем мы можем вставить вызов в канал между нашими потоками:

@Bean
public IntegrationFlow fileMover() {
    return IntegrationFlows.from(sourceDirectory())
      .filter(onlyJpgs())
      .channel("alphabetically")
      .handle(targetDirectory())
      .get();
}

Есть десятки каналов для выбора, некоторые из наиболее удобных-для параллелизма, аудита или промежуточной персистентности (подумайте о буферах Кафки или JMS).

Кроме того, каналы могут быть мощными в сочетании с Bridge s.

5.2. Мост

Когда мы хотим объединить два канала , мы используем Мост.

Давайте представим, что вместо записи непосредственно в выходной каталог мы вместо этого записали наше приложение для перемещения файлов в другой канал:

@Bean
public IntegrationFlow fileReader() {
    return IntegrationFlows.from(sourceDirectory())
      .filter(onlyJpgs())
      .channel("holdingTank")
      .get();
}

Теперь, поскольку мы просто записали его в канал, мы можем перейти оттуда к другим потокам .

Давайте создадим мост, который опрашивает наш резервуар для хранения сообщений и записывает их в пункт назначения:

@Bean
public IntegrationFlow fileWriter() {
    return IntegrationFlows.from("holdingTank")
      .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
      .handle(targetDirectory())
      .get();
}

Опять же, поскольку мы писали в промежуточный канал, теперь мы можем добавить другой поток , который принимает эти же файлы и записывает их с другой скоростью :

@Bean
public IntegrationFlow anotherFileWriter() {
    return IntegrationFlows.from("holdingTank")
      .bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10)))
      .handle(anotherTargetDirectory())
      .get();
}

Как мы видим, отдельные мосты могут управлять конфигурацией опроса для разных обработчиков.

Как только наш контекст приложения загружен, у нас теперь есть более сложное приложение в действии, которое начнет перемещать файлы из исходного каталога в два целевых каталога.

6. Заключение

В этой статье мы рассмотрели различные способы использования Java DSL Spring Integration для построения различных интеграционных конвейеров.

По сути, мы смогли воссоздать приложение для перемещения файлов из предыдущего учебника, на этот раз используя чистую java.

Кроме того, мы рассмотрели несколько других компонентов, таких как каналы и мосты.

Полный исходный код, используемый в этом учебнике, доступен на Github .