Рубрики
Без рубрики

Руководство по асинхронному каналу NIO2

Краткое и практическое руководство по API асинхронного сокета Java NIO2

Автор оригинала: baeldung.

1. Обзор

В этой статье мы продемонстрируем, как построить простой сервер и его клиент с помощью API-интерфейсов Java 7 NIO.2.

Мы рассмотрим классы AsynchronousServerSocketChannel и AsynchronousSocketChannel , которые являются ключевыми классами, используемыми при реализации сервера и клиента соответственно.

Если вы новичок в API-интерфейсах каналов NIO.2, у нас есть вступительная статья на этом сайте. Вы можете прочитать его, перейдя по этой ссылке .

Все классы, необходимые для использования API-интерфейсов каналов NIO.2, объединены в java.nio.channels package:

import java.nio.channels.*;

2. Сервер С Будущим

Экземпляр AsynchronousServerSocketChannel создается путем вызова статического открытого API в его классе:

AsynchronousServerSocketChannel server
  = AsynchronousServerSocketChannel.open();

Недавно созданный канал сокета асинхронного сервера открыт, но еще не привязан, поэтому мы должны привязать его к локальному адресу и при необходимости выбрать порт:

server.bind(new InetSocketAddress("127.0.0.1", 4555));

С таким же успехом мы могли бы передать значение null, чтобы он использовал локальный адрес и привязывался к произвольному порту:

server.bind(null);

После привязки API accept используется для инициирования приема подключений к сокету канала:

Future acceptFuture = server.accept();

Как и в случае с асинхронными операциями канала, приведенный выше вызов возвращается сразу же, и выполнение продолжается.

Затем мы можем использовать API get для запроса ответа от объекта Future :

AsynchronousSocketChannel worker = future.get();

Этот вызов будет заблокирован, если необходимо дождаться запроса на соединение от клиента. При необходимости мы можем указать тайм-аут, если не хотим ждать вечно:

AsynchronousSocketChannel worker = acceptFuture.get(10, TimeUnit.SECONDS);

После того, как вышеупомянутый вызов возвращается и операция прошла успешно, мы можем создать цикл, в котором мы прослушиваем входящие сообщения и передаем их клиенту.

Давайте создадим метод с именем runServer , в котором мы будем выполнять ожидание и обрабатывать любые входящие сообщения:

public void runServer() {
    clientChannel = acceptResult.get();
    if ((clientChannel != null) && (clientChannel.isOpen())) {
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(32);
            Future readResult  = clientChannel.read(buffer);
            
            // perform other computations
            
            readResult.get();
            
            buffer.flip();
            Future writeResult = clientChannel.write(buffer);
 
            // perform other computations
 
            writeResult.get();
            buffer.clear();
        } 
        clientChannel.close();
        serverChannel.close();
    }
}

Внутри цикла все, что мы делаем, – это создаем буфер для чтения и записи в него в зависимости от операции.

Затем, каждый раз, когда мы выполняем чтение или запись, мы можем продолжить выполнение любого другого кода, и когда мы будем готовы обработать результат, мы вызовем get() API для объекта Future .

Чтобы запустить сервер, мы вызываем его конструктор, а затем метод runServer внутри main :

public static void main(String[] args) {
    AsyncEchoServer server = new AsyncEchoServer();
    server.runServer();
}

3. Сервер С completionHandler

В этом разделе мы рассмотрим, как реализовать один и тот же сервер, используя подход completionHandler , а не подход Future .

Внутри конструктора мы создаем AsynchronousServerSocketChannel и привязываем его к локальному адресу так же, как и раньше:

serverChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
serverChannel.bind(hostAddress);

Затем, все еще находясь внутри конструктора, мы создаем цикл while, в котором принимаем любое входящее соединение от клиента. Этот цикл while используется строго для предотвращения выхода сервера перед установлением соединения с клиентом .

Чтобы предотвратить бесконечное выполнение цикла , мы вызываем System.in.read() в его конце, чтобы заблокировать выполнение до тех пор, пока входящее соединение не будет считано из стандартного входного потока:

while (true) {
    serverChannel.accept(
      null, new CompletionHandler() {

        @Override
        public void completed(
          AsynchronousSocketChannel result, Object attachment) {
            if (serverChannel.isOpen()){
                serverChannel.accept(null, this);
            }

            clientChannel = result;
            if ((clientChannel != null) && (clientChannel.isOpen())) {
                ReadWriteHandler handler = new ReadWriteHandler();
                ByteBuffer buffer = ByteBuffer.allocate(32);

                Map readInfo = new HashMap<>();
                readInfo.put("action", "read");
                readInfo.put("buffer", buffer);

                clientChannel.read(buffer, readInfo, handler);
             }
         }
         @Override
         public void failed(Throwable exc, Object attachment) {
             // process error
         }
    });
    System.in.read();
}

При установлении соединения вызывается метод completed обратного вызова в completionHandler операции accept.

Его возвращаемым типом является экземпляр AsynchronousSocketChannel . Если канал сокета сервера все еще открыт, мы снова вызываем API accept , чтобы подготовиться к другому входящему соединению, повторно используя тот же обработчик.

Затем мы назначаем возвращенный канал сокета глобальному экземпляру. Затем мы проверяем, что он не является нулевым и что он открыт, прежде чем выполнять с ним операции.

Точка, в которой мы можем начать операции чтения и записи, находится внутри API completed обратного вызова обработчика операции accept . Этот шаг заменяет предыдущий подход, при котором мы опрашивали канал с помощью API get .

Обратите внимание, что сервер больше не будет существовать после установления соединения , если мы явно не закроем его.

Обратите также внимание, что мы создали отдельный внутренний класс для обработки операций чтения и записи; ReadWriteHandler . Мы увидим, как объект вложения пригодится на этом этапе.

Во-первых, давайте посмотрим на класс ReadWriteHandler :

class ReadWriteHandler implements 
  CompletionHandler> {
    
    @Override
    public void completed(
      Integer result, Map attachment) {
        Map actionInfo = attachment;
        String action = (String) actionInfo.get("action");

        if ("read".equals(action)) {
            ByteBuffer buffer = (ByteBuffer) actionInfo.get("buffer");
            buffer.flip();
            actionInfo.put("action", "write");

            clientChannel.write(buffer, actionInfo, this);
            buffer.clear();

        } else if ("write".equals(action)) {
            ByteBuffer buffer = ByteBuffer.allocate(32);

            actionInfo.put("action", "read");
            actionInfo.put("buffer", buffer);

            clientChannel.read(buffer, actionInfo, this);
        }
    }
    
    @Override
    public void failed(Throwable exc, Map attachment) {
        // 
    }
}

Общий тип нашего вложения в классе ReadWriteHandler – это карта. Нам конкретно нужно передать через него два важных параметра – тип операции(действия) и буфер.

Далее мы рассмотрим, как используются эти параметры.

Первая операция, которую мы выполняем, – это read , так как это эхо-сервер, который реагирует только на сообщения клиента. Внутри метода ReadWriteHandler ‘s completed callback мы извлекаем вложенные данные и решаем, что делать соответственно.

Если операция read завершена, мы извлекаем буфер, изменяем параметр действия вложения и сразу же выполняем операцию write , чтобы передать сообщение клиенту.

Если это только что завершенная операция write , мы снова вызываем API read , чтобы подготовить сервер к приему другого входящего сообщения.

4. Клиент

После настройки сервера мы теперь можем настроить клиент, вызвав open API в классе Asynchronoussocketchannel . Этот вызов создает новый экземпляр канала сокета клиента, который мы затем используем для подключения к серверу:

AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999)
Future future = client.connect(hostAddress);

Операция connect ничего не возвращает при успешном выполнении. Однако мы все еще можем использовать объект Future для мониторинга состояния асинхронной операции.

Давайте вызовем get API для ожидания соединения:

future.get()

После этого шага мы можем начать отправлять сообщения на сервер и получать эхо-сигналы для того же самого. Метод SendMessage выглядит следующим образом:

public String sendMessage(String message) {
    byte[] byteMsg = new String(message).getBytes();
    ByteBuffer buffer = ByteBuffer.wrap(byteMsg);
    Future writeResult = client.write(buffer);

    // do some computation

    writeResult.get();
    buffer.flip();
    Future readResult = client.read(buffer);
    
    // do some computation

    readResult.get();
    String echo = new String(buffer.array()).trim();
    buffer.clear();
    return echo;
}

5. Испытание

Чтобы подтвердить, что наши серверные и клиентские приложения работают в соответствии с ожиданиями, мы можем использовать тест:

@Test
public void givenServerClient_whenServerEchosMessage_thenCorrect() {
    String resp1 = client.sendMessage("hello");
    String resp2 = client.sendMessage("world");

    assertEquals("hello", resp1);
    assertEquals("world", resp2);
}

6. Заключение

В этой статье мы рассмотрели API-интерфейсы асинхронных сокетов Java NIO.2. Мы смогли пройти через процесс создания сервера и клиента с помощью этих новых API.

Вы можете получить доступ к полному исходному коду этой статьи в проекте Github .