Давайте теперь создадим и запустим простой пример потребителя Kafka, а затем производителя Kafka, используя spring-kafka. Если вам нужна помощь с Kafka, spring boot или docker, которые используются в этой статье, или вы хотите ознакомиться с примером приложения из этого поста, пожалуйста, ознакомьтесь с разделом Ссылок ниже.
Первый шаг – создать простое приложение Spring Boot maven и убедиться, что у него есть зависимость spring-kafka для pom.xml
org.springframework.kafka spring-kafka
Создайте потребителя Spring Kafka
Давайте теперь напишем простейший из возможных потребителей Kafka с spring-kafka, используя конфигурации spring-boot по умолчанию.
Создайте класс с именем Simple Consumer
и добавьте метод с аннотацией @KakfaListener
.
package io.stockgeeks.springkafka.springkafkaapp; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class SimpleConsumer { @KafkaListener(id = "simple-consumer", topics = "simple-message") public void consumeMessage(String message) { System.out.println("Got message: " + message); } }
Вот и все, это все, что требуется, потому что мы полагаемся на конфигурации spring-boot по умолчанию.
Запустите Kafka и Zookeeper
Как мы подробно видели в этой другой статье, мы собираемся использовать docker-compose для запуска нашей локальной Kafka для разработки, давайте запустим наши контейнеры Kafka и Zookeeper:
docker-compose up -d
Убедитесь, что контейнеры запущены:
docker ps
Вы бы видели, как Кафка и Смотритель зоопарка бегут:
Запустите приложение
Давайте теперь скомпилируем и запустим приложение, если вам нужны более подробные инструкции, пожалуйста, ознакомьтесь с этим сообщением, выполните следующие команды для сборки и запуска приложения:
mvn clean package
и давайте запустим его:
mvn spring-boot:run
Приложение запустится, и вы увидите на стандартном выводе конфигурации для потребителя, используемую версию Kafka и сообщение Запустил приложение Spring Kafka за x секунд
.
Убедитесь, что приложение запущено, не закрывайте окно терминала, в котором оно запущено. Давайте теперь создадим несколько сообщений с помощью Kafka console producer и посмотрим, как наш потребитель обрабатывает сообщения и выводит их из системы.
Создать сообщение с помощью Kafka console producer
Откройте новый терминал и введите запущенный контейнер Kafka, чтобы мы могли использовать console producer:
docker exec -it kafka /bin/bash
Оказавшись внутри контейнера cd/opt/kafka/bin
, скрипты командной строки для Kafka в этом конкретном образе, который мы используем, находятся в этой папке. Если вы используете разные образы docker, эти скрипты могут находиться в каком-то другом месте.
Запустите console producer, который позволит вам отправлять сообщения в Kafka:
./kafka-console-producer.sh --broker-list localhost:9092 --topic simple-message
Теперь консоль заблокируется, и вы сможете написать свое сообщение и нажать enter, потому что каждый раз, когда вы делаете это, одно сообщение будет отправлено в simple-topic
. Попробуйте отправить несколько сообщений и посмотрите стандартный вывод приложения в командной оболочке, в которой вы запускаете приложение Spring Boot, обрабатывающее сообщения и печатающее их.
Напишите простого продюсера
Пришло время создать нашего продюсера spring-kafka. Создайте класс с именем Simple Producer
, мы снова будем использовать значения по умолчанию для производителя, как мы это делали для потребителя.
package io.stockgeeks.springkafka.springkafkaapp; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class SimpleProducer { private KafkaTemplatesimpleProducer; public SimpleProducer(KafkaTemplate simpleProducer) { this.simpleProducer = simpleProducer; } public void send(String message) { simpleProducer.send("simple-message", message); } }
Напишите конечную точку
Давайте теперь создадим простую конечную точку, которая получит текстовое сообщение и опубликует его в Kafka, пока мы всегда возвращаем 200 OK.
package io.stockgeeks.springkafka.springkafkaapp; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/api") public class MessageApi { private final SimpleProducer simpleProducer; public MessageApi(SimpleProducer simpleProducer) { this.simpleProducer = simpleProducer; } @PostMapping("/message") public ResponseEntitymessage(@RequestBody String message) { simpleProducer.send(message); return ResponseEntity.ok("Message received: " + message); } }
Создайте и запустите приложение.
mvn clean package && mvn spring-boot:run
Есть большая вероятность, что вы получите ошибку и при запуске приложения на вашем компьютере разработки сейчас, это происходит потому, что ваше приложение работает внутри вашей обычной хост-сети, а Kafka и zookeeper работают внутри “docker network”.
Есть несколько способов решить эту проблему, лучше всего передать имя хоста вашей машины разработки в docker-compose при запуске контейнеров, если вы откроете файл docker-compose из этого проекта, в нем есть запись в KAFKA_ADVERTISED_LISTENERS: ... LISTENER_DOCKER_EXTERNAL
нравится ${DOCKER_HOST_IP:-kafka}:9092
это говорит композитору попытаться использовать переданное в hostname или Kafka по умолчанию, проверьте комментарии в файле compose, чтобы узнать, как это исправить, и проверьте раздел ссылок ниже для получения более подробной информации.
Отправьте несколько сообщений с помощью curl
Теперь обязательно посмотрите терминал приложения и в другом окне терминала давайте использовать curl для отправки некоторых сообщений:
curl -X POST http://localhost:8080/api/message -d "yet more fun" -H "Content-Type: text/plain"
Вы должны увидеть ответ на том же терминале, где был выполнен curl, также проверьте, что потребитель обрабатывает сообщение и печатает его на терминале, где запущено приложение.
Сделано
Вот и все, дело сделано. Теперь вы создали самое простое из возможных приложений Spring Boot, которое создает и использует сообщения из Kafka. Причина, по которой это выглядит так просто, заключается в том, что мы полагаемся на конфигурации Spring Boot и spring-kafka по умолчанию.
Если вы хотите узнать больше о том, как работает Spring Boot или Kafka, пожалуйста, взгляните на ссылки в следующем сеансе, где вы найдете некоторые ссылки с более подробной информацией.
Мы расскажем о тестировании ваших потребителей и производителей в другом посте. Счастливого Кодирования.
Исходный код с приложением, созданным в этом посте.
Чтобы настроить свою среду с помощью java, maven, docker и docker-compose, пожалуйста, ознакомьтесь с тем, как настроить свою среду, в примерах руководств.
Если вам нужно краткое введение в Кафку: Кафка – Ускоренный курс
Для получения некоторой информации о том, как использовать docker-compose для локальной разработки, пожалуйста, ознакомьтесь с этим сообщением: Один, чтобы запустить их все, где вы также узнаете некоторые полезные команды Kafka.
Если вы новичок в Spring Boot, пожалуйста, ознакомьтесь с ускоренным курсом Spring Boot
Docker создает переменные среды , чтобы понять конфигурацию для прослушивателя рекламы Kafka.
Оригинал: “https://dev.to/thegroo/spring-kafka-producer-and-consumer-41oc”