Рубрики
Без рубрики

Установка и запуск Apache Kafka и zookeeper в Windows

Apache Kafka – это распределенная потоковая платформа. Он обеспечивает унифицированную систему с высокой пропускной способностью и низкой задержкой… С тегами kubernetes, java, windows.

Apache Kafka – это распределенная потоковая платформа. Он предоставляет унифицированную платформу с высокой пропускной способностью и низкой задержкой для обработки потоков данных в реальном времени. Kafka можно использовать в качестве системы обмена сообщениями, такой как ActiveMQ или RabbitMQ. Он поддерживает отказоустойчивость с использованием набора реплик внутри кластера. Kafka может использоваться для хранения данных и потоковой обработки для считывания данных практически в режиме реального времени. Продюсеры передают сообщение по темам. Потребитель потребляет данные из тем.

Kafka содержит объекты broker, topics и replica set. Поддержка Kafka многие клиенты включают java, C++, Python и другие. Поддержка Kafka многие клиенты включают java, C++, Python и другие.

Apache Kafka поддерживает следующий вариант использования со многими различными доменами, включая финансовые, IOT и другие.

Обмен сообщениями, Отслеживание активности на веб-сайте, Агрегация журналов, Обработка потоков, Поиск событий, Журнал фиксации Apache Kafka использует Zookeeper для управления компонентами Kafka в кластере. ZooKeeper – это централизованная служба для хранения информации о конфигурации, присвоения имен, обеспечения распределенной синхронизации и предоставления групповых служб. Все эти виды услуг в той или иной форме используются распределенными приложениями. ZooKeeper – это согласованная файловая система для информации о конфигурации.

Загрузите последнюю версию Kafka (2.5.0) из следующего расположения

Загрузите последнюю версию Kafka (2.5.0) из следующего расположения

загрузите последнюю версию zookeeper (например, 3.6.1) из следующего места

загрузите последнюю версию zookeeper (например, 3.6.1) из следующего места

загрузите последнюю выпущенную версию JDK 1.8

загрузите последнюю выпущенную версию JDK 1.8

Установите JDK Установите OpenJDK и задайте следующую переменную в Windows path

Пользовательские переменные:\java\openjdk\jdk-8.0.252.09-hotspot\

Добавьте к системным переменным переменную Path (Например, PATH=% JAVA_HOME%\bin;). Выполните следующую команду, чтобы подтвердить установку java.

C:>java -версия openjdk версии “1.8.0_252” Среда выполнения OpenJDK (Принять OpenJDK) (сборка 1.8.0_252-b09) OpenJDK 64-Разрядная серверная виртуальная машина (AdoptOpenJDK) (сборка 25.252-b09, смешанный режим) Установите Zookeeper Извлеките zookeeper и откройте Копию файла конфигурации zookeeper C:\software\apache-zookeeper-3.6.1-bin\conf \ zoo_sample.cfg тоже zoo.cfg.

Обновите следующее свойство

Выполните следующую команду

C:\software\apache-zookeeper-3.6.1-bin\bin\zkServer.cmd

Убедитесь, что zookeeper успешно запущен, и прослушайте порт 2181.

Установите Kafka Извлеките Kafka и откройте конфигурационные файлы. Конфигурация по умолчанию, поставляемая с дистрибутивом Kafka, достаточна для запуска одноузловой Kafka. посредник.идентификатор должен быть уникальным в среде. в автономной конфигурации по умолчанию используется только один брокер. Kafka использует прослушиватель по умолчанию на TCP-порту 9092. каталог журнала должен быть доступен для записи. Kafka сохраняет все сообщения на диск, указанный в журнале.конфигурация dirs.

Выполните следующую команду

C:\software\kafka_2.12-2.5.0\bin\windows >kafka-server-start.bat ….\config\server.properties Убедитесь, что Kafka успешно запущена.

Создавайте темы и перечисляйте темы Темы Kafka создаются автоматически при настройке auto.create.topics.enable. Он создается по умолчанию, используя следующие варианты использования

Когда производитель начинает писать сообщения, Когда потребитель начинает читать сообщения, Когда любой клиент запрашивает метаданные

C:\software\kafka_2.12-2.5.0\bin\windows >kafka-topics.bat –create –bootstrap-server localhost:9092 –коэффициент репликации 1 –разделы 1 –тест темы Создан тест темы.

C:\software\kafka_2.12-2.5.0\bin\windows >kafka-topics.bat –list –bootstrap-localhost сервера:9092 тестовое создание и использование сообщений Продюсер пишет сообщения в тему. Потребитель потребляет сообщения из темы. Apache Kafka развертывает встроенный клиент для доступа к API-интерфейсам производителя и потребителя. Утилита командной строки помогает создавать и потреблять сообщения без написания какого-либо кода и полезна для тестирования установок Kafka/проверки компонентов Kafka.

C:\software\kafka_2.12-2.5.0\bin\windows >kafka-console-producer.bat –локальный хост загрузочного сервера:9092 –тематический тест

C:\software\kafka_2.12-2.5.0\bin\windows >kafka-console-consumer.bat –bootstrap-server localhost:9092 –тематический тест –с самого начала Apache Kafka в приложении Spring Boot Инструменты командной строки помогают понять концепции. Корпоративное приложение использует язык программирования высокого уровня для создания и использования сообщений с использованием API. в следующем разделе для создания и использования сообщений используются Java и Spring boot application.

Producer Kafka producer предоставляет множество гибких опций для публикации сообщений в различных вариантах использования. Пример требования к транзакции по кредитной карте не допускает дублирования сообщений или потери каких-либо сообщений. Производитель создает ProducerRecord и публикует сообщения с помощью producer API. Производитель и потребитель устанавливают ключ.сериализатор и значение.свойства сериализатора. Данные сериализуются на основе этих свойств. Клиентский API Kafka предоставляет набор параметров сериализации. Он поддерживает такие функции, как Avro, Thrift, Protobuf или пользовательская сериализация. если системе требуется несколько версий схемы и система ожидает сохранения схемы. Мы можем использовать общий шаблон архитектуры и использовать реестр схем, такой как Confluent Schema Registry.

Производитель установил параметр ack для управления политиками записи сообщений. Он поддерживает запись и не ждет ответа, пишет сообщение лидеру и возвращает, записывает во все синхронизированные реплики.

Следующий код задает свойства для производителей.

общедоступные статические свойства получают ProducerProperties(строка kafkaBootstrapServers) { Свойства Properties(); producerProperties.put(“bootstrap.servers”, kafkaBootstrapServers); producerProperties.put(“acks”, “all”); producerProperties.put(“retries”, 0); producerProperties.put(“batch.size”, 16384); producerProperties.put(“linger.ms “, 1); producerProperties.put(“buffer.memory”, 33554432); producerProperties.put(“key.serializer”, “org.apache.kafka.common.сериализация. StringSerializer”); produ andderproperties.put(“buy.serializer”, “org.apa ezthe.kafka.andrommon.serialization”). StringSerializer”); возвращает producerProperties; } Kafka управляет данными и записывает их в соответствующий раздел на основе сведений о записи. Если сообщения были успешно записаны в Kafka, он вернет объект метаданных записи, включающий тему, раздел и смещение. В противном случае он вернет сообщение об ошибке. Производитель отправляет сообщения с возможностью “Запустить и забыть”, синхронной и асинхронной отправки.

private void sendMessagesToKafka (Строка kafkaBootstrapServers) { KafkaProducer KafkaProducer<>( Simplekafkativity.getProducerProperties(kafkaBootstrapServers));

for (int; index < 10; index++) { JSONObject JSONObject(); try { JSONObject.put(“index”, индекс); JSONObject.put(“сообщение”, “Индекс теперь: ” + индекс); } catch (исключение JSONException e) { System.out.println(например,GetMessage()); } producer.send(новая запись ProducerRecord<>(Имя тега, “indexMessge”,JSONObject.toString())); System.out.println(“Отправить сообщение в Kafka:”+ JSONObject.toString()); } } Потребитель Kafka consumer предоставляет множество вариантов использования сообщений. Записи потребителей возвращаются из опроса и повторяют записи. Потребитель также устанавливает bootstrap.servers, ключ.сериализатор и значение.свойства сериализатора. Потребитель использует аналогичный набор свойств плюс свойство группы потребителей. Kafka предоставляет группу потребителей, которая содержит группу потребителей.

Следующий код задает свойства для потребителей.

общедоступные статические свойства получают потребительские свойства(строка kafkaBootstrapServers, строка zookeeperGroupId) { Properties Properties(); consumerProperties.put(“bootstrap.servers”, kafkaBootstrapServers); consumerProperties.put(“group.id “, zookeeperGroupId); consumerProperties.put(“zookeeper.session.timeout.ms “, “6000”); consumerProperties.put(“zookeeper.sync.time.ms “, “2000”); consumerProperties.put(“auto.commit.enable”, “false”); consumerProperties.put(“auto.commit.interval.ms “, “1000”); consumerProperties.put(“consumer.timeout.ms “, “-1”); consumerProperties.put(“макс.poll.records”, “1”); consumerProperties.put(“значение.десериализатор”, “org.apache.kafka.common.сериализация. Stringdeserialize”); orderonsumer Properties.put (“smoke emanu.deserializer”, “org.apa ezthe.kafka.andrommon.serialization”). StringDeserializer”) ;

return consumerProperties;

} Пользователь получает сообщения из одной или нескольких тем. Потребитель поддерживает регулярное выражение для предоставления набора тем. Например, если производитель публикует сообщение в теме T с разделами типа P1,P2 и P3. Группа потребителей может содержать группу потребителей. Если группа потребителей содержит трех потребителей, каждый раздел сопоставляется с отдельным потребителем. Более трудоемкий потребитель, возможно, не сможет угнаться за скоростью потока данных в тему, и добавление большего количества потребителей помогает увеличить масштабируемость. Когда группа потребителей добавляет новых потребителей или отключается из-за сбоя, группа потребителей восстанавливает баланс потребителей. процесс перехода права собственности на разделы от одного потребителя к другому называется перебалансировкой.

//Запустить поток потребителя для чтения сообщений Thread Thread(() -> { System.out.println(“Запуск потока потребителя Kafka.”); SimpleKafkaConsumer SimpleKafkaConsumer(theTechCheckTopicName, SimpleKafkaUtility.get Свойства потребителя(kafkaBootstrapServers, zookeeperGroupId)); simpleKafkaConsumer.runSingleWorker(); }); kafkaConsumerThread.start(); общедоступный класс SimpleKafkaConsumer { частный KafkaConsumer KafkaConsumer; общедоступный SimpleKafkaConsumer(Строка theTechCheckTopicName, свойства consumerProperties) { KafkaConsumer<>(Потребительские свойства); KafkaConsumer.subscribe(Collections.singletonList(theTechCheckTopicName)); } public void runSingleWorker() { while (true) { ConsumerRecords.poll(Продолжительность.ofMillis(100)); for (запись ConsumerRecord: записи) { String.value(); System.out.println(“Полученное сообщение: ” + сообщение);//Зафиксируйте хэш-карту карты смещения<>(); зафиксируйте Message.put(новая тематическая часть(record.topic(), record.partition()), новое смещение и данные(record.offset() + 1)); KafkaConsumer.commitSync(commitMessage); System.out.println(“Смещение зафиксировано к Кафке.”); } } } } Ссылка

Зафиксируйте хэш-карту карты смещения<>(); зафиксируйте Message.put(новая тематическая часть(record.topic(), record.partition()), новое смещение и данные(record.offset() + 1)); KafkaConsumer.commitSync(commitMessage); System.out.println(“Смещение зафиксировано к Кафке.”); } } } } Ссылка

Зафиксируйте хэш-карту карты смещения<>(); зафиксируйте Message.put(новая тематическая часть(record.topic(), record.partition()), новое смещение и данные(record.offset() + 1)); KafkaConsumer.commitSync(commitMessage); System.out.println(“Смещение зафиксировано к Кафке.”); } } } } Ссылка

Зафиксируйте хэш-карту карты смещения<>(); зафиксируйте Message.put(новая тематическая часть(record.topic(), record.partition()), новое смещение и данные(record.offset() + 1)); KafkaConsumer.commitSync(commitMessage); System.out.println(“Смещение зафиксировано к Кафке.”); } } } } Ссылка

Зафиксируйте хэш-карту карты смещения<>(); зафиксируйте Message.put(новая тематическая часть(record.topic(), record.partition()), новое смещение и данные(record.offset() + 1)); KafkaConsumer.commitSync(commitMessage); System.out.println(“Смещение зафиксировано к Кафке.”); } } } } Ссылка

Оригинал: “https://dev.to/careerdrill/installing-and-running-the-apache-kafka-and-zookeeper-on-windows-3mle”