Рубрики
Без рубрики

Введение в селектор Java NIO

Краткое и практическое руководство по селектору NIO на Java.

Автор оригинала: baeldung.

1. Обзор

В этой статье мы рассмотрим вводные части компонента Java Nio Selector .

Селектор обеспечивает механизм для мониторинга одного или нескольких каналов NIO и распознавания, когда один или несколько каналов становятся доступными для передачи данных.

Таким образом, один поток может использоваться для управления несколькими каналами и , следовательно, несколькими сетевыми соединениями.

2. Зачем использовать селектор?

С помощью селектора мы можем использовать один поток вместо нескольких для управления несколькими каналами. Переключение контекста между потоками является дорогостоящим для операционной системы , и, кроме того, каждый поток занимает память.

Поэтому чем меньше потоков мы используем, тем лучше. Однако важно помнить, что современные операционные системы и процессоры все лучше справляются с многозадачностью , поэтому накладные расходы на многопоточность со временем уменьшаются.

Здесь мы рассмотрим, как мы можем обрабатывать несколько каналов одним потоком с помощью селектора.

Обратите также внимание, что селекторы не просто помогают вам читать данные; они также могут прослушивать входящие сетевые подключения и записывать данные по медленным каналам.

3. Настройка

Чтобы использовать селектор, нам не нужна какая-либо специальная настройка. Все классы, которые нам нужны, находятся в пакете core java.nio , и нам просто нужно импортировать то, что нам нужно.

После этого мы можем зарегистрировать несколько каналов с помощью объекта селектора. Когда действие ввода-вывода происходит на любом из каналов, селектор уведомляет нас об этом. Именно так мы можем читать из большого количества источников данных в одном потоке.

Любой канал, который мы регистрируем с помощью селектора, должен быть подклассом Выбираемого канала . Это особый тип каналов, которые можно перевести в неблокирующий режим.

4. Создание селектора

Селектор может быть создан путем вызова статического метода open класса Selector , который будет использовать системный поставщик селектора по умолчанию для создания нового селектора:

Selector selector = Selector.open();

5. Регистрация Выбираемых Каналов

Для того чтобы селектор мог отслеживать любые каналы, мы должны зарегистрировать эти каналы с помощью селектора. Мы делаем это, вызывая метод register выбираемого канала.

Но прежде чем канал будет зарегистрирован селектором, он должен находиться в неблокирующем режиме:

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

Это означает, что мы не можем использовать FileChannel s с селектором, поскольку они не могут быть переключены в неблокирующий режим, как мы это делаем с каналами сокетов.

Первый параметр-это объект Selector , который мы создали ранее, второй параметр определяет набор интересов , , означающий, какие события мы заинтересованы прослушивать в отслеживаемом канале через селектор.

Существует четыре различных события, которые мы можем прослушивать, каждое из которых представлено константой в классе SelectionKey :

  • Подключение при попытке клиента подключиться к серверу. Представлен ключом SelectionKey.OP_CONNECT
  • Принять , когда сервер принимает соединение от клиента. Представлен ключом SelectionKey.OP_ACCEPT
  • Читать , когда сервер готов к чтению с канала. Представлен ключом SelectionKey.OP_READ
  • Напишите , когда сервер будет готов к записи в канал. Представлен ключом SelectionKey.OP_WRITE

Возвращаемый объект SelectionKey представляет регистрацию выбираемого канала с помощью селектора. Мы рассмотрим это подробнее в следующем разделе.

6. Объект SelectionKey

Как мы видели в предыдущем разделе, когда мы регистрируем канал с помощью селектора, мы получаем объект SelectionKey . Этот объект содержит данные, представляющие регистрацию канала.

Он содержит некоторые важные свойства, которые мы должны хорошо понимать, чтобы иметь возможность использовать селектор на канале. Мы рассмотрим эти свойства в следующих подразделах.

6.1. Процентный Набор

Набор интересов определяет набор событий, которые мы хотим, чтобы селектор следил за этим каналом. Это целочисленное значение; мы можем получить эту информацию следующим образом.

Во-первых, у нас есть набор процентов, возвращаемый методом Selection Key ‘s interestOps . Тогда у нас есть константа события в SelectionKey , которую мы рассматривали ранее.

Когда мы И эти два значения, мы получаем логическое значение, которое говорит нам, отслеживается ли событие или нет:

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

6.2. Готовый Набор

Готовый набор определяет набор событий, к которым канал готов. Это также целочисленное значение; мы можем получить эту информацию следующим образом.

У нас есть готовый набор, возвращаемый методом SelectionKey ‘s readyOps . Когда мы сопоставляем это значение с константами событий, как это было в случае набора интересов, мы получаем логическое значение, представляющее, готов ли канал к определенному значению или нет.

Другой альтернативный и более короткий способ сделать это-использовать для этой же цели удобные методы Selection Key’ s:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWriteable();

6.3. Канал

Доступ к просматриваемому каналу из объекта SelectionKey очень прост. Мы просто вызываем метод channel :

Channel channel = key.channel();

6.4. Селектор

Точно так же, как получить канал, очень легко получить объект Selector из объекта SelectionKey :

Selector selector = key.selector();

6.5. Прикрепление Предметов

Мы можем прикрепить объект к ключу выбора . Иногда мы можем захотеть дать каналу пользовательский идентификатор или прикрепить любой объект Java, который мы, возможно, захотим отслеживать.

Прикрепление объектов-удобный способ сделать это. Вот как вы прикрепляете и получаете объекты из ключа выбора |:

key.attach(Object);

Object object = key.attachment();

В качестве альтернативы мы можем прикрепить объект во время регистрации канала. Мы добавляем его в качестве третьего параметра в метод register канала, например:

SelectionKey key = channel.register(
  selector, SelectionKey.OP_ACCEPT, object);

7. Выбор ключа Канала

До сих пор мы рассматривали, как создать селектор, зарегистрировать в нем каналы и проверить свойства объекта SelectionKey , который представляет регистрацию канала в селекторе.

Это только половина процесса, теперь мы должны выполнить непрерывный процесс выбора готового набора, который мы рассматривали ранее. Мы делаем выбор с помощью метода селектора select , например:

int channels = selector.select();

Этот метод блокируется до тех пор, пока по крайней мере один канал не будет готов к работе. Возвращаемое целое число представляет количество ключей, каналы которых готовы к операции.

Затем мы обычно извлекаем набор выбранных ключей для обработки:

Set selectedKeys = selector.selectedKeys();

Набор, который мы получили, состоит из Ключа выбора объектов, каждый ключ представляет собой зарегистрированный канал, готовый к работе.

После этого мы обычно перебираем этот набор и для каждого ключа получаем канал и выполняем любые операции, которые появляются в нашем интересующем наборе на нем.

В течение срока службы канала он может быть выбран несколько раз, поскольку его ключ появляется в готовом наборе для различных событий. Вот почему мы должны иметь непрерывный цикл для захвата и обработки событий канала по мере их возникновения.

8. Полный Пример

Чтобы закрепить знания, полученные в предыдущих разделах, мы построим полный пример клиент-сервер.

Для удобства тестирования нашего кода мы создадим эхо-сервер и эхо-клиент. При такой настройке клиент подключается к серверу и начинает отправлять на него сообщения. Сервер повторяет сообщения, отправленные каждым клиентом.

Когда сервер обнаруживает определенное сообщение, например end , он интерпретирует его как конец связи и закрывает соединение с клиентом.

8.1. Сервер

Вот наш код для EchoServer.java :

public class EchoServer {

    private static final String POISON_PILL = "POISON_PILL";

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        ByteBuffer buffer = ByteBuffer.allocate(256);

        while (true) {
            selector.select();
            Set selectedKeys = selector.selectedKeys();
            Iterator iter = selectedKeys.iterator();
            while (iter.hasNext()) {

                SelectionKey key = iter.next();

                if (key.isAcceptable()) {
                    register(selector, serverSocket);
                }

                if (key.isReadable()) {
                    answerWithEcho(buffer, key);
                }
                iter.remove();
            }
        }
    }

    private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
      throws IOException {
 
        SocketChannel client = (SocketChannel) key.channel();
        client.read(buffer);
        if (new String(buffer.array()).trim().equals(POISON_PILL)) {
            client.close();
            System.out.println("Not accepting client messages anymore");
        }
        else {
            buffer.flip();
            client.write(buffer);
            buffer.clear();
        }
    }

    private static void register(Selector selector, ServerSocketChannel serverSocket)
      throws IOException {
 
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
    }

    public static Process start() throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = EchoServer.class.getCanonicalName();

        ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className);

        return builder.start();
    }
}

Это то, что происходит; мы создаем объект Selector , вызывая статический метод open . Затем мы также создаем канал, вызывая его статический метод open , в частности экземпляр ServerSocketChannel|/.

Это связано с тем, что ServerSocketChannel выбирается и подходит для сокета прослушивания, ориентированного на поток .

Затем мы привязываем его к порту по нашему выбору. Помните, мы уже говорили ранее, что перед регистрацией выбираемого канала в селекторе мы должны сначала установить его в неблокирующий режим. Итак, далее мы делаем это, а затем регистрируем канал в селекторе.

На данном этапе нам не нужен ключ выбора экземпляра этого канала, поэтому мы не будем его запоминать.

Java NIO использует модель, ориентированную на буфер, отличную от модели, ориентированной на поток. Таким образом, связь сокетов обычно происходит путем записи и чтения из буфера.

Поэтому мы создаем новый ByteBuffer , в который сервер будет записывать и считывать данные. Мы инициализируем его до 256 байт, это просто произвольное значение, в зависимости от того, сколько данных мы планируем передавать туда и обратно.

Наконец, мы выполняем процесс отбора. Мы выбираем готовые каналы, извлекаем их ключи выбора, перебираем ключи и выполняем операции, для которых каждый канал готов.

Мы делаем это в бесконечном цикле, так как серверы обычно должны продолжать работать независимо от того, есть активность или нет.

Единственная операция, которую может выполнить ServerSocketChannel , – это операция ACCEPT . Когда мы принимаем соединение от клиента, мы получаем объект SocketChannel , на котором мы можем выполнять чтение и запись. Мы устанавливаем его в неблокирующий режим и регистрируем его для операции чтения в селекторе.

Во время одного из последующих выборов этот новый канал станет готовым для чтения. Мы извлекаем его и считываем содержимое в буфер. Верный тому, что он является эхо-сервером, мы должны записать этот контент обратно клиенту.

Когда мы хотим записать в буфер, из которого мы читали, мы должны вызвать метод flip () |/.

Наконец, мы настроили буфер на режим записи, вызвав метод flip и просто записав в него.

Метод start() определен таким образом, что сервер echo может быть запущен как отдельный процесс во время модульного тестирования.

8.2. Клиент

Вот наш код для EchoClient.java :

public class EchoClient {
    private static SocketChannel client;
    private static ByteBuffer buffer;
    private static EchoClient instance;

    public static EchoClient start() {
        if (instance == null)
            instance = new EchoClient();

        return instance;
    }

    public static void stop() throws IOException {
        client.close();
        buffer = null;
    }

    private EchoClient() {
        try {
            client = SocketChannel.open(new InetSocketAddress("localhost", 5454));
            buffer = ByteBuffer.allocate(256);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public String sendMessage(String msg) {
        buffer = ByteBuffer.wrap(msg.getBytes());
        String response = null;
        try {
            client.write(buffer);
            buffer.clear();
            client.read(buffer);
            response = new String(buffer.array()).trim();
            System.out.println("response=" + response);
            buffer.clear();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;

    }
}

Клиент проще, чем сервер.

Мы используем одноэлементный шаблон для его создания в статическом методе start . Мы вызываем частный конструктор из этого метода.

В частном конструкторе мы открываем соединение на том же порту, на котором был привязан канал сервера, и все еще на том же хосте.

Затем мы создаем буфер, в который мы можем писать и из которого мы можем читать.

Наконец, у нас есть метод SendMessage , который считывает любую строку, которую мы передаем ему, в bytebuffer, который передается по каналу на сервер.

Затем мы считываем данные с клиентского канала, чтобы получить сообщение, отправленное сервером. Мы возвращаем это как эхо нашего послания.

8.3. Тестирование

Внутри класса под названием EchoTest.java , мы собираемся создать тестовый случай, который запускает сервер, отправляет сообщения на сервер и проходит только тогда, когда те же сообщения возвращаются с сервера. В качестве заключительного шага тестовый случай останавливает сервер до завершения.

Теперь мы можем провести тест:

public class EchoTest {

    Process server;
    EchoClient client;

    @Before
    public void setup() throws IOException, InterruptedException {
        server = EchoServer.start();
        client = EchoClient.start();
    }

    @Test
    public void givenServerClient_whenServerEchosMessage_thenCorrect() {
        String resp1 = client.sendMessage("hello");
        String resp2 = client.sendMessage("world");
        assertEquals("hello", resp1);
        assertEquals("world", resp2);
    }

    @After
    public void teardown() throws IOException {
        server.destroy();
        EchoClient.stop();
    }
}

9. Селектор.пробуждение()

Как мы видели ранее, вызов selector.select() блокирует текущий поток до тех пор, пока один из наблюдаемых каналов не станет готовым к работе. Мы можем переопределить это, вызвав селектор.wakeup() из другого потока.

В результате блокирующий поток возвращается немедленно, а не продолжает ждать, независимо от того, готов канал или нет .

Мы можем продемонстрировать это с помощью CountDownLatch и отслеживания шагов выполнения кода:

@Test
public void whenWakeUpCalledOnSelector_thenBlockedThreadReturns() {
    Pipe pipe = Pipe.open();
    Selector selector = Selector.open();
    SelectableChannel channel = pipe.source();
    channel.configureBlocking(false);
    channel.register(selector, OP_READ);

    List invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());

    CountDownLatch latch = new CountDownLatch(1);

    new Thread(() -> {
        invocationStepsTracker.add(">> Count down");
        latch.countDown();
        try {
            invocationStepsTracker.add(">> Start select");
            selector.select();
            invocationStepsTracker.add(">> End select");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }).start();

    invocationStepsTracker.add(">> Start await");
    latch.await();
    invocationStepsTracker.add(">> End await");

    invocationStepsTracker.add(">> Wakeup thread");
    selector.wakeup();
    //clean up
    channel.close();

    assertThat(invocationStepsTracker)
      .containsExactly(
        ">> Start await",
        ">> Count down",
        ">> Start select",
        ">> End await",
        ">> Wakeup thread",
        ">> End select"
    );
}

В этом примере мы используем Java Nio Класс Pipe для открытия канала в целях тестирования. Мы отслеживаем шаги выполнения кода в потокобезопасном списке. Анализируя эти шаги, мы можем увидеть, как selector.wakeup() освобождает поток, заблокированный selector.select() .

10. Заключение

В этой статье мы рассмотрели основное использование компонента селектора Java NIO.

Полный исходный код и все фрагменты кода для этой статьи доступны в моем проекте GitHub .