Рубрики
Без рубрики

Многопоточность в Java для чайников (часть 3)

Пулы потоков и барьеры. С тегами java, параллелизм, многопоточность.

Это будет последний пост из этой серии о многопоточности. Во втором посте мы оставили нашу игру в пинг-понг в хорошем положении, с потоками (игроками), заблокированными в ожидании своей очереди, и с возможностью увеличения количества игроков по желанию. Во втором посте мы оставили нашу игру в пинг-понг в хорошем положении, с потоками (игроками), заблокированными в ожидании своей очереди, и с возможностью увеличения количества игроков по желанию. Game отвечал за создание, запуск и завершение потоков, а также за синхронизацию с ними, чтобы основной поток не заканчивался раньше игроков. Давайте подведем итоги:

//...
Thread thread2 = new Thread(player2);
thread2.start();
Thread thread1 = new Thread(player1);
thread1.start();

//Let the players play!
try {
   Thread.sleep(2);
} catch (InterruptedException e) {
   e.printStackTrace();
}

//Tell the players to stop
thread1.interrupt();
thread2.interrupt();

//Wait until players finish
try {
   thread1.join();
   thread2.join();
} catch (InterruptedException e) {
   e.printStackTrace();
}
//...

Управление потоками/пулы потоков

Помимо того, что это утомительно, если мы не будем удобно инкапсулировать создание потоков, наш код будет не очень связным, потому что мы связываем логику самой игры с управлением параллелизмом. Кроме того, создание потоков обходится дорого, и в сложных приложениях это будет иметь значительные затраты с точки зрения производительности.

Java concurrency API экспортирует ряд классов и интерфейсов, позволяющих нам инкапсулировать управление созданием потоков с высокой гибкостью: Executor framework. Его тремя основными элементами являются:

  • Исполнитель : это интерфейс с одним методом, execute(Runnable) . Идея этой платформы заключается в том, что мы обрабатываем задачи вместо потоков, поэтому мы просим экземпляр Executor выполнить задачу (которая является экземпляром Работоспособный ) когда это возможно
  • ExecutorService : интерфейс, расширяющий Executor , который публикует ряд более продвинутых методов для управления всем жизненным циклом выполняемой работы ( shutdown , awaitTermination ) и для выполнения задач типа Callable , которые это всего лишь Runnable возвращает значение ( подробнее здесь ). В официальной документации вы можете прочитать больше обо всех возможностях этого интерфейса
  • Исполнители : два предыдущих компонента являются интерфейсами, и мы можем создавать наши собственные реализации, если захотим. Однако наиболее распространенные варианты использования уже реализованы в JDK. Чтобы использовать эти реализации, нам нужно запросить экземпляр, используя статические фабричные методы, предоставляемые Executors

В общем случае термин “Пул потоков” используется для обозначения реализаций Executor / ExecutorService , который мы используем для обработки наших потоков. Наиболее распространенные типы, которые можно получить, вызывая статические методы, предоставляемые Исполнителями являются:

  • Однопоточный исполнитель ( newSingleThreadExecutor ): он содержит один поток и широко не используется
  • Фиксированный пул потоков ( newFixedThreadPool ): он содержит постоянное количество “живых” потоков, которые ожидают получения задач для выполнения
  • Кэшированный пул потоков ( newcachedthreadpool ): он поддерживает пул потоков, который может увеличиваться или уменьшаться в зависимости от спроса
  • Запланированный пул потоков ( newScheduledThreadPool ): используется для планирования выполнения задач. Он возвращает экземпляр ScheduledExecutorService , учитывая , что ExecutorService не предоставляет соответствующие методы для планирования будущих задач, только для их выполнения как можно скорее

Пинг-Понг, версия 5: Пул потоков

Без изменения класса Игрок , мы можем адаптировать наш класс Game использовать пул потоков вместо того, чтобы самому брать на себя утомительные задачи создания, запуска и остановки потоков. Давайте посмотрим, как:

public class Game {

    public static void main(String[] args) {
        Lock lock = new ReentrantLock();

        Player player1 = new Player("ping", lock);
        Player player2 = new Player("pong", lock);

        player1.setNextPlayer(player2);
        player2.setNextPlayer(player1);

        System.out.println("Game starting...!");

        player1.setPlay(true);

        ExecutorService executor = Executors.newFixedThreadPool(2);

        executor.execute(player1);
        executor.execute(player2);

        sleep(2);

        executor.shutdownNow();

        System.out.println("Game finished!");
    }

    public static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Мы используем пул, содержащий два потока (по одному на игрока), и мы отправляем ему задачи для выполнения. Мы отключаем основной поток на 2 мс, чтобы оба игрока могли играть некоторое время, и мы вызываем метод shutdownNow()//, что эквивалентно прерыванию потоков, как мы делали в предыдущих версиях, но инкапсулирует всю логику в пуле. shutdownNow()//, что эквивалентно прерыванию потоков, как мы делали в предыдущих версиях, но инкапсулирует всю логику в пуле. Выключение сейчас вместо выключения , потому что последний ожидает завершения выполняемых задач, возвращая список с ожидающими задачами. , потому что последний ожидает завершения выполняемых задач, возвращая список с ожидающими задачами. , приложение никогда не закончится!

Что ж, если мы запустим эту версию несколько раз, мы увидим, что иногда она работает так, как ожидалось, в то время как в других случаях она показывает такой результат:

Game starting...!
ping
pong
//...
Game finished!
pong

Что случилось? После запроса прерывания обоих потоков возможно, что основной поток завершится раньше остальных потоков. Вот почему текст “Игра закончена!” появляется перед последним ходом “pong”. Изучение ExecutorService API, мы можем видеть метод, называемый awaitTermination . Изучение

//...
ExecutorService executor = Executors.newFixedThreadPool(2);

executor.execute(player1);
executor.execute(player2);

sleep(2);

executor.shutdownNow();

try {
    executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
    System.out.println("Main thread interrupted while waiting for players to finish");
}

System.out.println("Game finished!");
//...

Наконец-то мы получаем ожидаемый результат, и игра ведет себя так, как ожидалось, с более чистым и читаемым основным классом. Мы закончили? Еще нет!

Барьеры

Барьеры входа/выхода – это механизмы синхронизации, облегчающие одновременное выполнение группы потоков (барьер входа) или ожидание завершения работы всей группы потоков.

Мы уже видели идею барьера выхода ранее в этом посте, с помощью awaitTermination . Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Мы хотели бы создать барьер выхода без тайм-аута.

Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Мы хотели бы создать барьер выхода без тайм-аута. Чтобы понять, что такое барьер входа, мы собираемся добавить инструкцию в Game :

//...
executor.execute(player1);
sleep(1000);
executor.execute(player2);
//...

Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Мы хотели бы создать барьер выхода без тайм-аута. Чтобы понять, что такое барьер входа, мы собираемся добавить инструкцию в || Game, мы отправляем основной поток в спящий режим на секунду, прежде чем отправлять второго игрока играть. Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Мы хотели бы создать барьер выхода без тайм-аута. Чтобы понять, что такое барьер входа, мы собираемся добавить инструкцию в || Game, мы отправляем основной поток в спящий режим на секунду перед отправкой второго, хотя результат здесь трудно воспроизвести, потому что это связано со временем приложения, происходит что-то вроде этого: игрок для игры.

Game starting...!
ping
// Waiting 1 second
pong

Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Мы хотели бы создать барьер выхода без тайм-аута. Чтобы понять, что такое барьер входа, мы собираемся добавить инструкцию в || Game, мы отправляем основной поток в спящий режим на секунду перед отправкой второго, хотя результат здесь трудно воспроизвести, потому что это то, что игрок “пингует” играет, но во время во-вторых, здесь не с кем играть! это связано со временем приложения, происходит что-то вроде этого: игрок должен играть. Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Мы хотели бы создать барьер выхода без тайм-аута. Чтобы понять, что такое барьер входа, мы собираемся добавить инструкцию в || Game, мы отправляем основной поток в спящий режим на секунду перед отправкой второго, хотя результат здесь трудно воспроизвести, потому что это то, что игрок “пингует” играет, но во время во-вторых, игра “приостанавливается” на секунду, которая может составлять минуты (время, необходимое основному потоку для запуска второго потока/игрока). не с кем играть! это связано со временем приложения, происходит что-то вроде этого: игрок должен играть. Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Мы хотели бы создать барьер выхода без тайм-аута. Чтобы понять, что такое барьер входа, мы собираемся добавить инструкцию в || Game, мы отправляем основной поток в спящий режим на секунду перед отправкой второго, хотя результат здесь трудно воспроизвести, потому что это то, что игрок “пингует”, но во время во-вторых, игра “приостанавливается” на секунду, которая может быть минутной. Эта ситуация не идеальна, потому что мы запускаем параллельный процесс, требующий присутствия нескольких потоков, прежде чем все они будут готовы. es (время, необходимое основному потоку для запуска второго потока/проигрывателя). не с кем играть! это связано со временем приложения, происходит что-то вроде этого: игрок должен играть. Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Мы хотели бы создать барьер выхода без тайм-аута. Чтобы понять, что такое барьер входа, мы собираемся добавить инструкцию в || Game, мы отправляем основной поток в спящий режим на секунду перед отправкой второго, хотя результат здесь трудно воспроизвести, потому что это то, что игрок “пингует” играет, но во время во-вторых, игра “приостанавливается” на секунду, которая может быть минутной. Эта ситуация не идеальна, потому что мы начинаем параллель, чтобы избежать этого, нам нужен барьер входа. t процесс, требующий наличия нескольких потоков, прежде чем все они будут готовы. es (время, необходимое основному потоку для запуска второго потока/проигрывателя). не с кем играть! это связано со временем приложения, происходит что-то вроде этого: игрок должен играть.

Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Мы хотели бы создать барьер выхода без тайм-аута. Чтобы понять, что такое барьер входа, мы собираемся добавить инструкцию в Game, мы отправляем основной поток в спящий режим на секунду перед отправкой второго, хотя результат здесь трудно воспроизвести, потому что это то, что игрок "пингует" играет, но во время во-вторых, игра "приостанавливается" на секунду, которая может быть минутной. Эта ситуация не идеальна, потому что мы начинаем параллель, чтобы избежать этого, нам нужен барьер входа. процесс, требующий, чтобы в API параллелизма было несколько классов, которые можно использовать в качестве барьеров, но самый простой из них, который мы будем использовать в качестве барьера входа и выхода, - это Обратный отсчет при наличии нескольких потоков, прежде чем все они будут готовы. es (время, необходимое основному потоку для запуска второго потока/проигрывателя). не с кем играть! это связано со временем приложения, происходит что-то вроде этого: игрок должен играть.

  1. Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Мы хотели бы создать барьер выхода без тайм-аута. Чтобы понять, что такое барьер входа, мы собираемся добавить инструкцию в Game, мы отправляем основной поток в спящий режим на секунду перед отправкой второго, хотя результат здесь трудно воспроизвести, потому что это то, что игрок “пингует” играет, но во время во-вторых, игра “приостанавливается” на секунду, которая может быть минутной. Эта ситуация не идеальна, потому что мы начинаем параллель, чтобы избежать этого, нам нужен барьер входа. t процесс, требующий, чтобы в API параллелизма было несколько классов, которые можно использовать для создания барьера со счетчиком, инициализированным в
  2. N Использование этого класса можно свести к трем пунктам: ed как барьеры, но самый простой и тот, который мы будем использовать в качестве барьера входа и выхода, – это CountDownLatch - наличие нескольких потоков до того, как все они будут готовы. es (время, необходимое основному потоку для запуска второго потока/проигрывателя). не с кем играть! это связано со временем приложения, происходит что-то вроде этого: игрок должен играть. Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Мы хотели бы создать барьер выхода без тайм-аута. Чтобы понять, что такое барьер входа, мы собираемся добавить инструкцию в Game, мы отправляем основной поток в спящий режим на секунду перед отправкой второго, хотя результат здесь трудно воспроизвести, потому что это то, что игрок "пингует" играет, но во время во-вторых, игра "приостанавливается" на секунду, которая может быть минутной. Эта ситуация не идеальна, потому что мы начинаем параллель, чтобы избежать этого, нам нужен барьер входа. процесс, требующий, чтобы в API параллелизма было несколько классов, которые могут быть использованы. Потоки, которые должны продолжаться в зависимости от барьера, будут вызывать await()
  3. и будут заблокированы до тех пор, пока счетчик не достигнет нуля. Мы создаем барьер со счетчиком, инициализированным в N Использование этого класса можно свести к трем пунктам: ed как барьеры, но самый простой, и тот, который мы будем использовать в качестве барьера входа и выхода, - это CountDownLatch e наличие нескольких потоков до того, как все они будут готовы. es (время, необходимое основному потоку для запуска второго потока/проигрывателя). не с кем играть! это связано со временем приложения, происходит что-то вроде этого: игрок должен играть. Этот метод позволяет нам создать барьер выхода, но он заставляет нас устанавливать тайм-аут (это могут быть часы, но это тайм-аут!). Мы хотели бы создать барьер выхода без тайм-аута. Чтобы понять, что такое барьер входа, мы собираемся добавить инструкцию в Game, мы отправляем основной поток в спящий режим на секунду перед отправкой второго, хотя результат здесь трудно воспроизвести, потому что это то, что игрок “пингует” играет, но во время во-вторых, игра “приостанавливается” на секунду, которая может быть минутной. Эта ситуация не идеальна, потому что мы начинаем параллель, чтобы избежать этого, нам нужен барьер входа. для процесса, требующего, чтобы в API параллелизма было несколько классов, которые могут быть использованы, есть метод await

Версия 6: Барьеры для въезда/выезда

В этой новой версии нам придется изменить оба Игра и Игрок . Давайте посмотрим, как они будут выглядеть:

public class Player implements Runnable {

    private final String text;

    private final Lock lock;
    private final Condition myTurn;

    private final CountDownLatch entryBarrier;
    private final CountDownLatch exitBarrier;

    private Condition nextTurn;

    private Player nextPlayer;

    private volatile boolean play = false;

    public Player(String text,
                  Lock lock,
                  CountDownLatch entryBarrier,
                  CountDownLatch exitBarrier) {
        this.text = text;
        this.lock = lock;
        this.myTurn = lock.newCondition();

        this.entryBarrier = entryBarrier;
        this.exitBarrier = exitBarrier;
    }

    @Override
    public void run() {
        if(entryBarrierOpen())
            play();
    }

    public boolean entryBarrierOpen() {
        try {
            entryBarrier.await();
            return true;
        } catch (InterruptedException e) {
            System.out.println("Player "+text+
                                " was interrupted before starting Game!");
            return false;
        }
    }

    private void play() {
        while (!Thread.interrupted()) {
            lock.lock();

            try {
                while (!play)
                    myTurn.awaitUninterruptibly();

                System.out.println(text);

                this.play = false;
                nextPlayer.play = true;

                nextTurn.signal();
            } finally {
                lock.unlock();
            }
        }

        exitBarrier.countDown();
    }

    public void setNextPlayer(Player nextPlayer) {
        this.nextPlayer = nextPlayer;
        this.nextTurn = nextPlayer.myTurn;
    }

    public void setPlay(boolean play) {
        this.play = play;
    }
}

Класс не запускается до тех пор, пока не будет открыт входной барьер, и когда он прерывается для завершения, он вызывает Обратный отсчет на барьере выхода, который будет, как Игра знает, что оба игрока закончили.

Подумайте на секунду о значениях, которые мы должны инициализировать счетчики в барьер входа и выйдите из барьера , прежде чем продолжить чтение…

public class Game {

    public static void main(String[] args) {
        CountDownLatch entryBarrier = new CountDownLatch(1);
        CountDownLatch exitBarrier = new CountDownLatch(2);

        Lock lock = new ReentrantLock();

        Player player1 = new Player("ping", lock, entryBarrier, exitBarrier);
        Player player2 = new Player("pong", lock, entryBarrier, exitBarrier);

        player1.setNextPlayer(player2);
        player2.setNextPlayer(player1);

        System.out.println("Game starting...!");

        player1.setPlay(true);

        ExecutorService executor = Executors.newFixedThreadPool(2);

        executor.execute(player1);

        sleep(1000);

        executor.execute(player2);

        entryBarrier.countDown();

        sleep(2);

        executor.shutdownNow();

        try {
            exitBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Game finished!");
    }

    public static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Действительно, входной барьер имеет счетчик, установленный на 1, потому что он будет открыт, как только основной поток передаст все задачи в пул потоков, в то время как выходной барьер, который здесь используется в качестве альтернативы awaitTermination , имеет счетчик, установленный на 2, это количество участников, которые должны завершить выполнение, прежде чем основной поток сможет продолжить работу.

Таким образом, время в нашем приложении является желаемым, хотя мы немного усложнили код. Дело в том, что сам по себе параллелизм довольно сложен, поэтому трудно идеально инкапсулировать все используемые механизмы.

Прежде чем закончить пост, я хотел бы упомянуть, что барьер выхода был добавлен в эту версию с дидактическими целями. Лучший механизм для ожидания завершения работы группы потоков – это awaitTermination , используя разумный тайм-аут, поэтому, если мы достигнем этого тайм-аута, это будет связано с тем, что произошел сбой в одной из задач, которые мы ожидаем его завершения. Я добавил version 7 в GitHub, который использует барьер входа и awaitTermination в качестве барьера выхода. Эту версию можно было бы считать оптимальной версией приложения.

Я надеюсь, что эта серия постов была полезна для разъяснения многих концепций, используемых в параллельных приложениях, реализованных с помощью Java. Если вы хотите узнать больше, лучшая книга на эту тему – Java Concurrency На практике Брайан Гетц.

(Весь код можно найти в этом репозитории GitHub )

Оригинал: “https://dev.to/raulavila/multithreading-in-java-for-dummies-part-3”