Введение
В документации Kafka Streams у нас есть следующее описание: “Kafka Streams-это клиентская библиотека для создания приложений и microserviços, где входные и выходные данные хранятся в кластерах Кафка. Он сочетает в себе простоту написания и подняться приложений на Java и Scala на стороне клиента с преимуществами технологии кластеризации на стороне сервера Кафка”
Это означает, что Kafka Streams-это инструмент для обработки потоков данных (потоки) в режиме реального времени, интегрированного в среду Кафка. Возможность обработки, преобразования и сохранение данных в новые темы.
В этой статье будет показан пример, где приложения, подключенного к Кафка использует библиотеке Kafka Streams для выполнения обработки данных.
проект
Ранее в статьи прошлом было показано, как получения сообщений с Kafka и как потреблять сообщения, Кафка и теперь мы будем продолжать эту модель, в которой производитель будет отправлять эти данные, и как потребитель просто, как и наш потребительские Потоки будут обрабатывать, и в этом примере мы не будем упорствовать в новую тему мы будем только звонить Процессор , который будет сохраняться в базе данных. Обзор, как проекты, взаимодействуют с Кафки было бы что-то вроде:
Запуск проекта
Проект был создан на веб-сайте Spring Initializr как проект Maven и использовании Kafka Streams нужно добавить зависимости Apache , а также была добавлена зависимости от Confluent может быть использован кронштейн, который существует для Schema-Registry как Сериализации и Deserialização (SerDe):
org.apache.avro avro 1.10.1 org.apache.kafka kafka-streams org.apache.kafka kafka-clients io.confluent kafka-avro-serializer 5.3.0 io.confluent kafka-streams-avro-serde 5.3.0 org.slf4j slf4j-log4j12
И также нужно добавить тег, который указывает, где должны быть загружены в зависимости от Confluent :
confluent https://packages.confluent.io/maven/
Конфигурация свойств Кафки
O Кафка é todo baseado em конфигурирование для поиска путей через Свойства como без примера:
@Configuration public class KafkaConfiguration implements MessageConfiguration { @Autowired private KafkaProperties kafkaProperties; @Override public Properties configureProperties() { Properties properties = new Properties(); properties.put(APPLICATION_ID_CONFIG, kafkaProperties.getApplicationId()); properties.put(GROUP_ID_CONFIG, kafkaProperties.getGroupId()); properties.put(BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); properties.put(CLIENT_ID_CONFIG, kafkaProperties.getClientId()); properties.put(PROCESSING_GUARANTEE_CONFIG, kafkaProperties.getProcessingGuaranteeConfig()); properties.put(AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getOffsetReset()); properties.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, kafkaProperties.getTimeStampExtarctor()); properties.put(SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getSchemaRegistryUrl()); properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, kafkaProperties.getDefaultKeySerde()); properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, kafkaProperties.getDefaultValueSerde()); properties.put(SPECIFIC_AVRO_READER_CONFIG, kafkaProperties.isSpecficAvroReader()); return properties; } }
Вышеуказанных параметров стоит отметить следующие:
- APPLICATION_ID_CONFIG :, уникальный идентификатор процесса в кластере Kafka.
- ИДЕНТИФИКАТОР ГРУППЫ_КОНФИГ : Идентификатор группы потребителей.
- BOOTSTRAP_SERVERS_CONFIG : Может быть список url’s соединения с кластером Kafka.
- SCHEMA_REGISTRY_URL_CONFIG : Url-адрес для подключения Реестра схем .
- DEFAULT_KEY_SERDE_CLASS_CONFIG : Определение Сериализатор/Deserializador по умолчанию для ключей сообщения.
- DEFAULT_VALUE_СЕРВЕР_КЛАСС_КОНФИГУРАЦИИ : Определение Сериализатор/Deserializador по умолчанию для сообщений.
- SPECIFIC_AVRO_READER_КОНФИГУРАЦИЯ Указывает, что будет использоваться класс конкретной, чтобы прочитать сообщение Avro
Настройка обработки
После получения наших данных Taxpayer есть информация о том, по situation налогоплательщика. Мы можем использовать это, и установить процессор от конкретной ситуации налогоплательщика; в случае true обрабатывается классом Ситуация с обработчиком налогоплательщиков Истинна e caso seja ложь pela Ситуация с обработчиком налогоплательщиков Ложная . Для этого примера мы будем обрабатывать каждый налогоплательщик, в соответствии с его situation и сохранить в базе данных для каждого типа ситуации налогоплательщика.
API Kafka Streams предоставляет интерфейс вызовов Processor с целью процессором данных, а также у нас есть абстракции вызова AbstractProcessor стало еще проще этой работы, то классы расположены таким образом, чтобы situation равно true :
@Component @Slf4j public class TaxpayerProcessorSituationTrue extends AbstractProcessor{ @Autowired private TaxpayerPort repository; @Override public void process(String key, TaxPayer value) { log.info("Processing Taxpayer with situation :: " + value.getSituation()); ComplaintTaxpayer complaintTaxpayer = ComplaintTaxpayer.createDefaultedTaxpayer(value); repository.save(complaintTaxpayer); } }
И situation равно false :
@Component @Slf4j public class TaxpayerProcessorSituationFalse extends AbstractProcessor{ @Autowired private TaxpayerPort repository; @Override public void process(String key, TaxPayer value) { log.info("Processing Taxpayer with situation :: " + value.getSituation()); DefaultedTaxpayer defaultedTaxpayer = DefaultedTaxpayer.createDefaultedTaxpayer(value); repository.save(defaultedTaxpayer); } }
Настройка Stream
Настройки Stream необходимо быть в прошлом Properties кластера Кафка имя раздел и конфигурации с SerDe :
StreamsBuilder streamsBuilder = new StreamsBuilder(); SerdetaxpayerAvroSerde = new SpecificAvroSerde<>(); taxpayerAvroSerde.configure(getSerdeProperties(), false); KStream stream = streamsBuilder.stream(getTopic(), Consumed.with(Serdes.String(), taxpayerAvroSerde));
В данном фрагменте кода мы можем видеть, что Кафка имеется Builder нашей Streams , то она получает топика и сочетание ключ/значение сообщения, которое будет получено. Как мы работаем с Schema-Registry определяем, что значение этого сообщения, будет Быть .
С объектом KStream мы можем сделать различные манипуляции с данными, которые будут приехать сюда, чтобы мы могли фильтровать их и направить в Processor уверен, мы будем использовать метод отделение нам возвращает. Array :
KStream[] branch = stream.branch( (id, tax) -> tax.getSituation() == false, (id, tax) -> tax.getSituation() == true );
В приведенном выше коде мы используем метод отделение , чтобы сделать наш фильтр по situation и мы можем делегировать для Processors :
branch[0].process(() -> processorFalse); branch[1].process(() -> processorTrue);
Полный пример этой конфигурации:
@Autowired private TaxpayerProcessorSituationTrue processorTrue; @Autowired private TaxpayerProcessorSituationFalse processorFalse; private KafkaStreams kafkaStreams; @Override public String getTopic() { return "taxpayer-avro"; } @SuppressWarnings("unchecked") @Override public StreamsBuilder creataStream() { StreamsBuilder streamsBuilder = new StreamsBuilder(); SerdetaxpayerAvroSerde = new SpecificAvroSerde<>(); taxpayerAvroSerde.configure(getSerdeProperties(), false); KStream stream = streamsBuilder.stream(getTopic(), Consumed.with(Serdes.String(), taxpayerAvroSerde)); KStream [] branch = stream.branch( (id, tax) -> tax.getSituation() == false, (id, tax) -> tax.getSituation() == true ); branch[0].process(() -> processorFalse); branch[1].process(() -> processorTrue); return streamsBuilder; } private Map getSerdeProperties() { return Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, (String)kakfaConfiguration.configureProperties().get(SCHEMA_REGISTRY_URL_CONFIG)); }
Настройка start
С |/| Stream настройки необходимо сделать старт, где будет в прошлом Stream параметры кластера Кафка//, гм//обработчик/|де |/Исключения и гак мы обработки shutdown приложения.
@PostConstruct @Override public void start() { StreamsBuilder streamsBuilder = this.creataStream(); kafkaStreams = new KafkaStreams(streamsBuilder.build(), kakfaConfiguration.configureProperties()); kafkaStreams.setUncaughtExceptionHandler(this.getUncaughtExceptionHandler()); kafkaStreams.start(); this.shutDown(); } @Override public void shutDown() { Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close)); } private UncaughtExceptionHandler getUncaughtExceptionHandler() { return (thread, exception) -> exception.printStackTrace(); }
Давая новую тему
Точка используется также при работе с Streams обработки, переработки и производства новых потоков данных, то мы можем имитировать получение сообщения Taxpayer и меняем |/имя про нижний регистр e enviamos para um novo tópico шамаду имя-в нижнем регистре-тема :
KStreamnameLowerCase = stream.mapValues(taxpayer -> taxpayer.getName().toLowerCase()); nameLowerCase.to("name-lower-case-topic");
Завершение
Вопрос о том, что нужно сделать, чтобы сформировать форму пользователя Потоки Кафки порежьте библиотеку, расширяя ее, и тем самым повторите, чтобы узнать больше о консультациях, как это делают документалисты Потоки Apache Kafka e da Сливающиеся потоки Кафки
Исходный код
Исходный код этого проекта можно найти на GitHub
Оригинал: “https://dev.to/guilhermegarcia86/kafka-streams-com-java-1g4o”