Рубрики
Без рубрики

Изучите обработку потоков с помощью потоков Кафки: Операции без состояния

Потоки Кафки – это библиотека Java для разработки приложений потоковой обработки поверх Apache Kafka…. С пометкой кафка, учебник, показать разработчика, java.

Потоки Кафки – это библиотека Java для разработки приложений потоковой обработки поверх Apache Kafka. Это первое из серии сообщений в блоге о потоках Kafka и его API.

Это не “теоретическое руководство” по KafkaStream (хотя я уже рассматривал некоторые из этих аспектов в прошлом).

В этой части мы рассмотрим операции без состояния в DSL API Kafka Streams – в частности, функции, доступные в KStream , такие как фильтр , карта , groupBy и т.д. DSL API в потоках Kafka предлагает мощную модель программирования в функциональном стиле для определения топологий потоковой обработки. Пожалуйста, обратите внимание, что API Table также предлагает функции без состояния, и то, что описано в этом посте, будет применимо и в этом случае (более или менее).

API-интерфейсы ( Поток и т.д.), на которые ссылаются в этом посте, можно найти в javadocs Потоков Кафки

Установка

Чтобы начать, вам нужно создать экземпляр Потоков Кафки . Для этого требуется Топология и соответствующая конфигурация (в виде java.util. Свойства )

Установите необходимую конфигурацию для вашего приложения Kafka streams:

Properties config = new Properties();

config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

Затем мы можем построить Топологию , которая определяет конвейер обработки (остальная часть этого сообщения в блоге будет посвящена частям топологии без состояния).

Вы можете создать экземпляр Потоков Кафки и начать обработку

KafkaStreams app = new KafkaStreams(topology, config);
app.start();
new CountdownLatch(1).await(); // wait forever

Операции без состояния с использованием потока

Мне вообще нравится разделять вещи по категориям – это помогает мне “разделять и властвовать”. Я попробовал то же самое в этом случае, разделив различные операции Потока на фильтр , карта и т.д.

Давайте копать глубже!

фильтр

Вы можете использовать фильтр для исключения или включения записей на основе критериев. Например, если значение, отправленное в тему, содержит слово, и вы хотите включить слова, длина которых превышает указанную. Вы можете определить этот критерий с помощью Предиката и передать его методу фильтр – это создаст новый экземпляр Потока с отфильтрованными записями

KStream stream = builder.stream("words");
stream.filter(new Predicate() {
    @Override
    public boolean test(String k, String v) {
            return v.length() > 5;
        }
    })

Также можно использовать не фильтруйте , если вы хотите исключить записи на основе критериев. Вот пример лямбда-стиля:

KStream stream = builder.stream("words");
stream.filterNot((key,value) -> value.startsWith("foo"));

карта

Обычно используемая операция без сохранения состояния – карта . В случае потоков Кафки его можно использовать для преобразования каждой записи во входном потоке путем применения функции сопоставления

Это доступно в нескольких вариантах – карта , Значения карты , Плоская карта , Значения плоской карты

Просто используйте метод map , если вы хотите изменить как ключ, так и значение. Например, для преобразования ключа и значения в верхний регистр

stream.map(new KeyValueMapper>() {
    @Override
    public KeyValue apply(String k, String v) {
            return new KeyValue<>(k.toUpperCase(), v.toUpperCase());
        }
    });

Используйте Значения карты , если все, что вы хотите изменить, – это значение:

stream.mapValues(value -> value.toUpperCase());

Плоская карта аналогична карте, но позволяет возвращать несколько записей ( Значение ключа ов)

stream.flatMap(new KeyValueMapper>>() {
    @Override
    public Iterable> apply(String k, String csv) {
        String[] values = csv.split(",");
        return Arrays.asList(values)
                    .stream()
                    .map(value -> new KeyValue<>(k, value))
                    .collect(Collectors.toList());
            }
    })

В приведенном выше примере каждая запись в потоке получает Плоская карта p таким образом, чтобы каждое значение CSV (разделенное запятыми) сначала разбивалось на составляющие, и для каждой части строки CSV создавалась пара Значение ключа . Например, если у вас есть эти записи (foo <-> a,b,c) и (полоса <-> d,e) (где foo и bar являются ключами), результирующий поток будет содержать пять записей – (foo,a) , (foo,b) , (foo,c) , (штрих,d) , (штрих,e)

Используйте flatMapValues , если вы хотите только принять значение из потока и вернуть набор значений

группа

Если вы хотите выполнить агрегирование с учетом состояния содержимого Потока , вам сначала нужно сгруппировать его записи по их ключу, чтобы создать KGroupedStream .

мы рассмотрим операции с отслеживанием состояния в Сгруппированном потоке в последующих сообщениях в блоге этой серии

Вот пример того, как вы можете это сделать, используя Групповой звонок

StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(INPUT_TOPIC); 

KGroupedStream kgs = stream.groupByKey();

Обобщенная версия groupByKey – это группа по , которая дает вам возможность группироваться на основе другого ключа, используя Преобразователь значений клавиш

stream.groupBy(new KeyValueMapper() {
    @Override
    public String apply(String k, String v) {
        return k.toUpperCase();
    }
});

В обоих случаях ( groupByKey и группировать по ), если вам нужно использовать другой Сердэ ( Сериализатор и Десериализатор ) вместо стандартных используйте перегруженную версию, которая принимает Сгруппированный объект

stream.groupByKey(Grouped.with(Serdes.Bytes(), Serdes.Long()));

Терминальные операции

Терминальная операция в потоках Кафки – это метод, который возвращает void вместо промежуточного , такого как другой Поток или KTable .

Вы можете использовать метод to для хранения записей Потока в теме в Кафке.

KStream stream = builder.stream("words");

stream.mapValues(value -> value.toUpperCase())
      .to("uppercase-words");

Перегруженная версия to позволяет указать Созданный объект для настройки Сердец и разделителя

stream.mapValues(value -> value.toUpperCase())
      .to("output-topic",Produced.with(Serdes.Bytes(), Serdes.Long()));

Вместо указания статического имени темы вы можете использовать Экстрактор имен тем и включить любую пользовательскую логику для динамического выбора определенной темы

stream.mapValues(value -> value.toUpperCase())
    .to(new TopicNameExtractor() {
        @Override
        public String extract(String k, String v, RecordContext rc) {
            return rc.topic()+"_uppercase";
        }
    });

В этом примере мы используем контекст записи , который содержит метаданные записи, чтобы получить тему и добавить _uppercase к нему

Во всех вышеперечисленных случаях тема раковины должна уже существовать в Кафке

Если вы хотите протоколировать записи потока (для целей отладки), используйте метод печать . Он принимает экземпляр Printed для настройки поведения.

StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase()).print(Printed.toSysOut());

Это позволит распечатать записи, например, если вы передадите (foo, bar) и (джон, доу) в теме ввода, они будут преобразованы в верхний регистр и зарегистрированы как таковые:

[KSTREAM-MAPVALUES-0000000001]: foo, BAR
[KSTREAM-MAPVALUES-0000000001]: john, DOE

Вы также можете использовать Printed.to Файл ((вместо toSysOut ) для таргетинга на определенный файл

метод foreach аналогичен методу print и заглянуть т.е.

  • это также терминальная операция (например, печать )
  • и он принимает Для каждого действия ( нравится подглядывать )

Прочие операции

Поскольку метод print является терминальной операцией, у вас есть возможность использовать peek , который возвращает тот же экземпляр Stream ! Он принимает Для каждого действия , которое можно использовать для указания того, что вы хотите сделать для каждой записи, например, записать ключ и значение

StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(INPUT_TOPIC);

stream.mapValues(v -> v.toUpperCase())
      .peek((k,v) -> System.out.println("key="+k+", value="+v))
      .to(OUTPUT_TOPIC);

В приведенном выше примере вы сможете увидеть регистрируемые ключ и значения, и они также будут материализованы в разделе вывода (в отличие от операции печать ).

ветвь – это метод, который я не использовал (честно говоря!), Но он выглядит довольно интересно. Это дает вам возможность оценивать каждую запись в Потоке по нескольким критериям (представленным Предикатом ) и выводить несколько (массив) Поток с. Ключевым моментом здесь является то, что вы можете использовать несколько предикатов вместо одного, как в случае с фильтром и фильтр Не

Вы можете объединить два потока вместе в один.

StreamsBuilder builder = new StreamsBuilder(); 

KStream stream1 = builder.stream("topic1");
KStream stream2 = builder.stream("topic2");

stream1.merge(stream2).to("output-topic");

пожалуйста, обратите внимание, что в результирующем потоке могут быть не все записи в порядке

Если вы хотите получить новый ключ (он также может иметь другой тип) для каждой записи в вашем потоке , используйте выберите ключ метод, который принимает KeyValueMapper . клавиша выбора аналогична карте но разница в том, что карта ограничивает возвращаемый тип значением ключа объект

StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(INPUT_TOPIC);

stream.selectKey(new KeyValueMapper() {
            @Override
            public String apply(String k, String v) {
                return k.toUpperCase();
            }
        })

При разработке конвейеров обработки с помощью Kafka Streams DSL вы обнаружите, что отправляете записи результирующего потока в раздел вывода, используя в , а затем создаете новый поток из этой (выходной) темы т.е.

StreamsBuilder builder = new StreamsBuilder();
KStream stream1 = builder.stream(INPUT_TOPIC);
stream1.mapValues(v -> v.toUpperCase()).to(OUTPUT_TOPIC);

//output topic now becomes the input source
KStream stream2 = builder.stream(OUTPUT_TOPIC);

//continue processing with stream2
stream2.filter((k,v) -> v.length > 5).to(LENGTHY_WORDS_TOPIC);

Это можно упростить, используя метод через . Таким образом, вы можете переписать вышесказанное следующим образом:

StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(INPUT_TOPIC);

stream.mapValues(v -> v.toUpperCase())
      .through(OUTPUT_TOPIC)
      .filter((k,v) -> v.length > 5)
      .to(LENGTHY_WORDS_TOPIC);

Здесь мы материализуем записи (со значениями в верхнем регистре) в промежуточную тему и продолжаем обработку (в данном случае с использованием фильтра ) и, наконец, сохраняем результаты постфильтрации в другой теме

На этом пока все. Следите за новостями для предстоящих постов в этой серии!

Рекомендации

Пожалуйста, не забудьте проверить следующие ресурсы для потоков Кафки

Оригинал: “https://dev.to/itnext/learn-stream-processing-with-kafka-streams-stateless-operations-1k4h”