Рубрики
Без рубрики

ETL с потоком данных Spring Cloud

Узнайте, как реализовать ETL с потоком данных Spring Cloud.

Автор оригинала: Norberto Ritzmann.

1. Обзор

Spring Cloud Data Flow -это облачный инструментарий для построения конвейеров данных в реальном времени и пакетных процессов. Поток данных Spring Cloud готов к использованию в различных случаях обработки данных, таких как простой импорт/экспорт, обработка ETL, потоковая передача событий и прогнозная аналитика.

В этом уроке мы рассмотрим пример преобразования и загрузки извлечения в реальном времени (ETL) с использованием конвейера потоков, который извлекает данные из базы данных JDBC, преобразует их в простые POJOS и загружает в MongoDB.

2. ETL и обработка потока событий

ETL – извлечение, преобразование и загрузка – обычно назывался процессом пакетной загрузки данных из нескольких баз данных и систем в общее хранилище данных. В этом хранилище данных можно выполнять интенсивную обработку анализа данных без ущерба для общей производительности системы.

Однако новые тенденции меняют то, как это делается. ETL по-прежнему играет важную роль в передаче данных в хранилища данных и озера данных.

В настоящее время это можно сделать с помощью потоков в архитектуре потока событий с помощью потока данных Spring Cloud .

3. Поток данных Spring Cloud

С помощью Spring Cloud Data Flow (SCDF) разработчики могут создавать конвейеры данных в двух вариантах:

  • Долгоживущие потоковые приложения в реальном времени с использованием Spring Cloud Stream
  • Приложения с короткоживущими пакетными задачами, использующие задачу Spring Cloud

В этой статье мы рассмотрим первое, долговечное потоковое приложение, основанное на Spring Cloud Stream.

3.1. Приложения Spring Cloud Stream

Конвейеры потоков SCDF состоят из шагов, где каждый шаг представляет собой приложение, построенное в стиле Spring Boot с использованием микро-фреймворка Spring Cloud Stream. Эти приложения интегрированы с помощью промежуточного программного обеспечения для обмена сообщениями, такого как Apache Kafka или RabbitMQ.

Эти приложения подразделяются на источники, процессоры и приемники. По сравнению с процессом ETL, мы могли бы сказать, что источник-это “извлечение”, процессор – “трансформатор”, а приемник-часть “нагрузки”.

В некоторых случаях мы можем использовать стартер приложения на одном или нескольких этапах конвейера. Это означает, что нам не нужно будет реализовывать новое приложение для шага, а вместо этого настроить уже реализованный запуск существующего приложения.

Список стартеров приложений можно найти здесь .

3.2. Сервер потока данных Spring Cloud

Последней частью архитектуры является сервер потока данных Spring Cloud . Сервер SCDF выполняет развертывание приложений и потока конвейера с использованием спецификации Spring Cloud Deployer. Эта спецификация поддерживает облачный вариант SCDF путем развертывания в ряде современных сред выполнения, таких как Kubernetes, Apache Mesos, Yarn и Cloud Foundry.

Кроме того, мы можем запустить поток как локальное развертывание.

Более подробную информацию об архитектуре SCDF можно найти здесь .

4. Настройка среды

Прежде чем мы начнем, нам нужно выбрать части этого сложного развертывания . Первая часть, которую нужно определить, – это сервер SCDF.

Для тестирования/| мы будем использовать локальный сервер SCDF для локальной разработки . Для производственного развертывания мы можем позже выбрать облачную среду выполнения, например SCDF Server Kubernetes . Мы можем найти список сред выполнения сервера здесь .

Теперь давайте проверим системные требования для запуска этого сервера.

4.1. Системные требования

Чтобы запустить сервер SCDF, нам нужно определить и настроить две зависимости:

  • промежуточное программное обеспечение для обмена сообщениями и
  • СУБД.

Для промежуточного программного обеспечения обмена сообщениями мы будем работать с RabbitMQ, и мы выбираем PostgreSQL в качестве СУБД для хранения наших определений потоков конвейера.

Для запуска RabbitMQ загрузите последнюю версию здесь и запустите экземпляр RabbitMQ, используя конфигурацию по умолчанию, или выполните следующую команду Docker:

docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

В качестве последнего шага установки установите и запустите СУБД PostgreSQL на порту 5432 по умолчанию. После этого создайте базу данных, в которой SCDF может хранить свои определения потоков, используя следующий сценарий:

CREATE DATABASE dataflow;

4.2. Локальный сервер потока данных Spring Cloud

Для запуска локального сервера SCDF мы можем запустить сервер с помощью docker-compose , или мы можем запустить его как Java-приложение.

Здесь мы запустим локальный сервер SCDF как Java-приложение. Для настройки приложения мы должны определить конфигурацию как параметры приложения Java. Нам понадобится Java 8 в системном пути.

Чтобы разместить банки и зависимости, нам нужно создать домашнюю папку для нашего сервера SCDF и загрузить локальный дистрибутив сервера SCDF в эту папку. Вы можете скачать самый последний дистрибутив SCDF Server Local здесь .

Кроме того, нам нужно создать папку lib и поместить туда драйвер JDBC. Доступна последняя версия драйвера PostgreSQL здесь .

Наконец, давайте запустим локальный сервер SCDF:

$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \
    --spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \
    --spring.datasource.username=postgres_username \
    --spring.datasource.password=postgres_password \
    --spring.datasource.driver-class-name=org.postgresql.Driver \
    --spring.rabbitmq.host=127.0.0.1 \
    --spring.rabbitmq.port=5672 \
    --spring.rabbitmq.username=guest \
    --spring.rabbitmq.password=guest

Мы можем проверить, работает ли он, посмотрев на этот URL-адрес:

http://localhost:9393/dashboard

4.3. Оболочка потока данных Spring Cloud

Оболочка SCDF – это инструмент командной строки , который позволяет легко создавать и развертывать наши приложения и конвейеры . Эти команды оболочки выполняются над сервером потока данных Spring Cloud REST API .

Загрузите последнюю версию jar в домашнюю папку SCDF, доступную здесь. Как только это будет сделано, выполните следующую команду (обновите версию по мере необходимости):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>

Если вместо ” поток данных:>” вы получаете ” сервер-неизвестно:>” в последней строке, вы не запускаете сервер SCDF на локальном хосте. В этом случае выполните следующую команду для подключения к другому хосту:

server-unknown:>dataflow config server http://{host}

Теперь оболочка подключена к серверу SCDF, и мы можем запускать наши команды.

Первое, что нам нужно сделать в оболочке, – это импортировать стартеры приложений. Найдите последнюю версию здесь для RabbitMQ+Maven в Spring Boot 2.0.x и выполните следующую команду (снова обновите версию, здесь ” Darwin-SR1 “, по мере необходимости):

$ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven

Для проверки установленных приложений выполните следующую команду оболочки:

$ dataflow:> app list

В результате мы должны увидеть таблицу, содержащую все установленные приложения.

Кроме того, SCDF предлагает графический интерфейс с именем Flo , к которому мы можем получить доступ по этому адресу: http://localhost:9393/dashboard . Однако его использование не входит в сферу действия этой статьи.

5. Составление конвейера ETL

Теперь давайте создадим наш потоковый конвейер. Для этого мы будем использовать стартер исходного приложения JDBC для извлечения информации из нашей реляционной базы данных.

Кроме того, мы создадим пользовательский процессор для преобразования информационной структуры и пользовательский приемник для загрузки наших данных в MongoDB.

5.1. Извлечение – Подготовка реляционной базы данных к извлечению

Давайте создадим базу данных с именем crm и таблицу с именем customer :

CREATE DATABASE crm;
CREATE TABLE customer (
    id bigint NOT NULL,
    imported boolean DEFAULT false,
    customer_name character varying(50),
    PRIMARY KEY(id)
)

Обратите внимание , что мы используем флаг imported , который будет хранить, какая запись уже была импортирована. При необходимости мы также можем сохранить эту информацию в другой таблице.

Теперь давайте вставим некоторые данные:

INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);

5.2. Трансформация – Сопоставление полей JDBC со структурой полей MongoDB

Для шага преобразования мы сделаем простой перевод поля customer_name из исходной таблицы в новое поле name . Здесь можно было бы выполнить и другие преобразования, но давайте ограничимся кратким примером.

Для этого мы создадим новый проект с именем customer-transform . Самый простой способ сделать это-использовать сайт Spring Initializr для создания проекта. Перейдя на веб-сайт, выберите Группу и имя артефакта. Мы будем использовать com.customer и customer-transform, соответственно.

Как только это будет сделано, нажмите на кнопку “Создать проект”, чтобы загрузить проект. Затем распакуйте проект и импортируйте его в свою любимую среду IDE, а также добавьте следующую зависимость в pom.xml :


    org.springframework.cloud
    spring-cloud-stream-binder-rabbit

Теперь мы приступаем к кодированию преобразования имени поля. Для этого мы создадим класс Customer , который будет выступать в качестве адаптера. Этот класс получит customer_name через setName() метод и выведет его значение через getName метод .

Аннотации @JsonProperty будут выполнять преобразование при десериализации из JSON в Java:

public class Customer {

    private Long id;

    private String name;

    @JsonProperty("customer_name")
    public void setName(String name) {
        this.name = name;
    }

    @JsonProperty("name")
    public String getName() {
        return name;
    }

    // Getters and Setters
}

Процессор должен получать данные с входного сигнала, выполнять преобразование и привязывать результат к выходному каналу. Давайте создадим класс для этого:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;

@EnableBinding(Processor.class)
public class CustomerProcessorConfiguration {

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Customer convertToPojo(Customer payload) {

        return payload;
    }
}

В приведенном выше коде мы можем наблюдать, что преобразование происходит автоматически. Входные данные поступают в виде JSON, а Джексон десериализует их в объект Customer с помощью методов set .

Для вывода наоборот, данные сериализуются в JSON с помощью методов get .

5.3. Приемник нагрузки в MongoDB

Аналогично шагу преобразования, мы создадим еще один проект maven, теперь с именем customer- mongodb -sink . Снова откройте Spring Initializr , для группы выберите com.customer , а для артефакта выберите customer-mongodb-sink . Затем введите MongoDB в поле поиска зависимостей и загрузите проект.

Затем распакуйте и импортируйте его в свою любимую среду IDE.

Затем добавьте ту же дополнительную зависимость, что и в проекте customer-transform .

Теперь мы создадим еще один класс Customer для получения входных данных на этом шаге:

import org.springframework.data.mongodb.core.mapping.Document;

@Document(collection="customer")
public class Customer {

    private Long id;
    private String name;

    // Getters and Setters
}

Для потопления Customer мы создадим класс прослушивателя , который сохранит сущность customer с помощью CustomerRepository :

@EnableBinding(Sink.class)
public class CustomerListener {

    @Autowired
    private CustomerRepository repository;

    @StreamListener(Sink.INPUT)
    public void save(Customer customer) {
        repository.save(customer);
    }
}

И CustomerRepository , в данном случае, является MongoRepository из данных Spring:

import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface CustomerRepository extends MongoRepository {

}

5.4. Определение потока

Теперь оба пользовательских приложения готовы к регистрации на сервере SCDF. Для этого скомпилируйте оба проекта с помощью команды Maven mvn install .

Затем мы регистрируем их с помощью оболочки потока данных Spring Cloud:

app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT

Наконец, давайте проверим, хранятся ли приложения в SCDF, выполните команду списка приложений в оболочке:

app list

Наконец, давайте проверим, хранятся ли приложения в SCDF, выполните команду списка приложений в оболочке:

5.4.1. Потоковый конвейер Специфичный для домена Язык – DSL

DSL определяет конфигурацию и поток данных между приложениями. DSL SCDF прост. В первом слове мы определяем имя приложения, а затем конфигурации.

Кроме того, синтаксис является вдохновленным Unix Синтаксисом конвейера , который использует вертикальные полосы, также известные как “трубы”, для подключения нескольких приложений:

http --port=8181 | log

Это создает HTTP-приложение, обслуживаемое в порту 8181, которое отправляет любую полученную полезную нагрузку тела в журнал.

Теперь давайте посмотрим, как создать определение потока DSL источника JDBC.

5.4.2. Определение исходного потока JDBC

Ключевыми конфигурациями для источника JDBC являются query и update . запрос выберет непрочитанные записи, в то время как обновление изменит флаг, чтобы предотвратить повторное чтение текущих записей.

Кроме того, мы определим источник JDBC для опроса с фиксированной задержкой в 30 секунд и опросом не более 1000 строк. Наконец, мы определим конфигурации подключения, такие как драйвер, имя пользователя, пароль и URL-адрес подключения:

jdbc 
    --query='SELECT id, customer_name FROM public.customer WHERE imported = false'
    --update='UPDATE public.customer SET imported = true WHERE id in (:id)'
    --max-rows-per-poll=1000
    --fixed-delay=30 --time-unit=SECONDS
    --driver-class-name=org.postgresql.Driver
    --url=jdbc:postgresql://localhost:5432/crm
    --username=postgres
    --password=postgres

Дополнительные свойства конфигурации источника JDBC можно найти здесь .

5.4.3. Определение потока приемника MongoDB клиента

Поскольку мы не определили конфигурации соединений в application.properties of customer-mongodb-sink , мы настроим их с помощью параметров DSL.

Наше приложение полностью основано на MongoDataAutoConfiguration. Вы можете проверить другие возможные конфигурации здесь. В принципе, мы определим spring.data.mongodb.uri :

customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main

5.4.4. Создание и развертывание потока

Во-первых, чтобы создать окончательное определение потока, вернитесь в оболочку и выполните следующую команду (без разрывов строк, они только что были вставлены для удобства чтения):

stream create --name jdbc-to-mongodb 
  --definition "jdbc 
  --query='SELECT id, customer_name FROM public.customer WHERE imported=false' 
  --fixed-delay=30 
  --max-rows-per-poll=1000 
  --update='UPDATE customer SET imported=true WHERE id in (:id)' 
  --time-unit=SECONDS 
  --password=postgres 
  --driver-class-name=org.postgresql.Driver 
  --username=postgres 
  --url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink 
  --spring.data.mongodb.uri=mongodb://localhost/main"

Этот поток DSL определяет поток с именем jdbc -to- mongodb. Далее мы развернем поток по его имени :

stream deploy --name jdbc-to-mongodb

Наконец, мы должны увидеть расположение всех доступных журналов в выходных данных журнала:

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. Заключение

В этой статье мы рассмотрели полный пример конвейера данных ETL с использованием потока данных Spring Cloud.

Наиболее примечательно, что мы увидели конфигурации стартера приложений, создали конвейер потока ETL с использованием оболочки потока данных Spring Cloud и реализовали пользовательские приложения для чтения, преобразования и записи данных.

Как всегда, пример кода можно найти в проекте GitHub.