Автор оригинала: Jonathan Cook.
Тестирование Кафки и весенней загрузки
1. Обзор
Апач Кафка является мощной, распределенной, неуясной системой обработки потоков. В предыдущем учебнике мы узнали как работать с Spring и Kafka .
В этом учебнике Мы будем опираться на предыдущий и узнать, как писать надежные, автономные интеграционные тесты, которые не полагаются на внешний сервер Kafka, работающий .
Во-первых, мы начнем, но посмотрим, как использовать и настроить встроенный экземпляр Kafka. Тогда мы увидим, как мы можем использовать популярные рамки Тестконтейнеры из наших тестов.
2. Зависимости
Конечно, нам нужно будет добавить стандартную весенне-кафка зависимость к нашему пом.xml :
org.springframework.kafka spring-kafka 2.6.3.RELEASE
Тогда нам понадобится еще две зависимости специально для наших тестов . Во-первых, мы добавим весенне-кафка-тест артефакт :
org.springframework.kafka spring-kafka-test 2.6.3.RELEASE test
И, наконец, мы добавим testcontainers Кафка зависимость, которая также доступна на Мавен Центральный :
org.testcontainers kafka 1.15.0 test
Теперь, когда у нас настроены все необходимые зависимости, мы можем написать простое приложение Spring Boot с помощью Kafka.
3. Простое приложение производителя-потребителя Кафки
На протяжении всего этого учебника, в центре нашего теста будет простой производитель-потребитель Весна Boot Kafka приложения.
Начнем с определения точки входа в нашу заявку:
@SpringBootApplication @EnableAutoConfiguration public class KafkaProducerConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaProducerConsumerApplication.class, args); } }
Как мы видим, это стандартное приложение Spring Boot. Там, где это возможно, мы хотим использовать значения конфигурации по умолчанию . С этим в виду, мы используем @EnableAutoConfiguration аннотация к автоматической конфинг наше приложение.
3.1. Настройка производителя
Далее рассмотрим фасоль производителя, которую мы будем использовать для отправки сообщений на тему Kafka:
@Component public class KafkaProducer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class); @Autowired private KafkaTemplatekafkaTemplate; public void send(String topic, String payload) { LOGGER.info("sending payload='{}' to topic='{}'", payload, topic); kafkaTemplate.send(topic, payload); } }
Наша КафкаПродюсер фасоль определяется выше, это просто обертка вокруг КафкаТемпле класса. Этот класс обеспечивает высокоуровневые операции, безопасные для потоков, такие как отправка данных на предоставленную тему, что является именно тем, что мы делаем в наших отправить метод .
3.2. Настройка потребителей
Аналогичным образом, теперь мы определим простую потребительскую фасоль, которая будет слушать тему Кафки и получать сообщения:
@Component public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); private CountDownLatch latch = new CountDownLatch(1); private String payload = null; @KafkaListener(topics = "${test.topic}") public void receive(ConsumerRecord, ?> consumerRecord) { LOGGER.info("received payload='{}'", consumerRecord.toString()); setPayload(consumerRecord.toString()); latch.countDown(); } public CountDownLatch getLatch() { return latch; } public String getPayload() { return payload; } }
Наш простой потребитель пользуется @KafkaListener аннотация на получать метод прослушивания сообщений по данной теме. Позже мы увидим, как мы настраиваем test.topic из наших тестов.
Кроме того, метод получения хранит содержимое сообщения в нашей фасоли и осуждает количество защелка переменная. Эта переменная является простой нить-безопасный счетчик поле, которое мы будем использовать позже из наших тестов, чтобы гарантировать, что мы успешно получили сообщение .
Теперь, когда у нас есть наше простое приложение Kafka с помощью Spring Boot реализованы давайте посмотрим, как мы можем написать интеграционные тесты.
4. Слово о тестировании
Как правило, при написании чистых интеграционных тестов мы не должны зависеть от внешних служб, которые мы, возможно, не сможем контролировать или могут внезапно перестать работать . Это может негативно сказаться на результатах наших тестов.
Аналогичным образом, если мы зависим от внешнего сервиса, в данном случае, работающий брокер Kafka, мы, вероятно, не сможем настроить его, контролировать его и снести его так, как мы хотим от наших тестов.
4.1. Свойства применения
Мы собираемся использовать очень легкий набор свойств конфигурации приложений из наших тестов. Мы определим эти свойства в нашем src/test/resources/application.yml файл:
spring: kafka: consumer: auto-offset-reset: earliest group-id: baeldung test: topic: embedded-test-topic
Это минимальный набор свойств, которые нам нужны при работе со встроенным экземпляром Kafka или местным брокером.
Большинство из них являются самоочевидными, но тот, который мы должны подчеркнуть особое значение является потребительский автоматическая смещение сброса: самые ранние . Это свойство гарантирует, что наша группа потребителей получает сообщения, которые мы отправляем, потому что контейнер может начаться после завершения отправки.
Кроме того, мы настраиваем свойство темы со значением встроенный тест-тема , которая является темой, которую мы будем использовать из наших тестов.
5. Тестирование с использованием встроенных Кафка
В этом разделе мы посмотрим, как использовать экземпляр Kafka в памяти для пользования тестами. Это также известно как Встроенный Кафка.
Зависимость весенне-кафка-тест мы добавили ранее содержит некоторые полезные утилиты, чтобы помочь с тестированием нашего приложения. В частности, он содержит ВстроенныйКафкаБрокер класс .
С этим в виду, давайте идти вперед и написать наш первый тест интеграции:
@SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) class EmbeddedKafkaIntegrationTest { @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Value("${test.topic}") private String topic; @Test public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own simple KafkaProducer"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getPayload(), containsString("embedded-test-topic")); } }
Давайте пройдитесь по ключевым частям нашего теста. Во-первых, мы начинаем с украшения нашего тестового класса двумя довольно стандартными весенними аннотациями:
- @SpringBootTest аннотация гарантирует, что наши тестовые загрузки контекст приложения Весна
- Мы также используем @DirtiesContext аннотация, которая позволит очистить этот контекст и сбросить между различными тестами
А вот и важнейшая часть, мы используем @EmbeddedKafka аннотация для введения экземпляра ВстроенныйКафкаБрокер в наши тесты . Кроме того, есть несколько свойств, которые мы можем использовать для настройки встроенного узла Kafka:
- разделы – это количество разделов, используемых на одну тему. Чтобы держать вещи красивыми и простыми, мы только хотим, чтобы один, который будет использоваться из наших тестов
- брокерПредложение – дополнительные свойства для брокера Кафка. Опять же мы держим вещи простыми и указать простой текст слушателя и номер порта
Далее, мы автоматически провода нашей потребительские и продюсер классов и …
Для заключительной части головоломки, мы просто отправляем сообщение на нашу тестовую тему и проверяем, что сообщение получено, и содержит название нашей темы тестирования .
Когда мы заируем наш тест, мы увидим среди многословных Выход весны:
... 12:45:35.099 [main] INFO c.b.kafka.embedded.KafkaProducer - sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic' ... 12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.b.kafka.embedded.KafkaConsumer - received payload= 'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, serialized key size = -1, serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Sending with our own simple KafkaProducer)'
Это подтверждает, что наш тест работает должным образом. Потрясающий! Теперь у нас есть способ написать автономные, независимые интеграционные тесты с помощью в памяти кафки брокер .
6. Тестирование Кафки с TestContainers
Иногда мы можем увидеть небольшие различия между реальным внешним сервисом и встроенным экземпляром памяти службы, специально предоставленной для целей тестирования. Хотя маловероятно, это также может быть, что порт, используемый из нашего теста могут быть заняты, в результате чего .
С этим в виду, в этом разделе, мы увидим вариации на наш предыдущий подход к тестированию с использованием testcontainers рамки . Мы увидим, как мгновенно и управлять внешним брокером Apache Kafka, размещенным внутри контейнера Docker из нашего интеграционного теста.
Давайте определим еще один интеграционный тест, который будет очень похож на тот, который мы видели в предыдущем разделе:
@RunWith(SpringRunner.class) @Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest(classes = KafkaProducerConsumerApplication.class) @DirtiesContext public class KafkaTestContainersLiveTest { @ClassRule public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Value("${test.topic}") private String topic; @Test public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own controller"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getPayload(), containsString("embedded-test-topic")); } }
Давайте посмотрим на различия на этот раз. Мы объявляем кафка поле, которое является стандартным JUnit @ClassRule . Это поле является примером КафкаКонтейнер класс, который будет готовить и управлять жизненным циклом нашего контейнера работает Кафка.
Чтобы избежать столкновений в портах, Testcontainers динамически выделяет номер порта, когда начинается наш контейнер докера. По этой причине мы предоставляем пользовательскую конфигурацию фабрики для потребителей и производителей, используя КафкаТестКонтейнерыКонфигурация :
@Bean public MapconsumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); // more standard configuration return props; } @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); // more standard configuration return new DefaultKafkaProducerFactory<>(configProps); }
Затем мы ссылаемся на эту конфигурацию через @Import аннотация в начале нашего теста.
Причина этого заключается в том, что нам нужен способ введения адреса сервера в наше приложение, которое, как упоминалось ранее, генерируется динамически. Мы достигаем этого, называя getBootstrapServers () метод, который вернет местоположение сервера загрузки:
bootstrap.servers = [PLAINTEXT://localhost:32789]
Теперь, когда мы запускаем наш тест, мы должны увидеть, что Testcontainers делает несколько вещей:
- Проверяет нашу местную установку Docker.
- Тянет слияние/cp-kafka:5.4.3 докер изображение, если это необходимо
- Запускает новый контейнер и ждет его готовности
- Наконец, выключает и удаляет контейнер после завершения теста
Опять же, это подтверждается проверкой результатов тестирования:
13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Creating container for image: confluentinc/cp-kafka:5.4.3 13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3 13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
престо! Рабочий интеграционный тест с использованием контейнера kafka docker.
7. Заключение
В этой статье мы узнали о нескольких подходах для тестирования приложений Kafka с Spring Boot. В первом подходе мы увидели, как настроить и использовать локального брокера в памяти Kafka.
Затем мы увидели, как использовать Testcontainers для настройки внешнего брокера Kafka работает внутри контейнера докера из наших тестов.
Как всегда, полный исходный код статьи доступен более на GitHub .