Потоки Кафки – это библиотека Java для разработки приложений потоковой обработки поверх Apache Kafka. Это первое из серии сообщений в блоге о потоках Kafka и его API.
Это не “теоретическое руководство” по KafkaStream (хотя я уже рассматривал некоторые из этих аспектов в прошлом).
В этой части мы рассмотрим операции без состояния в DSL API Kafka Streams – в частности, функции, доступные в KStream
, такие как фильтр
, карта
, groupBy
и т.д. DSL API в потоках Kafka предлагает мощную модель программирования в функциональном стиле для определения топологий потоковой обработки. Пожалуйста, обратите внимание, что API Table
также предлагает функции без состояния, и то, что описано в этом посте, будет применимо и в этом случае (более или менее).
API-интерфейсы ( Поток
и т.д.), на которые ссылаются в этом посте, можно найти в javadocs Потоков Кафки
Установка
Чтобы начать, вам нужно создать экземпляр Потоков Кафки
. Для этого требуется Топология
и соответствующая конфигурация (в виде java.util. Свойства
)
Установите необходимую конфигурацию для вашего приложения Kafka streams:
Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID); config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Затем мы можем построить Топологию
, которая определяет конвейер обработки (остальная часть этого сообщения в блоге будет посвящена частям топологии без состояния).
Вы можете создать экземпляр Потоков Кафки
и начать обработку
KafkaStreams app = new KafkaStreams(topology, config); app.start(); new CountdownLatch(1).await(); // wait forever
Операции без состояния с использованием потока
Мне вообще нравится разделять вещи по категориям – это помогает мне “разделять и властвовать”. Я попробовал то же самое в этом случае, разделив различные операции Потока
на фильтр
, карта
и т.д.
Давайте копать глубже!
фильтр
Вы можете использовать фильтр
для исключения или включения записей на основе критериев. Например, если значение, отправленное в тему, содержит слово, и вы хотите включить слова, длина которых превышает указанную. Вы можете определить этот критерий с помощью Предиката
и передать его методу фильтр
– это создаст новый экземпляр Потока
с отфильтрованными записями
KStreamstream = builder.stream("words"); stream.filter(new Predicate () { @Override public boolean test(String k, String v) { return v.length() > 5; } })
Также можно использовать не фильтруйте
, если вы хотите исключить
записи на основе критериев. Вот пример лямбда-стиля:
KStreamstream = builder.stream("words"); stream.filterNot((key,value) -> value.startsWith("foo"));
карта
Обычно используемая операция без сохранения состояния – карта
. В случае потоков Кафки его можно использовать для преобразования каждой записи во входном потоке
путем применения функции сопоставления
Это доступно в нескольких вариантах – карта
, Значения карты
, Плоская карта
, Значения плоской карты
Просто используйте метод map
, если вы хотите изменить как ключ, так и значение. Например, для преобразования ключа и значения в верхний регистр
stream.map(new KeyValueMapper>() { @Override public KeyValue apply(String k, String v) { return new KeyValue<>(k.toUpperCase(), v.toUpperCase()); } });
Используйте Значения карты
, если все, что вы хотите изменить, – это значение:
stream.mapValues(value -> value.toUpperCase());
Плоская карта
аналогична карте, но позволяет возвращать несколько записей ( Значение ключа
ов)
stream.flatMap(new KeyValueMapper>>() { @Override public Iterable extends KeyValue extends String, ? extends String>> apply(String k, String csv) { String[] values = csv.split(","); return Arrays.asList(values) .stream() .map(value -> new KeyValue<>(k, value)) .collect(Collectors.toList()); } })
В приведенном выше примере каждая запись в потоке получает Плоская карта
p таким образом, чтобы каждое значение CSV (разделенное запятыми) сначала разбивалось на составляющие, и для каждой части строки CSV создавалась пара Значение ключа
. Например, если у вас есть эти записи (foo <-> a,b,c)
и (полоса <-> d,e)
(где foo
и bar
являются ключами), результирующий поток будет содержать пять записей – (foo,a)
, (foo,b)
, (foo,c)
, (штрих,d)
, (штрих,e)
Используйте flatMapValues
, если вы хотите только принять значение из потока и вернуть набор значений
группа
Если вы хотите выполнить агрегирование с учетом состояния содержимого Потока
, вам сначала нужно сгруппировать его записи по их ключу, чтобы создать KGroupedStream
.
мы рассмотрим операции с отслеживанием состояния в Сгруппированном потоке
в последующих сообщениях в блоге этой серии
Вот пример того, как вы можете это сделать, используя Групповой звонок
StreamsBuilder builder = new StreamsBuilder(); KStreamstream = builder.stream(INPUT_TOPIC); KGroupedStream kgs = stream.groupByKey();
Обобщенная версия groupByKey
– это группа по
, которая дает вам возможность группироваться на основе другого ключа, используя Преобразователь значений клавиш
stream.groupBy(new KeyValueMapper() { @Override public String apply(String k, String v) { return k.toUpperCase(); } });
В обоих случаях ( groupByKey
и группировать по
), если вам нужно использовать другой Сердэ
( Сериализатор
и Десериализатор
) вместо стандартных используйте перегруженную версию, которая принимает Сгруппированный
объект
stream.groupByKey(Grouped.with(Serdes.Bytes(), Serdes.Long()));
Терминальные операции
Терминальная операция в потоках Кафки – это метод, который возвращает void
вместо промежуточного , такого как другой Поток
или KTable
.
Вы можете использовать метод to
для хранения записей Потока
в теме в Кафке.
KStreamstream = builder.stream("words"); stream.mapValues(value -> value.toUpperCase()) .to("uppercase-words");
Перегруженная версия to
позволяет указать Созданный
объект для настройки Сердец
и разделителя
stream.mapValues(value -> value.toUpperCase()) .to("output-topic",Produced.with(Serdes.Bytes(), Serdes.Long()));
Вместо указания статического имени темы вы можете использовать Экстрактор имен тем
и включить любую пользовательскую логику для динамического выбора определенной темы
stream.mapValues(value -> value.toUpperCase()) .to(new TopicNameExtractor() { @Override public String extract(String k, String v, RecordContext rc) { return rc.topic()+"_uppercase"; } });
В этом примере мы используем контекст записи
, который содержит метаданные записи, чтобы получить тему и добавить _uppercase
к нему
Во всех вышеперечисленных случаях тема раковины должна уже существовать в Кафке
Если вы хотите протоколировать записи потока
(для целей отладки), используйте метод печать
. Он принимает экземпляр Printed
для настройки поведения.
StreamsBuilder builder = new StreamsBuilder(); KStreamstream = builder.stream(INPUT_TOPIC); stream.mapValues(v -> v.toUpperCase()).print(Printed.toSysOut());
Это позволит распечатать записи, например, если вы передадите (foo, bar)
и (джон, доу)
в теме ввода, они будут преобразованы в верхний регистр и зарегистрированы как таковые:
[KSTREAM-MAPVALUES-0000000001]: foo, BAR [KSTREAM-MAPVALUES-0000000001]: john, DOE
Вы также можете использовать Printed.to Файл
((вместо toSysOut
) для таргетинга на определенный файл
метод foreach
аналогичен методу print
и заглянуть
т.е.
- это также терминальная операция (например,
печать
) - и он принимает
Для каждого действия
( нравитсяподглядывать
)
Прочие операции
Поскольку метод print
является терминальной операцией, у вас есть возможность использовать peek
, который возвращает тот же экземпляр Stream
! Он принимает Для каждого действия
, которое можно использовать для указания того, что вы хотите сделать для каждой записи, например, записать ключ и значение
StreamsBuilder builder = new StreamsBuilder(); KStreamstream = builder.stream(INPUT_TOPIC); stream.mapValues(v -> v.toUpperCase()) .peek((k,v) -> System.out.println("key="+k+", value="+v)) .to(OUTPUT_TOPIC);
В приведенном выше примере вы сможете увидеть регистрируемые ключ и значения, и они также будут материализованы в разделе вывода (в отличие от операции печать
).
ветвь
– это метод, который я не использовал (честно говоря!), Но он выглядит довольно интересно. Это дает вам возможность оценивать каждую запись в Потоке
по нескольким критериям (представленным Предикатом
) и выводить несколько (массив) Поток
с. Ключевым моментом здесь является то, что вы можете использовать несколько предикатов вместо одного, как в случае с фильтром
и фильтр Не
Вы можете объединить
два потока
вместе в один.
StreamsBuilder builder = new StreamsBuilder(); KStreamstream1 = builder.stream("topic1"); KStream stream2 = builder.stream("topic2"); stream1.merge(stream2).to("output-topic");
пожалуйста, обратите внимание, что в результирующем потоке могут быть не все записи в порядке
Если вы хотите получить новый ключ (он также может иметь другой тип) для каждой записи в вашем потоке
, используйте выберите ключ
метод, который принимает KeyValueMapper
. клавиша выбора
аналогична карте
но разница в том, что карта
ограничивает возвращаемый тип значением ключа объект
StreamsBuilder builder = new StreamsBuilder(); KStreamstream = builder.stream(INPUT_TOPIC); stream.selectKey(new KeyValueMapper () { @Override public String apply(String k, String v) { return k.toUpperCase(); } })
При разработке конвейеров обработки с помощью Kafka Streams DSL вы обнаружите, что отправляете записи результирующего потока в раздел вывода, используя в
, а затем создаете новый поток из этой (выходной) темы т.е.
StreamsBuilder builder = new StreamsBuilder(); KStreamstream1 = builder.stream(INPUT_TOPIC); stream1.mapValues(v -> v.toUpperCase()).to(OUTPUT_TOPIC); //output topic now becomes the input source KStream stream2 = builder.stream(OUTPUT_TOPIC); //continue processing with stream2 stream2.filter((k,v) -> v.length > 5).to(LENGTHY_WORDS_TOPIC);
Это можно упростить, используя метод через
. Таким образом, вы можете переписать вышесказанное следующим образом:
StreamsBuilder builder = new StreamsBuilder(); KStreamstream = builder.stream(INPUT_TOPIC); stream.mapValues(v -> v.toUpperCase()) .through(OUTPUT_TOPIC) .filter((k,v) -> v.length > 5) .to(LENGTHY_WORDS_TOPIC);
Здесь мы материализуем записи (со значениями в верхнем регистре) в промежуточную тему и продолжаем обработку (в данном случае с использованием фильтра
) и, наконец, сохраняем результаты постфильтрации в другой теме
На этом пока все. Следите за новостями для предстоящих постов в этой серии!
Рекомендации
Пожалуйста, не забудьте проверить следующие ресурсы для потоков Кафки
Оригинал: “https://dev.to/itnext/learn-stream-processing-with-kafka-streams-stateless-operations-1k4h”