Потребление данных от Кафки, в зависимости от случая использования, требует некоторой осторожности в commit. Сделать это правильно, важно убедиться, что ничего не потеряно.
С Клиентами Кафка, в специальной версии для Java, это очень простой и понятный, но в Spring Kafka требуется внимание со специальных настроек.
В центре внимания этой статье, чтобы показать, как использовать данные с Spring Кафка, который является абстракцией над Клиенты Кафка для Java. Делаем это, следуя setup:
включить.авто.фиксацию=ложь
. Инструкция по выполнению запроса- совершить синхронизацию
Из-за семантики | доставка| at-least-once , регистрация может быть потребляли или п раза, и этот setup поиска уменьшить это.
Потребитель типичный Spring Kafka написана так:
@Component public class SpringKafkaListener { @KafkaListener(topics = "topico") public void consume(String valor) { // Processar valor do registro } }
И создан со следующими параметрами, внесенных в application.свойства
:
spring.kafka.bootstrap-servers=configure-me_kafka-broker:9092 spring.kafka.consumer.client-id=configure-me_client-id spring.kafka.consumer.group-id=configure-me_group-id spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
К счастью, Spring Кафка не сбрасывает значения, поэтому значение по умолчанию, сохраняется так же, как определяется в официальные документы , однако, в нем совершить, – это автоматический и асинхронный. Контудо, нет весенней Кафки, тодас, как необходимые настройки, пара инструкций по фиксации, не требуется, нет приложения.свойства
.
Разрешение проблемы
Spring Кафка-это abstraçao, логотип poll loop commit являются прозрачными. И, как вы можете видеть в примере, потребитель получает только по записи и по умолчанию не имеет доступа к Consumer .
Первый нужно проверить настройки для выключения автоматической фиксации.
Новая конфигурация:
# Nada de novo aqui spring.kafka.bootstrap-servers=configure-me_kafka-broker:9092 spring.kafka.consumer.client-id=configure-me_client-id spring.kafka.consumer.group-id=configure-me_group-id spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # Desliga o commit automático no Cliente Kafka spring.kafka.consumer.enable-auto-commit=false
Spring имеет свои собственные обозначения для большинства конфигураций, имеющихся в Kafka Consumer, которые локализуются во время выполнения, чтобы имя правильно.
Теперь, что commit авто был выключен, необходимы некоторые корректировки программ, сделанных при настройке заводы объектов:
- ackMode для
РУКОВОДСТВО
- синхронизация фиксирует комо
верно
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties.AckMode; @EnableKafka @Configuration public class KafkaConfig { @Autowired KafkaProperties properties; @Bean public ConsumerFactoryconsumerFactory() { return new DefaultKafkaConsumerFactory<>( properties.buildConsumerProperties()); } @Bean public KafkaListenerContainerFactory > kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory listener = new ConcurrentKafkaListenerContainerFactory<>(); listener.setConsumerFactory(consumerFactory()); // Não falhar, caso ainda não existam os tópicos para consumo listener.getContainerProperties() .setMissingTopicsFatal(false); // ### AQUI // Commit manual do offset listener.getContainerProperties().setAckMode(AckMode.MANUAL); // ### AQUI // Commits síncronos listener.getContainerProperties().setSyncCommits(Boolean.TRUE); return listener; } }
Поэтому потребитель с Spring Kafka будет выглядеть следующим образом:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @Component public class SpringKafkaListener { @KafkaListener(topics = "topico") public void consume(@Payload String valor, Acknowledgment ack) { //TODO Processar registro // . . . // Commmit manual, que também será síncrono ack.acknowledge(); } }
Обратите внимание, что даже так, нет доступа к Consumer , вместо того, Spring внедряет экземпляр Acknowledgment
что делает commit имеет свой метод, acknowledge()
выполняется.
Также в этом примере смещение подтверждать каждый реестр обрабатывается. Это то, что ухудшает скорость передачи данных, но еще больше снижает шансы на потребление дубликатов. Хорошо, но каждый случай индивидуален 😊 .
Полный пример доступен на Github:
фабио хосе/skc-бывший
Весенний Потребитель Кафки
Пример потребительского комизма Кафки
Требования
- JDK 1.8
- Доступ к хранилищу Доступ к хранилищу или альтернативы с бесплатным зависимостей, имеющихся в
или альтернативы с бесплатным зависимостей, имеющихся в
Конфигурация
Не волнуйтесь, потому что несмотря на то, что есть и ярлыки по variávies среды, вы можете спокойно использовать то, что Spring Boot предлагает. Тогда смотрите все свойства Тогда смотрите все свойства
В случае, если Кафка, используем Spring, Kafka, то можно использовать режим Spring для настройки.
Сборка и запуск
Знаток
Пара монтар, о фатьяр, выполняй, о командир:
Линукс
./mvnw clean package
Окна
.\mvnw.cmd clean package
Для выполнения:
Вы можете использовать docker-compose.yaml
чтобы подняться Кафка в своей машине
java \ -Dspring.kafka.bootstrap-servers='localhost:9092' \ -Dspring.kafka.consumer.client-id='spring-kafka-ex' \ -Dspring.kafka.consumer.group-id='meu-grupo' \ -jar target/app-spring-boot.jar
Docker
A definiçao Док-файл desta aplicaçao emprega многоступенчатые сборки Это означает, что в нем происходит сборки приложения и созданию образа.
Если…
Photo by Павел Червиньский on Unsplash
Оригинал: “https://dev.to/kafkabr/kafka-consumindo-registros-com-spring-40h8”