Потребление данных от Кафки, в зависимости от случая использования, требует некоторой осторожности в commit. Сделать это правильно, важно убедиться, что ничего не потеряно.
С Клиентами Кафка, в специальной версии для Java, это очень простой и понятный, но в Spring Kafka требуется внимание со специальных настроек.
В центре внимания этой статье, чтобы показать, как использовать данные с Spring Кафка, который является абстракцией над Клиенты Кафка для Java. Делаем это, следуя setup:
включить.авто.фиксацию=ложь. Инструкция по выполнению запроса- совершить синхронизацию
Из-за семантики | доставка| at-least-once , регистрация может быть потребляли или п раза, и этот setup поиска уменьшить это.
Потребитель типичный Spring Kafka написана так:
@Component
public class SpringKafkaListener {
@KafkaListener(topics = "topico")
public void consume(String valor) {
// Processar valor do registro
}
}
И создан со следующими параметрами, внесенных в application.свойства :
spring.kafka.bootstrap-servers=configure-me_kafka-broker:9092 spring.kafka.consumer.client-id=configure-me_client-id spring.kafka.consumer.group-id=configure-me_group-id spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
К счастью, Spring Кафка не сбрасывает значения, поэтому значение по умолчанию, сохраняется так же, как определяется в официальные документы , однако, в нем совершить, – это автоматический и асинхронный. Контудо, нет весенней Кафки, тодас, как необходимые настройки, пара инструкций по фиксации, не требуется, нет приложения.свойства .
Разрешение проблемы
Spring Кафка-это abstraçao, логотип poll loop commit являются прозрачными. И, как вы можете видеть в примере, потребитель получает только по записи и по умолчанию не имеет доступа к Consumer .
Первый нужно проверить настройки для выключения автоматической фиксации.
Новая конфигурация:
# Nada de novo aqui spring.kafka.bootstrap-servers=configure-me_kafka-broker:9092 spring.kafka.consumer.client-id=configure-me_client-id spring.kafka.consumer.group-id=configure-me_group-id spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # Desliga o commit automático no Cliente Kafka spring.kafka.consumer.enable-auto-commit=false
Spring имеет свои собственные обозначения для большинства конфигураций, имеющихся в Kafka Consumer, которые локализуются во время выполнения, чтобы имя правильно.
Теперь, что commit авто был выключен, необходимы некоторые корректировки программ, сделанных при настройке заводы объектов:
- ackMode для
РУКОВОДСТВО - синхронизация фиксирует комо
верно
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
@EnableKafka
@Configuration
public class KafkaConfig {
@Autowired
KafkaProperties properties;
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
properties.buildConsumerProperties());
}
@Bean
public KafkaListenerContainerFactory>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory listener =
new ConcurrentKafkaListenerContainerFactory<>();
listener.setConsumerFactory(consumerFactory());
// Não falhar, caso ainda não existam os tópicos para consumo
listener.getContainerProperties()
.setMissingTopicsFatal(false);
// ### AQUI
// Commit manual do offset
listener.getContainerProperties().setAckMode(AckMode.MANUAL);
// ### AQUI
// Commits síncronos
listener.getContainerProperties().setSyncCommits(Boolean.TRUE);
return listener;
}
}
Поэтому потребитель с Spring Kafka будет выглядеть следующим образом:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class SpringKafkaListener {
@KafkaListener(topics = "topico")
public void consume(@Payload String valor, Acknowledgment ack) {
//TODO Processar registro
// . . .
// Commmit manual, que também será síncrono
ack.acknowledge();
}
}
Обратите внимание, что даже так, нет доступа к Consumer , вместо того, Spring внедряет экземпляр Acknowledgment что делает commit имеет свой метод, acknowledge() выполняется.
Также в этом примере смещение подтверждать каждый реестр обрабатывается. Это то, что ухудшает скорость передачи данных, но еще больше снижает шансы на потребление дубликатов. Хорошо, но каждый случай индивидуален 😊 .
Полный пример доступен на Github:
фабио хосе/skc-бывший
Весенний Потребитель Кафки
Пример потребительского комизма Кафки
Требования
- JDK 1.8
- Доступ к хранилищу Доступ к хранилищу или альтернативы с бесплатным зависимостей, имеющихся в
или альтернативы с бесплатным зависимостей, имеющихся в
Конфигурация
Не волнуйтесь, потому что несмотря на то, что есть и ярлыки по variávies среды, вы можете спокойно использовать то, что Spring Boot предлагает. Тогда смотрите все свойства Тогда смотрите все свойства
В случае, если Кафка, используем Spring, Kafka, то можно использовать режим Spring для настройки.
Сборка и запуск
Знаток
Пара монтар, о фатьяр, выполняй, о командир:
Линукс
./mvnw clean package
Окна
.\mvnw.cmd clean package
Для выполнения:
Вы можете использовать docker-compose.yaml чтобы подняться Кафка в своей машине
java \ -Dspring.kafka.bootstrap-servers='localhost:9092' \ -Dspring.kafka.consumer.client-id='spring-kafka-ex' \ -Dspring.kafka.consumer.group-id='meu-grupo' \ -jar target/app-spring-boot.jar
Docker
A definiçao Док-файл desta aplicaçao emprega многоступенчатые сборки Это означает, что в нем происходит сборки приложения и созданию образа.
Если…
Photo by Павел Червиньский on Unsplash
Оригинал: “https://dev.to/kafkabr/kafka-consumindo-registros-com-spring-40h8”