Рубрики
Без рубрики

Тестирование приложений Kafka Streams

Предыдущие сообщения в блоге из серии потоков Кафки касались операций без состояния и с сохранением состояния в… Помечено как кафка, java, показать разработчика, тестирование.

Предыдущие сообщения в блоге из серии “Потоки Кафки” освещали Без гражданства и Операции с сохранением состояния в DSL API. В этом блоге мы рассмотрим несколько примеров, демонстрирующих, как использовать утилиты тестирования для проверки топологий на основе DSL API Kafka Streams.

Потоки Кафки предоставляют утилиты тестирования для выполнения модульных тестов для ваших конвейеров потоковой обработки без необходимости полагаться на внешний или встроенный кластер Кафки. В дополнение к тестированию, эти утилиты также служат отличным учебным пособием для использования различных функций API.

Давайте начнем с высокоуровневого обзора API, связанных с тестированием

Код доступен на GitHub , и тесты могут быть выполнены путем клонирования репозитория, за которым следует тест mvn

Ключевые понятия

Изначально в org.apache было несколько классов.кафка.потоки.тест пакет. Теперь они устарели в пользу следующих классов

Тема ввода теста

Экземпляр Тема ввода теста представляет тему ввода, и вы можете отправлять в нее записи, используя метод Ввода канала (и его перегруженные версии). Создайте Тему ввода теста экземпляры, используя Драйвер для тестирования топологии (объяснено ниже) и при необходимости используйте пользовательские сериализаторы. Затем вы можете отправлять пары ключ-значение, просто значения по одному или в пакетном режиме (используя Список )

Тема вывода теста

Тема вывода теста является другой половиной уравнения отправки-получения и дополняет Тема ввода теста . Вы можете использовать его для чтения записей из выходных разделов, в которые записываются ваши операции с топологией. Его методы включают чтение записей (пары ключ-значение), только значение, запрос размера (количество текущих записей, которые не были использованы) и т.д.

Драйвер для тестирования топологии

Драйвер для тестирования топологии содержит ссылку на Топологию , а также конфигурацию, связанную с вашим приложением Kafka Streams. Как упоминалось ранее, он используется для создания экземпляров темы Тестовый ввод , ,

Поток высокого уровня

Если вы используете Maven , вы можете включить утилиту тестирования в качестве зависимости

        
            org.apache.kafka
            kafka-streams-test-utils
            2.4.0
            test
        

и вы (скорее всего) будете использовать JUnit и hamcrest для написания соответствующих правил…

        
            junit
            junit
            4.12
            test
        
        
            org.hamcrest
            hamcrest-core
            1.3
            test
        

Вот как может выглядеть тестовый пример (аналогично тому, как вы бы проводили модульное тестирование любого кода Java с помощью JUnit и т. Д.)

  • настройте глобальное состояние (если таковое имеется), используя @BeforeClass аннотированный метод
  • состояние настройки для каждого тестового запуска с использованием метода @Before с аннотациями – здесь вы обычно создаете TopologyTestDriver и т. Д.
  • @Тест методы, которые проверяют топологию
  • @After (и/или @AfterClass ) методы для разрушения любого состояния (будь то глобальное или иное)

Пожалуйста, убедитесь, что вы позвонили Драйвер для тестирования топологии.close() для очистки процессоров в топологии и связанного с ними состояния. Невыполнение этого требования может привести к сбоям тестирования из-за несогласованного состояния

Теперь, когда у вас есть представление о концепциях и базовой настройке, давайте рассмотрим несколько конкретных примеров. Мы начнем с операций без состояния

Тестирование операций без состояния

фильтр

Вот Топология , которая использует метод фильтра, чтобы разрешать только значения, длина которых превышает пять.

        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream(INPUT_TOPIC);

        stream.filter((k, v) -> v.length() > 5).to(OUTPUT_TOPIC);

А вот и соответствующий тест:

    @Test
    public void shouldIncludeValueWithLengthGreaterThanFive() {

        topology = App.retainWordsLongerThan5Letters();
        td = new TopologyTestDriver(topology, config);

        inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
        outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());

        assertThat(outputTopic.isEmpty(), is(true));

        inputTopic.pipeInput("key1", "barrrrr");
        assertThat(outputTopic.readValue(), equalTo("barrrrr"));
        assertThat(outputTopic.isEmpty(), is(true));

        inputTopic.pipeInput("key2", "bar");
        assertThat(outputTopic.isEmpty(), is(true));
    }

Мы начинаем с выбора Топологии , которую мы хотим протестировать, создаем Тестовый драйвер топологии экземпляр вместе с TestInputTopic и Тема вывода теста объекты.

Затем мы подтверждаем, является ли тема вывода пустой перед отправкой каких-либо данных – утвержДаем, что(тема вывода.isEmpty(), является(истиной));

Теперь данные/записи могут быть отправлены в тему ввода с помощью тема ввода.pipeInput("ключ1", "barrrrr"); Это синхронный процесс и запускает Топологию , которая в данном случае выполняет операцию фильтр и помещает ее в раздел вывода, поскольку длина значения больше пяти. Мы подтверждаем то же самое, используя assertThat(тема вывода.значение чтения(), равно("barrrrr")); и дважды проверяем, пуста ли тема вывода

Наконец, мы отправляем значение bar и подтверждаем, что оно не было отправлено в тему вывода, поскольку его длина меньше пяти.

Плоская карта

Как объяснено в части 1 этой серии (операции без состояния), вот операция Плоская карта

        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream(INPUT_TOPIC);

        stream.flatMap(new KeyValueMapper>>() {
            @Override
            public Iterable> apply(String k, String csv) {
                String[] values = csv.split(",");
                return Arrays.asList(values)
                        .stream()
                        .map(value -> new KeyValue<>(k, value))
                        .collect(Collectors.toList());
            }
        }).to(OUTPUT_TOPIC);

В приведенном выше примере каждая запись в потоке получает Плоская карта p таким образом, чтобы каждое значение CSV (разделенное запятыми) сначала разбивалось на составляющие, и для каждой части строки CSV создавалась пара Значение ключа .

Чтобы проверить это….

        topology = App.flatMap();
        td = new TopologyTestDriver(topology, config);

        inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
        outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());

        inputTopic.pipeInput("random", "foo,bar,baz");
        inputTopic.pipeInput("hello", "world,universe");
        inputTopic.pipeInput("hi", "there");

        assertThat(outputTopic.getQueueSize(), equalTo(6L));

        assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "foo")));
        assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "bar")));
        assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "baz")));

        assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hello", "world")));
        assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hello", "universe")));

        assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hi", "there")));

        assertThat(outputTopic.isEmpty(), is(true));

Как обычно, мы настраиваем необходимые классы тестовых утилит и помещаем записи ввода в раздел ввода. например, для ключа случайный и его значения, разделенные запятыми foo, bar,baz будут разделены на отдельные пары ключ-значение, т.е. они приведут к тому, что три записи будут помещены в выходную таблицу. То же самое относится и к другим входным записям.

Мы подтверждаем количество записей в теме вывода Подтвердите это(вывод темы.getQueueSize(), равно(6L)); и проверьте каждую пару ключ-значение для подтверждения Плоской карты поведение

Операция с сохранением состояния без сохранения состояния

Вот пример топологии, в которой используется группируйте по ключу , за которым следует количество , и сохраняйте результаты в разделе вывода

        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream(INPUT_TOPIC);

        stream.groupByKey()
                .count()
                .toStream()
                .to(OUTPUT_TOPIC);

Тестирование операции с сохранением состояния не сильно отличается от тестирования операции без состояния.

        topology = App.count();
        td = new TopologyTestDriver(topology, config);

        inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
        TestOutputTopic ot = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.Long().deserializer());

        inputTopic.pipeInput("key1", "value1");
        inputTopic.pipeInput("key1", "value2");
        inputTopic.pipeInput("key2", "value3");
        inputTopic.pipeInput("key3", "value4");
        inputTopic.pipeInput("key2", "value5");

        assertThat(ot.readKeyValue(), equalTo(new KeyValue("key1", 1L)));
        assertThat(ot.readKeyValue(), equalTo(new KeyValue("key1", 2L)));
        assertThat(ot.readKeyValue(), equalTo(new KeyValue("key2", 1L)));
        assertThat(ot.readKeyValue(), equalTo(new KeyValue("key3", 1L)));
        assertThat(ot.readKeyValue(), equalTo(new KeyValue("key2", 2L)));

Отдельные записи отправляются во входную тему и в выходную тему, а затем подсчеты проверяются. Как и ожидалось, ключи ключ1 , ключ2 и ключ3 имеют значения 2, 2, 1 соответственно.

Операция с сохранением состояния с хранилищем состояний

Вещи становятся интересными, когда топология состоит из хранилища состояний. В этом примере, вместо отправки результатов в тему вывода используется промежуточное хранилище состояний (его можно запросить с помощью интерактивных запросов).

        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream(INPUT_TOPIC);

        stream.groupByKey().count(Materialized.as("count-store"));

Драйвер для тестирования топологии предоставляет доступ к хранилищу состояний ( Хранилище значений ключей ) через getKeyValueStore . Подсчет состояния хранилища проверяется после отправки каждой записи в раздел ввода, например утверждайте, что(countstore.get("ключ1"), равно(1L));

        topology = App.countWithStateStore();
        td = new TopologyTestDriver(topology, config);

        inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());

        KeyValueStore countStore = td.getKeyValueStore("count-store");

        inputTopic.pipeInput("key1", "value1");
        assertThat(countStore.get("key1"), equalTo(1L));

        inputTopic.pipeInput("key1", "value2");
        assertThat(countStore.get("key1"), equalTo(2L));

        inputTopic.pipeInput("key2", "value3");
        assertThat(countStore.get("key2"), equalTo(1L));

        inputTopic.pipeInput("key3", "value4");
        assertThat(countStore.get("key3"), equalTo(1L));

        inputTopic.pipeInput("key2", "value5");
        assertThat(countStore.get("key2"), equalTo(2L));

Обратите внимание, что в наших тестах мы создали Топологию, Драйвер тестирования топологии, тему ввода теста и TestOutputTopic в каждом методе тестирования. Это было просто потому, что мы тестировали разные топологии. Если вы тестировали одну топологию, используя множество тестовых примеров как часть одного класса JUnit, вы можете очень легко перенести это в метод настройки с аннотацией @До так что он запускается автоматически перед началом каждого тестового случая

На сегодня это все! Это было краткое, но, надеюсь, полезное введение в тестирование ваших конвейеров обработки на основе потоков Кафки.

Оригинал: “https://dev.to/itnext/how-to-test-kafka-streams-applications-4ceb”