Рубрики
Без рубрики

WebSockets с игровым фреймворком и Akka

WebSockets обеспечивают непрерывную связь между клиентским приложением и его сервером. Это может быть построено с помощью реактивных потоков в Akka и Play Framework.

Автор оригинала: baeldung.

1. Обзор

Когда мы хотим, чтобы наши веб-клиенты поддерживали диалог с нашим сервером, тогда WebSockets может быть полезным решением. WebSockets поддерживают постоянное полнодуплексное соединение. Это дает нам возможность отправлять двунаправленные сообщения между нашим сервером и клиентом.

В этом уроке мы узнаем, как использовать WebSockets с Akka в Play Framework .

2. Настройка

Давайте создадим простое приложение для чата. Пользователь отправит сообщения на сервер, а сервер ответит сообщением от JSONPlaceholder .

2.1. Настройка приложения Play Framework

Мы создадим это приложение, используя платформу Play.

Давайте следовать инструкциям из Введения в игру на Java, чтобы настроить и запустить простое приложение Play Framework.

2.2. Добавление необходимых файлов JavaScript

Кроме того, нам нужно будет работать с JavaScript для сценариев на стороне клиента. Это позволит нам получать новые сообщения, отправленные с сервера. Для этого мы будем использовать библиотеку jQuery .

Давайте добавим jQuery в нижнюю часть app/views/i ndex.scala.html файл:

2.3. Настройка Акка

Наконец, мы будем использовать Akka для обработки подключений WebSocket на стороне сервера.

Давайте перейдем к файлу build.sbt и добавим зависимости.

Нам нужно добавить зависимости akka-actor и akka-testkit :

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion

Нам нужно, чтобы они могли использовать и тестировать код фреймворка Akka.

Далее мы будем использовать потоки Akka. Итак, давайте добавим зависимость akka-stream :

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion

Наконец, нам нужно вызвать конечную точку rest от актера Akka. Для этого нам понадобится зависимость akka-http . Когда мы это сделаем, конечная точка вернет данные JSON, которые нам придется десериализовать, поэтому нам также нужно добавить зависимость akka-http-jackson :

libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion

И теперь все готово. Давайте посмотрим, как заставить работать WebSockets!

3. Работа С Веб-Сайтами С Актерами Akka

Механизм обработки WebSocket Play построен на потоках Akka. WebSocket моделируется как поток. Таким образом, входящие сообщения WebSocket подаются в поток, а сообщения, созданные потоком, отправляются клиенту.

Чтобы обработать WebSocket с помощью актера, нам понадобится утилита Play Actor Flow , которая преобразует ActorRef в поток. Это в основном требует некоторого кода Java с небольшой конфигурацией.

3.1. Метод контроллера WebSocket

Во-первых, нам нужен Материализованный экземпляр. Материализатор является фабрикой для двигателей выполнения потока.

Нам нужно ввести ActorSystem и Материализатор в контроллер app/controllers/HomeController.java :

private ActorSystem actorSystem;
private Materializer materializer;

@Inject
public HomeController(
  ActorSystem actorSystem, Materializer materializer) {
    this.actorSystem = actorSystem;
    this.materializer = materializer;
}

Теперь давайте добавим метод контроллера сокета:

public WebSocket socket() {
    return WebSocket.Json
      .acceptOrResult(this::createActorFlow);
}

Здесь мы вызываем функцию accept Или Result , которая принимает заголовок запроса и возвращает будущее. Возвращаемое будущее – это поток для обработки сообщений WebSocket.

Вместо этого мы можем отклонить запрос и вернуть результат отклонения.

Теперь давайте создадим поток:

private CompletionStage>> 
  createActorFlow(Http.RequestHeader request) {
    return CompletableFuture.completedFuture(
      F.Either.Right(createFlowForActor()));
}

Класс F в Play Framework определяет набор помощников по функциональному стилю программирования. В этом случае мы используем F. Либо.Правильно принять соединение и вернуть поток.

Допустим, мы хотели отклонить соединение, когда клиент не аутентифицирован.

Для этого мы могли бы проверить, задано ли имя пользователя в сеансе. И если это не так, мы отклоняем соединение с HTTP 403 Forbidden:

private CompletionStage>> 
  createActorFlow2(Http.RequestHeader request) {
    return CompletableFuture.completedFuture(
      request.session()
      .getOptional("username")
      .map(username -> 
        F.Either.>Right(
          createFlowForActor()))
      .orElseGet(() -> F.Either.Left(forbidden())));
}

Мы используем F. Либо.Влево , чтобы отклонить соединение таким же образом, как мы предоставляем поток с F. Либо.Правильно .

Наконец, мы связываем поток с субъектом, который будет обрабатывать сообщения:

private Flow createFlowForActor() {
    return ActorFlow.actorRef(out -> Messenger.props(out), 
      actorSystem, materializer);
}

Actor Flow.ActorRef создает поток, который обрабатывается Messenger actor .

3.2. Файл маршрутов

Теперь давайте добавим определения routes для методов контроллера в conf/routes :

GET  /                    controllers.HomeController.index(request: Request)
GET  /chat                controllers.HomeController.socket
GET  /chat/with/streams   controllers.HomeController.akkaStreamsSocket
GET  /assets/*file        controllers.Assets.versioned(path="/public", file: Asset)

Эти определения маршрутов сопоставляют входящие HTTP-запросы с методами действий контроллера, как описано в разделе Маршрутизация в игровых приложениях на Java .

3.3. Реализация Субъекта

Наиболее важной частью класса actor является метод createreceiver , который определяет, какие сообщения может обрабатывать актер:

@Override
public Receive createReceive() {
    return receiveBuilder()
      .match(JsonNode.class, this::onSendMessage)
      .matchAny(o -> log.error("Received unknown message: {}", o.getClass()))
      .build();
}

Актер будет пересылать все сообщения, соответствующие классу JsonNode , в метод обработчика on Send Message :

private void onSendMessage(JsonNode jsonNode) {
    RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
    String message = requestDTO.getMessage().toLowerCase();
    //..
    processMessage(requestDTO);
}

Затем обработчик будет отвечать на каждое сообщение с помощью метода ProcessMessage :

private void processMessage(RequestDTO requestDTO) {
    CompletionStage responseFuture = getRandomMessage();
    responseFuture.thenCompose(this::consumeHttpResponse)
      .thenAccept(messageDTO ->
        out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()));
}

3.4. Использование Rest API с помощью Akka HTTP

Мы отправим HTTP-запросы в генератор фиктивных сообщений по адресу JSONPlaceholder Posts . Когда приходит ответ, мы отправляем ответ клиенту, написав его out .

Давайте создадим метод, который вызывает конечную точку со случайным идентификатором post:

private CompletionStage getRandomMessage() {
    int postId = ThreadLocalRandom.current().nextInt(0, 100);
    return Http.get(getContext().getSystem())
      .singleRequest(HttpRequest.create(
        "https://jsonplaceholder.typicode.com/posts/" + postId));
}

Мы также обрабатываем ответ Http , который мы получаем от вызова службы, чтобы получить ответ JSON:

private CompletionStage consumeHttpResponse(
  HttpResponse httpResponse) {
    Materializer materializer = 
      Materializer.matFromSystem(getContext().getSystem());
    return Jackson.unmarshaller(MessageDTO.class)
      .unmarshal(httpResponse.entity(), materializer)
      .thenApply(messageDTO -> {
          log.info("Received message: {}", messageDTO);
          discardEntity(httpResponse, materializer);
          return messageDTO;
      });
}

Класс MessageConverter – это утилита для преобразования между JsonNode и DTOs:

public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) {
    ObjectMapper mapper = new ObjectMapper();
    return mapper.convertValue(jsonNode, MessageDTO.class);
}

Далее нам нужно отбросить сущность . Метод discardEntityBytes convenience служит для того, чтобы легко отбросить объект, если он не имеет для нас никакой цели.

Давайте посмотрим, как отбросить байты:

private void discardEntity(
  HttpResponse httpResponse, Materializer materializer) {
    HttpMessage.DiscardedEntity discarded = 
      httpResponse.discardEntityBytes(materializer);
    discarded.completionStage()
      .whenComplete((done, ex) -> 
        log.info("Entity discarded completely!"));
}

Теперь, выполнив обработку WebSocket, давайте посмотрим, как мы можем настроить клиент для этого с помощью HTML5 WebSocket.

4. Настройка клиента WebSocket

Для нашего клиента давайте создадим простое веб-приложение для чата.

4.1. Действие Контроллера

Нам нужно определить действие контроллера, которое отображает страницу индекса. Мы поместим это в класс контроллера app.controllers.Домашний контролер :

public Result index(Http.Request request) {
    String url = routes.HomeController.socket()
      .webSocketURL(request);
    return ok(views.html.index.render(url));
}

4.2. Страница Шаблона

А теперь давайте перейдем к app/views/ndex.scala.html страница и добавьте контейнер для полученных сообщений и форму для захвата нового сообщения:

F

Нам также нужно будет передать URL-адрес для действия контроллера WebSocket, объявив этот параметр в верхней части app/views/index.scala.html страница:

@(url: String)

4.3. Обработчики событий WebSocket в JavaScript

А теперь мы можем добавить JavaScript для обработки событий WebSocket. Для простоты мы добавим функции JavaScript в нижней части app/views/index.scala.html страница.

Давайте объявим обработчики событий:

var webSocket;
var messageInput;

function init() {
    initWebSocket();
}

function initWebSocket() {
    webSocket = new WebSocket("@url");
    webSocket.onopen = onOpen;
    webSocket.onclose = onClose;
    webSocket.onmessage = onMessage;
    webSocket.onerror = onError;
}

Давайте добавим самих обработчиков:

function onOpen(evt) {
    writeToScreen("CONNECTED");
}

function onClose(evt) {
    writeToScreen("DISCONNECTED");
}

function onError(evt) {
    writeToScreen("ERROR: " + JSON.stringify(evt));
}

function onMessage(evt) {
    var receivedData = JSON.parse(evt.data);
    appendMessageToView("Server", receivedData.body);
}

Затем, чтобы представить вывод, мы будем использовать функции добавить сообщение Для просмотра и writeToScreen :

function appendMessageToView(title, message) {
    $("#messageContent").append("

" + title + ": " + message + "

"); } function writeToScreen(message) { console.log("New message: ", message); }

4.4. Запуск и тестирование Приложения

Мы готовы протестировать приложение, так что давайте запустим его:

cd websockets
sbt run

С запущенным приложением мы можем общаться с сервером, посетив http://localhost:9000 :

Каждый раз, когда мы вводим сообщение и нажимаем Отправить , сервер немедленно отвечает некоторым lorem ipsum из службы заполнителей JSON.

5. Обработка WebSockets непосредственно с потоками Akka

Если мы обрабатываем поток событий из источника и отправляем их клиенту, то мы можем смоделировать это вокруг потоков Akka.

Давайте посмотрим, как мы можем использовать потоки Akka в примере, где сервер отправляет сообщения каждые две секунды.

Мы начнем с действия WebSocket в контроллере Home :

public WebSocket akkaStreamsSocket() {
    return WebSocket.Json.accept(request -> {
        Sink in = Sink.foreach(System.out::println);
        MessageDTO messageDTO = 
          new MessageDTO("1", "1", "Title", "Test Body");
        Source out = Source.tick(
          Duration.ofSeconds(2),
          Duration.ofSeconds(2),
          MessageConverter.messageToJsonNode(messageDTO)
        );
        return Flow.fromSinkAndSource(in, out);
    });
}

Метод Source# tick принимает три параметра. Первый-это начальная задержка перед обработкой первого тика, а второй-интервал между последовательными тиками. В приведенном выше фрагменте мы установили оба значения равными двум секундам. Третий параметр-это объект, который должен быть возвращен на каждом тике.

Чтобы увидеть это в действии, нам нужно изменить URL-адрес в действии index и указать на сокет akka Streams конечную точку:

String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);

А теперь, обновляя страницу, мы будем видеть новую запись каждые две секунды:

6. Прекращение действия актера

В какой-то момент нам нужно будет закрыть чат, либо по запросу пользователя, либо через тайм-аут.

6.1. Обработка Прекращения действия Актера

Как мы определяем, когда веб-сайт был закрыт?

Play автоматически закроет WebSocket, когда актер, который обрабатывает WebSocket, завершит работу. Таким образом, мы можем справиться с этим сценарием, реализовав метод Actor#postS top :

@Override
public void postStop() throws Exception {
    log.info("Messenger actor stopped at {}",
      OffsetDateTime.now()
      .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}

6.2. Ручное завершение действия Актера

Кроме того, если мы должны остановить актера, мы можем отправить PoisonPill актеру. В нашем примере приложения мы должны быть в состоянии обработать запрос “стоп”.

Давайте посмотрим, как это сделать в методе on Send Message :

private void onSendMessage(JsonNode jsonNode) {
    RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
    String message = requestDTO.getMessage().toLowerCase();
    if("stop".equals(message)) {
        MessageDTO messageDTO = 
          createMessageDTO("1", "1", "Stop", "Stopping actor");
        out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf());
        self().tell(PoisonPill.getInstance(), getSelf());
    } else {
        log.info("Actor received. {}", requestDTO);
        processMessage(requestDTO);
    }
}

Когда мы получаем сообщение, мы проверяем, является ли оно запросом на остановку. Если это так, мы отправляем Ядовитую таблетку . В противном случае мы обрабатываем запрос.

7. Параметры конфигурации

Мы можем настроить несколько параметров с точки зрения того, как должен обрабатываться WebSocket. Давайте рассмотрим некоторые из них.

7.1. Длина фрейма WebSocket

Коммуникация WebSocket включает в себя обмен фреймами данных.

Длина фрейма WebSocket настраивается. У нас есть возможность настроить длину рамки в соответствии с требованиями нашего приложения.

Настройка более короткой длины кадра может помочь уменьшить количество атак типа “отказ в обслуживании”, использующих длинные кадры данных. Мы можем изменить длину кадра для приложения, указав максимальную длину в application.conf :

play.server.websocket.frame.maxLength = 64k

Мы также можем задать этот параметр конфигурации, указав максимальную длину в качестве параметра командной строки:

sbt -Dwebsocket.frame.maxLength=64k run

7.2. Время ожидания соединения

По умолчанию актер, который мы используем для обработки WebSocket, завершается через одну минуту. Это связано с тем, что игровой сервер, на котором работает наше приложение, по умолчанию имеет тайм-аут простоя в 60 секунд. Это означает, что все соединения, которые не получают запрос в течение шестидесяти секунд, автоматически закрываются.

Мы можем изменить это с помощью параметров конфигурации. Давайте перейдем к нашему application.conf и изменим сервер, чтобы у него не было таймаута простоя:

play.server.http.idleTimeout = "infinite"

Или мы можем передать этот параметр в качестве аргументов командной строки:

sbt -Dhttp.idleTimeout=infinite run

Мы также можем настроить это, указав dev Settings в build.sbt .

Параметры конфигурации, указанные в Параметры конфигурации, указанные в используются только в разработке, в производстве они будут игнорироваться:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"

Если мы повторно запустим приложение, актер не завершит работу.

Мы можем изменить значение на секунды:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"

Мы можем узнать больше о доступных параметрах конфигурации в документации Play Framework .

8. Заключение

В этом уроке мы реализовали WebSockets в рамках Play с актерами Akka и потоками Akka.

Затем мы рассмотрели, как напрямую использовать актеров Akka, а затем увидели, как потоки Akka можно настроить для обработки подключения к WebSocket.

На стороне клиента мы использовали JavaScript для обработки событий WebSocket.

Наконец, мы рассмотрели некоторые параметры конфигурации, которые мы можем использовать.

Как обычно, исходный код этого учебника доступен на GitHub .