Event hub – это предложение Microsoft для приема больших данных, в нем используются AMQP, HTTPS и Apache Kafka под капотом. Event hub предлагает такие преимущества, как разделение и контрольные точки в потоке данных, а также всю масштабируемость, с которой может справиться ваш кошелек.
В этой статье я расскажу о том, как настроить соединение и отправить данные в event hub с помощью веб-службы spring boot. Вы можете просмотреть завершенный пример здесь.
Настройка учетной записи Azure
Если вы еще этого не сделали, вам нужно будет зарегистрироваться в учетной записи azure dev. Они дают вам достаточно информации о том, что нам нужно сделать в этом руководстве.
Создание пространства имен Event Hub
Войдите на портал azure и найдите “event hub” в поле поиска вверху. Вам нужно будет выбрать опцию “Центры событий”.
Оттуда нажмите на “Центры событий” и нажмите на значок Добавления (+).
Заполните форму создания, для этого примера я использую максимально простые настройки. Вероятно, вам также потребуется создать группу ресурсов для этого центра событий.
Бум! У нас есть настройка пространства имен event hub. Пространство имен действует как организационный каталог для созданных вами концентраторов событий.
Создание центра событий
Перейдите к вашему пространству имен Event hub и нажмите на значок Добавления Event Hub (+).
Дайте вашему центру событий шикарное имя, масштабируйте количество разделов так, как вам нужно, но для этого руководства я оставлю свой на уровне 2. Нажмите кнопку создать в нижней части формы, и мы готовы к работе.
Вы должны увидеть сообщение о том, что ваш центр событий создается, как только это будет сделано, перейдите к нему из меню пространства имен. Вы наткнетесь на панель мониторинга, которая показывает всевозможные полезные показатели, такие как пропускная способность и количество сообщений.
В боковом меню перейдите к разделу “Политики общего доступа” и нажмите Добавить (+). Дайте ему имя и предоставьте ему доступ к управлению (который дает нам возможность чтения и записи).
Создание политики общего доступа дает нам ключи к замку event hub. Нажмите на политику, которую вы только что создали, и запишите строку подключения.
Добавление зависимостей в концентратор событий.
Перво-наперво добавьте эти зависимости в свой pom-файл. На момент написания этой статьи последними версиями зависимостей являются 3.0.0.
com.microsoft.azure azure-eventhubs 3.0.0 com.microsoft.azure azure-eventhubs-eph 3.0.0
Далее давайте создадим класс config и создадим EventHubClient.
Создание клиента и его подключение.
import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.azure.eventhubs.EventHubException; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.concurrent.Executors; @Configuration public class EventHubConfig { @Value("${eventHub.connectionString}") private String connectionString; @Bean public EventHubClient setupEventHubConnection() throws IOException, EventHubException { return EventHubClient.createFromConnectionStringSync(connectionString, Executors.newSingleThreadScheduledExecutor()); } }
В этом классе я извлекаю строку подключения из файла конфигурации (в моем случае yml-файла приложения), вводю в класс config, затем передаю ее как есть в createFromConnectionStringSync метод, который принимает строку подключения и объект ScheduledExecutorService . Поскольку мы здесь не делаем ничего особенного, я использую однопоточный исполнитель.
Вот как я определил свой файл yml.
eventHub: connectionString: 'connections string here'
Теперь, когда у нас есть компонент клиента Event Hub, давайте продолжим и создадим сервисный компонент, который его использует.
import com.dublin.eventhub.demo.controller.Controller; import com.dublin.eventhub.demo.model.EventPayload; import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.SerializationUtils; import java.util.Objects; @Service public class EventHubService { private final EventHubClient eventHubClient; private Logger log = LoggerFactory.getLogger(Controller.class); @Autowired public EventHubService(EventHubClient eventHubClient) { this.eventHubClient = eventHubClient; } public void sendEvent(EventPayload test) { byte[] bytes = SerializationUtils.serialize(test); log.info("Sending message to the event hub {}", eventHubClient.getEventHubName()); eventHubClient.send(EventData.create(Objects.requireNonNull(bytes)), test.toString()); } }
Мы будем использовать внедрение конструктора для внедрения клиента в наш сервис, оттуда я определяю SendEvent метод, который будет использовать клиент для отправки данных.
Чтобы отправить данные в event hub, нам нужно сериализовать сообщение в массив байтов, обернуть его в объект EventData, а затем передать его методу клиента send .
Создание класса данных
Для этого примера я определил простой класс данных.
import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; @Data @Builder @AllArgsConstructor @NoArgsConstructor public class EventPayload implements Serializable { private String firstName; private String lastName; private String favoriteFood; }
Важно отметить, что данные, с которыми вы работаете для event hub, должны быть Сериализуемыми , поэтому обязательно реализуйте Сериализуемый в классе данных, с которым вы работаете, в противном случае вы получите java.lang. Исключение IllegalArgumentException: Не удалось десериализовать объект сообщение.
Далее мы определим конечную точку для отправки ваших данных.
Построение конечной точки
import com.dublin.eventhub.demo.model.EventPayload; import com.dublin.eventhub.demo.service.EventHubService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; @RestController public class Controller { private final EventHubService eventHubService; private Logger log = LoggerFactory.getLogger(Controller.class); @Autowired public Controller(EventHubService eventHubService) { this.eventHubService = eventHubService; } @PostMapping(path = "/eventhub/send") public ResponseEntity sendEvent(@RequestBody EventPayload payload) { try { log.info("Eventhub send endpoint called, sending {} to event hub..", payload.toString()); eventHubService.sendEvent(payload); } catch (Exception e) { log.error("An error arose sending a message to event hub: " + e); return new ResponseEntity(HttpStatus.INTERNAL_SERVER_ERROR); } return new ResponseEntity(HttpStatus.OK); } }
Я создал метод POST “/eventhub/send” и определил тело запроса как наш класс данных. Оттуда он просто отправляется в службу.
Как только мы все это подключим, мы сможем запустить наше приложение и увидеть, как оно успешно подключается к event hub.
2019-09-24 13:42:21.670 INFO 16113 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.24] 2019-09-24 13:42:21.734 INFO 16113 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2019-09-24 13:42:21.734 INFO 16113 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1063 ms 2019-09-24 13:42:21.983 INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.MessagingFactory : messagingFactory[MF_41d4fd_1569350541964], hostName[dublin-rest-demo.servicebus.windows.net], info[starting reactor instance.] 2019-09-24 13:42:21.999 INFO 16113 --- [pool-1-thread-1] c.m.azure.eventhubs.impl.ReactorHandler : name[MF_41d4fd_1569350541964] reactor.onReactorInit 2019-09-24 13:42:22.002 INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.ConnectionHandler : onConnectionInit hostname[dublin-rest-demo.servicebus.windows.net], connectionId[MF_41d4fd_1569350541964] 2019-09-24 13:42:22.003 INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.ConnectionHandler : onConnectionLocalOpen hostname[dublin-rest-demo.servicebus.windows.net:5671], connectionId[MF_41d4fd_1569350541964], errorCondition[null], errorDescription[null] 2019-09-24 13:42:22.101 INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.ConnectionHandler : onConnectionBound hostname[dublin-rest-demo.servicebus.windows.net], connectionId[MF_41d4fd_1569350541964] 2019-09-24 13:42:23.157 INFO 16113 --- [pool-1-thread-4] c.m.a.eventhubs.impl.ConnectionHandler : onConnectionRemoteOpen hostname[dublin-rest-demo.servicebus.windows.net:5671], connectionId[MF_41d4fd_1569350541964], remoteContainer[100db877ccad41b1a689c5a458bf1fbc_G6] 2019-09-24 13:42:23.268 INFO 16113 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor' 2019-09-24 13:42:23.449 INFO 16113 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator' 2019-09-24 13:42:23.491 INFO 16113 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2019-09-24 13:42:23.495 INFO 16113 --- [ main] com.dublin.eventhub.demo.Application : Started Application in 3.126 seconds (JVM running for 3.493)
Теперь, когда мы все настроили, давайте отправим на него некоторые данные. Откройте свой любимый HTTP-клиент и отправьте в него некоторые данные. Для этого примера я использую Бессонница .
И в журналах я должен увидеть, что сообщение было успешно отправлено в event hub!
2019-09-24 13:43:25.806 INFO 16113 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller : Eventhub send endpoint called, sending EventPayload(firstName=Johnny, lastName=Carson, email=null, favoriteFood=Potatoes and Molasses) to event hub.. 2019-09-24 13:43:25.808 INFO 16113 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller : Sending message to the event hub event-hub-test
Мы сделали это, мы сериализуем и отправляем данные в event hub. 😁
Резюмировать
В этом руководстве я рассказал о создании центра событий в Azure, настройке клиента и службы центра событий, а также конечной точки для отправки данных. В части 2 я пройдусь по событиям потребления.
Я надеюсь, что это руководство поможет, дайте мне знать, что вы думаете в комментариях.
Оригинал: “https://dev.to/danondso/getting-eventful-with-azure-event-hubs-part-one-3jn1”