Вот пример интеграционного тестирования приложений Micronaut Kafka Streams – https://github.com/PhilHardwick/micronaut-avro-streams-example . Он использует встроенную Kafka, чтобы вы могли протестировать работу с реальным экземпляром Kafka, но с таким же успехом он мог бы использовать тестовые контейнеры.
Пример немного надуман, но приложения получают команды для создания pots для банковского счета (инициализация баланса равняется 0) и команды для осуществления перевода между pots (что изменяет балансы двух pots при передаче). Приложение, которое я создал, не обязательно соответствует тому, как я бы спроектировал что-то, идущее в производство – Я думаю, что данные и ключи разделов нуждаются в дополнительном обдумывании, особенно при запуске нескольких экземпляров, но я хотел создать нетривиальный пример для демонстрации тестирования.
Есть несколько ключевых вещей, необходимых для того, чтобы иметь возможность протестировать это с помощью интеграции.
Дождитесь начала потоков
@BeforeEach void setUp() { await().atMost(10, TimeUnit.SECONDS).until(() -> stream.state().equals(KafkaStreams.State.RUNNING)); }
Ожидание запуска потока имеет важное значение, поскольку по умолчанию потоки обрабатываются ровно один раз. Наши тесты могут продвигаться вперед и отправлять сообщения до того, как поток будет готов, и поэтому поток никогда не получит сообщение (потому что он не считывает с самого раннего смещения).
Создайте простых производителей и потребителей для осуществления потока
Вам нужно отправлять сообщения в поток и прослушивать результаты, поэтому создайте для этого несколько простых производителей и потребителей. Micronaut упрощает это с помощью аннотаций, или вы можете настроить производителей и потребителей в коде, как показано в примерах слияния .
@Singleton @KafkaListener(groupId = "all-event-listener", offsetReset = OffsetReset.EARLIEST, clientId = "all-event-test-listener") public class EventsListener { private BlockingQueuepotEvents = new LinkedBlockingDeque<>(); private BlockingQueue transferEvents = new LinkedBlockingDeque<>(); @Topic("pot-events") public void potEventReceived(PotEvent accountEvent) { potEvents.add(accountEvent); } @Topic("transfer-events") public void transferEventReceived(PotTransferEvent transferEvent) { transferEvents.add(transferEvent); } public BlockingQueue getPotEvents() { return potEvents; } public BlockingQueue getTransferEvents() { return transferEvents; } } @KafkaClient public interface TestCommandSender { @Topic("pot-commands") void sendMakePotTransfer(@KafkaKey UUID accountId, MakePotTransfer makePotTransfer); @Topic("pot-commands") void sendCreatePot(@KafkaKey UUID accountId, CreatePot createPot); }
Главное, что здесь следует помнить, – настроить прослушиватель событий на прослушивание с самого раннего смещения, чтобы он получал все события, выводимые из потока, независимо от того, в какое время он подписывается на тему.
Реестр макетной схемы
Начиная с версии Confluent 5.4.0, вы можете использовать URL-адрес, начинающийся с mock://
для URL-адреса вашего реестра схемы, и он введет реестр фиктивной схемы в ваши serdes. Это то, что я установил в своем приложение-kafka.yml
.
Сериализаторы и десериализаторы
Убедитесь, что ваши сериализаторы и десериализаторы настроены в соответствии с настройкой Micronaut Kafka с помощью сериализаторов и десериализаторов, включая производителей и потребителей ваших тестов. Я использовал
kafka: key: serializer: org.apache.kafka.common.serialization.UUIDSerializer deserializer: org.apache.kafka.common.serialization.UUIDDeserializer value: serializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
в приложении-kafka.yml в тестовых ресурсах.
Вывод
Интеграционное тестирование действительно полезно для проверки того, как на самом деле будет работать ваше приложение. Это также обеспечивает гибкость в реализации, поскольку теперь я могу изменить способ работы потока, не меняя тест, и я могу быть уверен, что ничего не нарушил.
Оригинал: “https://dev.to/philhardwick/testing-micronaut-kafka-streams-5fpo”