Рубрики
Без рубрики

Учебное пособие: Используйте функции Azure для обработки данных в реальном времени из центров событий Azure и сохранения в базе данных Azure Cosmos DB

В одном из предыдущих блогов были рассмотрены некоторые концепции, лежащие в основе того, как Azure Event Hubs поддерживает несколько… Помеченный как azure, kafka, бессерверный, java.

В одном из предыдущих блогов были рассмотрены некоторые концепции, лежащие в основе того, как Azure Event Hubs поддерживает несколько протоколов для обмена данными. В этом блоге мы увидим это в действии на примере. С помощью примера приложения вы увидите, как объединить компонент приема данных в реальном времени с уровнем обработки без сервера.

Пример приложения содержит следующие компоненты:

Чтобы продолжить и развернуть это решение в Azure, вам понадобится учетная запись Microsoft Azure. Вы можете взять один бесплатно если у вас его еще нет!

Компоненты приложения

Давайте рассмотрим отдельные компоненты приложений

Как всегда, код доступен на GitHub

Компонент производителя

Это довольно просто – это приложение Go, которое использует клиент Sarama Kafka/| для отправки (имитируемых) “заказов”/| в центры событий Azure (Тема Кафки). Он доступен в виде Изображения Docker для удобства использования (подробности в следующем разделе)

Вот соответствующий фрагмент кода:

order := Order{OrderID: "order-1234", CustomerID: "customer-1234", Product: "product-1234"}

b, err := json.Marshal(order)

msg := &sarama.ProducerMessage{Topic: eventHubsTopic, Key: sarama.StringEncoder(oid), Value: sarama.ByteEncoder(b)}
producer.SendMessage(msg)

Многие детали были опущены (из приведенного выше фрагмента) – вы можете просмотреть полный код здесь . Подводя итог, Order создается, преобразуется (маршалируется) в JSON ( байты ) и отправляется в конечную точку EventHubs Kafka.

Бессерверный компонент

Часть Безсерверная представляет собой функцию Java Azure . Он использует следующие возможности:

Параметр Trigger позволяет вызывать логику функций Azure всякий раз, когда событие order отправляется в центры событий Azure. Привязка Output берет на себя всю тяжелую работу, такую как установление соединения с базой данных, масштабирование, параллелизм и т.д., И все, что нам осталось построить, – это бизнес-логика, которая в данном случае была сохранена довольно простой – при получении заказа данные из центров событий Azure, функция обогащает их дополнительной информацией (в данном случае о клиенте и названии продукта) и сохраняет их в контейнере Azure Cosmos DB .

Вы можете проверить код Order Processor на Github , но вот суть:

@FunctionName("storeOrders")
public void storeOrders(

  @EventHubTrigger(name = "orders", eventHubName = "", connection = 
  "EventHubConnectionString", cardinality = Cardinality.ONE) 
  OrderEvent orderEvent,

  @CosmosDBOutput(name = "databaseOutput", databaseName = "AppStore", 
  collectionName = "orders", connectionStringSetting = 
  "CosmosDBConnectionString") 
  OutputBinding output,

  final ExecutionContext context) {
....

Order order = new Order(orderEvent.getOrderId(),Data.CUSTOMER_DATA.get(orderEvent.getCustomerId()), orderEvent.getCustomerId(),Data.PRODUCT_DATA.get(orderEvent.getProduct());
output.setValue(order);

....
}

Метод Заказы на хранение снабжен аннотацией @functionName и он получает данные из концентраторов событий в виде объекта Order Event . Благодаря аннотации @EventHubTrigger платформа, которая заботится о преобразовании полезной нагрузки EventHub в Java POJO (типа Событие заказа ) и правильно маршрутизировать его. Часть connection указывает, что строка подключения EventHubs доступна в конфигурации функции/настройках с именем EventHubConnectionString Строка события

Аннотация @CosmosDBOutput используется для сохранения данных в Azure Cosmos DB. Он содержит базу данных Cosmos DB и имя контейнера, а также строку подключения, которая будет получена из параметра конфигурации CosmosDBConnectionString в функции. POJO ( Order в данном случае) сохраняется в CosmosDB с помощью одного вызова метода setValue для объекта Output Binding – платформа делает это действительно простым, но за кулисами многое происходит!

Давайте переключимся и узнаем, как развернуть решение в Azure

Пред- реквизиты

Записи

  • В идеале все компоненты (EventHubs, Cosmos DB, хранилище и функция Azure) должны находиться в одном регионе
  • Рекомендуется создать новую группу ресурсов для группировки этих служб таким образом, чтобы их было легко найти и легко удалить.

Развернуть функцию обработки заказов

В этом примере используются функции Azure Плагин Maven для развертывания. Во-первых, обновите pom.xml чтобы добавить требуемую конфигурацию.

Замените раздел и замените значения для azurewebjobstorage , EventHubConnectionString и Cosmos DBConnectionString параметры

Используйте Azure CLI , чтобы легко получить необходимые сведения

Для раздела конфигурация обновите следующее:

  • группа ресурсов : группа ресурсов, в которой вы хотите развернуть функцию к
  • регион : регион Azure, в котором вы хотите развернуть функцию ( получить список местоположений )

Для развертывания вам понадобятся две команды:

  • mvn clean package – подготовить артефакт развертывания
  • mvn azure-функции:развертывание – развертывание в Azure

Вы можете подтвердить, используя Azure CLI az [список приложений --запрос]" или портал

Запустить производителя концентраторов событий

Установите переменные среды:

export EVENTHUBS_BROKER=.servicebus.windows.net:9093
export EVENTHUBS_TOPIC=
export EVENTHUBS_CONNECTION_STRING="Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey="

Запустите образ Docker

docker run -e EVENTHUBS_BROKER=$EVENTHUBS_BROKER -e EVENTHUBS_TOPIC=$EVENTHUBS_TOPIC -e EVENTHUBS_CONNECTION_STRING=$EVENTHUBS_CONNECTION_STRING abhirockzz/eventhubs-kafka-producer

нажмите ctrl+c чтобы прекратить создавать события

Подтвердите результаты в Azure Cosmos DB

Вы можете использовать Azure Cosmos DB data explorer (веб-интерфейс) для проверки элементов в контейнере. Вы должны увидеть результаты, подобные этому:

Убирать

Предполагая, что вы разместили все службы в одной группе ресурсов, вы можете удалить их с помощью одной команды :

export RESOURCE_GROUP_NAME=
az group delete --name $RESOURCE_GROUP_NAME --no-wait

Спасибо за чтение 🙂 Рад получить ваши отзывы через Твиттер или просто оставьте комментарий 🙏🏻 Следите за обновлениями, чтобы узнать больше!

Оригинал: “https://dev.to/azure/tutorial-use-azure-functions-to-process-real-time-data-from-azure-event-hubs-and-persist-to-azure-cosmos-db-2co8”