Рубрики
Без рубрики

Тестирование Кафки и весенней загрузки

Узнайте о нескольких подходах к тестированию приложений Kafka с помощью Spring Boot.

Автор оригинала: Jonathan Cook.

Тестирование Кафки и весенней загрузки

1. Обзор

Апач Кафка является мощной, распределенной, неуясной системой обработки потоков. В предыдущем учебнике мы узнали как работать с Spring и Kafka .

В этом учебнике Мы будем опираться на предыдущий и узнать, как писать надежные, автономные интеграционные тесты, которые не полагаются на внешний сервер Kafka, работающий .

Во-первых, мы начнем, но посмотрим, как использовать и настроить встроенный экземпляр Kafka. Тогда мы увидим, как мы можем использовать популярные рамки Тестконтейнеры из наших тестов.

2. Зависимости

Конечно, нам нужно будет добавить стандартную весенне-кафка зависимость к нашему пом.xml :


    org.springframework.kafka
    spring-kafka
    2.6.3.RELEASE

Тогда нам понадобится еще две зависимости специально для наших тестов . Во-первых, мы добавим весенне-кафка-тест артефакт :


    org.springframework.kafka
    spring-kafka-test
    2.6.3.RELEASE
    test

И, наконец, мы добавим testcontainers Кафка зависимость, которая также доступна на Мавен Центральный :


    org.testcontainers
    kafka
    1.15.0
    test

Теперь, когда у нас настроены все необходимые зависимости, мы можем написать простое приложение Spring Boot с помощью Kafka.

3. Простое приложение производителя-потребителя Кафки

На протяжении всего этого учебника, в центре нашего теста будет простой производитель-потребитель Весна Boot Kafka приложения.

Начнем с определения точки входа в нашу заявку:

@SpringBootApplication
@EnableAutoConfiguration
public class KafkaProducerConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerConsumerApplication.class, args);
    }
}

Как мы видим, это стандартное приложение Spring Boot. Там, где это возможно, мы хотим использовать значения конфигурации по умолчанию . С этим в виду, мы используем @EnableAutoConfiguration аннотация к автоматической конфинг наше приложение.

3.1. Настройка производителя

Далее рассмотрим фасоль производителя, которую мы будем использовать для отправки сообщений на тему Kafka:

@Component
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send(String topic, String payload) {
        LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}

Наша КафкаПродюсер фасоль определяется выше, это просто обертка вокруг КафкаТемпле класса. Этот класс обеспечивает высокоуровневые операции, безопасные для потоков, такие как отправка данных на предоставленную тему, что является именно тем, что мы делаем в наших отправить метод .

3.2. Настройка потребителей

Аналогичным образом, теперь мы определим простую потребительскую фасоль, которая будет слушать тему Кафки и получать сообщения:

@Component
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        setPayload(consumerRecord.toString());
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }
}

Наш простой потребитель пользуется @KafkaListener аннотация на получать метод прослушивания сообщений по данной теме. Позже мы увидим, как мы настраиваем test.topic из наших тестов.

Кроме того, метод получения хранит содержимое сообщения в нашей фасоли и осуждает количество защелка переменная. Эта переменная является простой нить-безопасный счетчик поле, которое мы будем использовать позже из наших тестов, чтобы гарантировать, что мы успешно получили сообщение .

Теперь, когда у нас есть наше простое приложение Kafka с помощью Spring Boot реализованы давайте посмотрим, как мы можем написать интеграционные тесты.

4. Слово о тестировании

Как правило, при написании чистых интеграционных тестов мы не должны зависеть от внешних служб, которые мы, возможно, не сможем контролировать или могут внезапно перестать работать . Это может негативно сказаться на результатах наших тестов.

Аналогичным образом, если мы зависим от внешнего сервиса, в данном случае, работающий брокер Kafka, мы, вероятно, не сможем настроить его, контролировать его и снести его так, как мы хотим от наших тестов.

4.1. Свойства применения

Мы собираемся использовать очень легкий набор свойств конфигурации приложений из наших тестов. Мы определим эти свойства в нашем src/test/resources/application.yml файл:

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: baeldung
test:
  topic: embedded-test-topic

Это минимальный набор свойств, которые нам нужны при работе со встроенным экземпляром Kafka или местным брокером.

Большинство из них являются самоочевидными, но тот, который мы должны подчеркнуть особое значение является потребительский автоматическая смещение сброса: самые ранние . Это свойство гарантирует, что наша группа потребителей получает сообщения, которые мы отправляем, потому что контейнер может начаться после завершения отправки.

Кроме того, мы настраиваем свойство темы со значением встроенный тест-тема , которая является темой, которую мы будем использовать из наших тестов.

5. Тестирование с использованием встроенных Кафка

В этом разделе мы посмотрим, как использовать экземпляр Kafka в памяти для пользования тестами. Это также известно как Встроенный Кафка.

Зависимость весенне-кафка-тест мы добавили ранее содержит некоторые полезные утилиты, чтобы помочь с тестированием нашего приложения. В частности, он содержит ВстроенныйКафкаБрокер класс .

С этим в виду, давайте идти вперед и написать наш первый тест интеграции:

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() 
      throws Exception {
        producer.send(topic, "Sending with own simple KafkaProducer");
        consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
        
        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
    }
}

Давайте пройдитесь по ключевым частям нашего теста. Во-первых, мы начинаем с украшения нашего тестового класса двумя довольно стандартными весенними аннотациями:

  • @SpringBootTest аннотация гарантирует, что наши тестовые загрузки контекст приложения Весна
  • Мы также используем @DirtiesContext аннотация, которая позволит очистить этот контекст и сбросить между различными тестами

А вот и важнейшая часть, мы используем @EmbeddedKafka аннотация для введения экземпляра ВстроенныйКафкаБрокер в наши тесты . Кроме того, есть несколько свойств, которые мы можем использовать для настройки встроенного узла Kafka:

  • разделы – это количество разделов, используемых на одну тему. Чтобы держать вещи красивыми и простыми, мы только хотим, чтобы один, который будет использоваться из наших тестов
  • брокерПредложение – дополнительные свойства для брокера Кафка. Опять же мы держим вещи простыми и указать простой текст слушателя и номер порта

Далее, мы автоматически провода нашей потребительские и продюсер классов и …

Для заключительной части головоломки, мы просто отправляем сообщение на нашу тестовую тему и проверяем, что сообщение получено, и содержит название нашей темы тестирования .

Когда мы заируем наш тест, мы увидим среди многословных Выход весны:

...
12:45:35.099 [main] INFO  c.b.kafka.embedded.KafkaProducer -
  sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
  INFO  c.b.kafka.embedded.KafkaConsumer - received payload=
  'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
  CreateTime = 1605267935099, serialized key size = -1, 
  serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
  key = null, value = Sending with our own simple KafkaProducer)'

Это подтверждает, что наш тест работает должным образом. Потрясающий! Теперь у нас есть способ написать автономные, независимые интеграционные тесты с помощью в памяти кафки брокер .

6. Тестирование Кафки с TestContainers

Иногда мы можем увидеть небольшие различия между реальным внешним сервисом и встроенным экземпляром памяти службы, специально предоставленной для целей тестирования. Хотя маловероятно, это также может быть, что порт, используемый из нашего теста могут быть заняты, в результате чего .

С этим в виду, в этом разделе, мы увидим вариации на наш предыдущий подход к тестированию с использованием testcontainers рамки . Мы увидим, как мгновенно и управлять внешним брокером Apache Kafka, размещенным внутри контейнера Docker из нашего интеграционного теста.

Давайте определим еще один интеграционный тест, который будет очень похож на тот, который мы видели в предыдущем разделе:

@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {

    @ClassRule
    public static KafkaContainer kafka = 
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() 
      throws Exception {
        producer.send(topic, "Sending with own controller");
        consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
        
        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
    }
}

Давайте посмотрим на различия на этот раз. Мы объявляем кафка поле, которое является стандартным JUnit @ClassRule . Это поле является примером КафкаКонтейнер класс, который будет готовить и управлять жизненным циклом нашего контейнера работает Кафка.

Чтобы избежать столкновений в портах, Testcontainers динамически выделяет номер порта, когда начинается наш контейнер докера. По этой причине мы предоставляем пользовательскую конфигурацию фабрики для потребителей и производителей, используя КафкаТестКонтейнерыКонфигурация :

@Bean
public Map consumerConfigs() {
    Map props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
    // more standard configuration
    return props;
}

@Bean
public ProducerFactory producerFactory() {
    Map configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    // more standard configuration
    return new DefaultKafkaProducerFactory<>(configProps);
}

Затем мы ссылаемся на эту конфигурацию через @Import аннотация в начале нашего теста.

Причина этого заключается в том, что нам нужен способ введения адреса сервера в наше приложение, которое, как упоминалось ранее, генерируется динамически. Мы достигаем этого, называя getBootstrapServers () метод, который вернет местоположение сервера загрузки:

bootstrap.servers = [PLAINTEXT://localhost:32789]

Теперь, когда мы запускаем наш тест, мы должны увидеть, что Testcontainers делает несколько вещей:

  • Проверяет нашу местную установку Docker.
  • Тянет слияние/cp-kafka:5.4.3 докер изображение, если это необходимо
  • Запускает новый контейнер и ждет его готовности
  • Наконец, выключает и удаляет контейнер после завершения теста

Опять же, это подтверждается проверкой результатов тестирования:

13:33:10.396 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

престо! Рабочий интеграционный тест с использованием контейнера kafka docker.

7. Заключение

В этой статье мы узнали о нескольких подходах для тестирования приложений Kafka с Spring Boot. В первом подходе мы увидели, как настроить и использовать локального брокера в памяти Kafka.

Затем мы увидели, как использовать Testcontainers для настройки внешнего брокера Kafka работает внутри контейнера докера из наших тестов.

Как всегда, полный исходный код статьи доступен более на GitHub .