Рубрики
Без рубрики

Реактивный API клиента JAX-RS

Изучите различные способы создания реактивных клиентов с помощью JAX-RS

Автор оригинала: baeldung.

1. введение

В этом уроке мы рассмотрим поддержку JAX-RS для реактивного программирования (Rx) с использованием API Jersey. В этой статье предполагается, что читатель знаком с API клиента Jersey REST.

Некоторое знакомство с концепциями реактивного программирования будет полезно, но не обязательно.

2. Зависимости

Во-первых, нам нужны стандартные зависимости клиентской библиотеки Джерси:


    org.glassfish.jersey.core
    jersey-client
    2.27


    org.glassfish.jersey.inject
    jersey-hk2
    2.27

Эти зависимости дают нам поддержку реактивного программирования JAX-RS. Текущие версии jersey-client и jersey-hk2 доступны на Maven Central.

Для поддержки сторонних реактивных фреймворков мы будем использовать эти расширения:


    org.glassfish.jersey.ext.rx
    jersey-rx-client-rxjava
    2.27

Приведенная выше зависимость обеспечивает поддержку RxJava Observable ; для более новых RxJava2/| Flowable , мы используем следующее расширение:


    org.glassfish.jersey.ext.rx
    jersey-rx-client-rxjava2
    2.27

Зависимости от rxjava и rxjava 2 также доступны в Maven Central.

3. Почему Нам Нужны Реактивные Клиенты JAX-RS

Допустим, у нас есть три API REST для использования:

  • id-сервис предоставляет список длинных идентификаторов пользователей
  • name-service предоставляет имя пользователя для данного идентификатора пользователя
  • hash-сервис вернет хэш как идентификатора пользователя, так и имени пользователя

Мы создаем клиента для каждой из услуг:

Client client = ClientBuilder.newClient();
WebTarget userIdService = client.target("http://localhost:8080/id-service/ids");
WebTarget nameService 
  = client.target("http://localhost:8080/name-service/users/{userId}/name");
WebTarget hashService = client.target("http://localhost:8080/hash-service/{rawValue}");

Это надуманный пример, но он подходит для нашей иллюстрации. Спецификация JAX-RS поддерживает по крайней мере три подхода для совместного использования этих услуг:

  • Синхронный (блокирующий)
  • Асинхронный (неблокирующий)
  • Реактивный (функциональный, неблокирующий)

3.1. Проблема С Синхронным Вызовом Клиента Джерси

Ванильный подход к использованию этих служб приведет к тому, что мы будем использовать id-сервис для получения идентификаторов пользователей, а затем вызывать name-сервис и hash-сервис API последовательно для каждого возвращаемого идентификатора.

При таком подходе каждый вызов блокирует запущенный поток до тех пор, пока запрос не будет выполнен , затрачивая в общей сложности много времени на выполнение объединенного запроса. Это явно менее чем удовлетворительно в любом нетривиальном случае использования.

3.2. Проблема С Асинхронным Вызовом Клиента Jersey

Более сложный подход заключается в использовании механизма InvocationCallback , поддерживаемого JAX-RS. В самой простой форме мы передаем обратный вызов методу get , чтобы определить, что произойдет, когда данный вызов API завершится.

Хотя теперь мы получаем истинное асинхронное выполнение ( с некоторыми ограничениями на эффективность потока ), легко увидеть, как этот стиль кода может стать нечитаемым и громоздким в любых, кроме тривиальных сценариях. Спецификация JAX-RS специально выделяет этот сценарий как Пирамиду гибели :

// used to keep track of the progress of the subsequent calls
CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); 

userIdService.request()
  .accept(MediaType.APPLICATION_JSON)
  .async()
  .get(new InvocationCallback>() {
    @Override
    public void completed(List employeeIds) {
        employeeIds.forEach((id) -> {
        // for each employee ID, get the name
        nameService.resolveTemplate("userId", id).request()
          .async()
          .get(new InvocationCallback() {
              @Override
              public void completed(String response) {
                     hashService.resolveTemplate("rawValue", response + id).request()
                    .async()
                    .get(new InvocationCallback() {
                        @Override
                        public void completed(String response) {
                            //complete the business logic
                        }
                        // ommitted implementation of the failed() method
                    });
              }
              // omitted implementation of the failed() method
          });
        });
    }
    // omitted implementation of the failed() method
});

// wait for inner requests to complete in 10 seconds
if (!completionTracker.await(10, TimeUnit.SECONDS)) {
    logger.warn("Some requests didn't complete within the timeout");
}

Таким образом, мы достигли асинхронного, экономичного по времени кода, но:

  • это трудно читать
  • каждый вызов порождает новый поток

Обратите внимание, что мы используем CountDownLatch во всех примерах кода, чтобы дождаться, пока все ожидаемые значения будут доставлены хэш-службой . Мы делаем это для того, чтобы утверждать, что код работает в модульном тесте, проверяя, что все ожидаемые значения действительно были доставлены.

Обычный клиент не будет ждать, а скорее сделает все, что должно быть сделано с результатом в рамках обратного вызова, чтобы не блокировать поток.

3.3. Функциональное, Реактивное Решение

Функциональный и реактивный подход даст нам:

  • Отличная читаемость кода
  • Свободный стиль кодирования
  • Эффективное управление потоками

JAX-RS поддерживает эти цели в следующих компонентах:

  • Этап завершения RxInvoker поддерживает интерфейс Этап завершения в качестве реактивного компонента по умолчанию
  • RxObservableInvokerProvider поддерживает RxJava Observable
  • RxFlowableInvokerProvider поддержка RxJava Flowable

Существует также API для добавления поддержки других реактивных библиотек.

4. Поддержка реактивных компонентов JAX-RS

4.1. Этап завершения в JAX-RS

Используя CompletionStage и его конкретную реализацию – CompletableFuture мы можем написать элегантную, неблокирующую и плавную оркестровку вызовов служб.

Давайте начнем с получения идентификаторов пользователей:

CompletionStage> userIdStage = userIdService.request()
  .accept(MediaType.APPLICATION_JSON)
  .rx()
  .get(new GenericType>() {
}).exceptionally((throwable) -> {
    logger.warn("An error has occurred");
    return null;
});

Вызов метода rx () – это точка, с которой начинается реактивная обработка. Мы используем функцию исключительно , чтобы свободно определить наш сценарий обработки исключений.

Отсюда мы можем четко организовать вызовы для получения имени пользователя из службы имен, а затем хэшировать комбинацию имени и идентификатора пользователя:

List expectedHashValues = ...;
List receivedHashValues = new ArrayList<>(); 

// used to keep track of the progress of the subsequent calls 
CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); 

userIdStage.thenAcceptAsync(employeeIds -> {
  logger.info("id-service result: {}", employeeIds);
  employeeIds.forEach((Long id) -> {
    CompletableFuture completable = nameService.resolveTemplate("userId", id).request()
      .rx()
      .get(String.class)
      .toCompletableFuture();

    completable.thenAccept((String userName) -> {
        logger.info("name-service result: {}", userName);
        hashService.resolveTemplate("rawValue", userName + id).request()
          .rx()
          .get(String.class)
          .toCompletableFuture()
          .thenAcceptAsync(hashValue -> {
              logger.info("hash-service result: {}", hashValue);
              receivedHashValues.add(hashValue);
              completionTracker.countDown();
          }).exceptionally((throwable) -> {
              logger.warn("Hash computation failed for {}", id);
              return null;
         });
    });
  });
});

if (!completionTracker.await(10, TimeUnit.SECONDS)) {
    logger.warn("Some requests didn't complete within the timeout");
}

assertThat(receivedHashValues).containsAll(expectedHashValues);

В приведенном выше примере мы составляем наше выполнение 3 служб с беглым и читаемым кодом.

Метод thenAcceptAsync выполнит предоставленную функцию после того, как данный Этап завершения завершит выполнение (или выдаст исключение).

Каждый последующий вызов не блокируется, что позволяет разумно использовать системные ресурсы.

Интерфейс Этап завершения предоставляет широкий спектр методов постановки и оркестровки, которые позволяют нам составлять, упорядочивать и асинхронно выполнять любое количество шагов в многоступенчатой оркестровке (или одном вызове службы).

4.2. Наблюдаемый в JAX-RS

Чтобы использовать компонент Observable RxJava, мы должны сначала зарегистрировать Rx Observable InvokerProvider provider (а не ” ObservableRxInvokerProvider” , как указано в документе спецификации Jersey) на клиенте:

Client client = client.register(RxObservableInvokerProvider.class);

Затем мы переопределяем вызыватель по умолчанию:

Observable> userIdObservable = userIdService
  .request()
  .rx(RxObservableInvoker.class)
  .get(new GenericType>(){});

С этого момента мы можем использовать стандартную Наблюдаемую семантику для организации потока обработки :

userIdObservable.subscribe((List listOfIds)-> { 
  /** define processing flow for each ID */
});

4.3. Текучесть в JAX-RS

Семантика использования RxJava Flowable аналогична семантике Observable. Мы регистрируем соответствующего поставщика:

client.register(RxFlowableInvokerProvider.class);

Затем мы поставляем RxFlowableInvoker :

Flowable> userIdFlowable = userIdService
  .request()
  .rx(RxFlowableInvoker.class)
  .get(new GenericType>(){});

После этого мы можем использовать обычный Flowable API.

5. Заключение

Спецификация JAX-RS предоставляет большое количество опций, которые обеспечивают чистое, неблокирующее выполнение вызовов REST.

Интерфейс Этап завершения , в частности, предоставляет надежный набор методов

Вы можете проверить код для этой статьи на Github .