Автор оригинала: Ekaterina Galkina.
1. Обзор
Весенняя интеграция упрощает использование некоторых шаблонов корпоративной интеграции . Один из этих способов-через его DSL .
В этом уроке мы рассмотрим поддержку DSL для подпотоков для упрощения некоторых наших конфигураций.
2. Наша Задача
Допустим, у нас есть последовательность целых чисел, которые мы хотим разделить на три разных блока.
И если бы мы хотели использовать для этого интеграцию Spring, мы могли бы начать с создания трех выходных каналов:
- Такие числа, как 0, 3, 6 и 9, перейдут в кратное Трем каналам
- Такие числа, как 1, 4, 7 и 10, перейдут в остаток-это один канал
- И такие числа, как 2, 5, 8 и 11, переходят в остаток-это два канала
Чтобы увидеть, насколько полезными могут быть подпотоки, давайте начнем с того, как это будет выглядеть без подпотоков.
А затем мы будем использовать подпотоки, чтобы упростить нашу конфигурацию с помощью:
- опубликовать Канал Подписки
- маршрут К Получателям
- Фильтр s, чтобы настроить нашу логику если-то
- Маршрутизатор s, для настройки нашей коммутатора логики
3. Предварительные условия
Теперь, прежде чем настраивать наши подпотоки, давайте создадим эти выходные каналы.
Мы сделаем эти Каналы очереди s, так как это немного проще для демонстрации:
@EnableIntegration @IntegrationComponentScan public class SubflowsConfiguration { @Bean QueueChannel multipleOfThreeChannel() { return new QueueChannel(); } @Bean QueueChannel remainderIsOneChannel() { return new QueueChannel(); } @Bean QueueChannel remainderIsTwoChannel() { return new QueueChannel(); } boolean isMultipleOfThree(Integer number) { return number % 3 == 0; } boolean isRemainderIOne(Integer number) { return number % 3 == 1; } boolean isRemainderTwo(Integer number) { return number % 3 == 2; } }
В конечном счете, именно здесь окажутся наши сгруппированные номера.
Обратите также внимание, что интеграция Spring может легко начать выглядеть сложной, поэтому мы добавим несколько вспомогательных методов для удобства чтения.
4. Решение Без Подпотоков
Теперь нам нужно определить наши потоки.
Без подпотоков простая идея состоит в том, чтобы определить три отдельных потока интеграции, по одному для каждого типа чисел.
Мы отправим одинаковую последовательность сообщений каждому Поток интеграции компонент, но выходные сообщения для каждого компонента будут разными.
4.1. Определение компонентов интеграционного потока
Во-первых, давайте определим каждый Поток интеграции компонент в нашей Конфигурации подпотока классе:
@Bean public IntegrationFlow multipleOfThreeFlow() { return flow -> flow.split() .filter(this::isMultipleOfThree) .channel("multipleOfThreeChannel"); }
Наш поток содержит две конечные точки-/| Разделитель , за которым следует войлок er .
Фильтр делает то, на что он похож. Но зачем нам еще и разветвитель? Мы увидим это через минуту, но в основном это разбивает ввод Коллекцию на отдельные сообщения.
И, конечно, мы можем таким же образом определить еще два Потока интеграции компонента.
4.2. Шлюзы обмена Сообщениями
Для каждого потока нам также нужен Шлюз сообщений .
Проще говоря, они абстрагируют API сообщений интеграции Spring от вызывающего абонента, аналогично тому, как служба REST может абстрагироваться от HTTP:
@MessagingGateway public interface NumbersClassifier { @Gateway(requestChannel = "multipleOfThreeFlow.input") void multipleOfThree(Collectionnumbers); @Gateway(requestChannel = "remainderIsOneFlow.input") void remainderIsOne(Collection numbers); @Gateway(requestChannel = "remainderIsTwoFlow.input") void remainderIsTwo(Collection numbers); }
Для каждого из них нам нужно использовать аннотацию @Gateway и указать неявное имя входного канала, которое является просто именем компонента, за которым следует “.input” . Обратите внимание, что мы можем использовать это соглашение, потому что мы используем потоки на основе лямбда.
Эти методы являются точками входа в наши потоки.
4.3. Отправка сообщений и проверка вывода
А теперь давайте проверим:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = { SeparateFlowsConfiguration.class }) public class SeparateFlowsUnitTest { @Autowired private QueueChannel multipleOfThreeChannel; @Autowired private NumbersClassifier numbersClassifier;
@Test public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() { numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6)); Message> outMessage = multipleOfThreeChannel.receive(0); assertEquals(outMessage.getPayload(), 3); outMessage = multipleOfThreeChannel.receive(0); assertEquals(outMessage.getPayload(), 6); outMessage = multipleOfThreeChannel.receive(0); assertNull(outMessage); } }
Обратите внимание , что мы отправили сообщения в виде Списка , поэтому нам понадобился разделитель, чтобы взять одно “сообщение списка” и преобразовать его в несколько “числовых сообщений”.
Мы вызываем получаем с o , чтобы получить следующее доступное сообщение без ожидания. Поскольку в нашем списке есть два кратных трем, мы ожидаем, что сможем вызвать его дважды. Третий вызов прием возвращает null .
прием, конечно, возвращает Сообщение , поэтому мы вызываем getPayload , чтобы извлечь номер.
Точно так же мы могли бы сделать то же самое для двух других.
Итак, это было решение без подпотоков. У нас есть три отдельных потока для обслуживания и три отдельных метода шлюза.
Что мы сделаем сейчас, так это заменим три компонента Потока интеграции одним компонентом, а три метода шлюза-одним.
5. Использование канала PublishSubscribeChannel
Метод опубликовать канал подписки() транслирует сообщения во все подпотоки подписки. Таким образом, мы можем создать один поток вместо трех.
@Bean public IntegrationFlow classify() { return flow -> flow.split() .publishSubscribeChannel(subscription -> subscription .subscribe(subflow -> subflow .filter(this::isMultipleOfThree) .channel("multipleOfThreeChannel")) .subscribe(subflow -> subflow . filter(this::isRemainderOne) .channel("remainderIsOneChannel")) .subscribe(subflow -> subflow . filter(this::isRemainderTwo) .channel("remainderIsTwoChannel"))); }
Таким образом, подпотоки являются анонимными, что означает, что они не могут быть решены независимо.
Теперь у нас есть только один поток, поэтому давайте также отредактируем наш Классификатор чисел :
@Gateway(requestChannel = "classify.input") void classify(Collectionnumbers);
Теперь, поскольку у нас есть только один Поток интеграции компонент и один метод шлюза, нам нужно отправить ваш список только один раз:
@Test public void whenSendMessagesToFlow_thenNumbersAreClassified() { numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6)); // same assertions as before }
Обратите внимание, что с этого момента изменится только определение integrationflow, так что мы больше не будем показывать тест.
6. Использование маршрутных средств
Другой способ добиться того же-это маршрут К Получателям , что приятно, потому что в него встроена фильтрация.
Используя этот метод, мы можем указать как каналы, так и подпотоки для вещания.
6.1. получатель
В приведенном ниже коде мы укажем кратно 3 каналам , остаток равен 1 каналу, и остаток равен Двум каналам в качестве получателей на основе наших условий:
@Bean public IntegrationFlow classify() { return flow -> flow.split() .routeToRecipients(route -> route .recipient("multipleOfThreeChannel", this::isMultipleOfThree) . recipient("remainderIsOneChannel", this::isRemainderOne) . recipient("remainderIsTwoChannel", this::isRemainderTwo)); }
Мы также можем позвонить получателю без каких-либо условий, и маршрут к получателям будет опубликован в этом пункте назначения безоговорочно.
6.2. Поток получателей
И обратите внимание, что маршрут к получателям позволяет нам определить полный поток, точно так же, как опубликовать канал подписки.
Давайте изменим приведенный выше код и укажем анонимный подпоток в качестве первого получателя :
.routeToRecipients(route -> route .recipientFlow(subflow -> subflow .filter(this::isMultipleOfThree) .channel("mutipleOfThreeChannel")) ...);
Этот подпоток будет получать всю последовательность сообщений, поэтому нам нужно отфильтровать, как и раньше, чтобы получить такое же поведение.
Опять же, один Поток интеграции боба нам было достаточно.
Теперь давайте перейдем к компонентам if-else . Одним из них является Фильтр .
7. Использование потоков “если-то”
Мы уже использовали Фильтр во всех предыдущих примерах. Хорошей новостью является то, что мы можем указать не только условие для дальнейшей обработки, но также канал или | поток для отброшенных сообщений .
Мы можем думать о потоках и каналах сброса, таких как ещё блок:
@Bean public IntegrationFlow classify() { return flow -> flow.split() .filter(this::isMultipleOfThree, notMultiple -> notMultiple .discardFlow(oneflow -> oneflow . filter(this::isRemainderOne, twoflow -> twoflow .discardChannel("remainderIsTwoChannel")) .channel("remainderIsOneChannel")) .channel("multipleofThreeChannel"); }
В этом случае мы реализовали нашу логику маршрутизации if-else :
- Если число не кратно трем, затем отбросьте эти сообщения в поток отбрасывания; мы используем поток здесь, поскольку для определения канала назначения требуется больше логики.
- В потоке сброса если число не равно единице, затем отбросьте эти сообщения в канал сброса.
8. включение вычисленного значения
И, наконец, давайте попробуем метод маршрут , который дает нам немного больше контроля, чем маршрут для получателей. Это хорошо, потому что Маршрутизатор может разделить поток на любое количество частей, в то время как Фильтр может выполнять только две.
8.1. Сопоставление каналов
Давайте определим наш Поток интеграции компонент:
@Bean public IntegrationFlow classify() { return classify -> classify.split() .route(number -> number % 3, mapping -> mapping .channelMapping(0, "multipleOfThreeChannel") .channelMapping(1, "remainderIsOneChannel") .channelMapping(2, "remainderIsTwoChannel")); }
В приведенном выше коде мы вычисляем ключ маршрутизации, выполняя разделение:
route(p -> p % 3,...
Основываясь на этом ключе, мы направляем сообщения:
channelMapping(0, "multipleof3Channel")
8.2. Сопоставление подпотоков
Теперь, как и в случае с другими, мы можем получить больший контроль, указав подпоток, заменив отображение канала на Отображение подпотока :
.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))
Или еще больше контроля, вызвав метод handle вместо метода channel :
.subFlowMapping(2, subflow -> subflow .handle((payload, headers) -> { // do extra work on the payload return payload; }))).channel("remainderIsTwoChannel");
В этом случае подпоток вернется в основной поток после метода route () , поэтому нам нужно будет указать канал remainderIsTwoChannel.
9. Заключение
В этом руководстве мы рассмотрели, как фильтровать и маршрутизировать сообщения некоторыми способами, используя подпотоки.
Как обычно, полный исходный код доступен на GitHub .