Введение
В документации Kafka Streams у нас есть следующее описание: “Kafka Streams-это клиентская библиотека для создания приложений и microserviços, где входные и выходные данные хранятся в кластерах Кафка. Он сочетает в себе простоту написания и подняться приложений на Java и Scala на стороне клиента с преимуществами технологии кластеризации на стороне сервера Кафка”
Это означает, что Kafka Streams-это инструмент для обработки потоков данных (потоки) в режиме реального времени, интегрированного в среду Кафка. Возможность обработки, преобразования и сохранение данных в новые темы.
В этой статье будет показан пример, где приложения, подключенного к Кафка использует библиотеке Kafka Streams для выполнения обработки данных.
проект
Ранее в статьи прошлом было показано, как получения сообщений с Kafka и как потреблять сообщения, Кафка и теперь мы будем продолжать эту модель, в которой производитель будет отправлять эти данные, и как потребитель просто, как и наш потребительские Потоки будут обрабатывать, и в этом примере мы не будем упорствовать в новую тему мы будем только звонить Процессор , который будет сохраняться в базе данных. Обзор, как проекты, взаимодействуют с Кафки было бы что-то вроде:
Запуск проекта
Проект был создан на веб-сайте Spring Initializr как проект Maven и использовании Kafka Streams нужно добавить зависимости Apache , а также была добавлена зависимости от Confluent может быть использован кронштейн, который существует для Schema-Registry как Сериализации и Deserialização (SerDe):
org.apache.avro avro 1.10.1 org.apache.kafka kafka-streams org.apache.kafka kafka-clients io.confluent kafka-avro-serializer 5.3.0 io.confluent kafka-streams-avro-serde 5.3.0 org.slf4j slf4j-log4j12
И также нужно добавить тег, который указывает, где должны быть загружены в зависимости от Confluent :
confluent https://packages.confluent.io/maven/
Конфигурация свойств Кафки
O Кафка é todo baseado em конфигурирование для поиска путей через Свойства como без примера:
@Configuration
public class KafkaConfiguration implements MessageConfiguration {
@Autowired
private KafkaProperties kafkaProperties;
@Override
public Properties configureProperties() {
Properties properties = new Properties();
properties.put(APPLICATION_ID_CONFIG, kafkaProperties.getApplicationId());
properties.put(GROUP_ID_CONFIG, kafkaProperties.getGroupId());
properties.put(BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
properties.put(CLIENT_ID_CONFIG, kafkaProperties.getClientId());
properties.put(PROCESSING_GUARANTEE_CONFIG, kafkaProperties.getProcessingGuaranteeConfig());
properties.put(AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getOffsetReset());
properties.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, kafkaProperties.getTimeStampExtarctor());
properties.put(SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getSchemaRegistryUrl());
properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, kafkaProperties.getDefaultKeySerde());
properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, kafkaProperties.getDefaultValueSerde());
properties.put(SPECIFIC_AVRO_READER_CONFIG, kafkaProperties.isSpecficAvroReader());
return properties;
}
}
Вышеуказанных параметров стоит отметить следующие:
- APPLICATION_ID_CONFIG :, уникальный идентификатор процесса в кластере Kafka.
- ИДЕНТИФИКАТОР ГРУППЫ_КОНФИГ : Идентификатор группы потребителей.
- BOOTSTRAP_SERVERS_CONFIG : Может быть список url’s соединения с кластером Kafka.
- SCHEMA_REGISTRY_URL_CONFIG : Url-адрес для подключения Реестра схем .
- DEFAULT_KEY_SERDE_CLASS_CONFIG : Определение Сериализатор/Deserializador по умолчанию для ключей сообщения.
- DEFAULT_VALUE_СЕРВЕР_КЛАСС_КОНФИГУРАЦИИ : Определение Сериализатор/Deserializador по умолчанию для сообщений.
- SPECIFIC_AVRO_READER_КОНФИГУРАЦИЯ Указывает, что будет использоваться класс конкретной, чтобы прочитать сообщение Avro
Настройка обработки
После получения наших данных Taxpayer есть информация о том, по situation налогоплательщика. Мы можем использовать это, и установить процессор от конкретной ситуации налогоплательщика; в случае true обрабатывается классом Ситуация с обработчиком налогоплательщиков Истинна e caso seja ложь pela Ситуация с обработчиком налогоплательщиков Ложная . Для этого примера мы будем обрабатывать каждый налогоплательщик, в соответствии с его situation и сохранить в базе данных для каждого типа ситуации налогоплательщика.
API Kafka Streams предоставляет интерфейс вызовов Processor с целью процессором данных, а также у нас есть абстракции вызова AbstractProcessor стало еще проще этой работы, то классы расположены таким образом, чтобы situation равно true :
@Component @Slf4j public class TaxpayerProcessorSituationTrue extends AbstractProcessor{ @Autowired private TaxpayerPort repository; @Override public void process(String key, TaxPayer value) { log.info("Processing Taxpayer with situation :: " + value.getSituation()); ComplaintTaxpayer complaintTaxpayer = ComplaintTaxpayer.createDefaultedTaxpayer(value); repository.save(complaintTaxpayer); } }
И situation равно false :
@Component @Slf4j public class TaxpayerProcessorSituationFalse extends AbstractProcessor{ @Autowired private TaxpayerPort repository; @Override public void process(String key, TaxPayer value) { log.info("Processing Taxpayer with situation :: " + value.getSituation()); DefaultedTaxpayer defaultedTaxpayer = DefaultedTaxpayer.createDefaultedTaxpayer(value); repository.save(defaultedTaxpayer); } }
Настройка Stream
Настройки Stream необходимо быть в прошлом Properties кластера Кафка имя раздел и конфигурации с SerDe :
StreamsBuilder streamsBuilder = new StreamsBuilder(); SerdetaxpayerAvroSerde = new SpecificAvroSerde<>(); taxpayerAvroSerde.configure(getSerdeProperties(), false); KStream stream = streamsBuilder.stream(getTopic(), Consumed.with(Serdes.String(), taxpayerAvroSerde));
В данном фрагменте кода мы можем видеть, что Кафка имеется Builder нашей Streams , то она получает топика и сочетание ключ/значение сообщения, которое будет получено. Как мы работаем с Schema-Registry определяем, что значение этого сообщения, будет Быть .
С объектом KStream мы можем сделать различные манипуляции с данными, которые будут приехать сюда, чтобы мы могли фильтровать их и направить в Processor уверен, мы будем использовать метод отделение нам возвращает. Array :
KStream[] branch = stream.branch( (id, tax) -> tax.getSituation() == false, (id, tax) -> tax.getSituation() == true );
В приведенном выше коде мы используем метод отделение , чтобы сделать наш фильтр по situation и мы можем делегировать для Processors :
branch[0].process(() -> processorFalse); branch[1].process(() -> processorTrue);
Полный пример этой конфигурации:
@Autowired
private TaxpayerProcessorSituationTrue processorTrue;
@Autowired
private TaxpayerProcessorSituationFalse processorFalse;
private KafkaStreams kafkaStreams;
@Override
public String getTopic() {
return "taxpayer-avro";
}
@SuppressWarnings("unchecked")
@Override
public StreamsBuilder creataStream() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
Serde taxpayerAvroSerde = new SpecificAvroSerde<>();
taxpayerAvroSerde.configure(getSerdeProperties(), false);
KStream stream = streamsBuilder.stream(getTopic(), Consumed.with(Serdes.String(), taxpayerAvroSerde));
KStream[] branch = stream.branch(
(id, tax) -> tax.getSituation() == false,
(id, tax) -> tax.getSituation() == true
);
branch[0].process(() -> processorFalse);
branch[1].process(() -> processorTrue);
return streamsBuilder;
}
private Map getSerdeProperties() {
return Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, (String)kakfaConfiguration.configureProperties().get(SCHEMA_REGISTRY_URL_CONFIG));
}
Настройка start
С |/| Stream настройки необходимо сделать старт, где будет в прошлом Stream параметры кластера Кафка//, гм//обработчик/|де |/Исключения и гак мы обработки shutdown приложения.
@PostConstruct
@Override
public void start() {
StreamsBuilder streamsBuilder = this.creataStream();
kafkaStreams = new KafkaStreams(streamsBuilder.build(), kakfaConfiguration.configureProperties());
kafkaStreams.setUncaughtExceptionHandler(this.getUncaughtExceptionHandler());
kafkaStreams.start();
this.shutDown();
}
@Override
public void shutDown() {
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
private UncaughtExceptionHandler getUncaughtExceptionHandler() {
return (thread, exception) -> exception.printStackTrace();
}
Давая новую тему
Точка используется также при работе с Streams обработки, переработки и производства новых потоков данных, то мы можем имитировать получение сообщения Taxpayer и меняем |/имя про нижний регистр e enviamos para um novo tópico шамаду имя-в нижнем регистре-тема :
KStreamnameLowerCase = stream.mapValues(taxpayer -> taxpayer.getName().toLowerCase()); nameLowerCase.to("name-lower-case-topic");
Завершение
Вопрос о том, что нужно сделать, чтобы сформировать форму пользователя Потоки Кафки порежьте библиотеку, расширяя ее, и тем самым повторите, чтобы узнать больше о консультациях, как это делают документалисты Потоки Apache Kafka e da Сливающиеся потоки Кафки
Исходный код
Исходный код этого проекта можно найти на GitHub
Оригинал: “https://dev.to/guilhermegarcia86/kafka-streams-com-java-1g4o”