1. Введение
Весенний облачный поток данных — это облачная модель программирования и операционной модели для сотых микрослужб данных.
С Весенний облачный поток данных разработчики могут создавать и организовывать конвейеры данных для обычных случаев использования, таких как отем данных, аналитика в режиме реального времени и импорт/экспорт данных.
Эти конвейеры данных бывают двух вкусов, потоковых и пакетных конвейеров данных.
В первом случае неограниченный объем данных потребляется или производится с помощью среднего программного обеспечения для обмена сообщениями. В то время как во втором случае недолговечная задача обрабатывает конечный набор данных, а затем завершается.
Эта статья будет сосредоточена на потоковой обработке.
2. Архитектурный обзор
Ключевыми компонентами архитектуры такого типа являются Заявки , Сервер потока данных , и целевое время выполнения.
Кроме того, в дополнение к этим ключевым компонентам, мы также обычно Оболочка потока данных и сообщение брокер в архитектуре.
Давайте рассмотрим все эти компоненты более подробно.
2.1. Заявки
Как правило, конвейер потоковых данных включает в себя потребление событий из внешних систем, обработку данных и сохранение полиглота. Эти фазы обычно называют Источник , Процессор , и Потопить в Весеннее облако терминология:
- Источник: это приложение, которое потребляет События
- Процессор: потребляет данные из Источник , делает некоторую обработку на нем, и испускает обработанные данные к следующему приложению в конвейере
- Раковина: либо потребляет из Источник или Процессор и записывает данные в желаемый слой настойчивости
Эти приложения могут быть упакованы двумя способами:
- Весенняя загрузка убер-банка, которая размещается в репозитории maven, файле, http или любой другой реализации ресурсов Spring (этот метод будет использоваться в этой статье)
- докер
Многие источники, процессоры и приложения для раковины для обычных случаев использования (например, jdbc, hdfs, http, маршрутизатор) уже предоставлены и готовы к использованию Весенний облачный поток данных команда.
2.2. Время выполнения
Кроме того, для выполнения этих приложений требуется время выполнения. Поддерживаемые время времени:
- Облачный лиохий
- Апач YARN
- Кубернеты
- Апач Месос
- Локальный сервер для разработки (wich будет использоваться в этой статье)
2.3. Сервер потока данных
Компонент, отвечающий за развертывание приложений в время выполнения, является Сервер потока данных . Существует Сервер потока данных выполняемая банка, предусмотренная для каждого из целевых времени выполнения.
Сервер потока данных отвечает за интерпретацию:
- Поток DSL, описывай логический поток данных через несколько приложений.
- Манифест развертывания, описываемый отображение приложений на время выполнения.
2.4. Оболочка потока данных
Оболочка потока данных является клиентом сервера потока данных. Оболочка позволяет выполнять команду DSL, необходимую для взаимодействия с сервером.
В качестве примера, DSL для описания потока данных из источника http в раковину jdbc будет написано как “http | jdbc”. Эти имена в DSL зарегистрированы в Сервер потока данных и карта артефактов приложений, которые могут быть размещены в репозиториях Maven или Docker.
Весна также предлагает графический интерфейс, названный Фло , для создания и мониторинга потоковых конвейеров данных. Тем не менее, его использование находится вне обсуждения этой статьи.
2.5. Брокер сообщений
Как мы уже видели в примере предыдущего раздел, мы использовали символ трубы в определении потока данных. Символ трубы представляет собой связь между двумя приложениями с помощью программы обмена сообщениями.
Это означает, что нам нужен брокер сообщений и работает в целевой среде.
Два обмена сообщениями посредников брокеров, которые поддерживаются являются:
- Апач Кафка
- КроликМЗ
Итак, теперь, когда у нас есть обзор архитектурных компонентов – пришло время построить наш первый конвейер обработки потоков.
3. Установка брокера сообщений
Как мы видели, приложениям в конвейере необходимо среднее программное обеспечение для обмена сообщениями для общения. Для целей этой статьи, мы пойдем с КроликМЗ .
Для получения подробной информации об установке, вы можете следовать инструкциям по официальный сайт .
4. Локальный сервер потока данных
Чтобы ускорить процесс создания наших приложений, мы будем использовать Весенний инициализр ; с его помощью мы можем получить Весенняя загрузка приложений в течение нескольких минут.
После достижения веб-сайт, просто выбрать Групповой и Артефакт имя.
Как только это будет сделано, нажмите на кнопку Создание проектных чтобы начать загрузку артефакта Maven.
После загрузки, распаковать проект и импортировать его в качестве проекта Maven в вашем IDE выбора.
Давайте добавим зависимость Maven к проекту. Как нам нужно Локальный сервер потока данных библиотеки, давайте добавим весна-облако-стартер-dataflow-сервер-локальный зависимость:
org.springframework.cloud spring-cloud-starter-dataflow-server-local
Теперь нам нужно аннотировать Весенняя загрузка основной класс с @EnableDataFlowServer аннотация:
@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } }
Это всё. Наша Локальный сервер потока данных готов к исполнению:
mvn spring-boot:run
Приложение будет загружено на порт 9393.
5. Оболочка потока данных
Опять же, перейдите на Весенний инициализр и выберите Групповой и Артефакт имя.
После того, как мы скачали и импортировали проект, давайте добавим весна-облако-dataflow-оболочка зависимость:
org.springframework.cloud spring-cloud-dataflow-shell
Теперь нам нужно добавить @EnableDataFlowShell аннотация к Весенняя загрузка основной класс:
@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } }
Теперь мы можем запустить оболочку:
mvn spring-boot:run
После запуска оболочки мы можем ввести помочь команды в запросе, чтобы увидеть полный список команд, которые мы можем выполнить.
6. Заявка на источник
Аналогичным образом, на Initializr мы создадим простое приложение и добавим Поток Кролик зависимость, называемая весна-облако-стартер-поток-кролик:
org.springframework.cloud spring-cloud-starter-stream-rabbit
Затем мы добавим @EnableBinding (Источник.class) аннотация к Весенняя загрузка основной класс:
@EnableBinding(Source.class) @SpringBootApplication public class SpringDataFlowTimeSourceApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeSourceApplication.class, args); } }
Теперь нам нужно определить источник данных, которые должны быть обработаны. Этот источник может быть любой потенциально бесконечной рабочей нагрузки (интернет-вещей датчик данных, 24/7 события обработки, онлайн данных транзакций ingest).
В нашем примере приложения, мы производим одно событие (для простоты новый timestamp) каждые 10 секунд с Поллер .
@InboundChannelAdapter аннотация отправляет сообщение на выходной канал источника, используя значение возврата в качестве полезной нагрузки сообщения:
@Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSourcetimeMessageSource() { return () -> MessageBuilder.withPayload(new Date().getTime()).build(); }
Наш источник данных готов.
7. Приложение процессора
Далее мы создадим приложение и добавим Поток Кролик зависимость.
Затем мы добавим @EnableBinding (Процессор.class) аннотация к Весенняя загрузка основной класс:
@EnableBinding(Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeProcessorApplication.class, args); } }
Далее нам необходимо определить метод обработки данных, поступающих из приложения-источника.
Чтобы определить трансформатор, мы должны аннотировать этот метод с помощью @Transformer аннотация:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(Long timestamp) { DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy"); String date = dateFormat.format(timestamp); return date; }
Он преобразует таймштамп из канала “вход” в отформатированную дату, которая будет отправлена на канал “выход”.
8. Приложение раковины
Последним приложением для создания является приложение Sink.
Опять же, перейдите на Весенний инициализр и выберите Групповой , Артефакт имя. После загрузки проекта давайте добавим Поток Кролик зависимость.
Затем добавьте @EnableBinding (Sink.class) аннотация к Весенняя загрузка основной класс:
@EnableBinding(Sink.class) @SpringBootApplication public class SpringDataFlowLoggingSinkApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowLoggingSinkApplication.class, args); } }
Теперь нам нужен метод перехвата сообщений, поступающих из приложения процессора.
Для этого нам нужно добавить @StreamListener (Sink.INPUT) аннотация к нашему методу:
@StreamListener(Sink.INPUT) public void loggerSink(String date) { logger.info("Received: " + date); }
Метод просто печатает таймштамп, преобразованный в отформатированную дату, в файл журнала.
9. Зарегистрируйте приложение Stream
Оболочка потока данных весеннего облака позволяет нам зарегистрировать приложение Stream с реестром приложений с помощью регистрация приложений команда.
Мы должны предоставить уникальное имя, тип приложения и URI, которые могут быть решены с артефактом приложения. Для типа укажите ” источник “, ” процессор “, или ” раковина “.
При предоставлении URI с maven схеме, формат должен соответствовать следующим:
maven://: [: [: ]]:
Зарегистрировать Источник , Процессор и Потопить приложения ранее созданные , перейдите на Оболочка потока данных весеннего облака и выдать следующие команды из подсказки:
app register --name time-source --type source --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT app register --name time-processor --type processor --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT app register --name logging-sink --type sink --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT
10. Создание и развертывание потока
Чтобы создать новое определение потока перейдите на Оболочка потока данных весеннего облака и выполнить следующую команду оболочки:
stream create --name time-to-log --definition 'time-source | time-processor | logging-sink'
Это определяет поток под названием время к журналу на основе выражения DSL ‘Время-источник | время процессора | лесозаготовки ‘ .
Затем для развертывания потока выполните следующую команду оболочки:
stream deploy --name time-to-log
Сервер потока данных решает время-источник , время процессора , и лесозаготовительные для maven координаты и использует их для запуска время-источник , время процессора и лесозаготовительные приложений потока.
Если поток это правильно развернуты вы увидите в Сервер потока данных журналы, что модули были запущены и связаны друг с другом:
2016-08-24 12:29:10.516 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink 2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor 2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source
11. Обзор результатов
В этом примере источник просто отправляет текущую тайм-штамп в качестве сообщения каждую секунду, формат процессора и раковина журнала выводит отформатированную таймштампу с помощью фреймворка журнала.
Файлы журнала расположены в каталоге, отображаемом в Сервер потока данных выход журнала, как показано выше. Чтобы увидеть результат, мы можем следить за журналом:
tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log 2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01 2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11 2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21
12. Заключение
В этой статье мы видели, как построить конвейер данных для обработки потоков с помощью Весенний облачный поток данных .
Кроме того, мы увидели роль Источник , Процессор и Потопить приложения внутри потока и как подключить и связать этот модуль внутри Сервер потока данных с помощью Оболочка потока данных .
Пример кода можно найти в Проект GitHub .