Рубрики
Без рубрики

Шина сообщений Rxjava для OSGi

Я хотел сделать так, чтобы компоненты OSGi взаимодействовали богатым способом, управляемым событиями, где я мог бы публиковать многие из них… Помеченный osgi, java, rxjava.

Я хотел сделать так, чтобы компоненты OSGi взаимодействовали богатым способом, управляемым событиями, где я мог публиковать множество различных сообщений между компонентами без жесткой связи или прямой @Reference (ing). OSGI EventAdmin – хороший способ начать, но рано или поздно вы столкнетесь с ограничениями.

Цель состояла в том, чтобы создать систему, которая была бы более реактивной и простой для понимания, но в то же время не ограничивающей.

Я решил использовать RxJava в качестве базового движка для доставки сообщений, потому что:

  • Это быстро (асинхронно)
  • Это просто
  • Он мощен с помощью операций и планировщиков
  • в него встроены отличные инструменты модульного тестирования.

На этом пути у меня было несколько проблем, в основном создание очень чистого API, которым будет легко делиться и повторно использовать, а также не сложно понять. Мне также нужен был хороший способ модульного тестирования моих компонентов и во время интеграционных тестов OSGi.

Как мы увидим, некоторые тесты немного сложнее, чем другие.

Я черпал вдохновение из этого блога, который рекомендую прочитать, так как я не собираюсь повторять то, что описала Полина. Я решил перенести реализацию на OSGi и создать небольшую абстракцию конфигурации.

Я решил перенести реализацию на OSGi и создать небольшую абстракцию конфигурации.

Простая реализация шины

Очень простая шина сообщений с общим типом сообщения в качестве полезной нагрузки. Клиенты обязаны реализовывать сообщения такого типа:

import com.rds.demo.api.Message;

public class MyMessage implements Message {
}

Теперь нам нужно получить экземпляр шины сообщений, чтобы мы могли начать взаимодействовать с другими компонентами с помощью RxJava. Шина сообщений позволяет использовать два основных метода для получения и получения сообщений на шине.

public interface MessageBus {

    void publish(T message);

    void subscribe(Observer observer);

}

Затем поставщик шины сообщений по умолчанию реализует эту службу и развертывает ее как службу OSGi. Внутренне используя тему публикации, которая позволяет динамически отправлять сообщения и подписываться на них как на горячую наблюдаемую.

Обратите внимание на аннотацию SINGLETON, чтобы убедиться, что в регистрации OSGi есть только один экземпляр шины.

@Component(service = MessageBus.class, scope = ServiceScope.SINGLETON)
public class MessageBusProvider implements MessageBus {

    private final PublishSubject publishSubject = PublishSubject.create();

    @Override
    public void publish(T message) {
        publishSubject.onNext(message);
    }

    @Override
    public void subscribe(Observer observer) {
        publishSubject.subscribe(observer);
    }

}

Теперь, если мы хотим протестировать новый сервис напрямую (модульный тест, а не интеграционные тесты OSGi), мы можем использовать тестовые абстракции RxJava следующим образом:

@Test
public void publishMessageTest() {
    TestObserver test = TestObserver.create();

    MessageBus bus = new MessageBusProvider<>();

    bus.subscribe(test);
    bus.publish(new TestMessage());

    test.assertValueCount(1);
    test.assertNoErrors();
    test.assertNotComplete();
}

Шаг второй, Планировщики

Это было легко, верно? Ну, не так быстро… мы забыли о Планировщиках. Планировщики RxJava позволяют запускать каналы в разных потоках. Это реальная сила RxJava, когда дело доходит до параллельного выполнения задач.

Итак, давайте добавим в микс планировщик вычислений:

    @Override
    public void subscribe(Observer observer) {
        publishSubject
        .subscribeOn(schedulers.computation())
        .subscribe(observer);
    }

Но теперь у нас есть проблема, поскольку наши наблюдатели прослушивают пул потоков вычислений, наш тестовый наблюдатель больше не будет работать, так как он завершится без ожидания.

Чтобы преодолеть эту проблему, нам нужно использовать другой планировщик внутри наших тестов, но наша служба MessageBus не позволяет нам что-либо менять, поэтому у нас есть проблема.

Чтобы расширить шину сообщений, одним из вариантов является разрешение внедрения другой службы, которая будет управлять планировщиками. Как обрисовала Паулина, это может быть абстракцией планировщиков как сервиса. Поскольку мы не занимаемся пользовательским интерфейсом Android, я внес некоторые изменения в API.

public interface SchedulersFactory {

    Scheduler blocking();

    Scheduler pooled();

}

Внедряя эту службу в нашу службу MessageBus, теперь мы можем изменить код, чтобы он был:

@Component(service = MessageBus.class, scope = ServiceScope.SINGLETON)
public class MessageBusProvider implements MessageBus {

    private final PublishSubject publishSubject = PublishSubject.create();

    @Reference
    private SchedulersFactory schedulersFactory;

    public MessageBusProvider() {
    }

    public MessageBusProvider(SchedulersFactory schedulersFactory) {
        this.schedulersFactory = schedulersFactory;
    }

    @Override
    public void publish(T message) {
        publishSubject.onNext(message);
    }

    @Override
    public void subscribe(Observer observer) {
        publishSubject.subscribeOn(schedulersFactory.pooled()).subscribe(observer);
    }

}

Конструктор по умолчанию необходим для фреймворка OSGi для создания экземпляра нашего класса. Другой конструктор позволит нам внедрить другой SchedulerFactory в наши тесты.

public class TestSchedulersFactory implements SchedulersFactory {

    @Override
    public Scheduler pooled() {
        return Schedulers.trampoline();
    }

    @Override
    public Scheduler blocking() {
        return Schedulers.trampoline();
    }

}

Планировщик trampoline исправит вашу проблему, поскольку он выполняет операции последовательно, поэтому наш тестовый наблюдатель будет протестирован только после обработки всех сообщений.

Теперь мы можем изменить использование тестов, чтобы внедрить другой экземпляр фабрики планировщиков.

       MessageBus bus = new MessageBusProvider<>(new TestSchedulersFactory());

Пока все хорошо.

Интеграционные тесты

Тестирование OSGi считается интеграционными тестами, поскольку оно пытается создать полную среду выполнения и внедрить оболочку модульного тестирования во время выполнения, а затем выполнить утверждения. Это позволяет нам создавать небольшие части нашего приложения, адаптировать конфигурацию и проверять, все ли подключено правильно. Но разве мы что-то забываем? Да, тест все равно завершится неудачей, так как мы не планировали конфигурацию SchedulerFactory.

Давайте это исправим.

Сначала нам нужно разрешить настройку в нашем Schedulerfactory, теперь наш компонент будет иметь интерфейс конфигурации

public @interface SchedulersFactoryProviderConfiguration {

    @AttributeDefinition(
            name = ".blocking",
            type = AttributeType.STRING,
            description = "Blocking scheduler",
            required = false,
            options = {
                    @Option(label = "io", value = "IO"),
                    @Option(label = "computation", value = "COMPUTATION")
            }
    )
    String _blocking() default "IO";

    @AttributeDefinition(
            name = ".pooled",
            type = AttributeType.STRING,
            description = "Pooled scheduler",
            required = false,
            options = {
                    @Option(label = "io", value = "IO"),
                    @Option(label = "computation", value = "COMPUTATION")
            }
    )
    String _pooled() default "COMPUTATION";

}

Затем мы можем внедрить эту конфигурацию в наш компонент в методе activate

@Component(
        name = "com.rds.reactive.provider.schedulers.provider",
        service = SchedulersFactory.class,
        configurationPolicy = ConfigurationPolicy.OPTIONAL)
@Designate(
        ocd = SchedulersFactoryProviderConfiguration.class
)
public class SchedulersFactoryProvider implements SchedulersFactory {

    private Scheduler pooled;
    private Scheduler blocking;

    @Activate
    public void activate(SchedulersFactoryProviderConfiguration cfg) {
        System.out.println("ACTIVATE");
        blocking = SchedulerType.get(cfg._blocking()).value();
        pooled = SchedulerType.get(cfg._pooled()).value();
    }

    @Override
    public Scheduler blocking() {
        return blocking;
    }

    @Override
    public Scheduler pooled() {
        return pooled;
    }

}

И с помощью небольшого вспомогательного перечисления мы удостоверяемся, что у нас определены правильные планировщики.

public enum SchedulerType {

    IO(Schedulers.io()),
    COMPUTATION(Schedulers.computation()),
    TRAMPOLINE(Schedulers.trampoline());

    private final Scheduler scheduler;

    SchedulerType(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    Scheduler value() {
        return scheduler;
    }

    private static final Map ENUM_MAP;

    static {
        Map map = new HashMap<>();
        for (SchedulerType instance : SchedulerType.values()) {
            map.put(instance.name(), instance);
        }
        ENUM_MAP = Collections.unmodifiableMap(map);
    }

    public static SchedulerType get(String name) {
        return ENUM_MAP.getOrDefault(name, COMPUTATION);
    }
}

Наш SchedulerFactory вводится в шину сообщений с помощью @Reference (OSGi DS).

    @Reference
    private SchedulersFactory schedulersFactory;

И последнее, нам нужно рассказать OSGi framework, как настроить вашу фабрику планировщиков в тестовом режиме, поскольку нам все еще нужно использовать планировщик trampoline в наших тестах.

В вашей папке конфигурации (см. test.run и) мы связываем папку конфигурации с нашей средой выполнения:

-runproperties: \
    felix.fileinstall.dir=${.}/config

Добавьте файл свойств конфигурации в эту папку:

config/com.rds.reactive.provider.schedulers.provider.cfg

.blocking=TRAMPOILNE
.pooled=TRAMPOLINE

А теперь перейдем к нашим интеграционным тестам… нам нужно получить экземпляр шины сообщений из фреймворка, этот экземпляр будет использовать экземпляр SchedulersFactory с нашей конфигурацией переопределения TRAMPOLINE, чтобы мы могли проводить предсказуемые рабочие тесты.

public class MessageBusTest {

    private final BundleContext context = FrameworkUtil.getBundle(MessageBusTest.class).getBundleContext();

    private ServiceTracker messageBusMessageBusServiceTracker;

    private MessageBus bus;

    @Before
    public void before() throws InterruptedException {
        messageBusMessageBusServiceTracker = new ServiceTracker(context, MessageBus.class, null);
        messageBusMessageBusServiceTracker.open();
        bus = messageBusMessageBusServiceTracker.waitForService(500);
        Assert.assertNotNull(bus);
    }

    @After
    public void after() {
        messageBusMessageBusServiceTracker.close();
    }

    @Test
    public void canPublishToBusTest() {
        TestObserver test = TestObserver.create();
        bus.subscribe(test);
        bus.publish(new TestMessage());
        test.assertValueCount(1);
        test.assertNotComplete();
        test.assertNoErrors();
    }

    @Test
    public void multipleMessagesTest() {
        TestObserver test = TestObserver.create();
        bus.subscribe(test);

        // it is critical to set the scheduler here otherwise it is using the inmternal computation scheduler.
        Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.trampoline())
                .take(10)
                .map(t -> new TestMessage())
                .subscribe(testMessage -> bus.publish(testMessage));

        test.assertValueCount(10);
        test.assertNotComplete();
        test.assertNoErrors();
    }

}

Обратите внимание, что мы запрашиваем шину сообщений из контекста, и это все. Служба была настроена для нас с помощью файла cfg.

Также обратите внимание, что наблюдатель интервалов использует планировщик. Если вы не зададите интервалу планировщик, он по умолчанию будет вычисляться и нарушит наши тесты.

        Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.trampoline())

Резюме

RxJava предоставляет нам мощные инструменты для создания и тестирования приложений, но мы должны быть осторожны при их тестировании.

Я надеюсь, что это было полезно, если вы работаете с OSGI или RxJava.

Код

Пожалуйста, не стесняйтесь использовать, переносить и вносить свой вклад.

Пожалуйста, не стесняйтесь использовать, переносить и вносить свой вклад.

Рекомендации

Запись в блоге Полины -> https://medium.com/@PaulinaSadowska/writing-unit-tests-on-asynchronous-events-with-rxjava-and-rxkotlin-1616a27f69aa

Эффективный OSGi -> Эффективный OSGi ->

Оригинал: “https://dev.to/gadeichhorn/rxjava-message-bus-for-osgi-29b6”