Рубрики
Без рубрики

Jetty ReactiveStreams HTTP Клиент

Узнайте, как использовать Jetty ReactiveStreams HTTP клиента.

Автор оригинала: Tino Mulanchira Thomas.

Jetty ReactiveStreams HTTP Клиент

1. Обзор

В этом учебнике, мы собираемся, чтобы узнать, как использовать Реактивный клиент HTTP от Jetty . Мы будем демонстрировать его использование с различными библиотеками Reactive, создавая небольшие тестовые случаи.

2. Что такое реактивный httpClient?

Джетти HttpClient позволяет выполнять блокировку запросов HTTP. Однако, когда мы имеем дело с реактивным API, мы не можем использовать стандартный клиент HTTP. Чтобы заполнить этот пробел, Jetty создал обертку вокруг HttpClient API, так что он также поддерживает Реактивные потоки API.

Реактивный HttpClient используется для потребления или получения потока данных по вызовам HTTP.

Пример, который мы собираемся продемонстрировать здесь, будет иметь реактивного клиента HTTP, который будет общаться с сервером Jetty с помощью различных библиотек Reactive. Мы также поговорим о событиях запроса и ответа, предоставляемых Reactive HttpClient .

Мы рекомендуем читать наши статьи о Project Reactor , RxJava и Spring WebFlux, чтобы лучше понять концепции реактивного программирования и его терминологии.

3. Maven зависимостей

Давайте начнем с примера, добавив зависимости для Реактивные потоки , Проект Реактор , RxJava , Весенний WebFlux , и Реактивное реагирование HTTPClient к нашему пом.xml. Наряду с этим, мы будем добавлять зависимость от Jetty Server а также для создания сервера:


    org.eclipse.jetty
    jetty-reactive-httpclient
    1.0.3


    org.eclipse.jetty
    jetty-server
    9.4.19.v20190610


    org.reactivestreams
    reactive-streams
    1.0.3


    io.projectreactor
    reactor-core
    3.2.12.RELEASE


    io.reactivex.rxjava2
    rxjava
    2.2.11


    org.springframework
    spring-webflux
    5.1.9.RELEASE

4. Создание сервера и клиента

Теперь давайте создадим сервер и добавим обработчик запросов, который просто записывает тело запроса в ответ:

public class RequestHandler extends AbstractHandler {
    @Override
    public void handle(String target, Request jettyRequest, HttpServletRequest request,
      HttpServletResponse response) throws IOException, ServletException {
        jettyRequest.setHandled(true);
        response.setContentType(request.getContentType());
        IO.copy(request.getInputStream(), response.getOutputStream());
    }
}

...

Server server = new Server(8080);
server.setHandler(new RequestHandler());
server.start();

И тогда мы сможем написать HttpClient :

HttpClient httpClient = new HttpClient();
httpClient.start();

Теперь, когда мы создали клиент и сервер, давайте посмотрим, как мы можем превратить этот блокирующий http Client в не-блокирующий и создать запрос:

Request request = httpClient.newRequest("http://localhost:8080/"); 
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request).build();
Publisher publisher = reactiveRequest.response();

Итак, вот, РеактивныйРеквест обертка, предоставленная Jetty, сделала наш блокирующий http Client реактивным. Давайте продолжить и посмотреть его использование с различными реактивными библиотеками.

5. Использование реактивных потоков

Джетти HttpClient родной поддерживает Реактивные потоки , так что давайте начнем там.

Теперь, Реактивные потоки это всего лишь набор интерфейсов , итак, для нашего тестирования, давайте реализуем простой блокирующий абонент:

public class BlockingSubscriber implements Subscriber {
    BlockingQueue sink = new LinkedBlockingQueue<>(1);

    @Override
    public void onSubscribe(Subscription subscription) { 
        subscription.request(1); 
    }
  
    @Override 
    public void onNext(ReactiveResponse response) { 
        sink.offer(response);
    } 
   
    @Override 
    public void onError(Throwable failure) { } 

    @Override 
    public void onComplete() { }

    public ReactiveResponse block() throws InterruptedException {
        return sink.poll(5, TimeUnit.SECONDS);
    }   
}

Обратите внимание, что нам нужно было позвоните Подписка и запрос в соответствии с JavaDoc, в котором говорится, что “Никакие события не будут отправлены Издатель до тех пор, пока спрос не будет сигнален с помощью этого метода».

Кроме того, обратите внимание, что мы добавили механизм безопасности, так что наш тест может выручить, если он не видел значение в 5 секунд.

И теперь мы можем быстро протестировать наш запрос HTTP:

BlockingSubscriber subscriber = new BlockingSubscriber();
publisher.subscribe(subscriber);
ReactiveResponse response = subscriber.block();
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6. Использование проектного реактора

Давайте теперь посмотрим, как мы можем использовать реактивный HttpClient с проектом реактора. Создание издателя в значительной степени то же самое, что и в предыдущем разделе.

После создания издателя, давайте использовать Моно класса от Project Reactor, чтобы получить реактивный ответ:

ReactiveResponse response = Mono.from(publisher).block();

А затем мы можем проверить полученный ответ:

Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6.1. Весеннее использование WebFlux

Преобразование блокирующего клиента HTTP в реактивный легко при использовании с Spring WebFlux. Весенний WebFlux поставляется с реактивным клиентом, WebClient , которые могут быть использованы с различными библиотеками http . Мы можем использовать это в качестве альтернативы использованию прямого кода реактора проекта.

Итак, во-первых, давайте обернуть Jetty в HTTP клиента с помощью JettyClientHttpConnector чтобы сблизить его с WebClient:

ClientHttpConnector clientConnector = new JettyClientHttpConnector(httpClient);

А затем передать этот разъем на WebClient для выполнения не блокируя запросов HTTP:

WebClient client = WebClient.builder().clientConnector(clientConnector).build();

Далее, давайте сделаем фактический вызов HTTP с реактивным клиентом HTTP, который мы только что создали, и проверить результат:

String responseContent = client.post()
  .uri("http://localhost:8080/").contentType(MediaType.TEXT_PLAIN)
  .body(BodyInserters.fromPublisher(Mono.just("Hello World!"), String.class))
  .retrieve()
  .bodyToMono(String.class)
  .block();
Assert.assertNotNull(responseContent);
Assert.assertEquals("Hello World!", responseContent);

7. Использование RxJava2

Давайте теперь двигаться дальше и посмотреть, как реактивный клиент HTTP используется с RxJava2 .

Пока мы здесь, давайте мутировать наш пример только немного теперь включить тело в запросе:

ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
  .content(ReactiveRequest.Content
    .fromString("Hello World!", "text/plain", StandardCharsets.UTF_8))
  .build();
Publisher publisher = reactiveRequest
  .response(ReactiveResponse.Content.asString());

Код ReactiveResponse.Content.asString () преобразует тело отклика в строку. Также можно отказаться от ответа с помощью ReactiveResponse.Content.discard () метод, если нас интересует только статус запроса.

Теперь мы видим, что получение ответа с помощью RxJava2 на самом деле очень похоже на проект реактора. В принципе, мы просто используем Одноместный вместо Моно :

String responseContent = Single.fromPublisher(publisher)
  .blockingGet();

Assert.assertEquals("Hello World!", responseContent);

8. События, связанные с запросами и ответами

Клиент Reactive HTTP испускает ряд событий во время выполнения. Они классифицируются как события запроса и события ответа. Эти события полезны для заглянуть в жизненный цикл реактивного клиента HTTP.

На этот раз, давайте сделать наш реактивный запрос немного по-другому, используя http клиента вместо запроса:

ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localhost:8080/")
  .content(ReactiveRequest.Content.fromString("Hello World!", "text/plain", UTF_8))
  .build();

А теперь давайте Издатель событий запроса HTTP:

Publisher requestEvents = request.requestEvents();

Теперь, давайте использовать RxJava еще раз. На этот раз мы создадим список, который содержит типы событий, и заселим его, подписавшись на события запроса по мере их создания:

List requestEventTypes = new ArrayList<>();

Flowable.fromPublisher(requestEvents)
  .map(ReactiveRequest.Event::getType).subscribe(requestEventTypes::add);
Single response = Single.fromPublisher(request.response());

Затем, так как мы находимся в тесте, мы можем заблокировать наш ответ и проверить:

int actualStatus = response.blockingGet().getStatus();

Assert.assertEquals(6, requestEventTypes.size());
Assert.assertEquals(HttpStatus.OK_200, actualStatus);

Аналогичным образом, мы можем подписаться на события реагирования, а также. Так как они похожи на подписку события запроса, мы добавили только последний здесь. Полную реализацию событий запроса и ответа можно найти в репозитории GitHub, связанной в конце этой статьи.

9. Заключение

В этом учебнике мы узнали о Реактивные потоки HttpClient предоставляемые Jetty, его использование с различными библиотеками Reactive и событиями жизненного цикла, связанными с запросом Reactive.

Все фрагменты кода, упомянутые в статье, можно найти в нашем Репозиторий GitHub .