Рубрики
Без рубрики

Шаблоны Корпоративной Интеграции С Apache Camel

Всем привет! Я хочу рассказать вам об отличном инструменте с открытым исходным кодом, который является ПОТРЯСАЮЩИМ, и он не получает… С тегами java, программирование, camel, kafka.

Всем привет! Я хочу рассказать вам о замечательном инструменте с открытым исходным кодом, который является ПОТРЯСАЮЩИМ, но он не получает той любви, которой заслуживает: Apache Camel .

Apache Camel – это платформа интеграции. Что это значит? Предположим, вы работаете над проектом, который потребляет данные из Kafka и RabbitMQ, считывает и записывает из различных баз данных и в различные базы данных, преобразует данные, записывает все в файлы и выводит обработанные данные в другую тему Kafka. Вы также должны реализовать обработку ошибок службы (повторные попытки, канал с мертвыми буквами и т.д.), Чтобы все работало безупречно. Это кажется трудным.

Apache Camel помогает вам интегрироваться со многими компонентами, такими как базы данных, файлы, брокеры и многое другое, сохраняя при этом простоту и продвигая шаблоны корпоративной интеграции . Давайте рассмотрим несколько примеров, основанных на шаблонах интеграции. Вы можете найти код в этот репозиторий .

Мы начнем с потребления событий из темы Kafka и вывода в другую тему, используя шаблон Управляемый событиями потребитель . События будут представлять собой текстовые сообщения, отправленные пользователем.

Вот и все! Мы также добавили журнал, чтобы мы могли видеть текст сообщения в журналах. Аргумент log передается с использованием Simple language, языка Apache Camel, используемого для вычисления выражений.

Теперь давайте внедрим фильтр сообщений . Этот шаблон отфильтровывает сообщения, которые не соответствуют определенным условиям. В нашем случае мы будем обрабатывать только те, которые имеют тип “чат”.

Легко, верно? Теперь мы разархивируем сообщение из JSON в пользовательское сообщение POJO, чтобы иметь возможность фильтровать по типу. Мы снова маршалируем в JSON, прежде чем отправить его в другую тему Kafka.

Теперь предположим, что мы хотим сохранить все сообщения в файле. Кроме того, для сообщений, отправителем которых является “Неизвестный”, мы хотим сохранить их в другом файле для целей тестирования. Для этого мы можем использовать шаблон content-based router .

Если файл уже существует, мы добавим события и добавим новую строку в конце каждого события. Для других излучателей мы сделаем то же самое, но сохраним их в другом файле. Это действительно похоже на конструкцию “если”, верно?

Мы можем видеть список “устройств” в событии, и мы хотим регистрировать их одно за другим. Как мы можем это сделать? Используя шаблон Splitter , мы можем перебирать любой список. Мы можем делать это последовательно или параллельно. Давайте попробуем сделать это последовательно в этом примере.

Мы можем разделить на любое поле, которое является итеративным. Как вы можете видеть, мы снова используем простой язык для доступа к содержимому мероприятия.

Давай попробуем что-нибудь посильнее. Мы получаем текстовые сообщения от разных отправителей, но мы хотим объединить несколько текстовых сообщений и создать новое сообщение со всеми сообщениями для отправителя. Для этого мы можем использовать шаблон Aggregator . Шаблон агрегатора позволяет буферизировать события и ожидать других событий. Когда получено другое событие, может быть выполнено пользовательское агрегирование, основанное на наших потребностях. Новое событие отправляется при выполнении условия. Это условие может быть основано на количестве полученных событий, тайм-ауте или любом другом пользовательском условии.

В нашем случае мы создадим новый POJO, который будет агрегировать текстовые сообщения от отправителя. Новое событие будет отправлено через 5 секунд после первого события, полученного для отправителя.

Мы используем агрегацию в памяти, но мы могли бы использовать другие хранилища данных, такие как Postgres или Redis. Мы используем простой язык для агрегирования отправителя сообщения, и мы создали пользовательскую стратегию агрегирования, показанную ниже.

В пользовательской стратегии агрегирования для первого события) мы создаем новое Объединенное пользовательское сообщение с текстом сообщения. Для всех остальных событий мы добавляем текст сообщения в объединенное событие.

Все это замечательно, но как мы применяем преобразования к полю? Теперь у нас есть объединенное событие, но было бы здорово, если бы мы могли каким-то образом обработать объединенное событие и превратить его в обычный текст, объединив несколько элементов текстовых сообщений. Мы можем сделать это с помощью шаблона Message Translator .

Мы можем вызывать bean-функции непосредственно из маршрута Camel и выполнять все необходимые преобразования, используя простой Java-код. Аккуратно!

Мы видим, что наши Верблюжьи маршруты становятся все шире. Как нам поступить, если мы хотим, например, разделить их между файлами? Два компонента в памяти, которые позволяют нам это делать: Прямой и СЕДА .

Direct – это синхронная конечная точка, которая работает как вызов с маршрута на другой маршрут. Давайте используем его для разделения маршрута, который хранит событие в файле.

Отлично! Есть еще один компонент в памяти, который будет нам полезен: SEDAN . SEDA работает как Direct, но является асинхронным, что означает, что сообщение помещается в очередь для обработки другим потоком. Давайте используем SEDA, чтобы отделить получение сообщения от Kafka от маршрутов, которые его потребляют.

Теперь наши маршруты намного проще. Предположим, нам нужно выполнить периодическую задачу, такую как очистка. Мы можем воспользоваться преимуществами Таймер конечная точка. Давайте проиллюстрируем это, создав маршрут, который выполняется каждые 5 секунд.

Теперь, когда наше приложение почти готово к производству, мы должны повысить отказоустойчивость. Что произойдет, если по какой-либо причине сообщение получит сообщение об ошибке во время прохождения маршрута? Давайте реализуем шаблон Мертвая буква . Когда в маршруте возникает ошибка, сообщение отправляется в другую тему Kafka, чтобы позже его можно было повторно обработать.

И это все! Конфигурация обработчика ошибок применяется ко всем маршрутам в классе. Мы отправляем исходное сообщение в тему (то, которое было получено первым в маршруте). Мы также могли бы настроить политики повторных попыток с тайм-аутами и другими распространенными конфигурациями отказоустойчивости, но поскольку нам это не нужно, мы оставим все как есть.

Теперь, когда мы подходим к концу этой статьи, я также хотел вам кое-что показать: можно настроить конечные точки REST в качестве маршрутов Camel.

Вот так просто! Мы только что настроили GET для URL/api/hello, на который будет дан ответ “Привет, мир!”.

Как вы можете видеть, Apache Camel – это фреймворк, который упрощает интеграцию с другими компонентами, поддерживая шаблоны корпоративной интеграции и упрощая создание конвейеров данных.

Надеюсь, вам это понравилось! Спасибо!

Оригинал: “https://dev.to/diogodanielsoaresferreira/enterprise-integration-patterns-with-apache-camel-5a8f”