При тестировании Кафка в Микронавт вы можете использовать встроенную Кафку или использовать Тестовые контейнеры .
Конфигурация
Чтобы использовать встроенную кафку, установите эти свойства в src/тест/ресурсы/приложение-кафка.yaml
файл:
kafka: bootstrap: servers: localhost:${random.port} embedded: enabled: true topics: - topic1 - topic2
При настройке теста с помощью @Micronauts(среды)
он установит среду как “кафка”, так и “тест”, что позволит включить все свойства, необходимые для запуска встроенной Кафки.
Лучше всего запускать встроенную кафку на случайном порту, чтобы избежать столкновения с другой кафкой, которая может быть запущена локально (например, в докере).
Все темы, требуемые вашим приложением, также должны быть указаны в ключе kafka.embedded.topics
, чтобы они создавались во встроенной Кафке.
Создание тестовых сообщений
Чтобы настроить производителя для отправки тестовых сообщений в ваше приложение, создайте файл в src/test/java/
под названием TestProducer.java
и создайте внутри интерфейс с необходимыми аннотациями:
@KafkaClient public interface TestProducer { @Topic("topic1") void sendTestMessage(String body); }
Затем используйте тест, полученный путем введения его в ваш тест:
@MicronautTest(environments = "kafka") public class TestAKafkaApplication { @Inject public TestProducer topicClient; @Test public void listensToMessages() { // When topicClient.sendTestMessage("Hello"); // Then assertions... // assertThat()... } }
Получение сообщений из тестируемого приложения
Настройте потребителя в своих тестовых классах под названием TestListener
со следующим кодом:
@Singleton @KafkaListener(groupId = "test", clientId = "test-consumer") public class TestListener { private BlockingQueuemessages = new LinkedBlockingDeque<>(); @Topic("topic1") public void eventOccurred(String body) { messages.add(body); } public BlockingQueue getMessages() { return messages; } }
Это позволяет вам получить сообщения, которые были отправлены в тему, а затем проверить их содержимое. Он также копирует подход Spring Cloud Stream к хранению сообщений в очереди блокировки, чтобы вы могли легко дождаться поступления сообщения, а затем удалить его из структуры данных при его получении.
Использование его в вашем тесте требует, чтобы вы ввели его:
import io.micronaut.test.annotation.MicronautTest; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import technology.wick.blog.consumer.TestListener; import technology.wick.blog.producer.TestProducer; import javax.inject.Inject; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @MicronautTest(environments = "kafka") public class TestAKafkaApplication { @Inject public TestListener topicListener; @Inject public TestProducer topicClient; @BeforeEach void setUp() { // Given no messages exist topicListener.getMessages().clear(); } @Test public void producesMessages() throws InterruptedException { // When topicClient.sendTestMessage("Hello"); // Then String bodyOfMessage = topicListener.getMessages().poll(2, TimeUnit.SECONDS); assertThat(bodyOfMessage).isEqualTo("Hello"); } }
Вывод
Подход Micronaut к тестированию прост, но функционален, в нем не так много функций только для тестирования – для этого требуется использовать тот же код, что и в реальном приложении, поэтому лучше всего подходить к созданию теста так, как вы бы кодировали функции в полном приложении.
Рабочий пример использования этого кода в тестах можно найти на Github .
Оригинал: “https://dev.to/philhardwick/testing-micronaut-kafka-1ab5”