Рубрики
Без рубрики

Простейший производитель и потребитель Spring Kafka

Давайте теперь создадим и запустим простой пример потребителя Kafka, а затем производителя Kafka с использованием spring… Помеченный как spring boot, kafka, java, springkafka.

Давайте теперь создадим и запустим простой пример потребителя Kafka, а затем производителя Kafka, используя spring-kafka. Если вам нужна помощь с Kafka, spring boot или docker, которые используются в этой статье, или вы хотите ознакомиться с примером приложения из этого поста, пожалуйста, ознакомьтесь с разделом Ссылок ниже.

Первый шаг – создать простое приложение Spring Boot maven и убедиться, что у него есть зависимость spring-kafka для pom.xml


  org.springframework.kafka
  spring-kafka

Создайте потребителя Spring Kafka

Давайте теперь напишем простейший из возможных потребителей Kafka с spring-kafka, используя конфигурации spring-boot по умолчанию.

Создайте класс с именем Simple Consumer и добавьте метод с аннотацией @KakfaListener .

package io.stockgeeks.springkafka.springkafkaapp;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class SimpleConsumer {
  @KafkaListener(id = "simple-consumer", topics = "simple-message")
  public void consumeMessage(String message) {
    System.out.println("Got message: " + message);
  }
}

Вот и все, это все, что требуется, потому что мы полагаемся на конфигурации spring-boot по умолчанию.

Запустите Kafka и Zookeeper

Как мы подробно видели в этой другой статье, мы собираемся использовать docker-compose для запуска нашей локальной Kafka для разработки, давайте запустим наши контейнеры Kafka и Zookeeper:

docker-compose up -d

Убедитесь, что контейнеры запущены:

docker ps

Вы бы видели, как Кафка и Смотритель зоопарка бегут:

Запустите приложение

Давайте теперь скомпилируем и запустим приложение, если вам нужны более подробные инструкции, пожалуйста, ознакомьтесь с этим сообщением, выполните следующие команды для сборки и запуска приложения:

mvn clean package 

и давайте запустим его:

mvn spring-boot:run

Приложение запустится, и вы увидите на стандартном выводе конфигурации для потребителя, используемую версию Kafka и сообщение Запустил приложение Spring Kafka за x секунд .

Убедитесь, что приложение запущено, не закрывайте окно терминала, в котором оно запущено. Давайте теперь создадим несколько сообщений с помощью Kafka console producer и посмотрим, как наш потребитель обрабатывает сообщения и выводит их из системы.

Создать сообщение с помощью Kafka console producer

Откройте новый терминал и введите запущенный контейнер Kafka, чтобы мы могли использовать console producer:

docker exec -it kafka /bin/bash

Оказавшись внутри контейнера cd/opt/kafka/bin , скрипты командной строки для Kafka в этом конкретном образе, который мы используем, находятся в этой папке. Если вы используете разные образы docker, эти скрипты могут находиться в каком-то другом месте.

Запустите console producer, который позволит вам отправлять сообщения в Kafka:

./kafka-console-producer.sh --broker-list localhost:9092 --topic simple-message

Теперь консоль заблокируется, и вы сможете написать свое сообщение и нажать enter, потому что каждый раз, когда вы делаете это, одно сообщение будет отправлено в simple-topic . Попробуйте отправить несколько сообщений и посмотрите стандартный вывод приложения в командной оболочке, в которой вы запускаете приложение Spring Boot, обрабатывающее сообщения и печатающее их.

Напишите простого продюсера

Пришло время создать нашего продюсера spring-kafka. Создайте класс с именем Simple Producer , мы снова будем использовать значения по умолчанию для производителя, как мы это делали для потребителя.

package io.stockgeeks.springkafka.springkafkaapp;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class SimpleProducer {

  private KafkaTemplate simpleProducer;

  public SimpleProducer(KafkaTemplate simpleProducer) {
    this.simpleProducer = simpleProducer;
  }
  public void send(String message) {
    simpleProducer.send("simple-message", message);
  }
}

Напишите конечную точку

Давайте теперь создадим простую конечную точку, которая получит текстовое сообщение и опубликует его в Kafka, пока мы всегда возвращаем 200 OK.

package io.stockgeeks.springkafka.springkafkaapp;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api")
public class MessageApi {

  private final SimpleProducer simpleProducer;

  public MessageApi(SimpleProducer simpleProducer) {
    this.simpleProducer = simpleProducer;
  }

  @PostMapping("/message")
  public ResponseEntity message(@RequestBody String message) {
    simpleProducer.send(message);
    return ResponseEntity.ok("Message received: " + message);
  }
}

Создайте и запустите приложение.

mvn clean package && mvn spring-boot:run

Есть большая вероятность, что вы получите ошибку и при запуске приложения на вашем компьютере разработки сейчас, это происходит потому, что ваше приложение работает внутри вашей обычной хост-сети, а Kafka и zookeeper работают внутри “docker network”.

Есть несколько способов решить эту проблему, лучше всего передать имя хоста вашей машины разработки в docker-compose при запуске контейнеров, если вы откроете файл docker-compose из этого проекта, в нем есть запись в KAFKA_ADVERTISED_LISTENERS: ... LISTENER_DOCKER_EXTERNAL нравится ${DOCKER_HOST_IP:-kafka}:9092 это говорит композитору попытаться использовать переданное в hostname или Kafka по умолчанию, проверьте комментарии в файле compose, чтобы узнать, как это исправить, и проверьте раздел ссылок ниже для получения более подробной информации.

Отправьте несколько сообщений с помощью curl

Теперь обязательно посмотрите терминал приложения и в другом окне терминала давайте использовать curl для отправки некоторых сообщений:

curl -X POST http://localhost:8080/api/message -d "yet more fun" -H "Content-Type: text/plain"

Вы должны увидеть ответ на том же терминале, где был выполнен curl, также проверьте, что потребитель обрабатывает сообщение и печатает его на терминале, где запущено приложение.

Сделано

Вот и все, дело сделано. Теперь вы создали самое простое из возможных приложений Spring Boot, которое создает и использует сообщения из Kafka. Причина, по которой это выглядит так просто, заключается в том, что мы полагаемся на конфигурации Spring Boot и spring-kafka по умолчанию.

Если вы хотите узнать больше о том, как работает Spring Boot или Kafka, пожалуйста, взгляните на ссылки в следующем сеансе, где вы найдете некоторые ссылки с более подробной информацией.

Мы расскажем о тестировании ваших потребителей и производителей в другом посте. Счастливого Кодирования.

Исходный код с приложением, созданным в этом посте.

Чтобы настроить свою среду с помощью java, maven, docker и docker-compose, пожалуйста, ознакомьтесь с тем, как настроить свою среду, в примерах руководств.

Если вам нужно краткое введение в Кафку: Кафка – Ускоренный курс

Для получения некоторой информации о том, как использовать docker-compose для локальной разработки, пожалуйста, ознакомьтесь с этим сообщением: Один, чтобы запустить их все, где вы также узнаете некоторые полезные команды Kafka.

Если вы новичок в Spring Boot, пожалуйста, ознакомьтесь с ускоренным курсом Spring Boot

Docker создает переменные среды , чтобы понять конфигурацию для прослушивателя рекламы Kafka.

Оригинал: “https://dev.to/thegroo/spring-kafka-producer-and-consumer-41oc”