Рубрики
Без рубрики

Насыщение Событиями С Помощью Центров Событий Azure: Часть Вторая

В первой части этой серии я рассказал о создании центра событий в azure, подключив его к моей spring… Помечено как java, tutorial, spring, intermediate.

В первой части этой серии я рассказал о создании центра событий в azure, подключении его к моей веб-службе spring boot, а затем отправке на него сообщений. Теперь я собираюсь перейти к чтению этих сообщений из event hub и их обработке.

Как и в первой части, завершенный пример можно найти здесь.

Создание учетной записи хранилища в Azure

Зачем нам нужна учетная запись хранилища?

Нам нужна настройка учетной записи хранилища, чтобы мы могли хранить договоры аренды разделов и контрольные точки.

Вернитесь на портал azure и нажмите на учетные записи хранилища.

Затем нажмите кнопку Добавить (+), и вы попадете на экран создания учетной записи хранилища.

Заполните это поле и нажмите “Просмотреть + Создать” внизу. Мы должны получить одобрение от Microsoft, если что-то пошло не так, внести необходимые исправления и повторить попытку.

Как только будет получено согласие, мы можем нажать “Создать”, и azure запустит нашу учетную запись хранилища.

Получение Информации О Ключе Доступа

Как только учетная запись хранилища будет создана, перейдите к ней и выберите опцию “Ключ доступа” в боковом меню и скопируйте любую из строк подключения.

Создание узла обработки событий

В нашем классе конфигурации EventHub я создал компонент EventProcessorHost.

import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.concurrent.Executors;

@Configuration
public class EventHubConfig {

    @Value("${eventHub.connectionString}")
    private String connectionString;

    @Value("${eventHub.name}")
    private String eventHubName;

    @Value("${eventHub.storage.consumerGroupName}")
    private String consumerGroupName;

    @Value("${eventHub.storage.hostNamePrefix}")
    private String hostNamePrefix;

    @Value("${eventHub.storage.storageConnectionString}")
    private String storageConnectionString;

    @Value("${eventHub.storage.storageContainerName}")
    private String storageContainerName;

    @Bean
    public EventHubClient setupEventHubConnection() throws IOException, EventHubException {
        return EventHubClient.createFromConnectionStringSync(connectionString, Executors.newScheduledThreadPool(1));
    }

    @Bean
    public EventProcessorHost createEventHubProcessorHost() {
        return EventProcessorHost.EventProcessorHostBuilder
                .newBuilder(EventProcessorHost.createHostName(hostNamePrefix), consumerGroupName)
                .useAzureStorageCheckpointLeaseManager(storageConnectionString, storageContainerName, null)
                .useEventHubConnectionString(connectionString, eventHubName)
                .build();
    }
}

Для этого примера я использую EventProcessorHost builder и передаю для него необходимую конфигурацию. Я определил конфигурацию в своем приложении.yml ниже.

eventHub:
  connectionString: 'connections string here' #Event Hub connection string goes here
  name: 'event hub name here' # The name of the event hub
  storage:
    consumerGroupName: $Default #This is the default consumer group for event hub, but it can be customized
    storageConnectionString: 'storage connection string here'
    storageContainerName: 'storage container name here'
    hostNamePrefix: 'some unique prefix' # Identifies the instance of the EventProcessorHost

Теперь, когда у нас есть компонент EventProcessorHost, мы можем внедрить его в нашу службу EventProcessorHost.

import com.dublin.eventhub.demo.exception.ErrorNotificationHandler;
import com.dublin.eventhub.demo.processor.EventProcessor;
import com.microsoft.azure.eventprocessorhost.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutionException;


@Component
public class EventProcessorHostService {

    private final EventProcessorHost eventProcessorHost;
    private Logger log = LoggerFactory.getLogger(EventProcessorHostService.class);

    @Autowired
    public EventProcessorHostService(EventProcessorHost eventProcessorHost) {
        this.eventProcessorHost = eventProcessorHost;
    }

    @PostConstruct
    public void run() throws ExecutionException, InterruptedException {
        log.info("Setting up event hub {}", eventProcessorHost.getHostName());
        EventProcessorOptions options = new EventProcessorOptions();
        options.setExceptionNotification(new ErrorNotificationHandler());
        eventProcessorHost.registerEventProcessor(EventProcessor.class, options).get();
    }
}

Здесь нужно кое-что распаковать, так что давайте разберемся, что происходит.

  • Я использую внедрение конструктора для передачи компонента EventProcessorHost, который я определил ранее.
  • Я определил базовый класс обработчика уведомлений о событиях, который обрабатывает ошибки, не связанные с конкретными разделами (например, сбои инициализации). Я передаю это на EventProcessorHost, обернув его в объект параметров процессора событий.
  • Я регистрирую обработчик событий, он регистрирует хост в центре событий и получает права аренды на некоторые разделы, чтобы мы могли начать обработку сообщений. Для каждой аренды раздела для этого раздела создается экземпляр обработчика событий.
  • Наконец, я использую @PostConstruct для инициализации подключения к концентратору событий.

Ниже приведен Обработчик уведомлений об ошибках.

import java.util.function.Consumer;

import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ErrorNotificationHandler implements Consumer {

    @Override
    public void accept(ExceptionReceivedEventArgs t) {
        log.error("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
    }
}

Теперь наш EventProcessorHost настроен и готов к запуску, но для того, чтобы мы могли обрабатывать сообщения, нам также необходимо определить класс обработчика событий, который считывает данные с концентратора событий и обрабатывает контрольную точку.

Нам нужно расширить интерфейс Event processor , который дает нам доступ к контексту раздела для определения контрольных точек и итерации EventData. Здесь происходит пара вещей.

  • Я инициализирую класс обработчика полезной нагрузки события в методе onOpen .
  • В методе onevent я создал цикл for для перебора событий и try catch в теле цикла, поэтому, если событие завершается ошибкой во время обработки, я могу обработать оставшиеся события.
  • Если каждое событие обрабатывается успешно, я проверяю контрольную точку сразу после обработки.

ПРИМЕЧАНИЕ: Поскольку контрольная точка является асинхронной и не самой быстрой, я рекомендую устанавливать контрольную точку после некоторого количества обработанных событий. Поскольку этот пример невелик, я не утруждаю себя этим.

import com.dublin.eventhub.demo.model.EventPayload;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.SerializationUtils;

@Slf4j
@NoArgsConstructor
@Service
public class EventProcessor implements IEventProcessor {

    private EventPayloadProcessor eventPayloadProcessor;

    @Override
    public void onOpen(PartitionContext partitionContext) {
        eventPayloadProcessor = new EventPayloadProcessor();
    }

    @Override
    public void onClose(PartitionContext partitionContext, CloseReason closeReason) {

    }

    @Override
    public void onEvents(PartitionContext partitionContext, Iterable iterable) {
        for(EventData event: iterable) {
            try{
                EventPayload eventPayload = (EventPayload) SerializationUtils.deserialize(event.getBytes());
                eventPayloadProcessor.process(eventPayload);
                partitionContext.checkpoint(event);
            } catch (Exception e) {
                log.error("An Error occured when processing event data, exception: ", e);
            }

        }
    }

    @Override
    public void onError(PartitionContext partitionContext, Throwable throwable) {
    }
}

И, наконец, я определил процессор полезной нагрузки событий ниже.

import com.dublin.eventhub.demo.model.EventPayload;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@NoArgsConstructor
@Service
class EventPayloadProcessor {

    void process(EventPayload eventPayload) {
        log.info("Hello! My name is {} and my favorite food is {}", eventPayload.getFirstName(), eventPayload.getFavoriteFood());
    }
}

И это все. Все должно быть готово к броску. 🤠

Запуск приложения

Если вы следили за ходом событий с первой части. В event hub должно быть несколько сообщений, которые мы должны обработать. Таким образом, мы должны иметь возможность запустить приложение и обработать полезную нагрузку.

2019-10-06 08:12:18.242  INFO 42015 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller  : Eventhub send endpoint called, sending EventPayload(firstName=Dublin, lastName=Anondson, email=null, favoriteFood=Nanas) to event hub..
2019-10-06 08:12:18.247  INFO 42015 --- [nio-8080-exec-1] c.d.e.demo.service.EventHubService       : Sending message to the event hub event-hub-test
2019-10-06 08:12:18.253  INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.MessageSender   : clientId[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], path[event-hub-test], operationTimeout[PT1M], creating a send link
2019-10-06 08:12:18.258  INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.MessagingFactory    : messagingFactory[MF_41070c_1570367523209], hostName[dublin-rest-demo.servicebus.windows.net], getting a session.
2019-10-06 08:12:18.258  INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler  : onSessionLocalOpen connectionId[MF_41070c_1570367523209], entityName[cbs-session], condition[Error{condition=null, description='null', info=null}]
2019-10-06 08:12:18.259  INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler     : onLinkLocalOpen senderName[cbs], linkName[cbs:sender], localTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.259  INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.ReceiveLinkHandler  : onLinkLocalOpen receiverName[cbs], linkName[cbs:receiver], localSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
2019-10-06 08:12:18.327  INFO 42015 --- [pool-1-thread-2] c.m.azure.eventhubs.impl.SessionHandler  : onSessionRemoteOpen connectionId[MF_41070c_1570367523209], entityName[cbs-session], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2019-10-06 08:12:18.328  INFO 42015 --- [pool-1-thread-2] c.m.a.eventhubs.impl.SendLinkHandler     : onLinkRemoteOpen senderName[cbs], linkName[cbs:sender], remoteTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.328  INFO 42015 --- [pool-1-thread-2] c.m.a.eventhubs.impl.ReceiveLinkHandler  : onLinkRemoteOpen receiverName[cbs], linkName[cbs:receiver], remoteSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
2019-10-06 08:12:18.329  INFO 42015 --- [pool-1-thread-2] c.m.a.e.impl.RequestResponseOpener       : requestResponseChannel.onOpen complete clientId[MF_41070c_1570367523209], session[cbs-session], link[cbs], endpoint[$cbs]
2019-10-06 08:12:18.398  INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.MessagingFactory    : messagingFactory[MF_41070c_1570367523209], hostName[dublin-rest-demo.servicebus.windows.net], getting a session.
2019-10-06 08:12:18.398  INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler  : onSessionLocalOpen connectionId[MF_41070c_1570367523209], entityName[event-hub-test], condition[Error{condition=null, description='null', info=null}]
2019-10-06 08:12:18.464  INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler  : onSessionRemoteOpen connectionId[MF_41070c_1570367523209], entityName[event-hub-test], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2019-10-06 08:12:18.464  INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler     : onLinkLocalOpen senderName[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], linkName[LN_ddb0ee_1570367538464_5324_G9], localTarget[Target{address='event-hub-test', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.531  INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler     : onLinkRemoteOpen senderName[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], linkName[LN_ddb0ee_1570367538464_5324_G9], remoteTarget[Target{address='event-hub-test', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.531  INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.MessageSender   : onOpenComplete - clientId[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], sendPath[event-hub-test], linkName[LN_ddb0ee_1570367538464_5324_G9]
2019-10-06 08:12:18.678  INFO 42015 --- [b4f183ddd]-1-14] c.d.e.d.processor.EventPayloadProcessor  : Hello! My name is Dublin and my favorite food is Nanas

Но, черт возьми, в журналах слишком много шума, тебе не кажется? Давайте исправим это, установив для уровня журнала пакетов event hub значение ERROR.

logging:
  level:
    com.microsoft.azure.*: ERROR

И попробуй еще раз, гораздо чище, тебе не кажется?

2019-10-06 08:55:48.241  INFO 47531 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2019-10-06 08:55:48.246  INFO 47531 --- [           main] com.dublin.eventhub.demo.Application     : Started Application in 5.37 seconds (JVM running for 5.792)
2019-10-06 08:56:01.700  INFO 47531 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2019-10-06 08:56:01.700  INFO 47531 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2019-10-06 08:56:01.709  INFO 47531 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 9 ms
2019-10-06 08:56:01.817  INFO 47531 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller  : Eventhub send endpoint called, sending EventPayload(firstName=Dublin, lastName=Anondson, email=null, favoriteFood=Nanas) to event hub..
2019-10-06 08:56:01.822  INFO 47531 --- [nio-8080-exec-1] c.d.e.demo.service.EventHubService       : Sending message to the event hub event-hub-test
2019-10-06 08:56:19.581  INFO 47531 --- [352fbae26]-1-14] c.d.e.d.processor.EventPayloadProcessor  : Hello! My name is Dublin and my favorite food is Nanas

И все, мы подключились к event hub и зачитали наше сообщение!

Заключительные мысли

В этом руководстве мы рассмотрели создание учетной записи хранилища Azure, настройку подключения к event hub в нашей службе spring boot и считывание сообщений, которые мы ранее отправляли в event hub. Я надеюсь, что это руководство было полезным, дайте мне знать, что вы думаете в комментариях.

Счастливого кодирования!

Оригинал: “https://dev.to/danondso/getting-eventful-with-azure-event-hubs-part-two-10ji”