Рубрики
Без рубрики

Тестирование потоков Micronaut Kafka

Вот пример интеграционного тестирования приложений Micronaut Kafka Streams – https://github.com/Ph … Помеченные потоками java, micronaut, kafka, kafka.

Вот пример интеграционного тестирования приложений Micronaut Kafka Streams – https://github.com/PhilHardwick/micronaut-avro-streams-example . Он использует встроенную Kafka, чтобы вы могли протестировать работу с реальным экземпляром Kafka, но с таким же успехом он мог бы использовать тестовые контейнеры.

Пример немного надуман, но приложения получают команды для создания pots для банковского счета (инициализация баланса равняется 0) и команды для осуществления перевода между pots (что изменяет балансы двух pots при передаче). Приложение, которое я создал, не обязательно соответствует тому, как я бы спроектировал что-то, идущее в производство – Я думаю, что данные и ключи разделов нуждаются в дополнительном обдумывании, особенно при запуске нескольких экземпляров, но я хотел создать нетривиальный пример для демонстрации тестирования.

Есть несколько ключевых вещей, необходимых для того, чтобы иметь возможность протестировать это с помощью интеграции.

Дождитесь начала потоков

@BeforeEach
void setUp() {
    await().atMost(10, TimeUnit.SECONDS).until(() -> stream.state().equals(KafkaStreams.State.RUNNING));
}

Ожидание запуска потока имеет важное значение, поскольку по умолчанию потоки обрабатываются ровно один раз. Наши тесты могут продвигаться вперед и отправлять сообщения до того, как поток будет готов, и поэтому поток никогда не получит сообщение (потому что он не считывает с самого раннего смещения).

Создайте простых производителей и потребителей для осуществления потока

Вам нужно отправлять сообщения в поток и прослушивать результаты, поэтому создайте для этого несколько простых производителей и потребителей. Micronaut упрощает это с помощью аннотаций, или вы можете настроить производителей и потребителей в коде, как показано в примерах слияния .

@Singleton
@KafkaListener(groupId = "all-event-listener", offsetReset = OffsetReset.EARLIEST, clientId = "all-event-test-listener")
public class EventsListener {

    private BlockingQueue potEvents = new LinkedBlockingDeque<>();
    private BlockingQueue transferEvents = new LinkedBlockingDeque<>();

    @Topic("pot-events")
    public void potEventReceived(PotEvent accountEvent) {
        potEvents.add(accountEvent);
    }
    @Topic("transfer-events")
    public void transferEventReceived(PotTransferEvent transferEvent) {
        transferEvents.add(transferEvent);
    }

    public BlockingQueue getPotEvents() {
        return potEvents;
    }

    public BlockingQueue getTransferEvents() {
        return transferEvents;
    }

}

@KafkaClient
public interface TestCommandSender {

    @Topic("pot-commands")
    void sendMakePotTransfer(@KafkaKey UUID accountId, MakePotTransfer makePotTransfer);

    @Topic("pot-commands")
    void sendCreatePot(@KafkaKey UUID accountId, CreatePot createPot);

}

Главное, что здесь следует помнить, – настроить прослушиватель событий на прослушивание с самого раннего смещения, чтобы он получал все события, выводимые из потока, независимо от того, в какое время он подписывается на тему.

Реестр макетной схемы

Начиная с версии Confluent 5.4.0, вы можете использовать URL-адрес, начинающийся с mock:// для URL-адреса вашего реестра схемы, и он введет реестр фиктивной схемы в ваши serdes. Это то, что я установил в своем приложение-kafka.yml .

Сериализаторы и десериализаторы

Убедитесь, что ваши сериализаторы и десериализаторы настроены в соответствии с настройкой Micronaut Kafka с помощью сериализаторов и десериализаторов, включая производителей и потребителей ваших тестов. Я использовал

kafka:
  key:
    serializer: org.apache.kafka.common.serialization.UUIDSerializer
    deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
  value:
    serializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer
    deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer

в приложении-kafka.yml в тестовых ресурсах.

Вывод

Интеграционное тестирование действительно полезно для проверки того, как на самом деле будет работать ваше приложение. Это также обеспечивает гибкость в реализации, поскольку теперь я могу изменить способ работы потока, не меняя тест, и я могу быть уверен, что ничего не нарушил.

Оригинал: “https://dev.to/philhardwick/testing-micronaut-kafka-streams-5fpo”