В одном из предыдущих блогов были рассмотрены некоторые концепции, лежащие в основе того, как Azure Event Hubs поддерживает несколько протоколов для обмена данными. В этом блоге мы увидим это в действии на примере. С помощью примера приложения вы увидите, как объединить компонент приема данных в реальном времени с уровнем обработки без сервера.
Пример приложения содержит следующие компоненты:
- Центры событий Azure/| с конечной точкой Kafka Приложение-производитель, которое отправляет данные в раздел Event Hub.
- Бессерверное приложение, созданное с использованием
- Функций Azure , которое использует EventHubs, обогащает его и, наконец, сохраняет в База ДАННЫХ Azure Cosmos DB
Чтобы продолжить и развернуть это решение в Azure, вам понадобится учетная запись Microsoft Azure. Вы можете взять один бесплатно если у вас его еще нет!
Компоненты приложения
Давайте рассмотрим отдельные компоненты приложений
Как всегда, код доступен на GitHub
Компонент производителя
Это довольно просто – это приложение Go, которое использует клиент Sarama Kafka/| для отправки (имитируемых) “заказов”/| в центры событий Azure (Тема Кафки). Он доступен в виде Изображения Docker
для удобства использования (подробности в следующем разделе)
Вот соответствующий фрагмент кода:
order := Order{OrderID: "order-1234", CustomerID: "customer-1234", Product: "product-1234"} b, err := json.Marshal(order) msg := &sarama.ProducerMessage{Topic: eventHubsTopic, Key: sarama.StringEncoder(oid), Value: sarama.ByteEncoder(b)} producer.SendMessage(msg)
Многие детали были опущены (из приведенного выше фрагмента) – вы можете просмотреть полный код здесь . Подводя итог, Order
создается, преобразуется (маршалируется) в JSON ( байты
) и отправляется в конечную точку EventHubs Kafka.
Бессерверный компонент
Часть Безсерверная
представляет собой функцию Java Azure . Он использует следующие возможности:
Параметр Trigger позволяет вызывать логику функций Azure всякий раз, когда событие order
отправляется в центры событий Azure. Привязка Output берет на себя всю тяжелую работу, такую как установление соединения с базой данных, масштабирование, параллелизм и т.д., И все, что нам осталось построить, – это бизнес-логика, которая в данном случае была сохранена довольно простой – при получении заказа данные из центров событий Azure, функция обогащает их дополнительной информацией (в данном случае о клиенте и названии продукта) и сохраняет их в контейнере Azure Cosmos DB .
Вы можете проверить код Order Processor
на Github , но вот суть:
@FunctionName("storeOrders") public void storeOrders( @EventHubTrigger(name = "orders", eventHubName = "", connection = "EventHubConnectionString", cardinality = Cardinality.ONE) OrderEvent orderEvent, @CosmosDBOutput(name = "databaseOutput", databaseName = "AppStore", collectionName = "orders", connectionStringSetting = "CosmosDBConnectionString") OutputBindingoutput, final ExecutionContext context) { .... Order order = new Order(orderEvent.getOrderId(),Data.CUSTOMER_DATA.get(orderEvent.getCustomerId()), orderEvent.getCustomerId(),Data.PRODUCT_DATA.get(orderEvent.getProduct()); output.setValue(order); .... }
Метод Заказы на хранение
снабжен аннотацией @functionName
и он получает данные из концентраторов событий в виде объекта Order Event
. Благодаря аннотации @EventHubTrigger
платформа, которая заботится о преобразовании полезной нагрузки EventHub в Java POJO
(типа Событие заказа
) и правильно маршрутизировать его. Часть connection
указывает, что строка подключения EventHubs доступна в конфигурации функции/настройках с именем EventHubConnectionString Строка события
Аннотация @CosmosDBOutput
используется для сохранения данных в Azure Cosmos DB. Он содержит базу данных Cosmos DB и имя контейнера, а также строку подключения, которая будет получена из параметра конфигурации CosmosDBConnectionString
в функции. POJO ( Order
в данном случае) сохраняется в CosmosDB с помощью одного вызова метода setValue
для объекта Output Binding
– платформа делает это действительно простым, но за кулисами многое происходит!
Давайте переключимся и узнаем, как развернуть решение в Azure
Пред- реквизиты
Записи
- В идеале все компоненты (EventHubs, Cosmos DB, хранилище и функция Azure) должны находиться в одном регионе
- Рекомендуется создать новую группу ресурсов для группировки этих служб таким образом, чтобы их было легко найти и легко удалить.
- Учетная запись Microsoft Azure (как упоминалось в начале)
- Создайте пространство имен концентраторов событий с поддержкой Kafka
- Создайте компоненты Azure Cosmos DB : учетную запись, базу данных и контейнер (пожалуйста, убедитесь, что имя базы данных Cosmos DB равно
AppStore
и контейнер называетсяorders
, поскольку именно это использует логика функций Azure) - Создайте учетную запись хранилища Azure – это будет использоваться функциями Azure
Развернуть функцию обработки заказов
В этом примере используются функции Azure Плагин Maven для развертывания. Во-первых, обновите pom.xml
чтобы добавить требуемую конфигурацию.
Замените раздел и замените значения для
azurewebjobstorage
, EventHubConnectionString
и Cosmos DBConnectionString
параметры
Используйте Azure CLI , чтобы легко получить необходимые сведения
- Для
Azurewebjobstorage
: Получить строку подключения к хранилищу Azure - Для
EventHubConnectionString
: Получить Строку подключения концентраторов событий - Для
Cosmos DBConnectionString
: Получить Строка подключения Cosmos DB
Для раздела конфигурация
обновите следующее:
группа ресурсов
: группа ресурсов, в которой вы хотите развернуть функцию крегион
: регион Azure, в котором вы хотите развернуть функцию ( получить список местоположений )
Для развертывания вам понадобятся две команды:
mvn clean package
– подготовить артефакт развертыванияmvn azure-функции:развертывание
– развертывание в Azure
Вы можете подтвердить, используя Azure CLI az [список приложений --запрос]"
или портал
Запустить производителя концентраторов событий
Установите переменные среды:
export EVENTHUBS_BROKER=.servicebus.windows.net:9093 export EVENTHUBS_TOPIC= export EVENTHUBS_CONNECTION_STRING="Endpoint=sb:// .servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey= "
Запустите образ Docker
docker run -e EVENTHUBS_BROKER=$EVENTHUBS_BROKER -e EVENTHUBS_TOPIC=$EVENTHUBS_TOPIC -e EVENTHUBS_CONNECTION_STRING=$EVENTHUBS_CONNECTION_STRING abhirockzz/eventhubs-kafka-producer
нажмите ctrl+c
чтобы прекратить создавать события
Подтвердите результаты в Azure Cosmos DB
Вы можете использовать Azure Cosmos DB data explorer (веб-интерфейс) для проверки элементов в контейнере. Вы должны увидеть результаты, подобные этому:
Убирать
Предполагая, что вы разместили все службы в одной группе ресурсов, вы можете удалить их с помощью одной команды :
export RESOURCE_GROUP_NAME=az group delete --name $RESOURCE_GROUP_NAME --no-wait
Спасибо за чтение 🙂 Рад получить ваши отзывы через Твиттер или просто оставьте комментарий 🙏🏻 Следите за обновлениями, чтобы узнать больше!
Оригинал: “https://dev.to/azure/tutorial-use-azure-functions-to-process-real-time-data-from-azure-event-hubs-and-persist-to-azure-cosmos-db-2co8”