Рубрики
Без рубрики

Руководство по весеннему облачному потоку с реестром схем Kafka, Apache Avro и Confluent

Сделайте так, чтобы Spring Cloud поддерживал Кафку с помощью стандартных компонентов и подхода Confluent, включая Avro, реестр схем и стандартный формат двоичных сообщений.

Автор оригинала: baeldung.

1. введение

Apache Kafka-это платформа обмена сообщениями. С его помощью мы можем обмениваться данными между различными приложениями в масштабе.

Spring Cloud Stream-это платформа для создания приложений, управляемых сообщениями. Это может упростить интеграцию Кафки в наши сервисы.

Обычно Кафка используется с форматом сообщений Avro, поддерживаемым реестром схем. В этом руководстве мы будем использовать реестр схем слияния. Мы попробуем как реализацию интеграции Spring с реестром схем Confluent, так и собственные библиотеки Confluent.

2. Реестр Схем Слияния

Кафка представляет все данные в виде байтов, поэтому обычно используют внешнюю схему и сериализуют и десериализуют в байты в соответствии с этой схемой. Вместо того, чтобы предоставлять копию этой схемы с каждым сообщением, что было бы дорогостоящими накладными расходами, также принято хранить схему в реестре и указывать только идентификатор с каждым сообщением.

Реестр схем Confluent обеспечивает простой способ хранения, извлечения и управления схемами. Он предоставляет несколько полезных RESTful API .

Схемы хранятся по субъекту, и по умолчанию реестр проверяет совместимость, прежде чем разрешить загрузку новой схемы для субъекта.

Каждый производитель будет знать схему, с помощью которой он производит, и каждый потребитель должен иметь возможность либо использовать данные в ЛЮБОМ формате, либо иметь конкретную схему, в которой он предпочитает читать. Производитель консультируется с реестром, чтобы установить правильный идентификатор для использования при отправке сообщения. Потребитель использует реестр для получения схемы отправителя.

Когда потребитель знает как схему отправителя, так и свой собственный желаемый формат сообщения, библиотека Avro может преобразовать данные в желаемый формат потребителя.

3. Apache Avro

Apache Avro – это система сериализации данных|/.

Он использует структуру JSON для определения схемы, обеспечивая сериализацию между байтами и структурированными данными.

Одной из сильных сторон Avro является поддержка преобразования сообщений, написанных в одной версии схемы, в формат, определенный совместимой альтернативной схемой.

Набор инструментов Avro также способен генерировать классы для представления структур данных этих схем, что упрощает сериализацию в POJOs и из них.

4. Настройка проекта

Чтобы использовать реестр схем с Spring Cloud Stream , нам понадобится связующее средство Spring Cloud Kafka и реестр схем зависимости Maven:


    org.springframework.cloud
    spring-cloud-stream-binder-kafka



    org.springframework.cloud
    spring-cloud-stream-schema

Для Сериализатора слияния нам нужно:


    io.confluent
    kafka-avro-serializer
    4.0.0

И сериализатор Слияния находится в их репо:


    
        confluent
        https://packages.confluent.io/maven/
    

Кроме того, давайте использовать плагин Maven для создания классов Avro:


    
        
            org.apache.avro
            avro-maven-plugin
            1.8.2
            
                
                    schemas
                    generate-sources
                    
                        schema
                        protocol
                        idl-protocol
                    
                                            
                        ${project.basedir}/src/main/resources/
                        ${project.basedir}/src/main/java/
                    
                
            
        
    

Для тестирования мы можем использовать либо существующий реестр Кафки и схемы, настроенный, либо использовать докеризованное слияние и Кафку.

5. Весенний Облачный Поток

Теперь, когда мы настроили наш проект, давайте напишем продюсера, используя Spring Cloud Stream . Он опубликует сведения о сотрудниках по определенной теме.

Затем мы создадим потребителя, который будет считывать события из темы и записывать их в отчет журнала.

5.1. Схема

Во-первых, давайте определим схему для сведений о сотрудниках. Мы можем назвать его employee-schema.avsc .

Мы можем сохранить файл схемы в src/main/ресурсы:

{
    "type": "record",
    "name": "Employee",
    "namespace": "com.baeldung.schema",
    "fields": [
    {
        "name": "id",
        "type": "int"
    },
    {
        "name": "firstName",
        "type": "string"
    },
    {
        "name": "lastName",
        "type": "string"
    }]
}

После создания приведенной выше схемы нам нужно построить проект. Затем генератор кода Apache Avro создаст POJO с именем Сотрудник в пакете com.baeldung.schema .

5.2. Производитель

Spring Cloud Stream предоставляет интерфейс Процессор . Это обеспечивает нам выходной и входной каналы.

Давайте воспользуемся этим, чтобы создать производителя, который отправляет Сотруднику объекты в сведения о сотруднике тему Кафки :

@Autowired
private Processor processor;

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

    // creating employee details
    Employee employee = new Employee();
    employee.setId(empId);
    employee.setFirstName(firstName);
    employee.setLastName(lastName);

    Message message = MessageBuilder.withPayload(employee)
                .build();

    processor.output()
        .send(message);
}

5.2. Потребитель

Теперь давайте напишем нашему потребителю:

@StreamListener(Processor.INPUT)
public void consumeEmployeeDetails(Employee employeeDetails) {
    logger.info("Let's process employee details: {}", employeeDetails);
}

Этот потребитель будет читать события, опубликованные в теме сведения о сотрудниках . Давайте направим его вывод в журнал, чтобы посмотреть, что он делает.

5.3. Привязки Кафки

До сих пор мы работали только с входными и выходными каналами нашего процессора объекта. Эти каналы нуждаются в настройке с правильными назначениями.

Давайте использовать application.yml для предоставления привязок Кафки:

spring:
  cloud:
    stream: 
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro

Следует отметить, что в данном случае пункт назначения означает тему Кафки. Может немного сбивать с толку то, что он называется пункт назначения , поскольку в данном случае он является источником входных данных, но это согласованный термин для потребителей и производителей.

5.4. Точка Входа

Теперь, когда у нас есть наш производитель и потребитель, давайте представим API, который будет принимать входные данные от пользователя и передавать их производителю:

@Autowired
private AvroProducer avroProducer;

@PostMapping("/employees/{id}/{firstName}/{lastName}")
public String producerAvroMessage(@PathVariable int id, @PathVariable String firstName, 
  @PathVariable String lastName) {
    avroProducer.produceEmployeeDetails(id, firstName, lastName);
    return "Sent employee details to consumer";
}

5.5. Включите реестр и привязки схемы слияния

Наконец, чтобы наше приложение применяло привязки реестра Kafka и схемы, нам нужно будет добавить @EnableBinding и @EnableSchemaRegistryClient в один из наших классов конфигурации:

@SpringBootApplication
@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
public class AvroKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(AvroKafkaApplication.class, args);
    }

}

И мы должны предоставить Клиент реестра схемы слияния компонент:

@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}")
private String endPoint;

@Bean
public SchemaRegistryClient schemaRegistryClient() {
    ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
    client.setEndpoint(endPoint);
    return client;
}

Конечная точка – это URL-адрес реестра схемы слияния.

5.6. Тестирование Нашего Сервиса

Давайте протестируем сервис с помощью запроса POST:

curl -X POST localhost:8080/employees/1001/Harry/Potter

Журналы говорят нам, что это сработало:

2019-06-11 18:45:45.343  INFO 17036 --- [container-0-C-1] com.baeldung.consumer.AvroConsumer       : Let's process employee details: {"id": 1001, "firstName": "Harry", "lastName": "Potter"}

5.7. Что Произошло Во Время Обработки?

Давайте попробуем понять, что именно произошло с нашим примером приложения:

  1. Производитель создал сообщение Кафки, используя объект Сотрудник
  2. Производитель зарегистрировал схему сотрудника в реестре схем, чтобы получить идентификатор версии схемы, при этом либо создается новый идентификатор, либо повторно используется существующий для этой точной схемы
  3. Avro сериализовал объект Сотрудник , используя схему
  4. Spring Cloud поместил идентификатор схемы в заголовки сообщений
  5. Сообщение было опубликовано по теме
  6. Когда сообщение пришло потребителю, он прочитал идентификатор схемы из заголовка
  7. Потребитель использовал идентификатор схемы для получения Сотрудника схемы из реестра
  8. Потребитель нашел локальный класс, который мог бы представлять этот объект, и десериализовал в него сообщение

6. Сериализация/Десериализация С Использованием Собственных Библиотек Кафки

Spring Boot предоставляет несколько готовых конвертеров сообщений. По умолчанию Spring Boot использует заголовок Тип содержимого для выбора соответствующего конвертера сообщений.

В нашем примере Тип содержимого является приложением/*+avro, Следовательно, он использовал Конвертер сообщений схемы Avro для чтения и записи форматов Avro. Но Confluent рекомендует использовать KafkaAvroSerializer и Kafka AvroDeserializer для преобразования сообщений .

Хотя собственный формат Spring работает хорошо, у него есть некоторые недостатки с точки зрения разделения, и он не совместим со стандартами Confluent, что может потребоваться для некоторых служб, не относящихся к Spring, в нашем экземпляре Kafka.

Давайте обновим наш application.yml , чтобы использовать конвертеры слияния:

spring:
  cloud:
    stream:
      default: 
        producer: 
          useNativeEncoding: true
        consumer:  
          useNativeEncoding: true     
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro
      kafka:
         binder:        
           producer-properties:
             key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             schema.registry.url: http://localhost:8081 
           consumer-properties:
             key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             schema.registry.url: http://localhost:8081
             specific.avro.reader: true

Мы включили useNativeEncoding . Это заставляет Spring Cloud Stream делегировать сериализацию предоставленным классам.

Мы также должны знать, как мы можем предоставить собственные свойства настроек для Kafka в Spring Cloud, используя kafka.binder.producer-свойства и kafka.binder.consumer-свойства.

7. Группы потребителей и разделы

Группы потребителей-это набор потребителей, принадлежащих к одному и тому же приложению . Потребители из одной и той же группы потребителей имеют одно и то же название группы.

Давайте обновим application.yml , чтобы добавить имя группы потребителей:

spring:
  cloud:
    stream:
      // ...     
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
      // ...

Все потребители равномерно распределяют разделы темы между собой. Сообщения в разных разделах будут обрабатываться параллельно.

В группе потребителей максимальное количество потребителей, считывающих сообщения за раз, равно количеству разделов. Таким образом, мы можем настроить количество разделов и потребителей, чтобы получить желаемый параллелизм. В целом, у нас должно быть больше разделов, чем общее количество потребителей во всех репликах нашего сервиса.

7.1. Ключ раздела

При обработке наших сообщений может быть важен порядок их обработки. Когда наши сообщения обрабатываются параллельно, последовательность обработки будет трудно контролировать.

Кафка устанавливает правило, согласно которому в данном разделе сообщения всегда обрабатываются в той последовательности, в какой они поступили . Поэтому, когда важно, чтобы определенные сообщения обрабатывались в правильном порядке, мы гарантируем, что они попадают в один раздел друг с другом.

Мы можем предоставить ключ раздела при отправке сообщения в тему. Сообщения с одним и тем же ключом раздела всегда будут отправляться в один и тот же раздел . Если ключ раздела отсутствует, сообщения будут разделяться циклически.

Давайте попробуем понять это на примере. Представьте, что мы получаем несколько сообщений от сотрудника и хотим обработать все сообщения сотрудника в определенной последовательности. Название отдела и идентификатор сотрудника могут однозначно идентифицировать сотрудника.

Итак, давайте определим ключ раздела с идентификатором сотрудника и названием отдела:

{
    "type": "record",
    "name": "EmployeeKey",
    "namespace": "com.baeldung.schema",
    "fields": [
     {
        "name": "id",
        "type": "int"
    },
    {
        "name": "departmentName",
        "type": "string"
    }]
}

После создания проекта ключ Сотрудника POJO будет сгенерирован в пакете com.baeldung.schema .

Давайте обновим нашего производителя, чтобы использовать EmployeeKey в качестве ключа раздела:

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

    // creating employee details
    Employee employee = new Employee();
    employee.setId(empId);
    // ...

    // creating partition key for kafka topic
    EmployeeKey employeeKey = new EmployeeKey();
    employeeKey.setId(empId);
    employeeKey.setDepartmentName("IT");

    Message message = MessageBuilder.withPayload(employee)
        .setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey)
        .build();

    processor.output()
        .send(message);
}

Здесь мы помещаем ключ раздела в заголовок сообщения.

Теперь один и тот же раздел будет получать сообщения с одинаковым идентификатором сотрудника и названием отдела.

7.2 Параллелизм потребителей

Spring Cloud Stream позволяет нам установить параллелизм для потребителя в application.yml :

spring:
  cloud:
    stream:
      // ... 
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
          concurrency: 3

Теперь наши потребители будут читать три сообщения из этой темы одновременно. Другими словами, Spring создаст три разных потока для самостоятельного использования.

8. Заключение

В этой статье мы интегрировали производителя и потребителя в Apache Kafka со схемами Avro и реестром схем слияния .

Мы сделали это в одном приложении, но производитель и потребитель могли бы быть развернуты в разных приложениях и могли бы иметь свои собственные версии схем, синхронизированные через реестр.

Мы рассмотрели, как использовать Spring реализацию Avro и клиента реестра схем, а затем мы увидели, как переключиться на стандартную реализацию слияния сериализации и десериализации для целей взаимодействия.

Наконец, мы рассмотрели, как разделить вашу тему и убедиться, что у нас есть правильные ключи сообщений, чтобы обеспечить безопасную параллельную обработку наших сообщений.

Полный код, используемый для этой статьи, можно найти на GitHub .