В этом блоге вы продолжите с того места, на котором остановились после Часть 1 . Вы изучите модели связи сокетов “Запуск и забвение”, “Поток запросов” и “Канал”. Для всех этих моделей вы создадите сервер, клиент и модульный тест.
1. Вступление
- Запрос-ответ (поток из 1)
- Выстрелил и забыл (ответа нет)
- Поток запросов (поток из многих)
- Канал (двунаправленные потоки)
Вы рассмотрели Запрос-ответ в Части 1, остальные будут рассмотрены в Части 2.
Исходный код, используемый в этом посте, конечно же, доступен по адресу GitHub .
2. Модель “Выстрели и забудь”
Модель “Включи и забудь” очень похожа на модель “Запрос-ответ”. Единственная разница заключается в том, что вы не ожидаете ответа на свой запрос.
2.1 Серверная Часть
В контроллере сервера сокетов R
вы создаете метод/| уволь И Забудь . Поскольку запрос ничего не возвращает, возвращаемый тип метода -
void . Опять же, с аннотацией
@MessageMapping вы определяете имя маршрута. Как и в примере с запросом-ответом, сервер получает сообщение
Notification . Чтобы увидеть, что что-то происходит при получении сообщения, вы просто регистрируете сообщение
Notification .
@MessageMapping("my-fire-and-forget") public void fireAndForget(Notification notification) { logger.info("Received notification: " + notification); }
2.2 Клиентская Сторона
В контроллере R SocketClient
вы создаете метод уволь И Забудь
. Реализация идентична примеру запроса-ответа, за исключением ожидаемого возвращаемого типа. Здесь вы используете retrieveMono(Void.class )
вместо retrieveMono(Notification.class )
.
@GetMapping("/fire-and-forget") public MonofireAndForget() { Notification notification = new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model"); logger.info("Send notification for my-fire-and-forget: " + notification); return rSocketRequester .route("my-fire-and-forget") .data(notification) .retrieveMono(Void.class); }
Запустите как сервер, так и клиент и вызовите URL-адрес:
$ http://localhost:8080/fire-and-forget
Как вы можете видеть, никакого ответа не возвращается. В журнале регистрации клиента и сервера вы можете проверить отправку и получение сообщений.
Клиент:
Send notification for my-fire-and-forget: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}
Сервер:
Received notification: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}
2.3 Испытательная Сторона
Тест снова очень похож на клиентский код и пример запроса-ответа. Чтобы проверить, не выдает ли Mono
никаких данных, достаточно вызвать verify Complete
. Вам не нужно звонить потреблять С Помощью Next
. Если Mono
действительно выдает данные, тест должен завершиться неудачей. Однако замена маршрута my-fire-and-forget
на my-request-response
например, не проваливает тест. Непонятно, почему это не провалит тест. Если у кого-то есть какие-либо предложения или решение, пожалуйста, добавьте их в комментарии к этому блогу.
@Test void testFireAndForget() { // Send a fire-and-forget message Monoresult = rSocketRequester .route("my-fire-and-forget") .data(new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model")) .retrieveMono(Void.class); // Assert that the result is a completed Mono. StepVerifier .create(result) .verifyComplete(); }
3. Модель потока запросов
С помощью модели потока запросов вы отправляете запрос на сервер и получаете поток сообщений с уведомлениями.
3.1 Серверная Часть
В контроллере сервера сокетов R
вы создаете метод поток запросов
. На этот раз сервер вернет
Поток из
Уведомлений сообщений. Опять же, с аннотацией
@MessageMapping вы определяете имя маршрута. В этом примере при получении
Уведомления сообщения/| Возвращается поток
, который выдает новое |/Уведомление каждые 3 секунды.
@MessageMapping("my-request-stream") FluxrequestStream(Notification notification) { logger.info("Received notification for my-request-stream: " + notification); return Flux .interval(Duration.ofSeconds(3)) .map(i -> new Notification(notification.getDestination(), notification.getSource(), "In response to: " + notification.getText())); }
3.2 На Стороне Клиента
В контроллере R SocketClient
вы создаете метод поток запросов
. Реализация идентична примеру запроса-ответа, за исключением ожидаемого возвращаемого типа. Здесь вы используете
retrieveFlux(Notification.class ) вместо
retrieveMono(Notification.class )
@GetMapping("/request-stream") public ResponseEntity> requestStream() { Notification notification = new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model"); logger.info("Send notification for my-request-stream: " + notification); Flux notificationFlux = rSocketRequester .route("my-request-stream") .data(notification) .retrieveFlux(Notification.class); return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(notificationFlux); }
Запустите как сервер, так и клиент и вызовите URL-адрес:
$ curl http://localhost:8080/request-stream data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"} data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"} data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"} data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"} ...
Как вы можете видеть, сервер каждые 3 секунды отправляет уведомление. В журнале регистрации клиента и сервера вы можете проверить отправку и получение сообщений.
Клиент:
Send notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}
Сервер:
Received notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}
3.3 Тестовая Сторона
Тест снова похож на клиентский код. Во время проверки вы проверяете первое полученное сообщение, затем проверяете, получено ли 5 сообщений, и, наконец, проверяете последнее сообщение.
@Test void testRequestStream() { // Send a request message Fluxresult = rSocketRequester .route("my-request-stream") .data(new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model")) .retrieveFlux(Notification.class); // Verify that the response messages contain the expected data StepVerifier .create(result) .consumeNextWith(notification -> { assertThat(notification.getSource()).isEqualTo(SERVER); assertThat(notification.getDestination()) .isEqualTo(CLIENT); assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");}) .expectNextCount(5) .consumeNextWith(notification -> { assertThat(notification.getSource()).isEqualTo(SERVER); assertThat(notification.getDestination()) .isEqualTo(CLIENT); assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");}) .thenCancel() .verify(); }
4. Модель канала
Модель канала немного сложнее, чем другие модели, которые вы видели до сих пор. Здесь вы отправите Поток
и в качестве ответа Поток
будет возвращен. Это дает вам возможность отправлять сообщения туда и обратно, например, в чате.
4.1 Серверная Часть
В контроллере сервера сокетов R
вы создаете метод channel
. Вы будете при получении Уведомления
увеличивать счетчик и каждую секунду результат счетчика количество уведомлений
будет отправлено клиенту. Чтобы иметь возможность следить за происходящим, вы добавляете ведение журнала при получении уведомления и когда результат будет возвращен.
@MessageMapping("my-channel") public Fluxchannel(Flux notifications) { final AtomicLong notificationCount = new AtomicLong(0); return notifications .doOnNext(notification -> { logger.info("Received notification for channel: " + notification); notificationCount.incrementAndGet(); }) .switchMap(notification -> Flux.interval(Duration.ofSeconds(1)).map(new Object() { private Function numberOfMessages(AtomicLong notificationCount) { long count = notificationCount.get(); logger.info("Return flux with count: " + count); return i -> count; } }.numberOfMessages(notificationCount))).log(); }
4.2 Клиентская Сторона
В контроллере R SocketClient
вы создаете метод channel/| . Вам нужно создать
Поток . Чтобы достичь этого, вы создаете 3
Mono |/Элементы уведомления
, один с задержкой в 0 секунд ( уведомление 0
), один с задержкой в 2 секунды ( уведомление 2
) и один с задержкой в 5 секунд ( уведомление 5
). Вы создаете Поток
уведомления
с комбинацией Моно
‘s вы только что создали. Каждый раз, когда Flux
отправляет уведомление, вы регистрируете это, чтобы иметь возможность следить за происходящим. Наконец, вы отправляете Flux
на Socketchannel и получаете ответ в виде Поток
из Long
и верните его вызывающему абоненту.
@GetMapping("/channel") public ResponseEntity> channel() { Mono notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")); Mono notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2)); Mono notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5)); Flux notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2) .doOnNext(d -> logger.info("Send notification for my-channel")); Flux numberOfNotifications = this.rSocketRequester .route("my-channel") .data(notifications) .retrieveFlux(Long.class); return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(numberOfNotifications); }
Запустите как сервер, так и клиент и вызовите URL-адрес:
$ curl http://localhost:8080/channel data:1 data:1 data:1 data:1 data:3 data:3 data:4 data:4 data:5 data:5 data:6 data:6 ...
Результат такой, как и ожидалось. Сначала отправляется уведомление 0
, через 5 секунд ( уведомление 5/| ) следующее
Уведомление отправляется вместе с уведомлением 0, через 2 секунды появляется
уведомление 2 , через 2 секунды - новое и, наконец, через 2 секунды - последнее. После последнего результата
Flux будет продолжать передавать количество, равное 6. В журнале регистрации клиента и сервера вы можете проверить отправку и получение сообщений. На этот раз, включая временные метки, полное протоколирование содержит еще больше информации, которая опущена в целях краткости. Вам следует взглянуть на это более внимательно, когда вы сами будете запускать примеры. Важно отметить, что операторы
on Next log выполняются каждую секунду и соответствуют ответу, который отправляет сервер.
Клиент:
17:01:19.820 Send notification for my-channel 17:01:24.849 Send notification for my-channel 17:01:24.879 Send notification for my-channel 17:01:26.881 Send notification for my-channel 17:01:28.908 Send notification for my-channel 17:01:30.935 Send notification for my-channel
Сервер:
17:01:19.945 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'} 17:01:19.947 Return flux with count: 1 17:01:20.949 onNext(1) 17:01:21.947 onNext(1) 17:01:22.947 onNext(1) 17:01:23.947 onNext(1) 17:01:24.881 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'} 17:01:24.882 Return flux with count: 2 17:01:24.884 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'} 17:01:24.885 Return flux with count: 3 17:01:25.886 onNext(3) 17:01:26.886 onNext(3) 17:01:26.909 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'} 17:01:26.909 Return flux with count: 4 17:01:27.910 onNext(4) 17:01:28.910 onNext(4) 17:01:28.936 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'} 17:01:28.937 Return flux with count: 5 17:01:29.937 onNext(5) 17:01:30.937 onNext(5) 17:01:30.963 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'} 17:01:30.964 Return flux with count: 6 17:01:31.964 onNext(6) 17:01:32.964 onNext(6) ...
4.3 Испытательная Сторона
Тест аналогичен клиентскому коду. Вы отправляете сообщения на канал и проверяете результирующие подсчеты. Повторяющиеся элементы в тесте опущены для краткости, полный тест доступен на GitHub.
@Test void testChannel() { Mononotification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")); Mono notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2)); Mono notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5)); Flux notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2); // Send a request message Flux result = rSocketRequester .route("my-channel") .data(notifications) .retrieveFlux(Long.class); // Verify that the response messages contain the expected data StepVerifier .create(result) .consumeNextWith(count -> { assertThat(count).isEqualTo(1); }) ... .consumeNextWith(count -> { assertThat(count).isEqualTo(6); }) .thenCancel() .verify(); }
5. Вывод
Вы узнали, как создать сервер, клиент и модульный тест для моделей связи сокетов “Запуск и забвение”, “Потоки запросов” и “Канал”. К настоящему времени у вас будут базовые знания для того, чтобы самостоятельно исследовать и экспериментировать.
Оригинал: “https://dev.to/mydeveloperplanet/getting-started-with-rsocket-part-2-45ke”