Рубрики
Без рубрики

Начало работы с обработкой потоков с помощью весеннего облачного потока данных

Введение в то, как начать работу с обработкой потоков с помощью платформы Spring Cloud Data Flow.

Автор оригинала: baeldung.

1. Введение

Весенний облачный поток данных — это облачная модель программирования и операционной модели для сотых микрослужб данных.

С Весенний облачный поток данных разработчики могут создавать и организовывать конвейеры данных для обычных случаев использования, таких как отем данных, аналитика в режиме реального времени и импорт/экспорт данных.

Эти конвейеры данных бывают двух вкусов, потоковых и пакетных конвейеров данных.

В первом случае неограниченный объем данных потребляется или производится с помощью среднего программного обеспечения для обмена сообщениями. В то время как во втором случае недолговечная задача обрабатывает конечный набор данных, а затем завершается.

Эта статья будет сосредоточена на потоковой обработке.

2. Архитектурный обзор

Ключевыми компонентами архитектуры такого типа являются Заявки , Сервер потока данных , и целевое время выполнения.

Кроме того, в дополнение к этим ключевым компонентам, мы также обычно Оболочка потока данных и сообщение брокер в архитектуре.

Давайте рассмотрим все эти компоненты более подробно.

2.1. Заявки

Как правило, конвейер потоковых данных включает в себя потребление событий из внешних систем, обработку данных и сохранение полиглота. Эти фазы обычно называют Источник , Процессор , и Потопить в Весеннее облако терминология:

  • Источник: это приложение, которое потребляет События
  • Процессор: потребляет данные из Источник , делает некоторую обработку на нем, и испускает обработанные данные к следующему приложению в конвейере
  • Раковина: либо потребляет из Источник или Процессор и записывает данные в желаемый слой настойчивости

Эти приложения могут быть упакованы двумя способами:

  • Весенняя загрузка убер-банка, которая размещается в репозитории maven, файле, http или любой другой реализации ресурсов Spring (этот метод будет использоваться в этой статье)
  • докер

Многие источники, процессоры и приложения для раковины для обычных случаев использования (например, jdbc, hdfs, http, маршрутизатор) уже предоставлены и готовы к использованию Весенний облачный поток данных команда.

2.2. Время выполнения

Кроме того, для выполнения этих приложений требуется время выполнения. Поддерживаемые время времени:

  • Облачный лиохий
  • Апач YARN
  • Кубернеты
  • Апач Месос
  • Локальный сервер для разработки (wich будет использоваться в этой статье)

2.3. Сервер потока данных

Компонент, отвечающий за развертывание приложений в время выполнения, является Сервер потока данных . Существует Сервер потока данных выполняемая банка, предусмотренная для каждого из целевых времени выполнения.

Сервер потока данных отвечает за интерпретацию:

  • Поток DSL, описывай логический поток данных через несколько приложений.
  • Манифест развертывания, описываемый отображение приложений на время выполнения.

2.4. Оболочка потока данных

Оболочка потока данных является клиентом сервера потока данных. Оболочка позволяет выполнять команду DSL, необходимую для взаимодействия с сервером.

В качестве примера, DSL для описания потока данных из источника http в раковину jdbc будет написано как “http | jdbc”. Эти имена в DSL зарегистрированы в Сервер потока данных и карта артефактов приложений, которые могут быть размещены в репозиториях Maven или Docker.

Весна также предлагает графический интерфейс, названный Фло , для создания и мониторинга потоковых конвейеров данных. Тем не менее, его использование находится вне обсуждения этой статьи.

2.5. Брокер сообщений

Как мы уже видели в примере предыдущего раздел, мы использовали символ трубы в определении потока данных. Символ трубы представляет собой связь между двумя приложениями с помощью программы обмена сообщениями.

Это означает, что нам нужен брокер сообщений и работает в целевой среде.

Два обмена сообщениями посредников брокеров, которые поддерживаются являются:

  • Апач Кафка
  • КроликМЗ

Итак, теперь, когда у нас есть обзор архитектурных компонентов – пришло время построить наш первый конвейер обработки потоков.

3. Установка брокера сообщений

Как мы видели, приложениям в конвейере необходимо среднее программное обеспечение для обмена сообщениями для общения. Для целей этой статьи, мы пойдем с КроликМЗ .

Для получения подробной информации об установке, вы можете следовать инструкциям по официальный сайт .

4. Локальный сервер потока данных

Чтобы ускорить процесс создания наших приложений, мы будем использовать Весенний инициализр ; с его помощью мы можем получить Весенняя загрузка приложений в течение нескольких минут.

После достижения веб-сайт, просто выбрать Групповой и Артефакт имя.

Как только это будет сделано, нажмите на кнопку Создание проектных чтобы начать загрузку артефакта Maven.

После загрузки, распаковать проект и импортировать его в качестве проекта Maven в вашем IDE выбора.

Давайте добавим зависимость Maven к проекту. Как нам нужно Локальный сервер потока данных библиотеки, давайте добавим весна-облако-стартер-dataflow-сервер-локальный зависимость:


    org.springframework.cloud
    spring-cloud-starter-dataflow-server-local

Теперь нам нужно аннотировать Весенняя загрузка основной класс с @EnableDataFlowServer аннотация:

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowServerApplication.class, args);
    }
}

Это всё. Наша Локальный сервер потока данных готов к исполнению:

mvn spring-boot:run

Приложение будет загружено на порт 9393.

5. Оболочка потока данных

Опять же, перейдите на Весенний инициализр и выберите Групповой и Артефакт имя.

После того, как мы скачали и импортировали проект, давайте добавим весна-облако-dataflow-оболочка зависимость:


    org.springframework.cloud
    spring-cloud-dataflow-shell

Теперь нам нужно добавить @EnableDataFlowShell аннотация к Весенняя загрузка основной класс:

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

Теперь мы можем запустить оболочку:

mvn spring-boot:run

После запуска оболочки мы можем ввести помочь команды в запросе, чтобы увидеть полный список команд, которые мы можем выполнить.

6. Заявка на источник

Аналогичным образом, на Initializr мы создадим простое приложение и добавим Поток Кролик зависимость, называемая весна-облако-стартер-поток-кролик:


    org.springframework.cloud
    spring-cloud-starter-stream-rabbit

Затем мы добавим @EnableBinding (Источник.class) аннотация к Весенняя загрузка основной класс:

@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowTimeSourceApplication.class, args);
    }
}

Теперь нам нужно определить источник данных, которые должны быть обработаны. Этот источник может быть любой потенциально бесконечной рабочей нагрузки (интернет-вещей датчик данных, 24/7 события обработки, онлайн данных транзакций ingest).

В нашем примере приложения, мы производим одно событие (для простоты новый timestamp) каждые 10 секунд с Поллер .

@InboundChannelAdapter аннотация отправляет сообщение на выходной канал источника, используя значение возврата в качестве полезной нагрузки сообщения:

@Bean
@InboundChannelAdapter(
  value = Source.OUTPUT, 
  poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource timeMessageSource() {
    return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}

Наш источник данных готов.

7. Приложение процессора

Далее мы создадим приложение и добавим Поток Кролик зависимость.

Затем мы добавим @EnableBinding (Процессор.class) аннотация к Весенняя загрузка основной класс:

@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowTimeProcessorApplication.class, args);
    }
}

Далее нам необходимо определить метод обработки данных, поступающих из приложения-источника.

Чтобы определить трансформатор, мы должны аннотировать этот метод с помощью @Transformer аннотация:

@Transformer(inputChannel = Processor.INPUT, 
  outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {

    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
    String date = dateFormat.format(timestamp);
    return date;
}

Он преобразует таймштамп из канала “вход” в отформатированную дату, которая будет отправлена на канал “выход”.

8. Приложение раковины

Последним приложением для создания является приложение Sink.

Опять же, перейдите на Весенний инициализр и выберите Групповой , Артефакт имя. После загрузки проекта давайте добавим Поток Кролик зависимость.

Затем добавьте @EnableBinding (Sink.class) аннотация к Весенняя загрузка основной класс:

@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {

    public static void main(String[] args) {
	SpringApplication.run(
          SpringDataFlowLoggingSinkApplication.class, args);
    }
}

Теперь нам нужен метод перехвата сообщений, поступающих из приложения процессора.

Для этого нам нужно добавить @StreamListener (Sink.INPUT) аннотация к нашему методу:

@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
    logger.info("Received: " + date);
}

Метод просто печатает таймштамп, преобразованный в отформатированную дату, в файл журнала.

9. Зарегистрируйте приложение Stream

Оболочка потока данных весеннего облака позволяет нам зарегистрировать приложение Stream с реестром приложений с помощью регистрация приложений команда.

Мы должны предоставить уникальное имя, тип приложения и URI, которые могут быть решены с артефактом приложения. Для типа укажите ” источник “, ” процессор “, или ” раковина “.

При предоставлении URI с maven схеме, формат должен соответствовать следующим:

maven://:[:[:]]:

Зарегистрировать Источник , Процессор и Потопить приложения ранее созданные , перейдите на Оболочка потока данных весеннего облака и выдать следующие команды из подсказки:

app register --name time-source --type source 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT

app register --name time-processor --type processor 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT

app register --name logging-sink --type sink 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT

10. Создание и развертывание потока

Чтобы создать новое определение потока перейдите на Оболочка потока данных весеннего облака и выполнить следующую команду оболочки:

stream create --name time-to-log 
  --definition 'time-source | time-processor | logging-sink'

Это определяет поток под названием время к журналу на основе выражения DSL ‘Время-источник | время процессора | лесозаготовки ‘ .

Затем для развертывания потока выполните следующую команду оболочки:

stream deploy --name time-to-log

Сервер потока данных решает время-источник , время процессора , и лесозаготовительные для maven координаты и использует их для запуска время-источник , время процессора и лесозаготовительные приложений потока.

Если поток это правильно развернуты вы увидите в Сервер потока данных журналы, что модули были запущены и связаны друг с другом:

2016-08-24 12:29:10.516  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink
2016-08-24 12:29:17.600  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-processor instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor
2016-08-24 12:29:23.280  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-source instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. Обзор результатов

В этом примере источник просто отправляет текущую тайм-штамп в качестве сообщения каждую секунду, формат процессора и раковина журнала выводит отформатированную таймштампу с помощью фреймворка журнала.

Файлы журнала расположены в каталоге, отображаемом в Сервер потока данных выход журнала, как показано выше. Чтобы увидеть результат, мы можем следить за журналом:

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log
2016-08-24 12:40:42.029  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. Заключение

В этой статье мы видели, как построить конвейер данных для обработки потоков с помощью Весенний облачный поток данных .

Кроме того, мы увидели роль Источник , Процессор и Потопить приложения внутри потока и как подключить и связать этот модуль внутри Сервер потока данных с помощью Оболочка потока данных .

Пример кода можно найти в Проект GitHub .