Рубрики
Без рубрики

Кафка: consumindo регистрируется весной

Потребление данных от Кафки, в зависимости от случая использования, требует некоторой осторожности в commit. Сделать это… С пометкой кафка, кафка, весна, ява.

Потребление данных от Кафки, в зависимости от случая использования, требует некоторой осторожности в commit. Сделать это правильно, важно убедиться, что ничего не потеряно.

С Клиентами Кафка, в специальной версии для Java, это очень простой и понятный, но в Spring Kafka требуется внимание со специальных настроек.

В центре внимания этой статье, чтобы показать, как использовать данные с Spring Кафка, который является абстракцией над Клиенты Кафка для Java. Делаем это, следуя setup:

  • включить.авто.фиксацию=ложь . Инструкция по выполнению запроса
  • совершить синхронизацию

Из-за семантики | доставка| at-least-once , регистрация может быть потребляли или п раза, и этот setup поиска уменьшить это.

Потребитель типичный Spring Kafka написана так:

@Component
public class SpringKafkaListener {
  @KafkaListener(topics = "topico")
  public void consume(String valor) {
    // Processar valor do registro
  }
}

И создан со следующими параметрами, внесенных в application.свойства :

spring.kafka.bootstrap-servers=configure-me_kafka-broker:9092
spring.kafka.consumer.client-id=configure-me_client-id
spring.kafka.consumer.group-id=configure-me_group-id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

К счастью, Spring Кафка не сбрасывает значения, поэтому значение по умолчанию, сохраняется так же, как определяется в официальные документы , однако, в нем совершить, – это автоматический и асинхронный. Контудо, нет весенней Кафки, тодас, как необходимые настройки, пара инструкций по фиксации, не требуется, нет приложения.свойства .

Разрешение проблемы

Spring Кафка-это abstraçao, логотип poll loop commit являются прозрачными. И, как вы можете видеть в примере, потребитель получает только по записи и по умолчанию не имеет доступа к Consumer .

Первый нужно проверить настройки для выключения автоматической фиксации.

Новая конфигурация:

# Nada de novo aqui
spring.kafka.bootstrap-servers=configure-me_kafka-broker:9092
spring.kafka.consumer.client-id=configure-me_client-id
spring.kafka.consumer.group-id=configure-me_group-id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Desliga o commit automático no Cliente Kafka
spring.kafka.consumer.enable-auto-commit=false

Spring имеет свои собственные обозначения для большинства конфигураций, имеющихся в Kafka Consumer, которые локализуются во время выполнения, чтобы имя правильно.

Теперь, что commit авто был выключен, необходимы некоторые корректировки программ, сделанных при настройке заводы объектов:

  • ackMode для РУКОВОДСТВО
  • синхронизация фиксирует комо верно
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;

@EnableKafka
@Configuration
public class KafkaConfig {

  @Autowired
  KafkaProperties properties;

  @Bean
  public ConsumerFactory consumerFactory() {
      return new DefaultKafkaConsumerFactory<>(
              properties.buildConsumerProperties());
  }

  @Bean
  public KafkaListenerContainerFactory>
      kafkaListenerContainerFactory() {

      ConcurrentKafkaListenerContainerFactory listener = 
            new ConcurrentKafkaListenerContainerFactory<>();

      listener.setConsumerFactory(consumerFactory());

      // Não falhar, caso ainda não existam os tópicos para consumo
      listener.getContainerProperties()
          .setMissingTopicsFatal(false);

      // ### AQUI
      // Commit manual do offset
      listener.getContainerProperties().setAckMode(AckMode.MANUAL);

      // ### AQUI
      // Commits síncronos
      listener.getContainerProperties().setSyncCommits(Boolean.TRUE);

      return listener;
    }
}

Поэтому потребитель с Spring Kafka будет выглядеть следующим образом:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class SpringKafkaListener {

  @KafkaListener(topics = "topico")
  public void consume(@Payload String valor, Acknowledgment ack) {

    //TODO Processar registro
    // . . . 

    // Commmit manual, que também será síncrono
    ack.acknowledge();

  }
}

Обратите внимание, что даже так, нет доступа к Consumer , вместо того, Spring внедряет экземпляр Acknowledgment что делает commit имеет свой метод, acknowledge() выполняется.

Также в этом примере смещение подтверждать каждый реестр обрабатывается. Это то, что ухудшает скорость передачи данных, но еще больше снижает шансы на потребление дубликатов. Хорошо, но каждый случай индивидуален 😊 .

Полный пример доступен на Github:

фабио хосе/skc-бывший

Весенний Потребитель Кафки

Пример потребительского комизма Кафки

Требования

  • JDK 1.8
  • Доступ к хранилищу Доступ к хранилищу или альтернативы с бесплатным зависимостей, имеющихся в или альтернативы с бесплатным зависимостей, имеющихся в

Конфигурация

Не волнуйтесь, потому что несмотря на то, что есть и ярлыки по variávies среды, вы можете спокойно использовать то, что Spring Boot предлагает. Тогда смотрите все свойства Тогда смотрите все свойства

В случае, если Кафка, используем Spring, Kafka, то можно использовать режим Spring для настройки.

Сборка и запуск

Знаток

Пара монтар, о фатьяр, выполняй, о командир:

Линукс

./mvnw clean package

Окна

.\mvnw.cmd clean package

Для выполнения:

Вы можете использовать docker-compose.yaml чтобы подняться Кафка в своей машине

java \
  -Dspring.kafka.bootstrap-servers='localhost:9092' \
  -Dspring.kafka.consumer.client-id='spring-kafka-ex' \
  -Dspring.kafka.consumer.group-id='meu-grupo' \
  -jar target/app-spring-boot.jar

Docker

A definiçao Док-файл desta aplicaçao emprega многоступенчатые сборки Это означает, что в нем происходит сборки приложения и созданию образа.

Если…

Photo by Павел Червиньский on Unsplash

Оригинал: “https://dev.to/kafkabr/kafka-consumindo-registros-com-spring-40h8”