Рубрики
Без рубрики

Руководство по Java-фазеру

Узнайте, как API Phaser Java может помочь вам справиться с расширенными задачами координации потоков.

Автор оригинала: baeldung.

1. Обзор

В этой статье мы рассмотрим конструкцию Phaser из пакета java.util.concurrent . Это очень похожая конструкция на CountDownLatch , которая позволяет нам координировать выполнение потоков. По сравнению с CountDownLatch , он обладает некоторыми дополнительными функциями.

Фазер – это барьер, на котором динамическое количество потоков должно подождать, прежде чем продолжить выполнение. В CountDownLatch этот номер не может быть настроен динамически и должен быть указан при создании экземпляра.

2. API фазера

Фазер позволяет нам построить логику, в которой потокам необходимо подождать на барьере, прежде чем перейти к следующему шагу выполнения .

Мы можем координировать несколько этапов выполнения, повторно используя экземпляр Phaser для каждой фазы программы. На каждой фазе может быть разное количество потоков, ожидающих перехода на другую фазу. Позже мы рассмотрим пример использования фраз.

Чтобы участвовать в координации, потоку необходимо зарегистрироваться() в экземпляре Фазера . Обратите внимание, что это только увеличивает количество зарегистрированных участников, и мы не можем проверить, зарегистрирован ли текущий поток – для этого нам пришлось бы подклассировать реализацию.

Поток сигнализирует о том , что он достиг барьера, вызывая arriveAndAwaitAdvance () , который является методом блокировки. Когда количество прибывших сторон будет равно количеству зарегистрированных сторон, выполнение программы продолжится , а количество этапов увеличится. Мы можем получить номер текущей фазы, вызвав метод get Phase () .

Когда поток завершит свою работу, мы должны вызвать метод arriveAndDeregister () , чтобы сообщить, что текущий поток больше не должен учитываться на этой конкретной фазе.

3. Реализация Логики С Использованием API Phaser

Допустим, мы хотим скоординировать несколько этапов действий. Три потока будут обрабатывать первую фазу, а два потока будут обрабатывать вторую фазу.

Мы создадим Продолжительное действие класс, реализующий Управляемый интерфейс:

class LongRunningAction implements Runnable {
    private String threadName;
    private Phaser ph;

    LongRunningAction(String threadName, Phaser ph) {
        this.threadName = threadName;
        this.ph = ph;
        ph.register();
    }

    @Override
    public void run() {
        ph.arriveAndAwaitAdvance();
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ph.arriveAndDeregister();
    }
}

Когда наш класс действий создан, мы регистрируемся в экземпляре Phaser с помощью метода register () . Это увеличит количество потоков, использующих этот конкретный Фазер.

Вызов arriveAndAwaitAdvance() приведет к тому, что текущий поток будет ждать на барьере. Как уже упоминалось, когда количество прибывших сторон станет таким же, как количество зарегистрированных сторон, исполнение будет продолжено.

После завершения обработки текущий поток отменяет регистрацию, вызывая метод arriveAndDeregister () .

Давайте создадим тестовый случай, в котором мы запустим три Длительных действия потока и заблокируем барьер. Затем, после завершения действия, мы создадим два дополнительных потока LongRunningAction , которые будут выполнять обработку следующего этапа.

При создании экземпляра Phaser из основного потока мы передаем 1 в качестве аргумента. Это эквивалентно вызову метода register() из текущего потока. Мы делаем это, потому что, когда мы создаем три рабочих потока, основной поток является координатором, и поэтому Фазер должен иметь четыре потока, зарегистрированных в нем:

ExecutorService executorService = Executors.newCachedThreadPool();
Phaser ph = new Phaser(1);
 
assertEquals(0, ph.getPhase());

Фаза после инициализации равна нулю.

Класс Phaser имеет конструктор, в котором мы можем передать ему родительский экземпляр. Это полезно в тех случаях, когда у нас есть большое количество сторон, которые столкнутся с огромными затратами на синхронизацию. В таких ситуациях экземпляры Фраз могут быть настроены таким образом, чтобы группы подфазеров имели общего родителя.

Далее, давайте запустим три Длительных потока действий действий, которые будут ждать на барьере, пока мы не вызовем метод arriveAndAwaitAdvance() из основного потока.

Имейте в виду, что мы инициализировали наш Фазер с помощью 1 и вызвал register() еще три раза. Теперь три потока действий объявили, что они достигли барьера, поэтому необходим еще один вызов arriveAndAwaitAdvance () – из основного потока:

executorService.submit(new LongRunningAction("thread-1", ph));
executorService.submit(new LongRunningAction("thread-2", ph));
executorService.submit(new LongRunningAction("thread-3", ph));

ph.arriveAndAwaitAdvance();
 
assertEquals(1, ph.getPhase());

После завершения этой фазы метод getPhase() вернет один, потому что программа завершила обработку первого шага выполнения.

Предположим, что два потока должны провести следующий этап обработки. Мы можем использовать Фазер для достижения этой цели, потому что это позволяет нам динамически настраивать количество потоков, которые должны ждать на барьере. Мы запускаем два новых потока, но они не будут выполняться до тех пор, пока не будет выполнен вызов arriveAndAwaitAdvance() из основного потока (как и в предыдущем случае):

executorService.submit(new LongRunningAction("thread-4", ph));
executorService.submit(new LongRunningAction("thread-5", ph));
ph.arriveAndAwaitAdvance();
 
assertEquals(2, ph.getPhase());

ph.arriveAndDeregister();

После этого метод getPhase() вернет номер фазы, равный двум. Когда мы хотим завершить нашу программу, нам нужно вызвать метод arriveAndDeregister () , поскольку основной поток все еще зарегистрирован в Фазере. Когда снятие с регистрации приводит к тому, что число зарегистрированных сторон становится равным нулю, Фаза завершается|/. Все вызовы методов синхронизации больше не будут блокироваться и будут немедленно возвращены.

Запуск программы приведет к следующему результату (полный исходный код с инструкциями строки печати можно найти в репозитории кода):

This is phase 0
This is phase 0
This is phase 0
Thread thread-2 before long running action
Thread thread-1 before long running action
Thread thread-3 before long running action
This is phase 1
This is phase 1
Thread thread-4 before long running action
Thread thread-5 before long running action

Мы видим, что все потоки ожидают выполнения до тех пор, пока не откроется барьер. Следующая фаза выполнения выполняется только после успешного завершения предыдущей.

4. Заключение

В этом уроке мы рассмотрели конструкцию Phaser из java.util.concurrent и реализовали логику координации с несколькими фазами, используя класс Phaser .

Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub – это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.