В первой части этой серии я рассказал о создании центра событий в azure, подключении его к моей веб-службе spring boot, а затем отправке на него сообщений. Теперь я собираюсь перейти к чтению этих сообщений из event hub и их обработке.
Как и в первой части, завершенный пример можно найти здесь.
Создание учетной записи хранилища в Azure
Зачем нам нужна учетная запись хранилища?
Нам нужна настройка учетной записи хранилища, чтобы мы могли хранить договоры аренды разделов и контрольные точки.
Вернитесь на портал azure и нажмите на учетные записи хранилища.
Затем нажмите кнопку Добавить (+), и вы попадете на экран создания учетной записи хранилища.
Заполните это поле и нажмите “Просмотреть + Создать” внизу. Мы должны получить одобрение от Microsoft, если что-то пошло не так, внести необходимые исправления и повторить попытку.
Как только будет получено согласие, мы можем нажать “Создать”, и azure запустит нашу учетную запись хранилища.
Получение Информации О Ключе Доступа
Как только учетная запись хранилища будет создана, перейдите к ней и выберите опцию “Ключ доступа” в боковом меню и скопируйте любую из строк подключения.
Создание узла обработки событий
В нашем классе конфигурации EventHub я создал компонент EventProcessorHost.
import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventprocessorhost.EventProcessorHost; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.concurrent.Executors; @Configuration public class EventHubConfig { @Value("${eventHub.connectionString}") private String connectionString; @Value("${eventHub.name}") private String eventHubName; @Value("${eventHub.storage.consumerGroupName}") private String consumerGroupName; @Value("${eventHub.storage.hostNamePrefix}") private String hostNamePrefix; @Value("${eventHub.storage.storageConnectionString}") private String storageConnectionString; @Value("${eventHub.storage.storageContainerName}") private String storageContainerName; @Bean public EventHubClient setupEventHubConnection() throws IOException, EventHubException { return EventHubClient.createFromConnectionStringSync(connectionString, Executors.newScheduledThreadPool(1)); } @Bean public EventProcessorHost createEventHubProcessorHost() { return EventProcessorHost.EventProcessorHostBuilder .newBuilder(EventProcessorHost.createHostName(hostNamePrefix), consumerGroupName) .useAzureStorageCheckpointLeaseManager(storageConnectionString, storageContainerName, null) .useEventHubConnectionString(connectionString, eventHubName) .build(); } }
Для этого примера я использую EventProcessorHost builder и передаю для него необходимую конфигурацию. Я определил конфигурацию в своем приложении.yml ниже.
eventHub: connectionString: 'connections string here' #Event Hub connection string goes here name: 'event hub name here' # The name of the event hub storage: consumerGroupName: $Default #This is the default consumer group for event hub, but it can be customized storageConnectionString: 'storage connection string here' storageContainerName: 'storage container name here' hostNamePrefix: 'some unique prefix' # Identifies the instance of the EventProcessorHost
Теперь, когда у нас есть компонент EventProcessorHost, мы можем внедрить его в нашу службу EventProcessorHost.
import com.dublin.eventhub.demo.exception.ErrorNotificationHandler; import com.dublin.eventhub.demo.processor.EventProcessor; import com.microsoft.azure.eventprocessorhost.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.ExecutionException; @Component public class EventProcessorHostService { private final EventProcessorHost eventProcessorHost; private Logger log = LoggerFactory.getLogger(EventProcessorHostService.class); @Autowired public EventProcessorHostService(EventProcessorHost eventProcessorHost) { this.eventProcessorHost = eventProcessorHost; } @PostConstruct public void run() throws ExecutionException, InterruptedException { log.info("Setting up event hub {}", eventProcessorHost.getHostName()); EventProcessorOptions options = new EventProcessorOptions(); options.setExceptionNotification(new ErrorNotificationHandler()); eventProcessorHost.registerEventProcessor(EventProcessor.class, options).get(); } }
Здесь нужно кое-что распаковать, так что давайте разберемся, что происходит.
- Я использую внедрение конструктора для передачи компонента EventProcessorHost, который я определил ранее.
- Я определил базовый класс обработчика уведомлений о событиях, который обрабатывает ошибки, не связанные с конкретными разделами (например, сбои инициализации). Я передаю это на EventProcessorHost, обернув его в объект параметров процессора событий.
- Я регистрирую обработчик событий, он регистрирует хост в центре событий и получает права аренды на некоторые разделы, чтобы мы могли начать обработку сообщений. Для каждой аренды раздела для этого раздела создается экземпляр обработчика событий.
- Наконец, я использую @PostConstruct для инициализации подключения к концентратору событий.
Ниже приведен Обработчик уведомлений об ошибках.
import java.util.function.Consumer; import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs; import lombok.extern.slf4j.Slf4j; @Slf4j public class ErrorNotificationHandler implements Consumer{ @Override public void accept(ExceptionReceivedEventArgs t) { log.error("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString()); } }
Теперь наш EventProcessorHost настроен и готов к запуску, но для того, чтобы мы могли обрабатывать сообщения, нам также необходимо определить класс обработчика событий, который считывает данные с концентратора событий и обрабатывает контрольную точку.
Нам нужно расширить интерфейс Event processor , который дает нам доступ к контексту раздела для определения контрольных точек и итерации EventData. Здесь происходит пара вещей.
- Я инициализирую класс обработчика полезной нагрузки события в методе onOpen .
- В методе onevent я создал цикл for для перебора событий и try catch в теле цикла, поэтому, если событие завершается ошибкой во время обработки, я могу обработать оставшиеся события.
- Если каждое событие обрабатывается успешно, я проверяю контрольную точку сразу после обработки.
ПРИМЕЧАНИЕ: Поскольку контрольная точка является асинхронной и не самой быстрой, я рекомендую устанавливать контрольную точку после некоторого количества обработанных событий. Поскольку этот пример невелик, я не утруждаю себя этим.
import com.dublin.eventhub.demo.model.EventPayload; import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventprocessorhost.CloseReason; import com.microsoft.azure.eventprocessorhost.IEventProcessor; import com.microsoft.azure.eventprocessorhost.PartitionContext; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.SerializationUtils; @Slf4j @NoArgsConstructor @Service public class EventProcessor implements IEventProcessor { private EventPayloadProcessor eventPayloadProcessor; @Override public void onOpen(PartitionContext partitionContext) { eventPayloadProcessor = new EventPayloadProcessor(); } @Override public void onClose(PartitionContext partitionContext, CloseReason closeReason) { } @Override public void onEvents(PartitionContext partitionContext, Iterableiterable) { for(EventData event: iterable) { try{ EventPayload eventPayload = (EventPayload) SerializationUtils.deserialize(event.getBytes()); eventPayloadProcessor.process(eventPayload); partitionContext.checkpoint(event); } catch (Exception e) { log.error("An Error occured when processing event data, exception: ", e); } } } @Override public void onError(PartitionContext partitionContext, Throwable throwable) { } }
И, наконец, я определил процессор полезной нагрузки событий ниже.
import com.dublin.eventhub.demo.model.EventPayload; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @Slf4j @NoArgsConstructor @Service class EventPayloadProcessor { void process(EventPayload eventPayload) { log.info("Hello! My name is {} and my favorite food is {}", eventPayload.getFirstName(), eventPayload.getFavoriteFood()); } }
И это все. Все должно быть готово к броску. 🤠
Запуск приложения
Если вы следили за ходом событий с первой части. В event hub должно быть несколько сообщений, которые мы должны обработать. Таким образом, мы должны иметь возможность запустить приложение и обработать полезную нагрузку.
2019-10-06 08:12:18.242 INFO 42015 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller : Eventhub send endpoint called, sending EventPayload(firstName=Dublin, lastName=Anondson, email=null, favoriteFood=Nanas) to event hub.. 2019-10-06 08:12:18.247 INFO 42015 --- [nio-8080-exec-1] c.d.e.demo.service.EventHubService : Sending message to the event hub event-hub-test 2019-10-06 08:12:18.253 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.MessageSender : clientId[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], path[event-hub-test], operationTimeout[PT1M], creating a send link 2019-10-06 08:12:18.258 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.MessagingFactory : messagingFactory[MF_41070c_1570367523209], hostName[dublin-rest-demo.servicebus.windows.net], getting a session. 2019-10-06 08:12:18.258 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler : onSessionLocalOpen connectionId[MF_41070c_1570367523209], entityName[cbs-session], condition[Error{condition=null, description='null', info=null}] 2019-10-06 08:12:18.259 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler : onLinkLocalOpen senderName[cbs], linkName[cbs:sender], localTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}] 2019-10-06 08:12:18.259 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.ReceiveLinkHandler : onLinkLocalOpen receiverName[cbs], linkName[cbs:receiver], localSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}] 2019-10-06 08:12:18.327 INFO 42015 --- [pool-1-thread-2] c.m.azure.eventhubs.impl.SessionHandler : onSessionRemoteOpen connectionId[MF_41070c_1570367523209], entityName[cbs-session], sessionIncCapacity[0], sessionOutgoingWindow[2147483647] 2019-10-06 08:12:18.328 INFO 42015 --- [pool-1-thread-2] c.m.a.eventhubs.impl.SendLinkHandler : onLinkRemoteOpen senderName[cbs], linkName[cbs:sender], remoteTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}] 2019-10-06 08:12:18.328 INFO 42015 --- [pool-1-thread-2] c.m.a.eventhubs.impl.ReceiveLinkHandler : onLinkRemoteOpen receiverName[cbs], linkName[cbs:receiver], remoteSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}] 2019-10-06 08:12:18.329 INFO 42015 --- [pool-1-thread-2] c.m.a.e.impl.RequestResponseOpener : requestResponseChannel.onOpen complete clientId[MF_41070c_1570367523209], session[cbs-session], link[cbs], endpoint[$cbs] 2019-10-06 08:12:18.398 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.MessagingFactory : messagingFactory[MF_41070c_1570367523209], hostName[dublin-rest-demo.servicebus.windows.net], getting a session. 2019-10-06 08:12:18.398 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler : onSessionLocalOpen connectionId[MF_41070c_1570367523209], entityName[event-hub-test], condition[Error{condition=null, description='null', info=null}] 2019-10-06 08:12:18.464 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler : onSessionRemoteOpen connectionId[MF_41070c_1570367523209], entityName[event-hub-test], sessionIncCapacity[0], sessionOutgoingWindow[2147483647] 2019-10-06 08:12:18.464 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler : onLinkLocalOpen senderName[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], linkName[LN_ddb0ee_1570367538464_5324_G9], localTarget[Target{address='event-hub-test', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}] 2019-10-06 08:12:18.531 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler : onLinkRemoteOpen senderName[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], linkName[LN_ddb0ee_1570367538464_5324_G9], remoteTarget[Target{address='event-hub-test', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}] 2019-10-06 08:12:18.531 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.MessageSender : onOpenComplete - clientId[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], sendPath[event-hub-test], linkName[LN_ddb0ee_1570367538464_5324_G9] 2019-10-06 08:12:18.678 INFO 42015 --- [b4f183ddd]-1-14] c.d.e.d.processor.EventPayloadProcessor : Hello! My name is Dublin and my favorite food is Nanas
Но, черт возьми, в журналах слишком много шума, тебе не кажется? Давайте исправим это, установив для уровня журнала пакетов event hub значение ERROR.
logging: level: com.microsoft.azure.*: ERROR
И попробуй еще раз, гораздо чище, тебе не кажется?
2019-10-06 08:55:48.241 INFO 47531 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2019-10-06 08:55:48.246 INFO 47531 --- [ main] com.dublin.eventhub.demo.Application : Started Application in 5.37 seconds (JVM running for 5.792) 2019-10-06 08:56:01.700 INFO 47531 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet' 2019-10-06 08:56:01.700 INFO 47531 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet' 2019-10-06 08:56:01.709 INFO 47531 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 9 ms 2019-10-06 08:56:01.817 INFO 47531 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller : Eventhub send endpoint called, sending EventPayload(firstName=Dublin, lastName=Anondson, email=null, favoriteFood=Nanas) to event hub.. 2019-10-06 08:56:01.822 INFO 47531 --- [nio-8080-exec-1] c.d.e.demo.service.EventHubService : Sending message to the event hub event-hub-test 2019-10-06 08:56:19.581 INFO 47531 --- [352fbae26]-1-14] c.d.e.d.processor.EventPayloadProcessor : Hello! My name is Dublin and my favorite food is Nanas
И все, мы подключились к event hub и зачитали наше сообщение!
Заключительные мысли
В этом руководстве мы рассмотрели создание учетной записи хранилища Azure, настройку подключения к event hub в нашей службе spring boot и считывание сообщений, которые мы ранее отправляли в event hub. Я надеюсь, что это руководство было полезным, дайте мне знать, что вы думаете в комментариях.
Счастливого кодирования!
Оригинал: “https://dev.to/danondso/getting-eventful-with-azure-event-hubs-part-two-10ji”