Рубрики
Без рубрики

Кафка стримит ком Ява

Обрабатывая поток данных в реальном времени. Помеченный кафкой, потоками, java.

Введение

В документации Kafka Streams у нас есть следующее описание: “Kafka Streams-это клиентская библиотека для создания приложений и microserviços, где входные и выходные данные хранятся в кластерах Кафка. Он сочетает в себе простоту написания и подняться приложений на Java и Scala на стороне клиента с преимуществами технологии кластеризации на стороне сервера Кафка”

Это означает, что Kafka Streams-это инструмент для обработки потоков данных (потоки) в режиме реального времени, интегрированного в среду Кафка. Возможность обработки, преобразования и сохранение данных в новые темы.

В этой статье будет показан пример, где приложения, подключенного к Кафка использует библиотеке Kafka Streams для выполнения обработки данных.

проект

Ранее в статьи прошлом было показано, как получения сообщений с Kafka и как потреблять сообщения, Кафка и теперь мы будем продолжать эту модель, в которой производитель будет отправлять эти данные, и как потребитель просто, как и наш потребительские Потоки будут обрабатывать, и в этом примере мы не будем упорствовать в новую тему мы будем только звонить Процессор , который будет сохраняться в базе данных. Обзор, как проекты, взаимодействуют с Кафки было бы что-то вроде:

Запуск проекта

Проект был создан на веб-сайте Spring Initializr как проект Maven и использовании Kafka Streams нужно добавить зависимости Apache , а также была добавлена зависимости от Confluent может быть использован кронштейн, который существует для Schema-Registry как Сериализации и Deserialização (SerDe):



    org.apache.avro
    avro
    1.10.1


    org.apache.kafka
    kafka-streams





    org.apache.kafka
    kafka-clients




    io.confluent
    kafka-avro-serializer
    5.3.0



    io.confluent
    kafka-streams-avro-serde
    5.3.0
    
        
            org.slf4j
            slf4j-log4j12
        
    

И также нужно добавить тег, который указывает, где должны быть загружены в зависимости от Confluent :


    
        confluent
        https://packages.confluent.io/maven/
    

Конфигурация свойств Кафки

O Кафка é todo baseado em конфигурирование для поиска путей через Свойства como без примера:

@Configuration
public class KafkaConfiguration implements MessageConfiguration {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Override
    public Properties configureProperties() {

        Properties properties = new Properties();

        properties.put(APPLICATION_ID_CONFIG, kafkaProperties.getApplicationId());
        properties.put(GROUP_ID_CONFIG, kafkaProperties.getGroupId());
        properties.put(BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        properties.put(CLIENT_ID_CONFIG, kafkaProperties.getClientId());

        properties.put(PROCESSING_GUARANTEE_CONFIG, kafkaProperties.getProcessingGuaranteeConfig());
        properties.put(AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getOffsetReset());
        properties.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, kafkaProperties.getTimeStampExtarctor());

        properties.put(SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getSchemaRegistryUrl());
        properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, kafkaProperties.getDefaultKeySerde());
        properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, kafkaProperties.getDefaultValueSerde());
        properties.put(SPECIFIC_AVRO_READER_CONFIG, kafkaProperties.isSpecficAvroReader());

        return properties;
    }

}

Вышеуказанных параметров стоит отметить следующие:

  • APPLICATION_ID_CONFIG :, уникальный идентификатор процесса в кластере Kafka.
  • ИДЕНТИФИКАТОР ГРУППЫ_КОНФИГ : Идентификатор группы потребителей.
  • BOOTSTRAP_SERVERS_CONFIG : Может быть список url’s соединения с кластером Kafka.
  • SCHEMA_REGISTRY_URL_CONFIG : Url-адрес для подключения Реестра схем .
  • DEFAULT_KEY_SERDE_CLASS_CONFIG : Определение Сериализатор/Deserializador по умолчанию для ключей сообщения.
  • DEFAULT_VALUE_СЕРВЕР_КЛАСС_КОНФИГУРАЦИИ : Определение Сериализатор/Deserializador по умолчанию для сообщений.
  • SPECIFIC_AVRO_READER_КОНФИГУРАЦИЯ Указывает, что будет использоваться класс конкретной, чтобы прочитать сообщение Avro

Настройка обработки

После получения наших данных Taxpayer есть информация о том, по situation налогоплательщика. Мы можем использовать это, и установить процессор от конкретной ситуации налогоплательщика; в случае true обрабатывается классом Ситуация с обработчиком налогоплательщиков Истинна e caso seja ложь pela Ситуация с обработчиком налогоплательщиков Ложная . Для этого примера мы будем обрабатывать каждый налогоплательщик, в соответствии с его situation и сохранить в базе данных для каждого типа ситуации налогоплательщика.

API Kafka Streams предоставляет интерфейс вызовов Processor с целью процессором данных, а также у нас есть абстракции вызова AbstractProcessor стало еще проще этой работы, то классы расположены таким образом, чтобы situation равно true :

@Component
@Slf4j
public class TaxpayerProcessorSituationTrue extends AbstractProcessor{

    @Autowired
    private TaxpayerPort repository;

    @Override
    public void process(String key, TaxPayer value) {
        log.info("Processing Taxpayer with situation :: " + value.getSituation());
        ComplaintTaxpayer complaintTaxpayer = ComplaintTaxpayer.createDefaultedTaxpayer(value);
        repository.save(complaintTaxpayer);

    }

}

И situation равно false :

@Component
@Slf4j
public class TaxpayerProcessorSituationFalse extends AbstractProcessor {

    @Autowired
    private TaxpayerPort repository;

    @Override
    public void process(String key, TaxPayer value) {
        log.info("Processing Taxpayer with situation :: " + value.getSituation());
        DefaultedTaxpayer defaultedTaxpayer = DefaultedTaxpayer.createDefaultedTaxpayer(value);
        repository.save(defaultedTaxpayer);

    }

}

Настройка Stream

Настройки Stream необходимо быть в прошлом Properties кластера Кафка имя раздел и конфигурации с SerDe :

StreamsBuilder streamsBuilder = new StreamsBuilder();

Serde taxpayerAvroSerde = new SpecificAvroSerde<>();

taxpayerAvroSerde.configure(getSerdeProperties(), false);

KStream stream = streamsBuilder.stream(getTopic(), Consumed.with(Serdes.String(), taxpayerAvroSerde));

В данном фрагменте кода мы можем видеть, что Кафка имеется Builder нашей Streams , то она получает топика и сочетание ключ/значение сообщения, которое будет получено. Как мы работаем с Schema-Registry определяем, что значение этого сообщения, будет Быть .

С объектом KStream мы можем сделать различные манипуляции с данными, которые будут приехать сюда, чтобы мы могли фильтровать их и направить в Processor уверен, мы будем использовать метод отделение нам возвращает. Array :

KStream[] branch = stream.branch(
                (id, tax) -> tax.getSituation() == false,
                (id, tax) -> tax.getSituation() == true
                );

В приведенном выше коде мы используем метод отделение , чтобы сделать наш фильтр по situation и мы можем делегировать для Processors :

branch[0].process(() -> processorFalse);
branch[1].process(() -> processorTrue);

Полный пример этой конфигурации:

@Autowired
private TaxpayerProcessorSituationTrue processorTrue;

@Autowired
private TaxpayerProcessorSituationFalse processorFalse;

private KafkaStreams kafkaStreams;

@Override
public String getTopic() {
    return "taxpayer-avro";
}

@SuppressWarnings("unchecked")
@Override
public StreamsBuilder creataStream() {

    StreamsBuilder streamsBuilder = new StreamsBuilder();

    Serde taxpayerAvroSerde = new SpecificAvroSerde<>();

    taxpayerAvroSerde.configure(getSerdeProperties(), false);

    KStream stream = streamsBuilder.stream(getTopic(), Consumed.with(Serdes.String(), taxpayerAvroSerde));

    KStream[] branch = stream.branch(
            (id, tax) -> tax.getSituation() == false,
            (id, tax) -> tax.getSituation() == true
            );

    branch[0].process(() -> processorFalse);
    branch[1].process(() -> processorTrue);

    return streamsBuilder;
}

private Map getSerdeProperties() {
    return Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, (String)kakfaConfiguration.configureProperties().get(SCHEMA_REGISTRY_URL_CONFIG));
}

Настройка start

С |/| Stream настройки необходимо сделать старт, где будет в прошлом Stream параметры кластера Кафка//, гм//обработчик/|де |/Исключения и гак мы обработки shutdown приложения.

@PostConstruct
@Override
public void start() {

    StreamsBuilder streamsBuilder = this.creataStream();

    kafkaStreams = new KafkaStreams(streamsBuilder.build(), kakfaConfiguration.configureProperties());
    kafkaStreams.setUncaughtExceptionHandler(this.getUncaughtExceptionHandler());
    kafkaStreams.start();

    this.shutDown();
}

@Override
public void shutDown() {
    Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}

private UncaughtExceptionHandler getUncaughtExceptionHandler() {
    return (thread, exception) -> exception.printStackTrace();
}

Давая новую тему

Точка используется также при работе с Streams обработки, переработки и производства новых потоков данных, то мы можем имитировать получение сообщения Taxpayer и меняем |/имя про нижний регистр e enviamos para um novo tópico шамаду имя-в нижнем регистре-тема :

KStream nameLowerCase = stream.mapValues(taxpayer -> taxpayer.getName().toLowerCase());
nameLowerCase.to("name-lower-case-topic");

Завершение

Вопрос о том, что нужно сделать, чтобы сформировать форму пользователя Потоки Кафки порежьте библиотеку, расширяя ее, и тем самым повторите, чтобы узнать больше о консультациях, как это делают документалисты Потоки Apache Kafka e da Сливающиеся потоки Кафки

Исходный код

Исходный код этого проекта можно найти на GitHub

Оригинал: “https://dev.to/guilhermegarcia86/kafka-streams-com-java-1g4o”