Рубрики
Без рубрики

Знакомство с послом

Откройте для себя Ambassador – высокопроизводительный автобус событий pub-sub.

Автор оригинала: baeldung.

1. Обзор

Проще говоря, Ambassador является высокопроизводительной шиной событий, использующей семантику publish-subscribe .

Сообщения передаются одному или нескольким одноранговым узлам без предварительного знания о количестве подписчиков и о том, как они используют сообщение.

2. Зависимость Maven

Прежде чем мы сможем использовать библиотеку, нам нужно добавить зависимость ambassador :


    net.engio
    mbassador
    1.3.1

3. Базовая Обработка Событий

3.1. Простой Пример

Мы начнем с простого примера публикации сообщения:

private MBassador dispatcher = new MBassador<>();
private String messageString;

@Before
public void prepareTests() {
    dispatcher.subscribe(this);
}

@Test
public void whenStringDispatched_thenHandleString() {
    dispatcher.post("TestString").now();
 
    assertNotNull(messageString);
    assertEquals("TestString", messageString);
}

@Handler
public void handleString(String message) {
    messageString = message;
}

В верхней части этого тестового класса мы видим создание Ambassador с его конструктором по умолчанию. Затем в методе @Before мы вызываем subscribe() и передаем ссылку на сам класс.

В subscribe ()/| диспетчер проверяет подписчика на наличие @обработчика аннотаций.

И в первом тесте мы вызываем dispatcher.post(…).now() для отправки сообщения, что приводит к вызову handleString () .

Этот первоначальный тест демонстрирует несколько важных концепций. Любой Объект может быть подписчиком, если у него есть один или несколько методов, аннотированных @Handler . У подписчика может быть любое количество обработчиков.

Мы используем тестовые объекты, которые подписываются на себя для простоты, но в большинстве производственных сценариев диспетчеры сообщений будут в разных классах от потребителей.

Методы обработчика имеют только один входной параметр – сообщение, и не могут выдавать никаких проверенных исключений.

Подобно методу subscribe () , метод post принимает любой Объект . Этот Объект доставляется подписчикам.

Когда сообщение публикуется, оно доставляется всем слушателям, подписавшимся на этот тип сообщения.

Давайте добавим еще один обработчик сообщений и отправим сообщение другого типа:

private Integer messageInteger; 

@Test
public void whenIntegerDispatched_thenHandleInteger() {
    dispatcher.post(42).now();
 
    assertNull(messageString);
    assertNotNull(messageInteger);
    assertTrue(42 == messageInteger);
}

@Handler
public void handleInteger(Integer message) {
    messageInteger = message;
}

Как и ожидалось, когда мы отправляем целое число , handle Integer() вызывается, а handle String () – нет. Один диспетчер может использоваться для отправки нескольких типов сообщений.

3.2. Мертвые сообщения

Так куда же отправляется сообщение, если для него нет обработчика? Давайте добавим новый обработчик событий, а затем отправим третий тип сообщения:

private Object deadEvent; 

@Test
public void whenLongDispatched_thenDeadEvent() {
    dispatcher.post(42L).now();
 
    assertNull(messageString);
    assertNull(messageInteger);
    assertNotNull(deadEvent);
    assertTrue(deadEvent instanceof Long);
    assertTrue(42L == (Long) deadEvent);
} 

@Handler
public void handleDeadEvent(DeadMessage message) {
    deadEvent = message.getMessage();
}

В этом тесте мы отправляем Long вместо Integer. Ни handle Integer() ни |/handle String() Не вызываются, но handle Dead Event() есть.

Когда для сообщения нет обработчиков, оно оборачивается в Мертвое сообщение объект. Поскольку мы добавили обработчик для Мертвого сообщения , мы его фиксируем.

Мертвое сообщение можно безопасно игнорировать; если приложению не нужно отслеживать мертвые сообщения, им можно разрешить никуда не отправляться.

4. Использование иерархии событий

Отправка String и Integer событий ограничена. Давайте создадим несколько классов сообщений:

public class Message {}

public class AckMessage extends Message {}

public class RejectMessage extends Message {
    int code;

    // setters and getters
}

У нас есть простой базовый класс и два класса, которые его расширяют.

4.1. Отправка сообщения базового класса

Начнем с Сообщения события:

private MBassador dispatcher = new MBassador<>();

private Message message;
private AckMessage ackMessage;
private RejectMessage rejectMessage;

@Before
public void prepareTests() {
    dispatcher.subscribe(this);
}

@Test
public void whenMessageDispatched_thenMessageHandled() {
    dispatcher.post(new Message()).now();
    assertNotNull(message);
    assertNull(ackMessage);
    assertNull(rejectMessage);
}

@Handler
public void handleMessage(Message message) {
    this.message = message;
}

@Handler
public void handleRejectMessage(RejectMessage message) {
   rejectMessage = message;
}

@Handler
public void handleAckMessage(AckMessage message) {
    ackMessage = message;
}

Откройте для себя MBassador – высокопроизводительную шину событий pub-sub. Это ограничивает нас в использовании Сообщений , но добавляет дополнительный уровень безопасности типов.

Когда мы отправляем Сообщение , handleMessage() получает его. Два других обработчика этого не делают.

4.2. Отправка сообщения подкласса

Давайте отправим Отклоненное сообщение :

@Test
public void whenRejectDispatched_thenMessageAndRejectHandled() {
    dispatcher.post(new RejectMessage()).now();
 
    assertNotNull(message);
    assertNotNull(rejectMessage);
    assertNull(ackMessage);
}

Когда мы отправляем Отклоненное сообщение оба обрабатываем отклоненное сообщение() и handleMessage() получаем его.

Поскольку Отклонить сообщение расширяет Сообщение, Обработчик Сообщения получил его в дополнение к R извлечению сообщения обработчика.

Давайте проверим это поведение с помощью сообщения Ack :

@Test
public void whenAckDispatched_thenMessageAndAckHandled() {
    dispatcher.post(new AckMessage()).now();
 
    assertNotNull(message);
    assertNotNull(ackMessage);
    assertNull(rejectMessage);
}

Как мы и ожидали, когда мы отправляем сообщение Ack , оба обрабатывают сообщение Ack() и handleMessage() получают его.

5. Фильтрация сообщений

Организация сообщений по типу уже является мощной функцией, но мы можем фильтровать их еще больше.

5.1. Фильтр по классам и подклассам

Когда мы отправили Сообщение об отклонении или Сообщение Ack , мы получили событие как в обработчике событий для конкретного типа, так и в базовом классе.

Мы можем решить эту проблему иерархии типов, сделав Message абстрактным и создав класс, такой как Generic Message . Но что, если у нас нет такой роскоши?

Мы можем использовать фильтры сообщений:

private Message baseMessage;
private Message subMessage;

@Test
public void whenMessageDispatched_thenMessageFiltered() {
    dispatcher.post(new Message()).now();
 
    assertNotNull(baseMessage);
    assertNull(subMessage);
}

@Test
public void whenRejectDispatched_thenRejectFiltered() {
    dispatcher.post(new RejectMessage()).now();
 
    assertNotNull(subMessage);
    assertNull(baseMessage);
}

@Handler(filters = { @Filter(Filters.RejectSubtypes.class) })
public void handleBaseMessage(Message message) {
    this.baseMessage = message;
}

@Handler(filters = { @Filter(Filters.SubtypesOnly.class) })
public void handleSubMessage(Message message) {
    this.subMessage = message;
}

Параметр filters для аннотации @Handler принимает Класс , реализующий IMessageFilter . Библиотека предлагает два примера:

Filters.Reject Subtypes делает так, как подсказывает его название: он отфильтрует любые подтипы. В этом случае мы видим, что Отклонить сообщение не обрабатывается handle Base Message().

Фильтры .Только подтипы также делает то, что следует из его названия: он будет отфильтровывать любые базовые типы. В этом случае мы видим, что Message не обрабатывается handle Sub Message().

5.2. Фильтр IMessageFilter

Подтипы Filters.Reject и Filters.Только подтипы оба реализуют IMessageFilter .

RejectSubTypes сравнивает класс сообщения с определенными типами сообщений и разрешает только сообщения, равные одному из его типов, в отличие от любых подклассов.

5.3. Фильтр С Условиями

К счастью, существует более простой способ фильтрации сообщений. Ambassador поддерживает подмножество выражений Java EL в качестве условий для фильтрации сообщений.

Давайте отфильтруем сообщение String на основе его длины:

private String testString;

@Test
public void whenLongStringDispatched_thenStringFiltered() {
    dispatcher.post("foobar!").now();
 
    assertNull(testString);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
    this.testString = message;
}

Сообщение “foobar!” состоит из семи символов и фильтруется. Давайте отправим более короткую строку :

    
@Test
public void whenShortStringDispatched_thenStringHandled() {
    dispatcher.post("foobar").now();
 
    assertNotNull(testString);
}

Теперь “foobar” имеет длину всего шесть символов и проходит через него.

Наше сообщение Отклонить содержит поле с аксессуаром. Давайте напишем фильтр для этого:

private RejectMessage rejectMessage;

@Test
public void whenWrongRejectDispatched_thenRejectFiltered() {

    RejectMessage testReject = new RejectMessage();
    testReject.setCode(-1);

    dispatcher.post(testReject).now();
 
    assertNull(rejectMessage);
    assertNotNull(subMessage);
    assertEquals(-1, ((RejectMessage) subMessage).getCode());
}

@Handler(condition = "msg.getCode() != -1")
public void handleRejectMessage(RejectMessage rejectMessage) {
    this.rejectMessage = rejectMessage;
}

Здесь мы снова можем запросить метод для объекта и либо отфильтровать сообщение, либо нет.

5.4. Захват Отфильтрованных сообщений

Подобно Мертвым событиям, мы можем захотеть захватить и обработать отфильтрованные сообщения. Существует также специальный механизм для захвата отфильтрованных событий. Отфильтрованные события обрабатываются иначе, чем “мертвые” события.

Давайте напишем тест, который иллюстрирует это:

private String testString;
private FilteredMessage filteredMessage;
private DeadMessage deadMessage;

@Test
public void whenLongStringDispatched_thenStringFiltered() {
    dispatcher.post("foobar!").now();
 
    assertNull(testString);
    assertNotNull(filteredMessage);
    assertTrue(filteredMessage.getMessage() instanceof String);
    assertNull(deadMessage);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
    this.testString = message;
}

@Handler
public void handleFilterMessage(FilteredMessage message) {
    this.filteredMessage = message;
}

@Handler
public void handleDeadMessage(DeadMessage deadMessage) {
    this.deadMessage = deadMessage;
}

С добавлением обработчика Фильтрованного сообщения мы можем отслеживать Строки|/, которые фильтруются из-за их длины. Сообщение filter содержит слишком длинную Строку , в то время как мертвое сообщение остается нулевым.

6. Асинхронная отправка и обработка сообщений

До сих пор во всех наших примерах использовалась синхронная отправка сообщений; когда мы вызывали post.now() сообщения доставлялись каждому обработчику в том же потоке, из которого мы вызывали post () .

6.1. Асинхронная отправка

Ambassador.post() возвращает команду Sync Async Post . Этот класс предлагает несколько методов, в том числе:

  • теперь() – отправка сообщений синхронно; вызов будет блокироваться до тех пор, пока все сообщения не будут доставлены
  • асинхронно() – асинхронно выполняет публикацию сообщения

Давайте использовать асинхронную отправку в примере класса. Мы будем использовать Awaitility в этих тестах, чтобы упростить код:

private MBassador dispatcher = new MBassador<>();
private String testString;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenAsyncDispatched_thenMessageReceived() {
    dispatcher.post("foobar").asynchronously();
 
    await().untilAtomic(ready, equalTo(true));
    assertNotNull(testString);
}

@Handler
public void handleStringMessage(String message) {
    this.testString = message;
    ready.set(true);
}

В этом тесте мы вызываем асинхронно() и используем AtomicBoolean в качестве флага с await () , чтобы дождаться, пока поток доставки доставит сообщение.

Если мы закомментируем вызов wait() , мы рискуем провалить тест, потому что мы проверяем тестовую строку до завершения потока доставки.

6.2. Вызов Асинхронного обработчика

Асинхронная отправка позволяет поставщику сообщений вернуться к обработке сообщений до того, как сообщения будут доставлены каждому обработчику, но он по-прежнему вызывает каждый обработчик по порядку, и каждый обработчик должен дождаться завершения предыдущего.

Это может привести к проблемам, если один обработчик выполняет дорогостоящую операцию.

Ambassador предоставляет механизм для асинхронного вызова обработчика. Обработчики, настроенные для этого, получают сообщения в своем потоке:

private Integer testInteger;
private String invocationThreadName;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenHandlerAsync_thenHandled() {
    dispatcher.post(42).now();
 
    await().untilAtomic(ready, equalTo(true));
    assertNotNull(testInteger);
    assertFalse(Thread.currentThread().getName().equals(invocationThreadName));
}

@Handler(delivery = Invoke.Asynchronously)
public void handleIntegerMessage(Integer message) {
 
    this.invocationThreadName = Thread.currentThread().getName();
    this.testInteger = message;
    ready.set(true);
}

Обработчики могут запрашивать асинхронный вызов с помощью delivery.Асинхронно свойство в обработчике аннотации. Мы проверяем это в нашем тесте, сравнивая имена Thread в методе диспетчеризации и обработчике.

7. Настройка посла

До сих пор мы использовали экземпляр Ambassador с его конфигурацией по умолчанию. Поведение диспетчера может быть изменено с помощью аннотаций, подобных тем, которые мы видели до сих пор; мы рассмотрим еще несколько, чтобы закончить этот учебник.

7.1. Обработка исключений

Обработчики не могут определить проверенные исключения. Вместо этого диспетчеру может быть предоставлен Я использую ErrorHandler в качестве аргумента для его конструктора:

public class MBassadorConfigurationTest
  implements IPublicationErrorHandler {

    private MBassador dispatcher;
    private String messageString;
    private Throwable errorCause;

    @Before
    public void prepareTests() {
        dispatcher = new MBassador(this);
        dispatcher.subscribe(this);
    }

    @Test
    public void whenErrorOccurs_thenErrorHandler() {
        dispatcher.post("Error").now();
 
        assertNull(messageString);
        assertNotNull(errorCause);
    }

    @Test
    public void whenNoErrorOccurs_thenStringHandler() {
        dispatcher.post("Error").now();
 
        assertNull(errorCause);
        assertNotNull(messageString);
    }

    @Handler
    public void handleString(String message) {
        if ("Error".equals(message)) {
            throw new Error("BOOM");
        }
        messageString = message;
    }

    @Override
    public void handleError(PublicationError error) {
        errorCause = error.getCause().getCause();
    }
}

Когда handle String() выдает Ошибку, она сохраняется в Причина ошибки.

7.2. Приоритет обработчика

Обработчики вызываются в обратном порядке их добавления, но это не то поведение, на которое мы хотим полагаться. Даже имея возможность вызывать обработчики в своих потоках, нам все равно может потребоваться знать, в каком порядке они будут вызываться.

Мы можем явно установить приоритет обработчика:

private LinkedList list = new LinkedList<>();

@Test
public void whenRejectDispatched_thenPriorityHandled() {
    dispatcher.post(new RejectMessage()).now();

    // Items should pop() off in reverse priority order
    assertTrue(1 == list.pop());
    assertTrue(3 == list.pop());
    assertTrue(5 == list.pop());
}

@Handler(priority = 5)
public void handleRejectMessage5(RejectMessage rejectMessage) {
    list.push(5);
}

@Handler(priority = 3)
public void handleRejectMessage3(RejectMessage rejectMessage) {
    list.push(3);
}

@Handler(priority = 2, rejectSubtypes = true)
public void handleMessage(Message rejectMessage) 
    logger.error("Reject handler #3");
    list.push(3);
}

@Handler(priority = 0)
public void handleRejectMessage0(RejectMessage rejectMessage) {
    list.push(1);
}

Обработчики вызываются от самого высокого приоритета до самого низкого. Обработчики с приоритетом по умолчанию, равным нулю, вызываются последними. Мы видим, что обработчик нумерует pop() off в обратном порядке.

7.3. Отклонить Подтипы, самый Простой Способ

Что случилось с handleMessage() в приведенном выше тесте? Нам не нужно использовать RejectSubTypes.class для фильтрации подтипов.

Отклонить подтипы – это логический флаг, который обеспечивает ту же фильтрацию, что и класс, но с лучшей производительностью, чем реализация IMessageFilter|/.

Нам все еще нужно использовать реализацию на основе фильтров для приема только подтипов, через.

8. Заключение

Ambassador-это простая и понятная библиотека для передачи сообщений между объектами. Сообщения могут быть организованы различными способами и могут отправляться синхронно или асинхронно.

И, как всегда, пример доступен в этом проекте GitHub .