1. Введение
В этом учебнике мы узнаем о Hazelcast Jet. Это распределенный движок обработки данных, предоставляемый Hazelcast, Inc и построен на вершине Hazelcast IMDG.
Если вы хотите узнать о Hazelcast IMDG, вот статья для начала работы.
2. Что такое Хейзелкаст Джет?
Hazelcast Jet — это распределенный движок обработки данных, который рассматривает данные как потоки. Он может обрабатывать данные, хранящиеся в базе данных или файлах, а также данные, которые потокового сервера Kafka.
Кроме того, он может выполнять агрегированные функции над бесконечными потоками данных, разделяя потоки на подмножества и применяя агрегацию по каждому подмножество. Эта концепция известна как оконные окна в терминологии Jet.
Мы можем развернуть Jet в кластере машин, а затем представить наши рабочие места обработки данных к нему. Jet сделает все члены кластера автоматически обрабатывать данные. Каждый член кластера потребляет часть данных, что упрощает масштабирование до любого уровня пропускной способности.
Вот типичные случаи использования для Hazelcast Jet:
- Обработка потоков в режиме реального времени
- Быстрая обработка пакетов
- Обработка потоков Java 8 распределенным способом
- Обработка данных в микросервисах
3. Настройка
Чтобы настроить Hazelcast Jet в нашей среде, нам просто нужно добавить одну зависимость Maven к нашей пом.xml .
Вот как мы это делаем:
com.hazelcast.jet hazelcast-jet 4.2
Включая эту зависимость, загрузит файл банка 10 Мб, который предоставляет нам всю инфраструктуру, необходимой для создания распределенного конвейера обработки данных.
Последняя версия для Hazelcast Jet можно найти здесь .
4. Пример применения
Для того, чтобы узнать больше о Hazelcast Jet, мы создадим пример приложения, которое принимает вход предложений и слово, чтобы найти в этих предложениях и возвращает количество указанного слова в этих предложениях.
4.1. Трубопровод
Трубопровод формирует основную конструкцию для приложения Jet. Обработка в конвейере следует следующим шагам:
- читать данные из источника
- трансформировать данные
- писать данные в раковину
Для нашего приложения конвейер будет читаться с распределенного Список , применить преобразование группировки и агрегации и, наконец, написать распределенной Карта .
Вот как мы пишем наш конвейер:
private Pipeline createPipeLine() { Pipeline p = Pipeline.create(); p.readFrom(Sources.list(LIST_NAME)) .flatMap(word -> traverseArray(word.toLowerCase().split("\\W+"))) .filter(word -> !word.isEmpty()) .groupingKey(wholeItem()) .aggregate(counting()) .writeTo(Sinks.map(MAP_NAME)); return p; }
После того, как мы читаем из источника, мы проходим данные и разделить его вокруг пространства с помощью регулярного выражения. После этого мы отфильтровываем пробелы.
Наконец, мы группим слова, агрегируют их и пишем результаты на карта.
4.2. Работа
Теперь, когда наш конвейер определен, мы создаем задание для выполнения конвейера.
Вот как мы пишем countWord функция, которая принимает параметры и возвращает количество:
public Long countWord(Listsentences, String word) { long count = 0; JetInstance jet = Jet.newJetInstance(); try { List textList = jet.getList(LIST_NAME); textList.addAll(sentences); Pipeline p = createPipeLine(); jet.newJob(p).join(); Map counts = jet.getMap(MAP_NAME); count = counts.get(word); } finally { Jet.shutdownAll(); } return count; }
Сначала мы создаем экземпляр Jet, чтобы создать свою работу и использовать конвейер. Далее мы копим входные Список в распределенный список, чтобы он был доступен во всех экземплярах.
Затем мы представляем работу с использованием трубопровода, который мы построили выше. Метод newJob () возвращает выполненную работу, которая начата Jet асинхронно. присоединиться метод ждет завершения задания и бросает исключение если задание выполнено с ошибкой.
Когда задание завершается, результаты извлекаются в распределенной Карта, как мы определили в нашем конвейере. Итак, мы получаем Карта от экземпляра Jet и получить подсчеты слова против него.
Наконец, мы закрыли экземпляр Jet. Важно закрыть его после того, как наша казнь закончилась, как Реактивный экземпляр запускает свои собственные потоки . В противном случае наш Java-процесс будет по-прежнему жив даже после выхода нашего метода.
Вот унитарный тест, который проверяет код, который мы написали для Jet:
@Test public void whenGivenSentencesAndWord_ThenReturnCountOfWord() { Listsentences = new ArrayList<>(); sentences.add("The first second was alright, but the second second was tough."); WordCounter wordCounter = new WordCounter(); long countSecond = wordCounter.countWord(sentences, "second"); assertEquals(3, countSecond); }
5. Заключение
В этой статье мы узнали о Hazelcast Jet. Чтобы узнать больше о нем и его особенностях, обратитесь к ручные .
Как обычно, код для примеров, используемых в этой статье, можно найти более на Github .