1. Введение
Spring Cloud Data Flow — это инструментарий для построения конвейеров интеграции данных и обработки данных в режиме реального времени.
Трубопроводы, в данном случае, являются приложениями Spring Boot, которые построены с использованием Весенний облачный поток или Весенний облачный целевой Рамки.
В этом учебнике мы покажем, как использовать поток данных Весеннего Облака с Apache Spark .
2. Локальный сервер потока данных
Во-первых, мы должны запустить сервер потока данных, чтобы иметь возможность развертывать наши рабочие места.
Для локального запуска сервера потока данных нам необходимо создать новый проект с весна-облако-стартер-dataflow-сервер-локальный зависимость :
org.springframework.cloud spring-cloud-starter-dataflow-server-local 1.7.4.RELEASE
После этого нам нужно аннотировать основной класс сервера с помощью @EnableDataFlowServer :
@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } }
Как только мы завестим это приложение, у нас будет локальный сервер Data Flow в порту 9393.
3. Создание проекта
Мы создадим Spark Job как отдельное локальное приложение, чтобы нам не нужно было создавать кластер для его запуска.
3.1. Зависимости
Во-первых, мы добавим Искровая зависимость :
org.apache.spark spark-core_2.10 2.4.0
3.2. Создание работы
И для нашей работы, давайте приблизительные пи:
public class PiApproximation { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation"); JavaSparkContext context = new JavaSparkContext(conf); int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2; int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices; Listxs = IntStream.rangeClosed(0, n) .mapToObj(element -> Integer.valueOf(element)) .collect(Collectors.toList()); JavaRDD dataSet = context.parallelize(xs, slices); JavaRDD pointsInsideTheCircle = dataSet.map(integer -> { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y ) < 1 ? 1: 0; }); int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2); System.out.println("The pi was estimated as:" + count / n); context.stop(); } }
4. Оболочка потока данных
Data Flow Shell это приложение, которое будет позволяют нам взаимодействовать с сервером . Shell использует команды DSL для описания потоков данных.
Для использовать оболочку потока данных нам нужно создать проект, который позволит нам запустить его. Во-первых, нам весна-облако-dataflow-оболочка зависимость :
org.springframework.cloud spring-cloud-dataflow-shell 1.7.4.RELEASE
После добавления зависимости мы можем создать класс, который запустит нашу оболочку потока данных:
@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } }
5. Развертывание проекта
Для развертывания нашего проекта мы будем использовать так называемый бегун задач, который доступен для Apache Spark в трех версиях: кластерные , пряжа , и клиент . Мы собираемся продолжить работу с местными клиент Версия.
Задача бегуна является то, что работает наша работа Spark.
Чтобы сделать это, мы сначала должны зарегистрировать нашу задачу с помощью системы Data Flow Shell :
app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT
Задача позволяет нам указать несколько различных параметров, некоторые из них являются необязательными, но некоторые параметры необходимы для правильного развертывания задания Spark:
- spark.app класса , основной класс нашей представленной работы
- spark.app- , путь к жирной банке, содержащей нашу работу
- spark.app- имя , имя, которое будет использоваться для нашей работы
- spark.app-арги , аргументы, которые будут переданы на работу
Мы можем использовать зарегистрированную целевую искра-клиент представить нашу работу, не забывая предоставить необходимые параметры:
task create spark1 --definition "spark-client \ --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \ --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"
Обратите внимание, что spark.app- это путь к жир-банку с нашей работой.
После успешного создания задачи, мы можем приступить к ее запуску со следующей командой:
task launch spark1
Это потребует выполнения нашей задачи.
6. Резюме
В этом учебнике мы показали, как использовать платформу Spring Cloud Data Flow для обработки данных с помощью Apache Spark. Более подробную информацию о инфраструктуре потока данных Spring Cloud можно найти в документация .
Все образцы кода можно найти на GitHub.