Рубрики
Без рубрики

Насыщение Событиями С Помощью Центров Событий Azure: Часть первая

Event hub – это приложение для обработки больших данных от Microsoft, использующее AMQP, HTTPS и Apache Kafka… Помечено как java, spring, tutorial, intermediate.

Event hub – это предложение Microsoft для приема больших данных, в нем используются AMQP, HTTPS и Apache Kafka под капотом. Event hub предлагает такие преимущества, как разделение и контрольные точки в потоке данных, а также всю масштабируемость, с которой может справиться ваш кошелек.

В этой статье я расскажу о том, как настроить соединение и отправить данные в event hub с помощью веб-службы spring boot. Вы можете просмотреть завершенный пример здесь.

Настройка учетной записи Azure

Если вы еще этого не сделали, вам нужно будет зарегистрироваться в учетной записи azure dev. Они дают вам достаточно информации о том, что нам нужно сделать в этом руководстве.

Создание пространства имен Event Hub

Войдите на портал azure и найдите “event hub” в поле поиска вверху. Вам нужно будет выбрать опцию “Центры событий”.

Оттуда нажмите на “Центры событий” и нажмите на значок Добавления (+).

Заполните форму создания, для этого примера я использую максимально простые настройки. Вероятно, вам также потребуется создать группу ресурсов для этого центра событий.

Бум! У нас есть настройка пространства имен event hub. Пространство имен действует как организационный каталог для созданных вами концентраторов событий.

Создание центра событий

Перейдите к вашему пространству имен Event hub и нажмите на значок Добавления Event Hub (+).

Дайте вашему центру событий шикарное имя, масштабируйте количество разделов так, как вам нужно, но для этого руководства я оставлю свой на уровне 2. Нажмите кнопку создать в нижней части формы, и мы готовы к работе.

Вы должны увидеть сообщение о том, что ваш центр событий создается, как только это будет сделано, перейдите к нему из меню пространства имен. Вы наткнетесь на панель мониторинга, которая показывает всевозможные полезные показатели, такие как пропускная способность и количество сообщений.

В боковом меню перейдите к разделу “Политики общего доступа” и нажмите Добавить (+). Дайте ему имя и предоставьте ему доступ к управлению (который дает нам возможность чтения и записи).

Создание политики общего доступа дает нам ключи к замку event hub. Нажмите на политику, которую вы только что создали, и запишите строку подключения.

Добавление зависимостей в концентратор событий.

Перво-наперво добавьте эти зависимости в свой pom-файл. На момент написания этой статьи последними версиями зависимостей являются 3.0.0.

      
            com.microsoft.azure
            azure-eventhubs
            3.0.0
        
        
            com.microsoft.azure
            azure-eventhubs-eph
            3.0.0
        

Далее давайте создадим класс config и создадим EventHubClient.

Создание клиента и его подключение.

import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.concurrent.Executors;

@Configuration
public class EventHubConfig {

    @Value("${eventHub.connectionString}")
    private String connectionString;

    @Bean
    public EventHubClient setupEventHubConnection() throws IOException, EventHubException {
        return EventHubClient.createFromConnectionStringSync(connectionString,
                              Executors.newSingleThreadScheduledExecutor());
    }
}

В этом классе я извлекаю строку подключения из файла конфигурации (в моем случае yml-файла приложения), вводю в класс config, затем передаю ее как есть в createFromConnectionStringSync метод, который принимает строку подключения и объект ScheduledExecutorService . Поскольку мы здесь не делаем ничего особенного, я использую однопоточный исполнитель.

Вот как я определил свой файл yml.

eventHub:
  connectionString: 'connections string here'

Теперь, когда у нас есть компонент клиента Event Hub, давайте продолжим и создадим сервисный компонент, который его использует.

import com.dublin.eventhub.demo.controller.Controller;
import com.dublin.eventhub.demo.model.EventPayload;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.SerializationUtils;

import java.util.Objects;

@Service
public class EventHubService {
    private final EventHubClient eventHubClient;
    private Logger log = LoggerFactory.getLogger(Controller.class);

    @Autowired
    public EventHubService(EventHubClient eventHubClient) {
        this.eventHubClient = eventHubClient;
    }

    public void sendEvent(EventPayload test) {

        byte[] bytes = SerializationUtils.serialize(test);

        log.info("Sending message to the event hub {}", eventHubClient.getEventHubName());
        eventHubClient.send(EventData.create(Objects.requireNonNull(bytes)), test.toString());
    }
}

Мы будем использовать внедрение конструктора для внедрения клиента в наш сервис, оттуда я определяю SendEvent метод, который будет использовать клиент для отправки данных.

Чтобы отправить данные в event hub, нам нужно сериализовать сообщение в массив байтов, обернуть его в объект EventData, а затем передать его методу клиента send .

Создание класса данных

Для этого примера я определил простой класс данных.

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class EventPayload implements Serializable {
    private String firstName;
    private String lastName;
    private String favoriteFood;
}

Важно отметить, что данные, с которыми вы работаете для event hub, должны быть Сериализуемыми , поэтому обязательно реализуйте Сериализуемый в классе данных, с которым вы работаете, в противном случае вы получите java.lang. Исключение IllegalArgumentException: Не удалось десериализовать объект сообщение.

Далее мы определим конечную точку для отправки ваших данных.

Построение конечной точки

import com.dublin.eventhub.demo.model.EventPayload;
import com.dublin.eventhub.demo.service.EventHubService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
public class Controller {

    private final EventHubService eventHubService;
    private Logger log = LoggerFactory.getLogger(Controller.class);

    @Autowired
    public Controller(EventHubService eventHubService) {
        this.eventHubService = eventHubService;
    }

    @PostMapping(path = "/eventhub/send")
    public ResponseEntity sendEvent(@RequestBody EventPayload payload) {
        try {
            log.info("Eventhub send endpoint called, sending {} to event hub..", payload.toString());
            eventHubService.sendEvent(payload);
        } catch (Exception e) {
            log.error("An error arose sending a message to event hub: " + e);
            return new ResponseEntity(HttpStatus.INTERNAL_SERVER_ERROR);
        }
        return new ResponseEntity(HttpStatus.OK);
    }
}

Я создал метод POST “/eventhub/send” и определил тело запроса как наш класс данных. Оттуда он просто отправляется в службу.

Как только мы все это подключим, мы сможем запустить наше приложение и увидеть, как оно успешно подключается к event hub.

2019-09-24 13:42:21.670  INFO 16113 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.24]
2019-09-24 13:42:21.734  INFO 16113 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2019-09-24 13:42:21.734  INFO 16113 --- [           main] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 1063 ms
2019-09-24 13:42:21.983  INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.MessagingFactory    : messagingFactory[MF_41d4fd_1569350541964], hostName[dublin-rest-demo.servicebus.windows.net], info[starting reactor instance.]
2019-09-24 13:42:21.999  INFO 16113 --- [pool-1-thread-1] c.m.azure.eventhubs.impl.ReactorHandler  : name[MF_41d4fd_1569350541964] reactor.onReactorInit
2019-09-24 13:42:22.002  INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionInit hostname[dublin-rest-demo.servicebus.windows.net], connectionId[MF_41d4fd_1569350541964]
2019-09-24 13:42:22.003  INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionLocalOpen hostname[dublin-rest-demo.servicebus.windows.net:5671], connectionId[MF_41d4fd_1569350541964], errorCondition[null], errorDescription[null]
2019-09-24 13:42:22.101  INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionBound hostname[dublin-rest-demo.servicebus.windows.net], connectionId[MF_41d4fd_1569350541964]
2019-09-24 13:42:23.157  INFO 16113 --- [pool-1-thread-4] c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionRemoteOpen hostname[dublin-rest-demo.servicebus.windows.net:5671], connectionId[MF_41d4fd_1569350541964], remoteContainer[100db877ccad41b1a689c5a458bf1fbc_G6]
2019-09-24 13:42:23.268  INFO 16113 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2019-09-24 13:42:23.449  INFO 16113 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2019-09-24 13:42:23.491  INFO 16113 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2019-09-24 13:42:23.495  INFO 16113 --- [           main] com.dublin.eventhub.demo.Application     : Started Application in 3.126 seconds (JVM running for 3.493)

Теперь, когда мы все настроили, давайте отправим на него некоторые данные. Откройте свой любимый HTTP-клиент и отправьте в него некоторые данные. Для этого примера я использую Бессонница .

И в журналах я должен увидеть, что сообщение было успешно отправлено в event hub!

2019-09-24 13:43:25.806  INFO 16113 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller  : Eventhub send endpoint called, sending EventPayload(firstName=Johnny, lastName=Carson, email=null, favoriteFood=Potatoes and Molasses) to event hub..
2019-09-24 13:43:25.808  INFO 16113 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller  : Sending message to the event hub event-hub-test

Мы сделали это, мы сериализуем и отправляем данные в event hub. 😁

Резюмировать

В этом руководстве я рассказал о создании центра событий в Azure, настройке клиента и службы центра событий, а также конечной точки для отправки данных. В части 2 я пройдусь по событиям потребления.

Я надеюсь, что это руководство поможет, дайте мне знать, что вы думаете в комментариях.

Оригинал: “https://dev.to/danondso/getting-eventful-with-azure-event-hubs-part-one-3jn1”