Рубрики
Без рубрики

Вступление к Apache Kafka с Spring

Краткое и практическое руководство по использованию Apache Kafka с Spring.

Автор оригинала: baeldung.

1. Обзор

Apache Kafka -это распределенная и отказоустойчивая система обработки потоков.

В этом уроке мы рассмотрим поддержку Spring для Kafka и уровень абстракций, которые она предоставляет по сравнению с собственными API-интерфейсами Java-клиента Kafka.

Spring Kafka приносит простую и типичную модель программирования шаблонов Spring с шаблоном Kafka и управляемыми сообщениями POJOs через @KafkaListener аннотацию.

Дальнейшее чтение:

Построение конвейера данных с помощью Flink и Kafka

Пример соединения Kafka с MQTT и MongoDB

2. Установка и настройка

Чтобы скачать и установить Kafka, пожалуйста, обратитесь к официальному руководству здесь .

Нам также нужно добавить spring-kafka зависимость к вашему pom.xml :


    org.springframework.kafka
    spring-kafka
    2.5.8.RELEASE

Последнюю версию этого артефакта можно найти здесь .

Нашим примером приложения будет приложение Spring Boot.

В этой статье предполагается, что сервер запускается с использованием конфигурации по умолчанию и никакие порты сервера не изменяются.

3. Настройка тем

Ранее мы запускали инструменты командной строки для создания тем в Kafka:

$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic mytopic

Но с введением Admin Client в Kafka мы теперь можем создавать темы программно.

Нам нужно добавить Kafka Admin Spring bean, который автоматически добавит темы для всех бобов типа NewTopic :

@Configuration
public class KafkaTopicConfig {
    
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1() {
         return new NewTopic("baeldung", 1, (short) 1);
    }
}

4. Создание Сообщений

Чтобы создать сообщения, нам сначала нужно настроить Producer Factory . Это задает стратегию создания экземпляров Kafka Producer .

Тогда нам нужен Шаблон Кафки , который обертывает Производитель экземпляр и предоставляет удобные методы для отправки сообщений на темы Кафки.

Экземпляры Producer являются потокобезопасными. Таким образом, использование одного экземпляра во всем контексте приложения даст более высокую производительность. Следовательно, экземпляры Kakfa Template также потокобезопасны, и рекомендуется использовать один экземпляр.

4.1. Конфигурация производителя

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory producerFactory() {
        Map configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

4.2. Публикация сообщений

Мы можем отправлять сообщения с помощью класса Kafka Template :

@Autowired
private KafkaTemplate kafkaTemplate;

public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

API send возвращает объект ListenableFuture . Если мы хотим заблокировать отправляющий поток и получить результат об отправленном сообщении, мы можем вызвать get API объекта ListenableFuture . Поток будет ждать результата, но это замедлит производителя.

Kafka-это платформа быстрой потоковой обработки. Поэтому лучше обрабатывать результаты асинхронно, чтобы последующие сообщения не ждали результата предыдущего сообщения.

Мы можем сделать это через обратный вызов:

public void sendMessage(String message) {
            
    ListenableFuture> future = 
      kafkaTemplate.send(topicName, message);
	
    future.addCallback(new ListenableFutureCallback>() {

        @Override
        public void onSuccess(SendResult result) {
            System.out.println("Sent message=[" + message + 
              "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=[" 
              + message + "] due to : " + ex.getMessage());
        }
    });
}

5. Потребление сообщений

5.1. Конфигурация потребителя

Для потребления сообщений нам нужно настроить Consumer Factory и Kafka Listener Container Factory . Как только эти бобы будут доступны в Spring beanfactory, потребители на основе POJO могут быть сконфигурированы с помощью аннотации @KafkaListener .

@EnableKafka аннотация требуется для класса конфигурации, чтобы включить обнаружение @KafkaListener аннотации на управляемых пружиной бобах :

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory consumerFactory() {
        Map props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG, 
          groupId);
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory 
      kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

5.2. Потребление сообщений

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

Мы можем реализовать несколько слушателей для темы , каждый из которых имеет свой идентификатор группы. Кроме того, один потребитель может прослушивать сообщения из различных тем:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring также поддерживает извлечение одного или нескольких заголовков сообщений с помощью аннотации @Header в прослушивателе:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

5.3. Потребление сообщений из определенного раздела

Обратите внимание, что мы создали тему baeldung только с одним разделом.

Однако для темы с несколькими разделами a @KafkaListener может явно подписаться на определенный раздел темы с начальным смещением:

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}),
  containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

Поскольку в этом прослушивателе начальное смещение было установлено равным 0, все ранее потребленные сообщения из разделов 0 и 3 будут повторно потребляться каждый раз при инициализации этого прослушивателя.

Если нам не нужно устанавливать смещение, мы можем использовать свойство partitions аннотации @TopicPartition для установки только разделов без смещения:

@KafkaListener(topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. Добавление фильтра сообщений для слушателей

Мы можем настроить слушателей на использование определенных типов сообщений, добавив пользовательский фильтр. Это можно сделать, установив стратегию Record Filter на фабрику контейнеров Kafka Listener :

@Bean
public ConcurrentKafkaListenerContainerFactory
  filterKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
      record -> record.value().contains("World"));
    return factory;
}

Затем мы можем настроить прослушиватель для использования этой фабрики контейнеров:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}

В этом прослушивателе все сообщения , соответствующие фильтру, будут отброшены.

6. Пользовательские Конвертеры сообщений

До сих пор мы рассматривали только отправку и получение строк в виде сообщений. Однако мы также можем отправлять и получать пользовательские объекты Java. Это требует настройки соответствующего сериализатора в Producer Factory и десериализатора в Consumer Factory .

Давайте рассмотрим простой класс bean , который мы будем отправлять в виде сообщений:

public class Greeting {

    private String msg;
    private String name;

    // standard getters, setters and constructor
}

6.1. Создание Пользовательских Сообщений

В этом примере мы будем использовать JsonSerializer .

Давайте посмотрим на код для Producer Factory и Kafka Template :

@Bean
public ProducerFactory greetingProducerFactory() {
    // ...
    configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

Мы можем использовать этот новый шаблон Kafka для отправки Приветствия сообщения:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. Использование Пользовательских Сообщений

Аналогично, давайте изменим Consumer Factory и Kafka Listener Container Factory , чтобы десериализовать приветствие правильно:

@Bean
public ConsumerFactory greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory 
  greetingKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

Сериализатор и десериализатор spring-kafka JSON использует библиотеку Jackson, которая также является необязательной зависимостью Maven для проекта spring-kafka.

Итак, давайте добавим его к вашему pom.xml :


    com.fasterxml.jackson.core
    jackson-databind
    2.9.7

Вместо использования последней версии Jackson рекомендуется использовать версию, добавленную в pom.xml весны-кафка.

Наконец, нам нужно написать слушателя для потребления Приветствия сообщений:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // process greeting message
}

7. Заключение

В этой статье мы рассмотрели основы поддержки Spring для Apache Kafka. Мы кратко рассмотрели классы, используемые для отправки и получения сообщений.

Полный исходный код этой статьи можно найти на GitHub .

Перед запуском кода, пожалуйста, убедитесь, что Kafka server работает и что темы создаются вручную.