1. введение
Циклические барьеры – это конструкции синхронизации, которые были введены в Java 5 как часть пакета java.util.concurrent .
В этой статье мы рассмотрим эту реализацию в сценарии параллелизма.
2. Параллелизм Java – Синхронизаторы
Пакет java.util.concurrent содержит несколько классов, которые помогают управлять набором потоков, которые сотрудничают друг с другом. Некоторые из них включают в себя:
- Циклобарьер
- Фазер
- Обратный отсчет
- Обменник
- Семафор
- Синхронная очередь
Эти классы предлагают готовые функциональные возможности для общих шаблонов взаимодействия между потоками.
Если у нас есть набор потоков, которые взаимодействуют друг с другом и напоминают один из общих шаблонов, мы можем просто повторно использовать соответствующие классы библиотек (также называемые Синхронизаторами ) вместо того, чтобы пытаться придумать пользовательскую схему с использованием набора блокировок и объектов условий и ключевого слова synchronized .
Давайте сосредоточимся на CyclicBarrier в дальнейшем.
Давайте сосредоточимся на || CyclicBarrier || в дальнейшем.
A CyclicBarrier – это синхронизатор, который позволяет набору потоков ждать друг друга, чтобы достичь общей точки выполнения, также называемой барьером .
Циклические барьеры используются в программах, в которых у нас есть фиксированное количество потоков, которые должны ждать друг друга, чтобы достичь общей точки, прежде чем продолжить выполнение.
Барьер называется циклический потому что он может быть повторно использован после освобождения ожидающих потоков.
4. Использование
Конструктор для CyclicBarrier прост. Требуется одно целое число, которое обозначает количество потоков, которым необходимо вызвать метод await() на экземпляре барьера, чтобы обозначить достижение общей точки выполнения:
public CyclicBarrier(int parties)
Потоки, которые должны синхронизировать свое выполнение, также называются parties , и вызов метода await () – это то, как мы можем зарегистрировать, что определенный поток достиг точки барьера.
Этот вызов является синхронным, и поток, вызывающий этот метод, приостанавливает выполнение до тех пор, пока указанное количество потоков не вызовет один и тот же метод на барьере. Эта ситуация , когда необходимое количество потоков вызвало await () , называется срабатыванием барьера .
При необходимости мы можем передать второй аргумент конструктору, который является выполняемым экземпляром. Это имеет логику, которая будет выполняться последним потоком, который преодолевает барьер:
public CyclicBarrier(int parties, Runnable barrierAction)
5. Реализация
Чтобы увидеть CyclicBarrier в действии, рассмотрим следующий сценарий:
Существует операция, которую выполняет фиксированное количество потоков и сохраняет соответствующие результаты в списке. Когда все потоки завершают выполнение своих действий, один из них (обычно последний, который преодолевает барьер) начинает обработку данных, полученных каждым из них.
Давайте реализуем основной класс, в котором происходит все действие:
public class CyclicBarrierDemo { private CyclicBarrier cyclicBarrier; private List> partialResults = Collections.synchronizedList(new ArrayList<>()); private Random random = new Random(); private int NUM_PARTIAL_RESULTS; private int NUM_WORKERS; // ... }
Этот класс довольно прямолинейен – NUM_WORKERS – это количество потоков, которые будут выполняться, и NUM_PARTIAL_RESULTS – это количество результатов, которые будут получены каждым из рабочих потоков.
Наконец, у нас есть частичные результаты , которые представляют собой список, в котором будут храниться результаты каждого из этих рабочих потоков. Обратите внимание, что этот список является synchronizedList , потому что в него одновременно будут записываться несколько потоков, а метод add() не является потокобезопасным на простом ArrayList .
Теперь давайте реализуем логику каждого из рабочих потоков:
public class CyclicBarrierDemo { // ... class NumberCruncherThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); ListpartialResult = new ArrayList<>(); // Crunch some numbers and store the partial result for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) { Integer num = random.nextInt(10); System.out.println(thisThreadName + ": Crunching some numbers! Final result - " + num); partialResult.add(num); } partialResults.add(partialResult); try { System.out.println(thisThreadName + " waiting for others to reach barrier."); cyclicBarrier.await(); } catch (InterruptedException e) { // ... } catch (BrokenBarrierException e) { // ... } } } }
Теперь мы реализуем логику, которая работает, когда барьер был отключен.
Чтобы все было просто, давайте просто добавим все числа в список частичных результатов:
public class CyclicBarrierDemo { // ... class AggregatorThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); System.out.println( thisThreadName + ": Computing sum of " + NUM_WORKERS + " workers, having " + NUM_PARTIAL_RESULTS + " results each."); int sum = 0; for (ListthreadResult : partialResults) { System.out.print("Adding "); for (Integer partialResult : threadResult) { System.out.print(partialResult+" "); sum += partialResult; } System.out.println(); } System.out.println(thisThreadName + ": Final result = " + sum); } } }
Последним шагом было бы построить CyclicBarrier и начать работу с помощью метода main() :
public class CyclicBarrierDemo { // Previous code public void runSimulation(int numWorkers, int numberOfPartialResults) { NUM_PARTIAL_RESULTS = numberOfPartialResults; NUM_WORKERS = numWorkers; cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread()); System.out.println("Spawning " + NUM_WORKERS + " worker threads to compute " + NUM_PARTIAL_RESULTS + " partial results each"); for (int i = 0; i < NUM_WORKERS; i++) { Thread worker = new Thread(new NumberCruncherThread()); worker.setName("Thread " + i); worker.start(); } } public static void main(String[] args) { CyclicBarrierDemo demo = new CyclicBarrierDemo(); demo.runSimulation(5, 3); } }
В приведенном выше коде мы инициализировали cyclicbarrier с помощью 5 потоков, каждый из которых производит 3 целых числа в рамках своих вычислений и сохраняет их в результирующем списке.
Как только барьер отключен, последний поток, который отключил барьер, выполняет логику, указанную в AggregatorThread, а именно – добавляет все числа, созданные потоками.
6. Результаты
Вот результат одного выполнения приведенной выше программы – каждое выполнение может привести к различным результатам, поскольку потоки могут быть созданы в другом порядке:
Spawning 5 worker threads to compute 3 partial results each Thread 0: Crunching some numbers! Final result - 6 Thread 0: Crunching some numbers! Final result - 2 Thread 0: Crunching some numbers! Final result - 2 Thread 0 waiting for others to reach barrier. Thread 1: Crunching some numbers! Final result - 2 Thread 1: Crunching some numbers! Final result - 0 Thread 1: Crunching some numbers! Final result - 5 Thread 1 waiting for others to reach barrier. Thread 3: Crunching some numbers! Final result - 6 Thread 3: Crunching some numbers! Final result - 4 Thread 3: Crunching some numbers! Final result - 0 Thread 3 waiting for others to reach barrier. Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 0 Thread 2 waiting for others to reach barrier. Thread 4: Crunching some numbers! Final result - 9 Thread 4: Crunching some numbers! Final result - 3 Thread 4: Crunching some numbers! Final result - 5 Thread 4 waiting for others to reach barrier. Thread 4: Computing final sum of 5 workers, having 3 results each. Adding 6 2 2 Adding 2 0 5 Adding 6 4 0 Adding 1 1 0 Adding 9 3 5 Thread 4: Final result = 46
Как видно из приведенного выше вывода, Поток 4 – это тот, который преодолевает барьер, а также выполняет логику окончательной агрегации. Также не обязательно, чтобы потоки действительно запускались в том порядке, в котором они запускаются, как показано в приведенном выше примере.
7. Заключение
В этой статье мы рассмотрели, что такое CyclicBarrier и в каких ситуациях он полезен.
Мы также реализовали сценарий, в котором нам требовалось фиксированное количество потоков, чтобы достичь фиксированной точки выполнения, прежде чем продолжить работу с другой логикой программы.
Как всегда, код для учебника можно найти на GitHub .