Рубрики
Без рубрики

Руководство по фреймворку Fork/Join в Java

Введение в фреймворк fork/join, представленный в Java 7, и инструменты, помогающие ускорить параллельную обработку, пытаясь использовать все доступные процессорные ядра.

Автор оригинала: baeldung.

1. Обзор

Фреймворк fork/join был представлен в Java 7. Он предоставляет инструменты, помогающие ускорить параллельную обработку, пытаясь использовать все доступные процессорные ядра, что достигается с помощью подхода “разделяй и властвуй” .

На практике это означает, что фреймворк сначала “разветвляется” , рекурсивно разбивая задачу на более мелкие независимые подзадачи, пока они не станут достаточно простыми для асинхронного выполнения.

После этого начинается часть “соединение” , в которой результаты всех подзадач рекурсивно объединяются в один результат, или в случае задачи, которая возвращает void, программа просто ждет, пока не будет выполнена каждая подзадача.

Для обеспечения эффективного параллельного выполнения платформа fork/join использует пул потоков, называемый ForkJoinPool , который управляет рабочими потоками типа ForkJoinWorkerThread .

2. ForkJoinPool

ForkJoinPool является сердцем фреймворка. Это реализация ExecutorService , которая управляет рабочими потоками и предоставляет нам инструменты для получения информации о состоянии и производительности пула потоков.

Рабочие потоки могут выполнять только одну задачу за раз, но ForkJoinPool не создает отдельный поток для каждой отдельной подзадачи. Вместо этого каждый поток в пуле имеет свою собственную двустороннюю очередь (или deque , произносится deck ), в которой хранятся задачи.

Эта архитектура жизненно важна для балансировки рабочей нагрузки потока с помощью алгоритма воровства работы.

2.1. Алгоритм Кражи Работ

Проще говоря, свободные потоки пытаются “украсть” работу у деков занятых потоков.

По умолчанию рабочий поток получает задачи из головы своего собственного deque. Когда он пуст, поток берет задачу из хвоста deque другого занятого потока или из глобальной очереди ввода, так как именно здесь, вероятно, будут расположены самые большие части работы.

Такой подход сводит к минимуму вероятность того, что потоки будут конкурировать за задачи. Это также сокращает количество раз, когда потоку придется искать работу, так как сначала он работает с самыми большими доступными фрагментами работы.

2.2. Создание экземпляра ForkJoinPool

В Java 8 наиболее удобным способом получить доступ к экземпляру ForkJoinPool является использование его статического метода common Pool (). Как следует из названия, это обеспечит ссылку на общий пул, который является пулом потоков по умолчанию для каждой ForkJoinTask .

Согласно документации Oracle , использование предопределенного общего пула снижает потребление ресурсов, поскольку это препятствует созданию отдельного пула потоков для каждой задачи.

ForkJoinPool commonPool = ForkJoinPool.commonPool();

Такого же поведения можно добиться в Java 7, создав ForkJoinPool и назначив его общедоступному статическому полю служебного класса:

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

Теперь к нему можно легко получить доступ:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

С помощью конструкторов Forkjoinpool можно создать пользовательский пул потоков с определенным уровнем параллелизма, фабрикой потоков и обработчиком исключений. В приведенном выше примере пул имеет уровень параллелизма 2. Это означает, что пул будет использовать 2 ядра процессора.

3. ForkJoinTask

ForkJoinTask является базовым типом для задач, выполняемых внутри ForkJoinPool. На практике следует расширить один из двух его подклассов: RecursiveAction для void задач и RecursiveTask для задач, возвращающих значение. У них обоих есть абстрактный метод compute () , в котором определена логика задачи.

3.1. Рекурсивное действие – пример

В приведенном ниже примере единица работы, подлежащая обработке, представлена строкой , называемой рабочая нагрузка . Для демонстрационных целей эта задача бессмысленна: она просто вводит данные в верхний регистр и регистрирует их.

Чтобы продемонстрировать поведение разветвления фреймворка, пример разбивает задачу, если рабочая нагрузка .length() превышает заданный порог | с помощью метода createSubtask () .

Строка рекурсивно разделяется на подстроки, создавая Пользовательские экземпляры RecursiveTask , основанные на этих подстроках.

В результате метод возвращает List RecursiveAction>. RecursiveAction>.

Список передается в ForkJoinPool с помощью метода invokeAll() :

public class CustomRecursiveAction extends RecursiveAction {

    private String workload = "";
    private static final int THRESHOLD = 4;

    private static Logger logger = 
      Logger.getAnonymousLogger();

    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }

    @Override
    protected void compute() {
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else {
           processing(workload);
        }
    }

    private List createSubtasks() {
        List subtasks = new ArrayList<>();

        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());

        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));

        return subtasks;
    }

    private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by " 
          + Thread.currentThread().getName());
    }
}

Этот шаблон можно использовать для разработки собственных RecursiveAction классов . Для этого создайте объект, представляющий общий объем работы, выберите подходящий порог, определите метод разделения работы и определите метод выполнения работы.

3.2. Рекурсивная задача

Для задач, возвращающих значение, логика здесь аналогична, за исключением того, что результат для каждой подзадачи объединяется в один результат:

public class CustomRecursiveTask extends RecursiveTask {
    private int[] arr;

    private static final int THRESHOLD = 20;

    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }

    @Override
    protected Integer compute() {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
              .stream()
              .mapToInt(ForkJoinTask::join)
              .sum();
        } else {
            return processing(arr);
        }
    }

    private Collection createSubtasks() {
        List dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }

    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
          .filter(a -> a > 10 && a < 27)
          .map(a -> a * 10)
          .sum();
    }
}

В этом примере работа представлена массивом, хранящимся в поле art класса Custom RecursiveTask|/. Метод create Subtasks() рекурсивно делит задачу на более мелкие части работы, пока каждая часть не станет меньше порогового значения . Затем метод invokeAll() отправляет подзадачи в общий пул и возвращает список Future .

Для запуска выполнения для каждой подзадачи вызывается метод join () .

В этом примере это достигается с помощью Java 8 Stream API ; метод sum() используется в качестве представления объединения вложенных результатов в конечный результат.

4. Отправка задач в ForkJoinPool

Для отправки задач в пул потоков можно использовать несколько подходов.

Метод submit() или execute () (их варианты использования одинаковы):

forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

Метод invoke() разветвляет задачу и ожидает результата, и не требует ручного соединения:

int result = forkJoinPool.invoke(customRecursiveTask);

Метод invokeAll() является наиболее удобным способом отправки последовательности ForkJoinTasks в ForkJoinPool. Он принимает задачи в качестве параметров (две задачи, varargs или коллекция), а затем возвращает коллекцию объектов Feature в том порядке, в котором они были созданы.

Кроме того, вы можете использовать отдельные методы fork() и join () . Метод fork() отправляет задачу в пул, но не запускает ее выполнение. Для этой цели необходимо использовать метод join () . В случае RecursiveAction функция join() возвращает только null ; для RecursiveTask возвращает результат выполнения задачи:

customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();

В нашем примере RecursiveTask мы использовали метод invokeAll() для отправки последовательности подзадач в пул. Ту же работу можно выполнить с помощью fork() и join() , что имеет последствия для упорядочения результатов.

Чтобы избежать путаницы, обычно рекомендуется использовать метод invokeAll() для отправки более одной задачи в ForkJoinPool.

5. Выводы

Использование фреймворка fork/join может ускорить обработку больших задач, но для достижения этого результата необходимо следовать некоторым рекомендациям:

  • Используйте как можно меньше пулов потоков – в большинстве случаев лучшим решением является использование одного пула потоков для каждого приложения или системы
  • Используйте пул общих потоков по умолчанию, если конкретная настройка не требуется
  • Используйте разумный порог для разделения ForkJoinTask на подзадачи
  • Избегайте каких-либо блокировок в вашей Forkjointask

Примеры, используемые в этой статье, доступны в связанном репозитории GitHub .