1. Обзор
Проще говоря, Ambassador является высокопроизводительной шиной событий, использующей семантику publish-subscribe .
Сообщения передаются одному или нескольким одноранговым узлам без предварительного знания о количестве подписчиков и о том, как они используют сообщение.
2. Зависимость Maven
Прежде чем мы сможем использовать библиотеку, нам нужно добавить зависимость ambassador :
net.engio mbassador 1.3.1
3. Базовая Обработка Событий
3.1. Простой Пример
Мы начнем с простого примера публикации сообщения:
private MBassador
В верхней части этого тестового класса мы видим создание Ambassador с его конструктором по умолчанию. Затем в методе @Before мы вызываем subscribe() и передаем ссылку на сам класс.
В subscribe ()/| диспетчер проверяет подписчика на наличие @обработчика аннотаций.
И в первом тесте мы вызываем dispatcher.post(…).now() для отправки сообщения, что приводит к вызову handleString () .
Этот первоначальный тест демонстрирует несколько важных концепций. Любой Объект может быть подписчиком, если у него есть один или несколько методов, аннотированных @Handler . У подписчика может быть любое количество обработчиков.
Мы используем тестовые объекты, которые подписываются на себя для простоты, но в большинстве производственных сценариев диспетчеры сообщений будут в разных классах от потребителей.
Методы обработчика имеют только один входной параметр – сообщение, и не могут выдавать никаких проверенных исключений.
Подобно методу subscribe () , метод post принимает любой Объект . Этот Объект доставляется подписчикам.
Когда сообщение публикуется, оно доставляется всем слушателям, подписавшимся на этот тип сообщения.
Давайте добавим еще один обработчик сообщений и отправим сообщение другого типа:
private Integer messageInteger; @Test public void whenIntegerDispatched_thenHandleInteger() { dispatcher.post(42).now(); assertNull(messageString); assertNotNull(messageInteger); assertTrue(42 == messageInteger); } @Handler public void handleInteger(Integer message) { messageInteger = message; }
Как и ожидалось, когда мы отправляем целое число , handle Integer() вызывается, а handle String () – нет. Один диспетчер может использоваться для отправки нескольких типов сообщений.
3.2. Мертвые сообщения
Так куда же отправляется сообщение, если для него нет обработчика? Давайте добавим новый обработчик событий, а затем отправим третий тип сообщения:
private Object deadEvent; @Test public void whenLongDispatched_thenDeadEvent() { dispatcher.post(42L).now(); assertNull(messageString); assertNull(messageInteger); assertNotNull(deadEvent); assertTrue(deadEvent instanceof Long); assertTrue(42L == (Long) deadEvent); } @Handler public void handleDeadEvent(DeadMessage message) { deadEvent = message.getMessage(); }
В этом тесте мы отправляем Long вместо Integer. Ни handle Integer() ни |/handle String() Не вызываются, но handle Dead Event() есть.
Когда для сообщения нет обработчиков, оно оборачивается в Мертвое сообщение объект. Поскольку мы добавили обработчик для Мертвого сообщения , мы его фиксируем.
Мертвое сообщение можно безопасно игнорировать; если приложению не нужно отслеживать мертвые сообщения, им можно разрешить никуда не отправляться.
4. Использование иерархии событий
Отправка String и Integer событий ограничена. Давайте создадим несколько классов сообщений:
public class Message {} public class AckMessage extends Message {} public class RejectMessage extends Message { int code; // setters and getters }
У нас есть простой базовый класс и два класса, которые его расширяют.
4.1. Отправка сообщения базового класса
Начнем с Сообщения события:
private MBassadordispatcher = new MBassador<>(); private Message message; private AckMessage ackMessage; private RejectMessage rejectMessage; @Before public void prepareTests() { dispatcher.subscribe(this); } @Test public void whenMessageDispatched_thenMessageHandled() { dispatcher.post(new Message()).now(); assertNotNull(message); assertNull(ackMessage); assertNull(rejectMessage); } @Handler public void handleMessage(Message message) { this.message = message; } @Handler public void handleRejectMessage(RejectMessage message) { rejectMessage = message; } @Handler public void handleAckMessage(AckMessage message) { ackMessage = message; }
Откройте для себя MBassador – высокопроизводительную шину событий pub-sub. Это ограничивает нас в использовании Сообщений , но добавляет дополнительный уровень безопасности типов.
Когда мы отправляем Сообщение , handleMessage() получает его. Два других обработчика этого не делают.
4.2. Отправка сообщения подкласса
Давайте отправим Отклоненное сообщение :
@Test public void whenRejectDispatched_thenMessageAndRejectHandled() { dispatcher.post(new RejectMessage()).now(); assertNotNull(message); assertNotNull(rejectMessage); assertNull(ackMessage); }
Когда мы отправляем Отклоненное сообщение оба обрабатываем отклоненное сообщение() и handleMessage() получаем его.
Поскольку Отклонить сообщение расширяет Сообщение, Обработчик Сообщения получил его в дополнение к R извлечению сообщения обработчика.
Давайте проверим это поведение с помощью сообщения Ack :
@Test public void whenAckDispatched_thenMessageAndAckHandled() { dispatcher.post(new AckMessage()).now(); assertNotNull(message); assertNotNull(ackMessage); assertNull(rejectMessage); }
Как мы и ожидали, когда мы отправляем сообщение Ack , оба обрабатывают сообщение Ack() и handleMessage() получают его.
5. Фильтрация сообщений
Организация сообщений по типу уже является мощной функцией, но мы можем фильтровать их еще больше.
5.1. Фильтр по классам и подклассам
Когда мы отправили Сообщение об отклонении или Сообщение Ack , мы получили событие как в обработчике событий для конкретного типа, так и в базовом классе.
Мы можем решить эту проблему иерархии типов, сделав Message абстрактным и создав класс, такой как Generic Message . Но что, если у нас нет такой роскоши?
Мы можем использовать фильтры сообщений:
private Message baseMessage; private Message subMessage; @Test public void whenMessageDispatched_thenMessageFiltered() { dispatcher.post(new Message()).now(); assertNotNull(baseMessage); assertNull(subMessage); } @Test public void whenRejectDispatched_thenRejectFiltered() { dispatcher.post(new RejectMessage()).now(); assertNotNull(subMessage); assertNull(baseMessage); } @Handler(filters = { @Filter(Filters.RejectSubtypes.class) }) public void handleBaseMessage(Message message) { this.baseMessage = message; } @Handler(filters = { @Filter(Filters.SubtypesOnly.class) }) public void handleSubMessage(Message message) { this.subMessage = message; }
Параметр filters для аннотации @Handler принимает Класс , реализующий IMessageFilter . Библиотека предлагает два примера:
Filters.Reject Subtypes делает так, как подсказывает его название: он отфильтрует любые подтипы. В этом случае мы видим, что Отклонить сообщение не обрабатывается handle Base Message().
Фильтры .Только подтипы также делает то, что следует из его названия: он будет отфильтровывать любые базовые типы. В этом случае мы видим, что Message не обрабатывается handle Sub Message().
5.2. Фильтр IMessageFilter
Подтипы Filters.Reject и Filters.Только подтипы оба реализуют IMessageFilter .
RejectSubTypes сравнивает класс сообщения с определенными типами сообщений и разрешает только сообщения, равные одному из его типов, в отличие от любых подклассов.
5.3. Фильтр С Условиями
К счастью, существует более простой способ фильтрации сообщений. Ambassador поддерживает подмножество выражений Java EL в качестве условий для фильтрации сообщений.
Давайте отфильтруем сообщение String на основе его длины:
private String testString; @Test public void whenLongStringDispatched_thenStringFiltered() { dispatcher.post("foobar!").now(); assertNull(testString); } @Handler(condition = "msg.length() < 7") public void handleStringMessage(String message) { this.testString = message; }
Сообщение “foobar!” состоит из семи символов и фильтруется. Давайте отправим более короткую строку :
@Test public void whenShortStringDispatched_thenStringHandled() { dispatcher.post("foobar").now(); assertNotNull(testString); }
Теперь “foobar” имеет длину всего шесть символов и проходит через него.
Наше сообщение Отклонить содержит поле с аксессуаром. Давайте напишем фильтр для этого:
private RejectMessage rejectMessage; @Test public void whenWrongRejectDispatched_thenRejectFiltered() { RejectMessage testReject = new RejectMessage(); testReject.setCode(-1); dispatcher.post(testReject).now(); assertNull(rejectMessage); assertNotNull(subMessage); assertEquals(-1, ((RejectMessage) subMessage).getCode()); } @Handler(condition = "msg.getCode() != -1") public void handleRejectMessage(RejectMessage rejectMessage) { this.rejectMessage = rejectMessage; }
Здесь мы снова можем запросить метод для объекта и либо отфильтровать сообщение, либо нет.
5.4. Захват Отфильтрованных сообщений
Подобно Мертвым событиям, мы можем захотеть захватить и обработать отфильтрованные сообщения. Существует также специальный механизм для захвата отфильтрованных событий. Отфильтрованные события обрабатываются иначе, чем “мертвые” события.
Давайте напишем тест, который иллюстрирует это:
private String testString; private FilteredMessage filteredMessage; private DeadMessage deadMessage; @Test public void whenLongStringDispatched_thenStringFiltered() { dispatcher.post("foobar!").now(); assertNull(testString); assertNotNull(filteredMessage); assertTrue(filteredMessage.getMessage() instanceof String); assertNull(deadMessage); } @Handler(condition = "msg.length() < 7") public void handleStringMessage(String message) { this.testString = message; } @Handler public void handleFilterMessage(FilteredMessage message) { this.filteredMessage = message; } @Handler public void handleDeadMessage(DeadMessage deadMessage) { this.deadMessage = deadMessage; }
С добавлением обработчика Фильтрованного сообщения мы можем отслеживать Строки|/, которые фильтруются из-за их длины. Сообщение filter содержит слишком длинную Строку , в то время как мертвое сообщение остается нулевым.
6. Асинхронная отправка и обработка сообщений
До сих пор во всех наших примерах использовалась синхронная отправка сообщений; когда мы вызывали post.now() сообщения доставлялись каждому обработчику в том же потоке, из которого мы вызывали post () .
6.1. Асинхронная отправка
Ambassador.post() возвращает команду Sync Async Post . Этот класс предлагает несколько методов, в том числе:
- теперь() – отправка сообщений синхронно; вызов будет блокироваться до тех пор, пока все сообщения не будут доставлены
- асинхронно() – асинхронно выполняет публикацию сообщения
Давайте использовать асинхронную отправку в примере класса. Мы будем использовать Awaitility в этих тестах, чтобы упростить код:
private MBassadordispatcher = new MBassador<>(); private String testString; private AtomicBoolean ready = new AtomicBoolean(false); @Test public void whenAsyncDispatched_thenMessageReceived() { dispatcher.post("foobar").asynchronously(); await().untilAtomic(ready, equalTo(true)); assertNotNull(testString); } @Handler public void handleStringMessage(String message) { this.testString = message; ready.set(true); }
В этом тесте мы вызываем асинхронно() и используем AtomicBoolean в качестве флага с await () , чтобы дождаться, пока поток доставки доставит сообщение.
Если мы закомментируем вызов wait() , мы рискуем провалить тест, потому что мы проверяем тестовую строку до завершения потока доставки.
6.2. Вызов Асинхронного обработчика
Асинхронная отправка позволяет поставщику сообщений вернуться к обработке сообщений до того, как сообщения будут доставлены каждому обработчику, но он по-прежнему вызывает каждый обработчик по порядку, и каждый обработчик должен дождаться завершения предыдущего.
Это может привести к проблемам, если один обработчик выполняет дорогостоящую операцию.
Ambassador предоставляет механизм для асинхронного вызова обработчика. Обработчики, настроенные для этого, получают сообщения в своем потоке:
private Integer testInteger; private String invocationThreadName; private AtomicBoolean ready = new AtomicBoolean(false); @Test public void whenHandlerAsync_thenHandled() { dispatcher.post(42).now(); await().untilAtomic(ready, equalTo(true)); assertNotNull(testInteger); assertFalse(Thread.currentThread().getName().equals(invocationThreadName)); } @Handler(delivery = Invoke.Asynchronously) public void handleIntegerMessage(Integer message) { this.invocationThreadName = Thread.currentThread().getName(); this.testInteger = message; ready.set(true); }
Обработчики могут запрашивать асинхронный вызов с помощью delivery.Асинхронно свойство в обработчике аннотации. Мы проверяем это в нашем тесте, сравнивая имена Thread в методе диспетчеризации и обработчике.
7. Настройка посла
До сих пор мы использовали экземпляр Ambassador с его конфигурацией по умолчанию. Поведение диспетчера может быть изменено с помощью аннотаций, подобных тем, которые мы видели до сих пор; мы рассмотрим еще несколько, чтобы закончить этот учебник.
7.1. Обработка исключений
Обработчики не могут определить проверенные исключения. Вместо этого диспетчеру может быть предоставлен Я использую ErrorHandler в качестве аргумента для его конструктора:
public class MBassadorConfigurationTest implements IPublicationErrorHandler { private MBassador dispatcher; private String messageString; private Throwable errorCause; @Before public void prepareTests() { dispatcher = new MBassador(this); dispatcher.subscribe(this); } @Test public void whenErrorOccurs_thenErrorHandler() { dispatcher.post("Error").now(); assertNull(messageString); assertNotNull(errorCause); } @Test public void whenNoErrorOccurs_thenStringHandler() { dispatcher.post("Error").now(); assertNull(errorCause); assertNotNull(messageString); } @Handler public void handleString(String message) { if ("Error".equals(message)) { throw new Error("BOOM"); } messageString = message; } @Override public void handleError(PublicationError error) { errorCause = error.getCause().getCause(); } }
Когда handle String() выдает Ошибку, она сохраняется в Причина ошибки.
7.2. Приоритет обработчика
Обработчики вызываются в обратном порядке их добавления, но это не то поведение, на которое мы хотим полагаться. Даже имея возможность вызывать обработчики в своих потоках, нам все равно может потребоваться знать, в каком порядке они будут вызываться.
Мы можем явно установить приоритет обработчика:
private LinkedListlist = new LinkedList<>(); @Test public void whenRejectDispatched_thenPriorityHandled() { dispatcher.post(new RejectMessage()).now(); // Items should pop() off in reverse priority order assertTrue(1 == list.pop()); assertTrue(3 == list.pop()); assertTrue(5 == list.pop()); } @Handler(priority = 5) public void handleRejectMessage5(RejectMessage rejectMessage) { list.push(5); } @Handler(priority = 3) public void handleRejectMessage3(RejectMessage rejectMessage) { list.push(3); } @Handler(priority = 2, rejectSubtypes = true) public void handleMessage(Message rejectMessage) logger.error("Reject handler #3"); list.push(3); } @Handler(priority = 0) public void handleRejectMessage0(RejectMessage rejectMessage) { list.push(1); }
Обработчики вызываются от самого высокого приоритета до самого низкого. Обработчики с приоритетом по умолчанию, равным нулю, вызываются последними. Мы видим, что обработчик нумерует pop() off в обратном порядке.
7.3. Отклонить Подтипы, самый Простой Способ
Что случилось с handleMessage() в приведенном выше тесте? Нам не нужно использовать RejectSubTypes.class для фильтрации подтипов.
Отклонить подтипы – это логический флаг, который обеспечивает ту же фильтрацию, что и класс, но с лучшей производительностью, чем реализация IMessageFilter|/.
Нам все еще нужно использовать реализацию на основе фильтров для приема только подтипов, через.
8. Заключение
Ambassador-это простая и понятная библиотека для передачи сообщений между объектами. Сообщения могут быть организованы различными способами и могут отправляться синхронно или асинхронно.
И, как всегда, пример доступен в этом проекте GitHub .