Рубрики
Без рубрики

Введение в Hazelcast Jet

Узнайте, как использовать Hazelcast Jet для обработки данных потока.

Автор оригинала: baeldung.

1. Введение

В этом учебнике мы узнаем о Hazelcast Jet. Это распределенный движок обработки данных, предоставляемый Hazelcast, Inc и построен на вершине Hazelcast IMDG.

Если вы хотите узнать о Hazelcast IMDG, вот статья для начала работы.

2. Что такое Хейзелкаст Джет?

Hazelcast Jet — это распределенный движок обработки данных, который рассматривает данные как потоки. Он может обрабатывать данные, хранящиеся в базе данных или файлах, а также данные, которые потокового сервера Kafka.

Кроме того, он может выполнять агрегированные функции над бесконечными потоками данных, разделяя потоки на подмножества и применяя агрегацию по каждому подмножество. Эта концепция известна как оконные окна в терминологии Jet.

Мы можем развернуть Jet в кластере машин, а затем представить наши рабочие места обработки данных к нему. Jet сделает все члены кластера автоматически обрабатывать данные. Каждый член кластера потребляет часть данных, что упрощает масштабирование до любого уровня пропускной способности.

Вот типичные случаи использования для Hazelcast Jet:

  • Обработка потоков в режиме реального времени
  • Быстрая обработка пакетов
  • Обработка потоков Java 8 распределенным способом
  • Обработка данных в микросервисах

3. Настройка

Чтобы настроить Hazelcast Jet в нашей среде, нам просто нужно добавить одну зависимость Maven к нашей пом.xml .

Вот как мы это делаем:


    com.hazelcast.jet
    hazelcast-jet
    4.2

Включая эту зависимость, загрузит файл банка 10 Мб, который предоставляет нам всю инфраструктуру, необходимой для создания распределенного конвейера обработки данных.

Последняя версия для Hazelcast Jet можно найти здесь .

4. Пример применения

Для того, чтобы узнать больше о Hazelcast Jet, мы создадим пример приложения, которое принимает вход предложений и слово, чтобы найти в этих предложениях и возвращает количество указанного слова в этих предложениях.

4.1. Трубопровод

Трубопровод формирует основную конструкцию для приложения Jet. Обработка в конвейере следует следующим шагам:

  • читать данные из источника
  • трансформировать данные
  • писать данные в раковину

Для нашего приложения конвейер будет читаться с распределенного Список , применить преобразование группировки и агрегации и, наконец, написать распределенной Карта .

Вот как мы пишем наш конвейер:

private Pipeline createPipeLine() {
    Pipeline p = Pipeline.create();
    p.readFrom(Sources.list(LIST_NAME))
      .flatMap(word -> traverseArray(word.toLowerCase().split("\\W+")))
      .filter(word -> !word.isEmpty())
      .groupingKey(wholeItem())
      .aggregate(counting())
      .writeTo(Sinks.map(MAP_NAME));
    return p;
}

После того, как мы читаем из источника, мы проходим данные и разделить его вокруг пространства с помощью регулярного выражения. После этого мы отфильтровываем пробелы.

Наконец, мы группим слова, агрегируют их и пишем результаты на карта.

4.2. Работа

Теперь, когда наш конвейер определен, мы создаем задание для выполнения конвейера.

Вот как мы пишем countWord функция, которая принимает параметры и возвращает количество:

public Long countWord(List sentences, String word) {
    long count = 0;
    JetInstance jet = Jet.newJetInstance();
    try {
        List textList = jet.getList(LIST_NAME);
        textList.addAll(sentences);
        Pipeline p = createPipeLine();
        jet.newJob(p).join();
        Map counts = jet.getMap(MAP_NAME);
        count = counts.get(word);
        } finally {
            Jet.shutdownAll();
      }
    return count;
}

Сначала мы создаем экземпляр Jet, чтобы создать свою работу и использовать конвейер. Далее мы копим входные Список в распределенный список, чтобы он был доступен во всех экземплярах.

Затем мы представляем работу с использованием трубопровода, который мы построили выше. Метод newJob () возвращает выполненную работу, которая начата Jet асинхронно. присоединиться метод ждет завершения задания и бросает исключение если задание выполнено с ошибкой.

Когда задание завершается, результаты извлекаются в распределенной Карта, как мы определили в нашем конвейере. Итак, мы получаем Карта от экземпляра Jet и получить подсчеты слова против него.

Наконец, мы закрыли экземпляр Jet. Важно закрыть его после того, как наша казнь закончилась, как Реактивный экземпляр запускает свои собственные потоки . В противном случае наш Java-процесс будет по-прежнему жив даже после выхода нашего метода.

Вот унитарный тест, который проверяет код, который мы написали для Jet:

@Test
public void whenGivenSentencesAndWord_ThenReturnCountOfWord() {
    List sentences = new ArrayList<>();
    sentences.add("The first second was alright, but the second second was tough.");
    WordCounter wordCounter = new WordCounter();
    long countSecond = wordCounter.countWord(sentences, "second");
    assertEquals(3, countSecond);
}

5. Заключение

В этой статье мы узнали о Hazelcast Jet. Чтобы узнать больше о нем и его особенностях, обратитесь к ручные .

Как обычно, код для примеров, используемых в этой статье, можно найти более на Github .