В этом блоге вы продолжите с того места, на котором остановились после Часть 1 . Вы изучите модели связи сокетов “Запуск и забвение”, “Поток запросов” и “Канал”. Для всех этих моделей вы создадите сервер, клиент и модульный тест.
1. Вступление
- Запрос-ответ (поток из 1)
- Выстрелил и забыл (ответа нет)
- Поток запросов (поток из многих)
- Канал (двунаправленные потоки)
Вы рассмотрели Запрос-ответ в Части 1, остальные будут рассмотрены в Части 2.
Исходный код, используемый в этом посте, конечно же, доступен по адресу GitHub .
2. Модель “Выстрели и забудь”
Модель “Включи и забудь” очень похожа на модель “Запрос-ответ”. Единственная разница заключается в том, что вы не ожидаете ответа на свой запрос.
2.1 Серверная Часть
В контроллере сервера сокетов R вы создаете метод/| уволь И Забудь . Поскольку запрос ничего не возвращает, возвращаемый тип метода - void . Опять же, с аннотацией @MessageMapping вы определяете имя маршрута. Как и в примере с запросом-ответом, сервер получает сообщение Notification . Чтобы увидеть, что что-то происходит при получении сообщения, вы просто регистрируете сообщение Notification .
@MessageMapping("my-fire-and-forget")
public void fireAndForget(Notification notification) {
logger.info("Received notification: " + notification);
}
2.2 Клиентская Сторона
В контроллере R SocketClient вы создаете метод уволь И Забудь . Реализация идентична примеру запроса-ответа, за исключением ожидаемого возвращаемого типа. Здесь вы используете retrieveMono(Void.class ) вместо retrieveMono(Notification.class ) .
@GetMapping("/fire-and-forget")
public Mono fireAndForget() {
Notification notification = new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model");
logger.info("Send notification for my-fire-and-forget: " + notification);
return rSocketRequester
.route("my-fire-and-forget")
.data(notification)
.retrieveMono(Void.class);
}
Запустите как сервер, так и клиент и вызовите URL-адрес:
$ http://localhost:8080/fire-and-forget
Как вы можете видеть, никакого ответа не возвращается. В журнале регистрации клиента и сервера вы можете проверить отправку и получение сообщений.
Клиент:
Send notification for my-fire-and-forget: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}
Сервер:
Received notification: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}
2.3 Испытательная Сторона
Тест снова очень похож на клиентский код и пример запроса-ответа. Чтобы проверить, не выдает ли Mono никаких данных, достаточно вызвать verify Complete . Вам не нужно звонить потреблять С Помощью Next . Если Mono действительно выдает данные, тест должен завершиться неудачей. Однако замена маршрута my-fire-and-forget на my-request-response например, не проваливает тест. Непонятно, почему это не провалит тест. Если у кого-то есть какие-либо предложения или решение, пожалуйста, добавьте их в комментарии к этому блогу.
@Test
void testFireAndForget() {
// Send a fire-and-forget message
Mono result = rSocketRequester
.route("my-fire-and-forget")
.data(new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model"))
.retrieveMono(Void.class);
// Assert that the result is a completed Mono.
StepVerifier
.create(result)
.verifyComplete();
}
3. Модель потока запросов
С помощью модели потока запросов вы отправляете запрос на сервер и получаете поток сообщений с уведомлениями.
3.1 Серверная Часть
В контроллере сервера сокетов R вы создаете метод поток запросов . На этот раз сервер вернет Поток из Уведомлений сообщений. Опять же, с аннотацией @MessageMapping вы определяете имя маршрута. В этом примере при получении Уведомления сообщения/| Возвращается поток , который выдает новое |/Уведомление каждые 3 секунды.
@MessageMapping("my-request-stream")
Flux requestStream(Notification notification) {
logger.info("Received notification for my-request-stream: " + notification);
return Flux
.interval(Duration.ofSeconds(3))
.map(i -> new Notification(notification.getDestination(), notification.getSource(), "In response to: " + notification.getText()));
}
3.2 На Стороне Клиента
В контроллере R SocketClient вы создаете метод поток запросов . Реализация идентична примеру запроса-ответа, за исключением ожидаемого возвращаемого типа. Здесь вы используете retrieveFlux(Notification.class ) вместо retrieveMono(Notification.class )
@GetMapping("/request-stream")
public ResponseEntity> requestStream() {
Notification notification = new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model");
logger.info("Send notification for my-request-stream: " + notification);
Flux notificationFlux = rSocketRequester
.route("my-request-stream")
.data(notification)
.retrieveFlux(Notification.class);
return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(notificationFlux);
}
Запустите как сервер, так и клиент и вызовите URL-адрес:
$ curl http://localhost:8080/request-stream
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
...
Как вы можете видеть, сервер каждые 3 секунды отправляет уведомление. В журнале регистрации клиента и сервера вы можете проверить отправку и получение сообщений.
Клиент:
Send notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}
Сервер:
Received notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}
3.3 Тестовая Сторона
Тест снова похож на клиентский код. Во время проверки вы проверяете первое полученное сообщение, затем проверяете, получено ли 5 сообщений, и, наконец, проверяете последнее сообщение.
@Test
void testRequestStream() {
// Send a request message
Flux result = rSocketRequester
.route("my-request-stream")
.data(new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model"))
.retrieveFlux(Notification.class);
// Verify that the response messages contain the expected data
StepVerifier
.create(result)
.consumeNextWith(notification -> {
assertThat(notification.getSource()).isEqualTo(SERVER);
assertThat(notification.getDestination())
.isEqualTo(CLIENT);
assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
.expectNextCount(5)
.consumeNextWith(notification -> {
assertThat(notification.getSource()).isEqualTo(SERVER);
assertThat(notification.getDestination())
.isEqualTo(CLIENT);
assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
.thenCancel()
.verify();
}
4. Модель канала
Модель канала немного сложнее, чем другие модели, которые вы видели до сих пор. Здесь вы отправите Поток и в качестве ответа Поток будет возвращен. Это дает вам возможность отправлять сообщения туда и обратно, например, в чате.
4.1 Серверная Часть
В контроллере сервера сокетов R вы создаете метод channel . Вы будете при получении Уведомления увеличивать счетчик и каждую секунду результат счетчика количество уведомлений будет отправлено клиенту. Чтобы иметь возможность следить за происходящим, вы добавляете ведение журнала при получении уведомления и когда результат будет возвращен.
@MessageMapping("my-channel")
public Flux channel(Flux notifications) {
final AtomicLong notificationCount = new AtomicLong(0);
return notifications
.doOnNext(notification -> {
logger.info("Received notification for channel: " + notification);
notificationCount.incrementAndGet();
})
.switchMap(notification ->
Flux.interval(Duration.ofSeconds(1)).map(new Object() {
private Function numberOfMessages(AtomicLong notificationCount) {
long count = notificationCount.get();
logger.info("Return flux with count: " + count);
return i -> count;
}
}.numberOfMessages(notificationCount))).log();
}
4.2 Клиентская Сторона
В контроллере R SocketClient вы создаете метод channel/| . Вам нужно создать Поток . Чтобы достичь этого, вы создаете 3 Mono |/Элементы уведомления , один с задержкой в 0 секунд ( уведомление 0 ), один с задержкой в 2 секунды ( уведомление 2 ) и один с задержкой в 5 секунд ( уведомление 5 ). Вы создаете Поток уведомления с комбинацией Моно ‘s вы только что создали. Каждый раз, когда Flux отправляет уведомление, вы регистрируете это, чтобы иметь возможность следить за происходящим. Наконец, вы отправляете Flux на Socketchannel и получаете ответ в виде Поток из Long и верните его вызывающему абоненту.
@GetMapping("/channel")
public ResponseEntity> channel() {
Mono notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
Mono notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
Mono notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));
Flux notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2)
.doOnNext(d -> logger.info("Send notification for my-channel"));
Flux numberOfNotifications = this.rSocketRequester
.route("my-channel")
.data(notifications)
.retrieveFlux(Long.class);
return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(numberOfNotifications);
}
Запустите как сервер, так и клиент и вызовите URL-адрес:
$ curl http://localhost:8080/channel data:1 data:1 data:1 data:1 data:3 data:3 data:4 data:4 data:5 data:5 data:6 data:6 ...
Результат такой, как и ожидалось. Сначала отправляется уведомление 0 , через 5 секунд ( уведомление 5/| ) следующее Уведомление отправляется вместе с уведомлением 0, через 2 секунды появляется уведомление 2 , через 2 секунды - новое и, наконец, через 2 секунды - последнее. После последнего результата Flux будет продолжать передавать количество, равное 6. В журнале регистрации клиента и сервера вы можете проверить отправку и получение сообщений. На этот раз, включая временные метки, полное протоколирование содержит еще больше информации, которая опущена в целях краткости. Вам следует взглянуть на это более внимательно, когда вы сами будете запускать примеры. Важно отметить, что операторы on Next log выполняются каждую секунду и соответствуют ответу, который отправляет сервер.
Клиент:
17:01:19.820 Send notification for my-channel 17:01:24.849 Send notification for my-channel 17:01:24.879 Send notification for my-channel 17:01:26.881 Send notification for my-channel 17:01:28.908 Send notification for my-channel 17:01:30.935 Send notification for my-channel
Сервер:
17:01:19.945 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:19.947 Return flux with count: 1
17:01:20.949 onNext(1)
17:01:21.947 onNext(1)
17:01:22.947 onNext(1)
17:01:23.947 onNext(1)
17:01:24.881 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.882 Return flux with count: 2
17:01:24.884 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.885 Return flux with count: 3
17:01:25.886 onNext(3)
17:01:26.886 onNext(3)
17:01:26.909 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:26.909 Return flux with count: 4
17:01:27.910 onNext(4)
17:01:28.910 onNext(4)
17:01:28.936 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:28.937 Return flux with count: 5
17:01:29.937 onNext(5)
17:01:30.937 onNext(5)
17:01:30.963 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:30.964 Return flux with count: 6
17:01:31.964 onNext(6)
17:01:32.964 onNext(6)
...
4.3 Испытательная Сторона
Тест аналогичен клиентскому коду. Вы отправляете сообщения на канал и проверяете результирующие подсчеты. Повторяющиеся элементы в тесте опущены для краткости, полный тест доступен на GitHub.
@Test
void testChannel() {
Mono notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
Mono notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
Mono notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));
Flux notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2);
// Send a request message
Flux result = rSocketRequester
.route("my-channel")
.data(notifications)
.retrieveFlux(Long.class);
// Verify that the response messages contain the expected data
StepVerifier
.create(result)
.consumeNextWith(count -> {
assertThat(count).isEqualTo(1);
})
...
.consumeNextWith(count -> {
assertThat(count).isEqualTo(6);
})
.thenCancel()
.verify();
}
5. Вывод
Вы узнали, как создать сервер, клиент и модульный тест для моделей связи сокетов “Запуск и забвение”, “Потоки запросов” и “Канал”. К настоящему времени у вас будут базовые знания для того, чтобы самостоятельно исследовать и экспериментировать.
Оригинал: “https://dev.to/mydeveloperplanet/getting-started-with-rsocket-part-2-45ke”