В первой части этой серии я рассказал о создании центра событий в azure, подключении его к моей веб-службе spring boot, а затем отправке на него сообщений. Теперь я собираюсь перейти к чтению этих сообщений из event hub и их обработке.
Как и в первой части, завершенный пример можно найти здесь.
Создание учетной записи хранилища в Azure
Зачем нам нужна учетная запись хранилища?
Нам нужна настройка учетной записи хранилища, чтобы мы могли хранить договоры аренды разделов и контрольные точки.
Вернитесь на портал azure и нажмите на учетные записи хранилища.
Затем нажмите кнопку Добавить (+), и вы попадете на экран создания учетной записи хранилища.
Заполните это поле и нажмите “Просмотреть + Создать” внизу. Мы должны получить одобрение от Microsoft, если что-то пошло не так, внести необходимые исправления и повторить попытку.
Как только будет получено согласие, мы можем нажать “Создать”, и azure запустит нашу учетную запись хранилища.
Получение Информации О Ключе Доступа
Как только учетная запись хранилища будет создана, перейдите к ней и выберите опцию “Ключ доступа” в боковом меню и скопируйте любую из строк подключения.
Создание узла обработки событий
В нашем классе конфигурации EventHub я создал компонент EventProcessorHost.
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.concurrent.Executors;
@Configuration
public class EventHubConfig {
@Value("${eventHub.connectionString}")
private String connectionString;
@Value("${eventHub.name}")
private String eventHubName;
@Value("${eventHub.storage.consumerGroupName}")
private String consumerGroupName;
@Value("${eventHub.storage.hostNamePrefix}")
private String hostNamePrefix;
@Value("${eventHub.storage.storageConnectionString}")
private String storageConnectionString;
@Value("${eventHub.storage.storageContainerName}")
private String storageContainerName;
@Bean
public EventHubClient setupEventHubConnection() throws IOException, EventHubException {
return EventHubClient.createFromConnectionStringSync(connectionString, Executors.newScheduledThreadPool(1));
}
@Bean
public EventProcessorHost createEventHubProcessorHost() {
return EventProcessorHost.EventProcessorHostBuilder
.newBuilder(EventProcessorHost.createHostName(hostNamePrefix), consumerGroupName)
.useAzureStorageCheckpointLeaseManager(storageConnectionString, storageContainerName, null)
.useEventHubConnectionString(connectionString, eventHubName)
.build();
}
}
Для этого примера я использую EventProcessorHost builder и передаю для него необходимую конфигурацию. Я определил конфигурацию в своем приложении.yml ниже.
eventHub:
connectionString: 'connections string here' #Event Hub connection string goes here
name: 'event hub name here' # The name of the event hub
storage:
consumerGroupName: $Default #This is the default consumer group for event hub, but it can be customized
storageConnectionString: 'storage connection string here'
storageContainerName: 'storage container name here'
hostNamePrefix: 'some unique prefix' # Identifies the instance of the EventProcessorHost
Теперь, когда у нас есть компонент EventProcessorHost, мы можем внедрить его в нашу службу EventProcessorHost.
import com.dublin.eventhub.demo.exception.ErrorNotificationHandler;
import com.dublin.eventhub.demo.processor.EventProcessor;
import com.microsoft.azure.eventprocessorhost.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutionException;
@Component
public class EventProcessorHostService {
private final EventProcessorHost eventProcessorHost;
private Logger log = LoggerFactory.getLogger(EventProcessorHostService.class);
@Autowired
public EventProcessorHostService(EventProcessorHost eventProcessorHost) {
this.eventProcessorHost = eventProcessorHost;
}
@PostConstruct
public void run() throws ExecutionException, InterruptedException {
log.info("Setting up event hub {}", eventProcessorHost.getHostName());
EventProcessorOptions options = new EventProcessorOptions();
options.setExceptionNotification(new ErrorNotificationHandler());
eventProcessorHost.registerEventProcessor(EventProcessor.class, options).get();
}
}
Здесь нужно кое-что распаковать, так что давайте разберемся, что происходит.
- Я использую внедрение конструктора для передачи компонента EventProcessorHost, который я определил ранее.
- Я определил базовый класс обработчика уведомлений о событиях, который обрабатывает ошибки, не связанные с конкретными разделами (например, сбои инициализации). Я передаю это на EventProcessorHost, обернув его в объект параметров процессора событий.
- Я регистрирую обработчик событий, он регистрирует хост в центре событий и получает права аренды на некоторые разделы, чтобы мы могли начать обработку сообщений. Для каждой аренды раздела для этого раздела создается экземпляр обработчика событий.
- Наконец, я использую @PostConstruct для инициализации подключения к концентратору событий.
Ниже приведен Обработчик уведомлений об ошибках.
import java.util.function.Consumer; import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs; import lombok.extern.slf4j.Slf4j; @Slf4j public class ErrorNotificationHandler implements Consumer{ @Override public void accept(ExceptionReceivedEventArgs t) { log.error("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString()); } }
Теперь наш EventProcessorHost настроен и готов к запуску, но для того, чтобы мы могли обрабатывать сообщения, нам также необходимо определить класс обработчика событий, который считывает данные с концентратора событий и обрабатывает контрольную точку.
Нам нужно расширить интерфейс Event processor , который дает нам доступ к контексту раздела для определения контрольных точек и итерации EventData. Здесь происходит пара вещей.
- Я инициализирую класс обработчика полезной нагрузки события в методе onOpen .
- В методе onevent я создал цикл for для перебора событий и try catch в теле цикла, поэтому, если событие завершается ошибкой во время обработки, я могу обработать оставшиеся события.
- Если каждое событие обрабатывается успешно, я проверяю контрольную точку сразу после обработки.
ПРИМЕЧАНИЕ: Поскольку контрольная точка является асинхронной и не самой быстрой, я рекомендую устанавливать контрольную точку после некоторого количества обработанных событий. Поскольку этот пример невелик, я не утруждаю себя этим.
import com.dublin.eventhub.demo.model.EventPayload;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.SerializationUtils;
@Slf4j
@NoArgsConstructor
@Service
public class EventProcessor implements IEventProcessor {
private EventPayloadProcessor eventPayloadProcessor;
@Override
public void onOpen(PartitionContext partitionContext) {
eventPayloadProcessor = new EventPayloadProcessor();
}
@Override
public void onClose(PartitionContext partitionContext, CloseReason closeReason) {
}
@Override
public void onEvents(PartitionContext partitionContext, Iterable iterable) {
for(EventData event: iterable) {
try{
EventPayload eventPayload = (EventPayload) SerializationUtils.deserialize(event.getBytes());
eventPayloadProcessor.process(eventPayload);
partitionContext.checkpoint(event);
} catch (Exception e) {
log.error("An Error occured when processing event data, exception: ", e);
}
}
}
@Override
public void onError(PartitionContext partitionContext, Throwable throwable) {
}
}
И, наконец, я определил процессор полезной нагрузки событий ниже.
import com.dublin.eventhub.demo.model.EventPayload;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@NoArgsConstructor
@Service
class EventPayloadProcessor {
void process(EventPayload eventPayload) {
log.info("Hello! My name is {} and my favorite food is {}", eventPayload.getFirstName(), eventPayload.getFavoriteFood());
}
}
И это все. Все должно быть готово к броску. 🤠
Запуск приложения
Если вы следили за ходом событий с первой части. В event hub должно быть несколько сообщений, которые мы должны обработать. Таким образом, мы должны иметь возможность запустить приложение и обработать полезную нагрузку.
2019-10-06 08:12:18.242 INFO 42015 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller : Eventhub send endpoint called, sending EventPayload(firstName=Dublin, lastName=Anondson, email=null, favoriteFood=Nanas) to event hub..
2019-10-06 08:12:18.247 INFO 42015 --- [nio-8080-exec-1] c.d.e.demo.service.EventHubService : Sending message to the event hub event-hub-test
2019-10-06 08:12:18.253 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.MessageSender : clientId[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], path[event-hub-test], operationTimeout[PT1M], creating a send link
2019-10-06 08:12:18.258 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.MessagingFactory : messagingFactory[MF_41070c_1570367523209], hostName[dublin-rest-demo.servicebus.windows.net], getting a session.
2019-10-06 08:12:18.258 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler : onSessionLocalOpen connectionId[MF_41070c_1570367523209], entityName[cbs-session], condition[Error{condition=null, description='null', info=null}]
2019-10-06 08:12:18.259 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler : onLinkLocalOpen senderName[cbs], linkName[cbs:sender], localTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.259 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.ReceiveLinkHandler : onLinkLocalOpen receiverName[cbs], linkName[cbs:receiver], localSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
2019-10-06 08:12:18.327 INFO 42015 --- [pool-1-thread-2] c.m.azure.eventhubs.impl.SessionHandler : onSessionRemoteOpen connectionId[MF_41070c_1570367523209], entityName[cbs-session], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2019-10-06 08:12:18.328 INFO 42015 --- [pool-1-thread-2] c.m.a.eventhubs.impl.SendLinkHandler : onLinkRemoteOpen senderName[cbs], linkName[cbs:sender], remoteTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.328 INFO 42015 --- [pool-1-thread-2] c.m.a.eventhubs.impl.ReceiveLinkHandler : onLinkRemoteOpen receiverName[cbs], linkName[cbs:receiver], remoteSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
2019-10-06 08:12:18.329 INFO 42015 --- [pool-1-thread-2] c.m.a.e.impl.RequestResponseOpener : requestResponseChannel.onOpen complete clientId[MF_41070c_1570367523209], session[cbs-session], link[cbs], endpoint[$cbs]
2019-10-06 08:12:18.398 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.MessagingFactory : messagingFactory[MF_41070c_1570367523209], hostName[dublin-rest-demo.servicebus.windows.net], getting a session.
2019-10-06 08:12:18.398 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler : onSessionLocalOpen connectionId[MF_41070c_1570367523209], entityName[event-hub-test], condition[Error{condition=null, description='null', info=null}]
2019-10-06 08:12:18.464 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler : onSessionRemoteOpen connectionId[MF_41070c_1570367523209], entityName[event-hub-test], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2019-10-06 08:12:18.464 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler : onLinkLocalOpen senderName[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], linkName[LN_ddb0ee_1570367538464_5324_G9], localTarget[Target{address='event-hub-test', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.531 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler : onLinkRemoteOpen senderName[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], linkName[LN_ddb0ee_1570367538464_5324_G9], remoteTarget[Target{address='event-hub-test', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.531 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.MessageSender : onOpenComplete - clientId[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], sendPath[event-hub-test], linkName[LN_ddb0ee_1570367538464_5324_G9]
2019-10-06 08:12:18.678 INFO 42015 --- [b4f183ddd]-1-14] c.d.e.d.processor.EventPayloadProcessor : Hello! My name is Dublin and my favorite food is Nanas
Но, черт возьми, в журналах слишком много шума, тебе не кажется? Давайте исправим это, установив для уровня журнала пакетов event hub значение ERROR.
logging:
level:
com.microsoft.azure.*: ERROR
И попробуй еще раз, гораздо чище, тебе не кажется?
2019-10-06 08:55:48.241 INFO 47531 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2019-10-06 08:55:48.246 INFO 47531 --- [ main] com.dublin.eventhub.demo.Application : Started Application in 5.37 seconds (JVM running for 5.792) 2019-10-06 08:56:01.700 INFO 47531 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet' 2019-10-06 08:56:01.700 INFO 47531 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet' 2019-10-06 08:56:01.709 INFO 47531 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 9 ms 2019-10-06 08:56:01.817 INFO 47531 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller : Eventhub send endpoint called, sending EventPayload(firstName=Dublin, lastName=Anondson, email=null, favoriteFood=Nanas) to event hub.. 2019-10-06 08:56:01.822 INFO 47531 --- [nio-8080-exec-1] c.d.e.demo.service.EventHubService : Sending message to the event hub event-hub-test 2019-10-06 08:56:19.581 INFO 47531 --- [352fbae26]-1-14] c.d.e.d.processor.EventPayloadProcessor : Hello! My name is Dublin and my favorite food is Nanas
И все, мы подключились к event hub и зачитали наше сообщение!
Заключительные мысли
В этом руководстве мы рассмотрели создание учетной записи хранилища Azure, настройку подключения к event hub в нашей службе spring boot и считывание сообщений, которые мы ранее отправляли в event hub. Я надеюсь, что это руководство было полезным, дайте мне знать, что вы думаете в комментариях.
Счастливого кодирования!
Оригинал: “https://dev.to/danondso/getting-eventful-with-azure-event-hubs-part-two-10ji”