Рубрики
Без рубрики

Начало Работы С Ракетой Часть 2

В этом блоге вы продолжите с того места, на котором остановились после части 1. Вы будете исследовать Ракетное сообщество… Помеченный как java, socket, springboot, unittest.

В этом блоге вы продолжите с того места, на котором остановились после Часть 1 . Вы изучите модели связи сокетов “Запуск и забвение”, “Поток запросов” и “Канал”. Для всех этих моделей вы создадите сервер, клиент и модульный тест.

1. Вступление

В Часть 1 |/вы изучили основы протокола связи Rocket. Рекомендуется сначала прочитать Часть 1, прежде чем переходить к Части 2. Помните, что Rocket предоставляет 4 модели связи:

  • Запрос-ответ (поток из 1)
  • Выстрелил и забыл (ответа нет)
  • Поток запросов (поток из многих)
  • Канал (двунаправленные потоки)

Вы рассмотрели Запрос-ответ в Части 1, остальные будут рассмотрены в Части 2.

Исходный код, используемый в этом посте, конечно же, доступен по адресу GitHub .

2. Модель “Выстрели и забудь”

Модель “Включи и забудь” очень похожа на модель “Запрос-ответ”. Единственная разница заключается в том, что вы не ожидаете ответа на свой запрос.

2.1 Серверная Часть

В контроллере сервера сокетов R вы создаете метод/| уволь И Забудь . Поскольку запрос ничего не возвращает, возвращаемый тип метода - void . Опять же, с аннотацией @MessageMapping вы определяете имя маршрута. Как и в примере с запросом-ответом, сервер получает сообщение Notification . Чтобы увидеть, что что-то происходит при получении сообщения, вы просто регистрируете сообщение Notification .

@MessageMapping("my-fire-and-forget")
public void fireAndForget(Notification notification) {
    logger.info("Received notification: " + notification);
}

2.2 Клиентская Сторона

В контроллере R SocketClient вы создаете метод уволь И Забудь . Реализация идентична примеру запроса-ответа, за исключением ожидаемого возвращаемого типа. Здесь вы используете retrieveMono(Void.class ) вместо retrieveMono(Notification.class ) .

@GetMapping("/fire-and-forget")
public Mono fireAndForget() {
    Notification notification = new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model");
    logger.info("Send notification for my-fire-and-forget: " + notification);
    return rSocketRequester
            .route("my-fire-and-forget")
            .data(notification)
            .retrieveMono(Void.class);
}

Запустите как сервер, так и клиент и вызовите URL-адрес:

$ http://localhost:8080/fire-and-forget

Как вы можете видеть, никакого ответа не возвращается. В журнале регистрации клиента и сервера вы можете проверить отправку и получение сообщений.

Клиент:

Send notification for my-fire-and-forget: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}

Сервер:

Received notification: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}

2.3 Испытательная Сторона

Тест снова очень похож на клиентский код и пример запроса-ответа. Чтобы проверить, не выдает ли Mono никаких данных, достаточно вызвать verify Complete . Вам не нужно звонить потреблять С Помощью Next . Если Mono действительно выдает данные, тест должен завершиться неудачей. Однако замена маршрута my-fire-and-forget на my-request-response например, не проваливает тест. Непонятно, почему это не провалит тест. Если у кого-то есть какие-либо предложения или решение, пожалуйста, добавьте их в комментарии к этому блогу.

@Test
void testFireAndForget() {
    // Send a fire-and-forget message
    Mono result = rSocketRequester
            .route("my-fire-and-forget")
            .data(new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model"))
            .retrieveMono(Void.class);

    // Assert that the result is a completed Mono.
    StepVerifier
            .create(result)
            .verifyComplete();
}

3. Модель потока запросов

С помощью модели потока запросов вы отправляете запрос на сервер и получаете поток сообщений с уведомлениями.

3.1 Серверная Часть

В контроллере сервера сокетов R вы создаете метод поток запросов . На этот раз сервер вернет Поток из Уведомлений сообщений. Опять же, с аннотацией @MessageMapping вы определяете имя маршрута. В этом примере при получении Уведомления сообщения/| Возвращается поток , который выдает новое |/Уведомление каждые 3 секунды.

@MessageMapping("my-request-stream")
Flux requestStream(Notification notification) {
    logger.info("Received notification for my-request-stream: " + notification);
    return Flux
            .interval(Duration.ofSeconds(3))
            .map(i -> new Notification(notification.getDestination(), notification.getSource(), "In response to: " + notification.getText()));
}

3.2 На Стороне Клиента

В контроллере R SocketClient вы создаете метод поток запросов . Реализация идентична примеру запроса-ответа, за исключением ожидаемого возвращаемого типа. Здесь вы используете retrieveFlux(Notification.class ) вместо retrieveMono(Notification.class )

@GetMapping("/request-stream")
public ResponseEntity> requestStream() {
    Notification notification = new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model");
    logger.info("Send notification for my-request-stream: " + notification);
    Flux notificationFlux = rSocketRequester
            .route("my-request-stream")
            .data(notification)
            .retrieveFlux(Notification.class);
    return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(notificationFlux);
}

Запустите как сервер, так и клиент и вызовите URL-адрес:

$ curl http://localhost:8080/request-stream
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}

data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}

data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}

data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
...

Как вы можете видеть, сервер каждые 3 секунды отправляет уведомление. В журнале регистрации клиента и сервера вы можете проверить отправку и получение сообщений.

Клиент:

Send notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}

Сервер:

Received notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}

3.3 Тестовая Сторона

Тест снова похож на клиентский код. Во время проверки вы проверяете первое полученное сообщение, затем проверяете, получено ли 5 сообщений, и, наконец, проверяете последнее сообщение.

@Test
void testRequestStream() {
    // Send a request message
    Flux result = rSocketRequester
            .route("my-request-stream")
            .data(new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model"))
            .retrieveFlux(Notification.class);

    // Verify that the response messages contain the expected data
    StepVerifier
      .create(result)
      .consumeNextWith(notification -> {
         assertThat(notification.getSource()).isEqualTo(SERVER);
         assertThat(notification.getDestination())
                                             .isEqualTo(CLIENT);
         assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
      .expectNextCount(5)
      .consumeNextWith(notification -> {
         assertThat(notification.getSource()).isEqualTo(SERVER);
         assertThat(notification.getDestination())
                                             .isEqualTo(CLIENT);
         assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
      .thenCancel()
      .verify();
}

4. Модель канала

Модель канала немного сложнее, чем другие модели, которые вы видели до сих пор. Здесь вы отправите Поток и в качестве ответа Поток будет возвращен. Это дает вам возможность отправлять сообщения туда и обратно, например, в чате.

4.1 Серверная Часть

В контроллере сервера сокетов R вы создаете метод channel . Вы будете при получении Уведомления увеличивать счетчик и каждую секунду результат счетчика количество уведомлений будет отправлено клиенту. Чтобы иметь возможность следить за происходящим, вы добавляете ведение журнала при получении уведомления и когда результат будет возвращен.

@MessageMapping("my-channel")
public Flux channel(Flux notifications) {
    final AtomicLong notificationCount = new AtomicLong(0);
    return notifications
        .doOnNext(notification -> {
            logger.info("Received notification for channel: " + notification);
            notificationCount.incrementAndGet();
         })
        .switchMap(notification -> 
Flux.interval(Duration.ofSeconds(1)).map(new Object() {
                private Function numberOfMessages(AtomicLong notificationCount) {
                    long count = notificationCount.get();
                    logger.info("Return flux with count: " + count);
                    return i -> count;
                }
         }.numberOfMessages(notificationCount))).log();
}

4.2 Клиентская Сторона

В контроллере R SocketClient вы создаете метод channel/| . Вам нужно создать Поток . Чтобы достичь этого, вы создаете 3 Mono |/Элементы уведомления , один с задержкой в 0 секунд ( уведомление 0 ), один с задержкой в 2 секунды ( уведомление 2 ) и один с задержкой в 5 секунд ( уведомление 5 ). Вы создаете Поток уведомления с комбинацией Моно ‘s вы только что создали. Каждый раз, когда Flux отправляет уведомление, вы регистрируете это, чтобы иметь возможность следить за происходящим. Наконец, вы отправляете Flux на Socketchannel и получаете ответ в виде Поток из Long и верните его вызывающему абоненту.

@GetMapping("/channel")
public ResponseEntity> channel() {
    Mono notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
    Mono notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
    Mono notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));

    Flux notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2)
            .doOnNext(d -> logger.info("Send notification for my-channel"));

    Flux numberOfNotifications = this.rSocketRequester
            .route("my-channel")
            .data(notifications)
            .retrieveFlux(Long.class);

    return  ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(numberOfNotifications);
}

Запустите как сервер, так и клиент и вызовите URL-адрес:

$ curl http://localhost:8080/channel
data:1
data:1
data:1
data:1
data:3
data:3
data:4
data:4
data:5
data:5
data:6
data:6
...

Результат такой, как и ожидалось. Сначала отправляется уведомление 0 , через 5 секунд ( уведомление 5/| ) следующее Уведомление отправляется вместе с уведомлением 0, через 2 секунды появляется уведомление 2 , через 2 секунды - новое и, наконец, через 2 секунды - последнее. После последнего результата Flux будет продолжать передавать количество, равное 6. В журнале регистрации клиента и сервера вы можете проверить отправку и получение сообщений. На этот раз, включая временные метки, полное протоколирование содержит еще больше информации, которая опущена в целях краткости. Вам следует взглянуть на это более внимательно, когда вы сами будете запускать примеры. Важно отметить, что операторы on Next log выполняются каждую секунду и соответствуют ответу, который отправляет сервер.

Клиент:

17:01:19.820 Send notification for my-channel
17:01:24.849 Send notification for my-channel
17:01:24.879 Send notification for my-channel
17:01:26.881 Send notification for my-channel
17:01:28.908 Send notification for my-channel
17:01:30.935 Send notification for my-channel

Сервер:

17:01:19.945 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:19.947 Return flux with count: 1
17:01:20.949 onNext(1)
17:01:21.947 onNext(1)
17:01:22.947 onNext(1)
17:01:23.947 onNext(1)
17:01:24.881 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.882 Return flux with count: 2
17:01:24.884 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.885 Return flux with count: 3
17:01:25.886 onNext(3)
17:01:26.886 onNext(3)
17:01:26.909 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:26.909 Return flux with count: 4
17:01:27.910 onNext(4)
17:01:28.910 onNext(4)
17:01:28.936 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:28.937 Return flux with count: 5
17:01:29.937 onNext(5)
17:01:30.937 onNext(5)
17:01:30.963 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:30.964 Return flux with count: 6
17:01:31.964 onNext(6)
17:01:32.964 onNext(6)
...

4.3 Испытательная Сторона

Тест аналогичен клиентскому коду. Вы отправляете сообщения на канал и проверяете результирующие подсчеты. Повторяющиеся элементы в тесте опущены для краткости, полный тест доступен на GitHub.

@Test
void testChannel() {
    Mono notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
    Mono notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
    Mono notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));

    Flux notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2);
    // Send a request message
    Flux result = rSocketRequester
            .route("my-channel")
            .data(notifications)
            .retrieveFlux(Long.class);

    // Verify that the response messages contain the expected data
    StepVerifier
            .create(result)
            .consumeNextWith(count -> {
                assertThat(count).isEqualTo(1);
                })
...
            .consumeNextWith(count -> {
                assertThat(count).isEqualTo(6);
                })
            .thenCancel()
            .verify();
}

5. Вывод

Вы узнали, как создать сервер, клиент и модульный тест для моделей связи сокетов “Запуск и забвение”, “Потоки запросов” и “Канал”. К настоящему времени у вас будут базовые знания для того, чтобы самостоятельно исследовать и экспериментировать.

Оригинал: “https://dev.to/mydeveloperplanet/getting-started-with-rsocket-part-2-45ke”