Рубрики
Без рубрики

Создавайте приложения Kafka Streams быстрее, чем когда-либо прежде, с Azkarra Streams

Kafka Streams – это мощная библиотека для написания потоковых приложений и микросервисов поверх Ap… С тегами kafka, kafka streams, opensource, java.

Kafka Streams – это мощная библиотека для написания потоковых приложений и микросервисов поверх Apache Kafka на Java и Scala.

При написании приложения Kafka Streams разработчики должны не только определить их топологию, то есть последовательность операций, которые будут применяться к потребляемым сообщениям, но и код, необходимый для его выполнения.

Кроме того, чтобы написать готовое к работе приложение, вам нужно будет знать: как обрабатывать сбои обработки и неверные записи, как отслеживать экземпляры и управлять ими. И, если вы планируете раскрывать некоторые внутренние состояния, используя встроенную функцию Kafka Streams, так называемую “Интерактивные запросы” , вам также придется написать код, чтобы получить доступ к вашим данным (например, через REST API).

В результате этого ваше приложение может быстро усложниться шаблонным кодом, который не имеет прямой бизнес-ценности, но который вам придется поддерживать и дублировать, а также в других проектах.

Azkarra Streams – это легкий Java-фреймворк с открытым исходным кодом, который упрощает разработку и эксплуатацию приложений Kafka Streams (Azkarra – баскское слово, означающее ” Быстрый “).

Основные характеристики

Azkarra Streams предоставляет набор функций для быстрой отладки и создания готовых к работе приложений Kafka Streams. Это включает в себя, среди прочего:

  • Управление жизненным циклом экземпляров Kafka Streams ((больше никаких KafkaStreams#start()).
  • Простая экстернализация конфигураций топологии и потоков Kafka (используя Typesafe Config ).
  • Встроенный HTTP-сервер для запроса хранилища состояний (Подводное течение).
  • Конечная точка HTTP для мониторинга метрик приложения streams (например: JSON, Prometheus).
  • Веб-интерфейс для визуализации топологий.
  • Шифрование и аутентификация с помощью SSL или базовой аутентификации. И т.д.

приступая к работе

Начиная с Azkarra v0.5.0, один из способов начать работу с Azkarra – использовать официальный образ Docker ( streamthoughts/azkarra-streams-worker ), который позволяет запускать автономный Azkarra worker для выполнения одного или нескольких приложений Kafka Streams.

Azkarra Worker следует тому же механизму, который используется проектом Kafka Connect, т.е. топологии потоков Kafka предоставляются в качестве внешних компонентов, которые можно запускать и останавливать либо с помощью вызовов REST, либо с помощью встроенного пользовательского интерфейса.

Давайте запустим рабочий экземпляр Azkarra и кластер с одним узлом-брокером, используя docker-compose.yml , доступный в репозитории GitHub .

1 ) Выполните следующую команду для загрузки и запуска контейнеров:

$ curl -s https://raw.githubusercontent.com/streamthoughts/azkarra-streams/master/docker-compose.yml --output \
docker-compose.yml && docker-compose up -d

2 ) Убедитесь, что Azkarra worker запущен и работает:

$ curl -sX GET http://localhost:8080 | jq
{
  "azkarraVersion": "0.5.0",
  "commitId": "d2bc2fdc24e68eb143f4388960881974604093ca",
  "branch": "master"
}

3) Наконец, вы можете получить доступ к веб-интерфейсу Azkarra, доступному на: http://localhost:8080/ui .

Как мы можем видеть, на данный момент наш работник абсолютно ничего не делает, поскольку мы еще не развернули топологию. Итак, давайте напишем простое приложение Kafka Streams.

Написание Первой Топологии Потоков Kafka

Для демонстрации использования Azkarra API мы перепишем стандартный пример WordCountTopology .

Во-первых, давайте создадим простой Java-проект и добавим потоки Azkarra в зависимость вашего проекта.

Для Maven ( pom.xml ):


    
        org.apache.kafka
        kafka-streams
        2.4.0
        provided
    

    
        io.streamthoughts
        azkarra-streams
        0.5.0
        provided
    

Обратите внимание, что при использовании Azkarra Worker ваш проект никогда не должен содержать никаких библиотек, предоставляемых средой выполнения Azkarra Worker (т.е. azkarra-*, kafka-streams).

Во-вторых, давайте определим нашу топологию потоков Kafka, создав новый файл Словосочетание Топология.java .

package azkarra;

import io.streamthoughts.azkarra.api.annotations.*;
import io.streamthoughts.azkarra.api.streams.TopologyProvider;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;

import java.util.Arrays;

@Component
@TopologyInfo(description = "WordCount topology example")
public class WordCountTopology implements TopologyProvider {

    @Override
    public Topology get() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream textLines = builder.stream("streams-plaintext-input");
        textLines
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .groupBy((key, value) -> value)
        .count(Materialized.as("WordCount"))
        .toStream()
        .to(
            "streams-wordcount-output", 
             Produced.with(Serdes.String(), Serdes.Long())
        );
        return builder.build();
    }

    @Override
    public String version() {
        return "1.0";
    }
}

Как вы можете видеть, мы внедрили интерфейс Topology Provider для предоставления объекта Topology . Azkarra принуждает вас к версии каждой предоставленной топологии. Это полезно, например, для выполнения нескольких версий одной и той же топологии или для автоматического создания значимого application.id .

Аннотация @Component требуется, чтобы позволить Azkarra обнаружить этот класс.

И это все! Azkarra будет отвечать за создание и управление экземпляром Kafka Streams , который будет запускать предоставленный Топология .

Развертывание Топологии Потоков

Теперь нам нужно сделать нашу WordCountTopology доступной для рабочего.

Для этого мы должны упаковать и установить наш компонент в один из каталогов, настроенных с помощью свойства azkara.component.paths

Если вы посмотрите на docker-compose.yml , вы увидите, что для этого свойства задано значение /tmp/azkarra/components с использованием переменной окружения.

Свойство azkarra.component.paths должно определять список местоположений (разделенных запятой), из которых будут сканироваться компоненты.

Каждый настроенный каталог может содержать:

  • uber JAR , содержащий все классы и сторонние зависимости для компонента (например, топологию).

  • каталог , содержащий все банки для компонента

Обычно с Maven вы будете использовать maven-assembly-plugin или maven-shade-plugin , чтобы создать свой проект в uber JAR.

После упаковки вашего приложения вы можете скопировать файл .jar в локальный каталог /tmp/azkarra/компоненты .

Затем перезапустите контейнеры docker следующим образом:

$ docker-compose restart

Теперь вы должны иметь возможность перечислять доступные топологии с помощью REST API:

curl -sX GET http://localhost:8080/api/v1/topologies | jq 
[
  {
    "name": "azkarra.WordCountTopology",
    "version": "1.0",
    "description": "WordCount topology example",
    "aliases": [
      "WordCount",
      "WordCountTopology"
    ],
    "config": {}
  }
]

Наконец, давайте запустим новый экземпляр Kafka Streams, отправив следующую конфигурацию JSON:

curl -H "Content-Type:application/json" \
-X POST http://localhost:8080/api/v1/streams \
--data '{"type": "azkarra.WordCountTopology", "version": "1.0",  "env": "__default", "config": {} }'

В приведенной выше команде мы указываем тип и версию топологии для развертывания, а также целевую среду.

Действительно, у Azkarra есть концепция Streams Execution Environment , которая действует как контейнер для выполнения экземпляров streams. По умолчанию создается среда с именем __default .

Обратите внимание, что Azkarra автоматически создаст любые темы источника и приемника, определенные топологией ( azkarra.context.auto.create.topics.enable=true ).

Изучение веб-интерфейса Azkarra

Azkarra поставляется со встроенным веб-интерфейсом, который позволяет вам получать информацию о запущенных приложениях Kafka Streams.

Например, вы можете:

  • Получить подробную информацию о потоках и задачах запущенного экземпляра streams:

  • Визуализируйте DAG топологии потоков:

  • Перечислите показатели потоков Kafka:

Кроме того, веб-интерфейс Azkarra позволяет останавливать, перезапускать и удалять экземпляры локальных потоков.

Запрашивающие хранилища состояний

Наконец, Kafka Streams имеет отличный механизм для запроса состояний, материализованных приложениями streams, с помощью вызовов REST API.

Давайте создадим несколько сообщений следующим образом:

$ docker exec -it broker /usr/bin/kafka-console-producer \
--topic streams-plaintext-input \
--broker-list broker:9092

Azkarra Streams
WordCount
I Heart Logs   
Kafka Streams
Making Sense of Stream Processing

Ниже приведен пример запроса состояния Количество слов :

curl -sX POST http://localhost:8080/api/v1/applications/word-count-topology-1-0/stores/WordCount \
--data '{"query":{"get":{"key": "streams"}},"type":"key_value", "set_options":{}}' | jq
{
  "took": 1,
  "timeout": false,
  "server": "azkarra:8080",
  "status": "SUCCESS",
  "result": {
    "success": [
      {
        "server": "azkarra:8080",
        "remote": false,
        "records": [
          {
            "key": "streams",
            "value": 2
          }
        ]
      }
    ],
    "total": 1
  }
}

Вы также можете запросить состояние непосредственно через веб-интерфейс Azkarra.

Идем дальше

Если вы хотите узнать больше об использовании потоков Azkarra, документацию можно найти на странице GitHub .

Документация содержит пошаговое руководство по началу изучения основных концепций Azkarra.

Проект также содержит некоторые примеры .

Вывод

Azkarra Streams – это инициатива по обогащению экосистемы Kafka Streams и облегчению ее внедрения разработчиками с помощью легкого микро-фреймворка.

Мы надеемся, что этот проект будет хорошо принят сообществом open-source и Kafka. Azkara все еще развивается, и некоторые функции нуждаются в улучшении. Чтобы поддержать проект Azkarra Streams, пожалуйста, посетите репозиторий Github или напишите в твиттере, если этот проект вам поможет!

Большое спасибо!

Оригинал: “https://dev.to/fhussonnois/create-kafka-streams-applications-faster-than-ever-before-via-azkarra-streams-3nng”