Рубрики
Без рубрики

Создание приложения для потоковой передачи данных в реальном времени с помощью Apache Kafka

Автор: Александр Ннаквуе ✏️ Введение Большинство крупных технологических компаний получают данные от своих… Помеченный узлом, javascript, java.

Автор: Александр Ннаквуэ ✏️

Вступление

Большинство крупных технологических компаний получают данные от своих пользователей различными способами, и в большинстве случаев эти данные поступают в необработанном виде. В понятном и удобном формате данные могут помочь удовлетворить потребности бизнеса. Задача состоит в том, чтобы обработать и, при необходимости, преобразовать или очистить данные, чтобы придать им смысл.

Приложения для потоковой передачи базовых данных перемещают данные из исходной корзины в целевую. Более сложные приложения, использующие потоки, выполняют некоторые волшебные действия на лету, например, изменяют структуру выходных данных или обогащают их новыми атрибутами или полями.

В этом посте мы узнаем, как создать минимальное приложение для потоковой передачи данных в реальном времени с использованием Apache Kafka. В этом посте также будут рассмотрены следующие вопросы:

  • Кафка и смотритель зоопарка как наши инструменты
  • Пакетная обработка и хранение данных
  • Установка и запуск Kafka в местном масштабе
  • Загрузка нашего приложения
  • Установка зависимостей
  • Создание темы Кафки
  • Переход к созданной теме
  • Потребление из темы

Согласно его веб-сайту , Kafka – это платформа потоковой передачи с открытым исходным кодом и высокой степенью распространения. Созданный инженерами LinkedIn (в настоящее время входит в состав Apache software foundation), он гордится тем, что является надежной, устойчивой и масштабируемой системой, поддерживающей потоковую передачу событий/приложений. Он горизонтально масштабируемый, отказоустойчивый по умолчанию и обеспечивает высокую скорость.

У Кафки есть множество вариантов использования , один из которых заключается в создании конвейеров данных или приложений, которые обрабатывают потоковые события и/или обработку пакетных данных в режиме реального времени.

Используя Apache Kafka, мы рассмотрим, как построить конвейер данных для перемещения пакетных данных. В качестве небольшой демонстрации мы смоделируем большое хранилище данных JSON, созданное в источнике.

После этого мы напишем сценарий производителя, который создает/записывает эти данные JSON из источника, скажем, в точке А в определенной теме в нашей локальной настройке брокера/кластера Kafka. Наконец, мы напишем сценарий-потребитель, который использует сохраненные данные из указанной темы Кафки.

Примечание: Преобразование и/или обогащение данных в основном обрабатывается по мере их использования из темы ввода для использования другим приложением или темой вывода.

Это очень распространенный сценарий в разработке данных, так как всегда возникает необходимость в очистке, преобразовании, агрегировании или даже повторной обработке обычно необработанных и временно сохраненных данных в теме Кафки, чтобы они соответствовали определенному стандарту или формату.

Предпосылки

Для того, чтобы следовать этому руководству, вам понадобится:

  • Последние версии Node.js и npm установлен на вашем компьютере
  • Последняя версия Java ( JVM ), установленная на вашем компьютере
  • Кафка, установленный на вашем локальном компьютере. В этом уроке мы рассмотрим локальную установку Kafka на наших компьютерах
  • Базовое понимание письма Node.js приложения

Однако, прежде чем мы продолжим, давайте рассмотрим некоторые основные понятия и термины о Кафке, чтобы мы могли легко следовать этому руководству.

Смотритель зоопарка

Кафка сильно зависит от ZooKeeper, который является сервисом, который он использует для отслеживания состояния своего кластера. ZooKeeper помогает контролировать синхронизацию и настройку брокеров или серверов Kafka, что включает в себя выбор соответствующих лидеров. Для получения более подробной информации о ZooKeeper вы можете ознакомиться с его потрясающей документацией .

Тема

Темы Кафки – это группа разделов или групп в нескольких брокерах Кафки. Для более четкого понимания этот раздел действует как механизм прерывистого хранения потоковых данных в кластере. Для каждой темы Кафки мы можем установить коэффициент репликации и другие параметры, такие как количество разделов и т.д.

Производители, потребители и кластеры

Производители – это клиенты, которые производят или записывают данные брокерам Кафки или, точнее, темам Кафки. Потребители, с другой стороны, читают данные или — как следует из названия — потребляют данные из тем Кафки или брокеров Кафки. Кластер – это просто группа брокеров или серверов, которые управляют текущим экземпляром Kafka.

Рисунок 1: Показаны отношения между производителем, кластером и потребителем в Кафке.

Для получения более подробной информации обо всех этих жизненно важных концепциях вы можете ознакомиться с этим разделом документации Apache Kafka .

Установка Кафки

Чтобы установить Kafka, все, что нам нужно сделать, это загрузить двоичные файлы здесь и извлечь архив. Мы делаем это, выполнив следующую команду в нашем терминале или командной строке:

cd 
tar -xzf 
cd 

Команда tar извлекает загруженный двоичный файл Kafka. После этого мы переходим в каталог, в котором установлена Kafka. Мы увидим все файлы, показанные ниже:

Рисунок 2: Снимок экрана установленной структуры папок Kafka с файлами.

Примечание: Двоичные файлы Кафки можно загрузить по любому пути, который мы пожелаем на наших машинах. Кроме того, на момент написания этой статьи последняя версия Кафки – 2.3.0.

Кроме того, если мы поднимемся на уровень выше ( cd.. ), мы найдем папку config внутри загруженного двоичного каталога Kafka. Здесь мы можем настроить наш сервер Kafka и включить любые изменения или конфигурации, которые мы можем пожелать. А теперь давайте подыграем:

cd ..
ls
cd config
ls
nano server.properties

Рисунок 3: Как настроить сервер Kafka.

Теперь, когда мы знаем, где настроить наш сервер Kafka, пришло время научиться использовать Kafka. Позже мы узнаем о полях, которые мы можем перенастроить или обновить в файле server.properties .

В этом уроке мы будем использовать клиентскую библиотеку kafka-node для Node.js . Обратите внимание, что у Kafka есть другие клиенты и для других языков программирования, поэтому не стесняйтесь использовать Kafka для любого другого языка по вашему выбору.

Кафка спешит на помощь

Так как мы используем Node.js в этом упражнении мы начнем с начальной загрузки базового приложения с минимальной структурой. Для начала мы создадим новый каталог для размещения нашего проекта и перейдем в него, как показано ниже:

mkdir kafka-sample-app
cd kafka-sample-app

Затем мы можем продолжить и создать файл package.json , выполнив команду npm init .

Теперь мы можем следовать инструкциям, чтобы настроить наш проект как обычно. Наш файл package.json должен выглядеть так, когда мы закончим:

{
  "name": "kafka-producer_consumer_tutorial",
  "version": "1.0.0",
  "description": "Building a real-time data streaming application pipeline with Apache Kafka",
  "main": "app.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "start": "node producer.js"
  },
  "author": "Alexander Nnakwue",
  "license": "MIT",
  "dependencies": {
    "dotenv": "^8.2.0",
    "kafka-node": "^4.1.3"
  }
}

Здесь мы установили две зависимости, которые нам понадобятся позже. Чтобы установить наш клиент kafka-node, мы запускаем npm install kafka-node на терминале. Документация для kafka-узла доступна в npm. Пакет dotenv используется для настройки переменных среды для нашего приложения. Чтобы установить пакет, мы можем запустить npm install dotenv .

Теперь, когда мы закончили установку зависимостей, мы можем продолжить и создать все необходимые файлы, как показано на рисунке ниже:

Рисунок 4: Наша файловая иерархия.

На рисунке выше показаны все необходимые файлы, необходимые нашему приложению. Давайте посмотрим на каждый файл и поймем, что происходит.

Прежде всего, чтобы создать новую тему вручную из терминала, мы можем использовать команду ниже:

./kafka-topics.sh --create --zookeeper  --replication-factor  --partitions  --topic 

Обратите внимание, что мы не должны забывать обновлять , , и <ИМЯ ТЕМЫ> реальными значениями.

Однако в этом уроке у нас есть сценарий, который обрабатывает это за нас. Код для создания новой темы можно найти в createTopic.js файл. Код также показан ниже:

const kafka = require('kafka-node');
const config  = require('./config');

const client = new kafka.KafkaClient({kafkaHost: config.KafkaHost});



const topicToCreate = [{
  topic: config.KafkaTopic,
  partitions: 1,
  replicationFactor: 1
}
];

client.createTopics(topicToCreate, (error, result) => {
  // result is an array of any errors if a given topic could not be created
  console.log(result, 'topic created successfully');
});

Здесь мы импортируем клиент Kafka и подключаемся к нашей настройке Kafka. Вы можете заметить, что мы никогда не настраивали коэффициент репликации в нашем случае использования. Однако это не отражает реальный сценарий.

В случаях производственного использования мы можем настроить несколько брокеров Kafka в зависимости от объема данных или сообщений, которые мы намерены обработать. Давайте посмотрим, как мы можем добиться этого в нашей локальной настройке.

  • Перейдите в каталог конфигурации в нашем загруженном двоичном файле конфигурация компакт-диска
  • Откройте файл Kafka server.properties . Этот файл содержит всю конфигурацию для настройки нашего сервера Kafka. Мы можем открыть файл с помощью команды nano server.свойства
  • Теперь мы можем создать несколько копий этого файла и просто изменить несколько конфигураций в других скопированных файлах. Здесь мы имеем в виду, что в дублированных файлах мы можем пойти дальше и изменить некоторые уникальные поля, такие как broker.id , журнал.dirs и порт брокера или хоста. Для получения дополнительной информации о настройке установки Kafka вы можете ознакомиться с документацией

После создания темы мы теперь можем создавать или записывать в нее данные. Код для записи в тему находится в producer.js файл. Код показан ниже:

const Kafka = require('kafka-node');
const config  = require('./config');

const Producer = Kafka.Producer;
const client = new Kafka.KafkaClient({kafkaHost: config.KafkaHost});
const producer = new Producer(client,  {requireAcks: 0, partitionerType: 2});



const pushDataToKafka =(dataToPush) => {

  try {
  let payloadToKafkaTopic = [{topic: config.KafkaTopic, messages: JSON.stringify(dataToPush) }];
  console.log(payloadToKafkaTopic);
  producer.on('ready', async function() {
    producer.send(payloadToKafkaTopic, (err, data) => {
          console.log('data: ', data);
  });

  producer.on('error', function(err) {
    //  handle error cases here
  })
  })
  }
catch(error) {
  console.log(error);
}

};


const jsonData = require('./app_json.js');

pushDataToKafka(jsonData);

Здесь мы импортировали библиотеку узлов кафки и настроили нашего клиента на получение соединения от нашего брокера Кафки. Как только это соединение установлено, мы отправляем наши данные в указанную тему Кафки. Обратите внимание, что в реальных приложениях мы должны закрыть соединение клиента после завершения, вызвав метод client.close() .

Теперь, когда мы запускаем наш сценарий запуска с помощью ./start.sh команда, мы получаем данные, записанные в нашу тему Кафки.

npm start

Чтобы прочитать данные из раздела, мы можем использовать наш потребительский скрипт в consumer.js файл, запустив узел./consumer.js . Мы получаем следующий результат:

Рисунок 5: Запуск потребительского сценария для чтения из темы Кафки.

Код для consumer.js файл также показан ниже:

const kafka = require('kafka-node');
const config = require('./config');

try {
 const Consumer = kafka.Consumer;
 const client = new kafka.KafkaClient({idleConnection: 24 * 60 * 60 * 1000,  kafkaHost: config.KafkaHost});

 let consumer = new Consumer(
    client,
    [{ topic: config.KafkaTopic, partition: 0 }],
    {
      autoCommit: true,
      fetchMaxWaitMs: 1000,
      fetchMaxBytes: 1024 * 1024,
      encoding: 'utf8',
      // fromOffset: false
    }
  );
  consumer.on('message', async function(message) {
    console.log(
      'kafka ',
      JSON.parse(message.value)
    );
  })
  consumer.on('error', function(error) {
    //  handle error 
    console.log('error', error);
  });
}
catch(error) {
  // catch error trace
  console.log(error);
}

Здесь мы подключаемся к клиенту Kafka и используем предварительно заданную тему Kafka.

Примечание: Как только мы закончим настройку и захотим запустить наше приложение, нам нужно сначала запустить сервер ZooKeeper. После этого мы можем запустить наш сервер Кафки. Это потому, что Кафка зависит от того, как работает смотритель зоопарка.

Чтобы запустить сервер ZooKeeper, мы можем выполнить следующую команду с нашего терминала:

bin/zookeeper-server-start.sh config/zookeeper.properties

Чтобы запустить наш сервер Kafka, мы можем запустить:

bin/Kafka-server-start.sh config/server.properties

Кроме того, мы можем проверить количество доступных тем Кафки в брокере, выполнив эту команду:

bin/Kafka-topics.sh --list --zookeeper localhost:2181

Наконец, мы также можем использовать данные из раздела Кафки, выполнив команду consumer console на терминале, как показано ниже:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-example-topic  --from-beginning

Кроме того, Kafka предоставляет сценарий, позволяющий разработчикам вручную создавать темы в своем кластере. Сценарий показан ниже:

./kafka-topics.sh --create --zookeeper  --replication-factor  --partitions  --topic 

Примечание: Нам необходимо в обязательном порядке запустить Zookeeper и сервер Кафки соответственно в отдельных окнах терминала, прежде чем мы сможем продолжить и создать тему Кафки.

Наконец, код для этого учебника доступен по этой ссылке Репозиторий на GitHub . Чтобы получить представление о философии дизайна, используемой для Кафки, вы можете проверить этот раздел документации. В следующем уроке мы рассмотрим другие инструменты, доступные через API Kafka, такие как потоки Kafka и Kafka connect. Для ознакомления вы можете проверить этот раздел документации.

Резюме

В общем, Кафка может выступать в качестве системы типа издатель/подписчик, используемой для создания потока чтения и записи для пакетных данных, как и RabbitMQ. Он также может быть использован для создания высокоустойчивых, масштабируемых приложений для потоковой передачи и обработки данных в реальном времени. Обратите внимание, что этот вид потоковой обработки может быть выполнен на лету на основе некоторых предопределенных событий.

Кроме того, как и системы обмена сообщениями, Kafka имеет механизм хранения, состоящий из высокотолерантных кластеров, которые реплицируются и широко распределяются. Под репликацией мы подразумеваем, что данные могут быть распределены по нескольким различным кластерам, сводя потери данных во всей цепочке к минимуму.

В целом, Kafka может быть встроен в другие системы в качестве отдельного плагина. В этом случае он может самостоятельно масштабироваться в зависимости от необходимости. Это означает, что мы можем масштабировать производителей и потребителей независимо, не вызывая никаких побочных эффектов для всего приложения.

Наконец, мы смогли увидеть, что построение конвейера данных включает перемещение данных из исходной точки, где они генерируются (обратите внимание, что это также может означать вывод данных из другого приложения), в точку назначения, где они необходимы или потребляются другим приложением. Теперь мы можем пойти дальше и изучить другие, более сложные варианты использования.

В случае, если у вас могут возникнуть какие-либо вопросы, не стесняйтесь обращаться ко мне в разделе комментариев ниже или связаться со мной по Твиттер .

Примечание редактора: Видите что-то неправильное в этом посте? Вы можете найти правильную версию здесь .

Плагин: Blog rocket, видеорегистратор для веб-приложений

Log Rocket – это инструмент ведения журнала на интерфейсе, который позволяет воспроизводить проблемы, как если бы они произошли в вашем собственном браузере. Вместо того, чтобы гадать, почему возникают ошибки, или запрашивать у пользователей скриншоты и дампы журналов, Log Rocket позволяет вам воспроизвести сеанс, чтобы быстро понять, что пошло не так. Он отлично работает с любым приложением, независимо от платформы, и имеет плагины для регистрации дополнительного контекста из Redux, Vuex и @ngrx/store. В дополнение к регистрации действий и состояний Redux, Log Rocket записывает журналы консоли, ошибки JavaScript, трассировки стеков, сетевые запросы/ответы с заголовками+телами, метаданными браузера и пользовательскими журналами. Он также использует DOM для записи HTML и CSS на странице, воссоздавая пиксельные видео даже самых сложных одностраничных приложений. Попробуйте это бесплатно .

Сообщение Создание приложения для потоковой передачи данных в реальном времени с помощью Apache Kafka появилось впервые в Блоге Rocket Blog .

Оригинал: “https://dev.to/bnevilleoneill/building-a-real-time-data-streaming-app-with-apache-kafka-1n5p”