1. Обзор
Когда мы хотим, чтобы наши веб-клиенты поддерживали диалог с нашим сервером, тогда WebSockets может быть полезным решением. WebSockets поддерживают постоянное полнодуплексное соединение. Это дает нам возможность отправлять двунаправленные сообщения между нашим сервером и клиентом.
В этом уроке мы узнаем, как использовать WebSockets с Akka в Play Framework .
2. Настройка
Давайте создадим простое приложение для чата. Пользователь отправит сообщения на сервер, а сервер ответит сообщением от JSONPlaceholder .
2.1. Настройка приложения Play Framework
Мы создадим это приложение, используя платформу Play.
Давайте следовать инструкциям из Введения в игру на Java, чтобы настроить и запустить простое приложение Play Framework.
2.2. Добавление необходимых файлов JavaScript
Кроме того, нам нужно будет работать с JavaScript для сценариев на стороне клиента. Это позволит нам получать новые сообщения, отправленные с сервера. Для этого мы будем использовать библиотеку jQuery .
Давайте добавим jQuery в нижнюю часть app/views/i ndex.scala.html файл:
2.3. Настройка Акка
Наконец, мы будем использовать Akka для обработки подключений WebSocket на стороне сервера.
Давайте перейдем к файлу build.sbt и добавим зависимости.
Нам нужно добавить зависимости akka-actor и akka-testkit :
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion
Нам нужно, чтобы они могли использовать и тестировать код фреймворка Akka.
Далее мы будем использовать потоки Akka. Итак, давайте добавим зависимость akka-stream :
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion
Наконец, нам нужно вызвать конечную точку rest от актера Akka. Для этого нам понадобится зависимость akka-http . Когда мы это сделаем, конечная точка вернет данные JSON, которые нам придется десериализовать, поэтому нам также нужно добавить зависимость akka-http-jackson :
libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
И теперь все готово. Давайте посмотрим, как заставить работать WebSockets!
3. Работа С Веб-Сайтами С Актерами Akka
Механизм обработки WebSocket Play построен на потоках Akka. WebSocket моделируется как поток. Таким образом, входящие сообщения WebSocket подаются в поток, а сообщения, созданные потоком, отправляются клиенту.
Чтобы обработать WebSocket с помощью актера, нам понадобится утилита Play Actor Flow , которая преобразует ActorRef в поток. Это в основном требует некоторого кода Java с небольшой конфигурацией.
3.1. Метод контроллера WebSocket
Во-первых, нам нужен Материализованный экземпляр. Материализатор является фабрикой для двигателей выполнения потока.
Нам нужно ввести ActorSystem и Материализатор в контроллер app/controllers/HomeController.java :
private ActorSystem actorSystem; private Materializer materializer; @Inject public HomeController( ActorSystem actorSystem, Materializer materializer) { this.actorSystem = actorSystem; this.materializer = materializer; }
Теперь давайте добавим метод контроллера сокета:
public WebSocket socket() { return WebSocket.Json .acceptOrResult(this::createActorFlow); }
Здесь мы вызываем функцию accept Или Result , которая принимает заголовок запроса и возвращает будущее. Возвращаемое будущее – это поток для обработки сообщений WebSocket.
Вместо этого мы можем отклонить запрос и вернуть результат отклонения.
Теперь давайте создадим поток:
private CompletionStage>> createActorFlow(Http.RequestHeader request) { return CompletableFuture.completedFuture( F.Either.Right(createFlowForActor())); }
Класс F в Play Framework определяет набор помощников по функциональному стилю программирования. В этом случае мы используем F. Либо.Правильно принять соединение и вернуть поток.
Допустим, мы хотели отклонить соединение, когда клиент не аутентифицирован.
Для этого мы могли бы проверить, задано ли имя пользователя в сеансе. И если это не так, мы отклоняем соединение с HTTP 403 Forbidden:
private CompletionStage>> createActorFlow2(Http.RequestHeader request) { return CompletableFuture.completedFuture( request.session() .getOptional("username") .map(username -> F.Either. >Right( createFlowForActor())) .orElseGet(() -> F.Either.Left(forbidden()))); }
Мы используем F. Либо.Влево , чтобы отклонить соединение таким же образом, как мы предоставляем поток с F. Либо.Правильно .
Наконец, мы связываем поток с субъектом, который будет обрабатывать сообщения:
private FlowcreateFlowForActor() { return ActorFlow.actorRef(out -> Messenger.props(out), actorSystem, materializer); }
Actor Flow.ActorRef создает поток, который обрабатывается Messenger actor .
3.2. Файл маршрутов
Теперь давайте добавим определения routes для методов контроллера в conf/routes :
GET / controllers.HomeController.index(request: Request) GET /chat controllers.HomeController.socket GET /chat/with/streams controllers.HomeController.akkaStreamsSocket GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)
Эти определения маршрутов сопоставляют входящие HTTP-запросы с методами действий контроллера, как описано в разделе Маршрутизация в игровых приложениях на Java .
3.3. Реализация Субъекта
Наиболее важной частью класса actor является метод createreceiver , который определяет, какие сообщения может обрабатывать актер:
@Override public Receive createReceive() { return receiveBuilder() .match(JsonNode.class, this::onSendMessage) .matchAny(o -> log.error("Received unknown message: {}", o.getClass())) .build(); }
Актер будет пересылать все сообщения, соответствующие классу JsonNode , в метод обработчика on Send Message :
private void onSendMessage(JsonNode jsonNode) { RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode); String message = requestDTO.getMessage().toLowerCase(); //.. processMessage(requestDTO); }
Затем обработчик будет отвечать на каждое сообщение с помощью метода ProcessMessage :
private void processMessage(RequestDTO requestDTO) { CompletionStageresponseFuture = getRandomMessage(); responseFuture.thenCompose(this::consumeHttpResponse) .thenAccept(messageDTO -> out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf())); }
3.4. Использование Rest API с помощью Akka HTTP
Мы отправим HTTP-запросы в генератор фиктивных сообщений по адресу JSONPlaceholder Posts . Когда приходит ответ, мы отправляем ответ клиенту, написав его out .
Давайте создадим метод, который вызывает конечную точку со случайным идентификатором post:
private CompletionStagegetRandomMessage() { int postId = ThreadLocalRandom.current().nextInt(0, 100); return Http.get(getContext().getSystem()) .singleRequest(HttpRequest.create( "https://jsonplaceholder.typicode.com/posts/" + postId)); }
Мы также обрабатываем ответ Http , который мы получаем от вызова службы, чтобы получить ответ JSON:
private CompletionStageconsumeHttpResponse( HttpResponse httpResponse) { Materializer materializer = Materializer.matFromSystem(getContext().getSystem()); return Jackson.unmarshaller(MessageDTO.class) .unmarshal(httpResponse.entity(), materializer) .thenApply(messageDTO -> { log.info("Received message: {}", messageDTO); discardEntity(httpResponse, materializer); return messageDTO; }); }
Класс MessageConverter – это утилита для преобразования между JsonNode и DTOs:
public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) { ObjectMapper mapper = new ObjectMapper(); return mapper.convertValue(jsonNode, MessageDTO.class); }
Далее нам нужно отбросить сущность . Метод discardEntityBytes convenience служит для того, чтобы легко отбросить объект, если он не имеет для нас никакой цели.
Давайте посмотрим, как отбросить байты:
private void discardEntity( HttpResponse httpResponse, Materializer materializer) { HttpMessage.DiscardedEntity discarded = httpResponse.discardEntityBytes(materializer); discarded.completionStage() .whenComplete((done, ex) -> log.info("Entity discarded completely!")); }
Теперь, выполнив обработку WebSocket, давайте посмотрим, как мы можем настроить клиент для этого с помощью HTML5 WebSocket.
4. Настройка клиента WebSocket
Для нашего клиента давайте создадим простое веб-приложение для чата.
4.1. Действие Контроллера
Нам нужно определить действие контроллера, которое отображает страницу индекса. Мы поместим это в класс контроллера app.controllers.Домашний контролер :
public Result index(Http.Request request) { String url = routes.HomeController.socket() .webSocketURL(request); return ok(views.html.index.render(url)); }
4.2. Страница Шаблона
А теперь давайте перейдем к app/views/ndex.scala.html страница и добавьте контейнер для полученных сообщений и форму для захвата нового сообщения:
F
Нам также нужно будет передать URL-адрес для действия контроллера WebSocket, объявив этот параметр в верхней части app/views/index.scala.html страница:
@(url: String)
4.3. Обработчики событий WebSocket в JavaScript
А теперь мы можем добавить JavaScript для обработки событий WebSocket. Для простоты мы добавим функции JavaScript в нижней части app/views/index.scala.html страница.
Давайте объявим обработчики событий:
var webSocket; var messageInput; function init() { initWebSocket(); } function initWebSocket() { webSocket = new WebSocket("@url"); webSocket.onopen = onOpen; webSocket.onclose = onClose; webSocket.onmessage = onMessage; webSocket.onerror = onError; }
Давайте добавим самих обработчиков:
function onOpen(evt) { writeToScreen("CONNECTED"); } function onClose(evt) { writeToScreen("DISCONNECTED"); } function onError(evt) { writeToScreen("ERROR: " + JSON.stringify(evt)); } function onMessage(evt) { var receivedData = JSON.parse(evt.data); appendMessageToView("Server", receivedData.body); }
Затем, чтобы представить вывод, мы будем использовать функции добавить сообщение Для просмотра и writeToScreen :
function appendMessageToView(title, message) { $("#messageContent").append("" + title + ": " + message + "
"); } function writeToScreen(message) { console.log("New message: ", message); }
4.4. Запуск и тестирование Приложения
Мы готовы протестировать приложение, так что давайте запустим его:
cd websockets sbt run
С запущенным приложением мы можем общаться с сервером, посетив http://localhost:9000 :
Каждый раз, когда мы вводим сообщение и нажимаем Отправить , сервер немедленно отвечает некоторым lorem ipsum из службы заполнителей JSON.
5. Обработка WebSockets непосредственно с потоками Akka
Если мы обрабатываем поток событий из источника и отправляем их клиенту, то мы можем смоделировать это вокруг потоков Akka.
Давайте посмотрим, как мы можем использовать потоки Akka в примере, где сервер отправляет сообщения каждые две секунды.
Мы начнем с действия WebSocket в контроллере Home :
public WebSocket akkaStreamsSocket() { return WebSocket.Json.accept(request -> { Sinkin = Sink.foreach(System.out::println); MessageDTO messageDTO = new MessageDTO("1", "1", "Title", "Test Body"); Source out = Source.tick( Duration.ofSeconds(2), Duration.ofSeconds(2), MessageConverter.messageToJsonNode(messageDTO) ); return Flow.fromSinkAndSource(in, out); }); }
Метод Source# tick принимает три параметра. Первый-это начальная задержка перед обработкой первого тика, а второй-интервал между последовательными тиками. В приведенном выше фрагменте мы установили оба значения равными двум секундам. Третий параметр-это объект, который должен быть возвращен на каждом тике.
Чтобы увидеть это в действии, нам нужно изменить URL-адрес в действии index и указать на сокет akka Streams конечную точку:
String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);
А теперь, обновляя страницу, мы будем видеть новую запись каждые две секунды:
6. Прекращение действия актера
В какой-то момент нам нужно будет закрыть чат, либо по запросу пользователя, либо через тайм-аут.
6.1. Обработка Прекращения действия Актера
Как мы определяем, когда веб-сайт был закрыт?
Play автоматически закроет WebSocket, когда актер, который обрабатывает WebSocket, завершит работу. Таким образом, мы можем справиться с этим сценарием, реализовав метод Actor#postS top :
@Override public void postStop() throws Exception { log.info("Messenger actor stopped at {}", OffsetDateTime.now() .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)); }
6.2. Ручное завершение действия Актера
Кроме того, если мы должны остановить актера, мы можем отправить PoisonPill актеру. В нашем примере приложения мы должны быть в состоянии обработать запрос “стоп”.
Давайте посмотрим, как это сделать в методе on Send Message :
private void onSendMessage(JsonNode jsonNode) { RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode); String message = requestDTO.getMessage().toLowerCase(); if("stop".equals(message)) { MessageDTO messageDTO = createMessageDTO("1", "1", "Stop", "Stopping actor"); out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()); self().tell(PoisonPill.getInstance(), getSelf()); } else { log.info("Actor received. {}", requestDTO); processMessage(requestDTO); } }
Когда мы получаем сообщение, мы проверяем, является ли оно запросом на остановку. Если это так, мы отправляем Ядовитую таблетку . В противном случае мы обрабатываем запрос.
7. Параметры конфигурации
Мы можем настроить несколько параметров с точки зрения того, как должен обрабатываться WebSocket. Давайте рассмотрим некоторые из них.
7.1. Длина фрейма WebSocket
Коммуникация WebSocket включает в себя обмен фреймами данных.
Длина фрейма WebSocket настраивается. У нас есть возможность настроить длину рамки в соответствии с требованиями нашего приложения.
Настройка более короткой длины кадра может помочь уменьшить количество атак типа “отказ в обслуживании”, использующих длинные кадры данных. Мы можем изменить длину кадра для приложения, указав максимальную длину в application.conf :
play.server.websocket.frame.maxLength = 64k
Мы также можем задать этот параметр конфигурации, указав максимальную длину в качестве параметра командной строки:
sbt -Dwebsocket.frame.maxLength=64k run
7.2. Время ожидания соединения
По умолчанию актер, который мы используем для обработки WebSocket, завершается через одну минуту. Это связано с тем, что игровой сервер, на котором работает наше приложение, по умолчанию имеет тайм-аут простоя в 60 секунд. Это означает, что все соединения, которые не получают запрос в течение шестидесяти секунд, автоматически закрываются.
Мы можем изменить это с помощью параметров конфигурации. Давайте перейдем к нашему application.conf и изменим сервер, чтобы у него не было таймаута простоя:
play.server.http.idleTimeout = "infinite"
Или мы можем передать этот параметр в качестве аргументов командной строки:
sbt -Dhttp.idleTimeout=infinite run
Мы также можем настроить это, указав dev Settings в build.sbt .
Параметры конфигурации, указанные в Параметры конфигурации, указанные в используются только в разработке, в производстве они будут игнорироваться:
PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"
Если мы повторно запустим приложение, актер не завершит работу.
Мы можем изменить значение на секунды:
PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"
Мы можем узнать больше о доступных параметрах конфигурации в документации Play Framework .
8. Заключение
В этом уроке мы реализовали WebSockets в рамках Play с актерами Akka и потоками Akka.
Затем мы рассмотрели, как напрямую использовать актеров Akka, а затем увидели, как потоки Akka можно настроить для обработки подключения к WebSocket.
На стороне клиента мы использовали JavaScript для обработки событий WebSocket.
Наконец, мы рассмотрели некоторые параметры конфигурации, которые мы можем использовать.
Как обычно, исходный код этого учебника доступен на GitHub .