Рубрики
Без рубрики

Быстрый Исполнитель Для Небольших Задач

Настройка Производительности Исполнителя. Помеченный как java, начинающий, параллельный.

Java-исполнители являются гибкими и удобными для подавляющего большинства случаев использования. Особенно хорошо они подходят для проектирования традиционной потоковой модели, где приложение выполняет довольно длительные (или даже блокирующие) задачи. Ситуация меняется, когда возникает необходимость справиться с огромным количеством мелких задач. Разработчики платформы Java также столкнулись с этой проблемой и разработали ForkJoinPool именно для этого варианта использования. И ForkJoinPool работает очень хорошо. Но иногда даже этого бывает недостаточно. Из-за характера варианта использования планировщик должен иметь как можно меньшие накладные расходы. ForkJoinPool реализует довольно сложный механизм “кражи работы”, и у него есть своя цена.

Анализ дизайна ForkJoinPool

Чтобы проанализировать, откуда могут возникнуть проблемы, давайте взглянем на ForkJoinPool внутренности (изображение взято из этой статьи ):

Итак, каждый рабочий поток имеет свою собственную очередь задач. Также существует общая очередь задач, разделяемая между потоками. Каждый работник обрабатывает задачи из своей собственной очереди. Когда задач нет, он ищет задачи в очередях других работников. И когда они тоже пусты, работник начинает выбирать задачи из общей очереди.

В ForkJoinPool дизайн довольно хорош для параллельной потоковой обработки и асинхронной обработки с CompletableFuture. К сожалению, у этого дизайна мало проблем для случая, когда существует огромное количество крошечных задач. Наличие общей входящей очереди начинает становиться точкой перегрузки. Чтобы использовать полную мощность процессора, ForkJoinPool обычно использует столько рабочих, сколько ядер процессора (минус одно, см. Ниже). По мере увеличения количества ядер (в случае современных процессоров) проблема усугубляется. Аналогичная вещь происходит, когда работники пытаются “украсть” работу из другого потока.

Стоит отметить, что аналогичная проблема существует со всеми реализациями Executors , присутствующими в стандартной библиотеке Java.

Поиск Правильного Решения

Самый простой способ избежать единой точки перегрузки – использовать отдельную очередь для каждого работника и отправлять задачи непосредственно в рабочую очередь. Простой механизм выбора очереди циклического перебора, хотя и не может полностью предотвратить конкуренцию, работает довольно хорошо и довольно хорошо распределяет входящие задачи.

Мой первоначальный подход выглядел так:

К сожалению, у этого подхода есть одно неотъемлемое ограничение: поток, который отправляет задачу, и рабочий поток должны иметь доступ к одной и той же очереди, поэтому очередь должна быть одной из Параллельных очередей. И хотя конкуренция намного ниже при использовании нескольких очередей, она все еще существует. Параллельные очереди сильно оптимизированы и очень быстры, но все еще имеют накладные расходы, связанные с параллельным доступом.

Чтобы свести к минимуму воздействие, вызванное одновременным доступом, я изменил подход. Вместо того, чтобы использовать одну очередь для каждого работника, я использовал две очереди. Один из них, называемый очередь ввода , принимал входящие задачи. Второй, вызванный рабочей очередью , был обработан рабочим потоком. Как только рабочая очередь очищается, рабочий атомарно меняет местами входную очередь и рабочая очередь и начинает обработку новой рабочей очереди .

Такой подход несколько повысил производительность, так как почти не бывает случаев, когда более одного потока обращаются к каждой очереди, а рабочая очередь доступна исключительно рабочему. Тем не менее, разница была довольно небольшой, так как обе очереди должны быть параллельными со всеми соответствующими накладными расходами.

Замена Очереди На … Стек!

Хотя последний подход не обеспечил значительного повышения производительности, он позволил взглянуть на обработку под другим углом: очереди больше не используются как очереди. Вместо этого входная очередь – это просто временное хранилище для входящих задач. Но такое хранилище не должно быть очередью, поэтому можно использовать более простую структуру данных с меньшими затратами на одновременный доступ. Оказалось, что такая структура уже существует (хотя и не является частью стандартной библиотеки Java). Структура называется Treiber Stack и имеет очень низкие издержки одновременного доступа, поскольку добавление элемента в стек – это всего лишь один Операция сравнения И Обмена . Но есть одно дополнительное преимущество: внутренние данные хранятся в виде простого связанного списка, к которому можно получить прямой доступ без каких-либо механизмов параллелизма. Конечно, только в том случае, если мы можем гарантировать, что доступ выполняется в пределах одного потока одновременно. Однако у структуры есть один недостаток – поскольку это стек, и доступ к данным осуществляется в порядке “Последний вход-Первый выход” (LIFO), задачи обрабатываются в обратном порядке по сравнению с отправкой. Это может не быть проблемой, но может привести к некоторой несправедливости в отношении задач, добавленных первыми. К счастью, это можно легко исправить, так как реверсирование связанного списка очень просто и может быть выполнено на месте. Окончательная структура выглядит так:

public class StackingCollector {
    private final AtomicReference> head = new AtomicReference<>();

    private StackingCollector() {}

    public static  StackingCollector stackingCollector() {
        return new StackingCollector<>();
    }

    public void push(final T action) {
        final var newHead = new Node<>(action);
        Node oldHead;

        do {
            oldHead = head.get();
            newHead.nextNode = oldHead;
        } while (!head.compareAndSet(oldHead, newHead));
    }

    public boolean swapAndApplyFIFO(final Consumer consumer) {
        //Note: this is very performance critical method, so all internals are inlined
        Node head;

        //Detach stored data from head
        do {
            head = this.head.get();
        } while (!this.head.compareAndSet(head, null));

        //Reverse list
        Node current = head;
        Node prev = null;
        Node next = null;

        while(current != null) {
            next = current.nextNode;
            current.nextNode = prev;
            prev = current;
            current = next;
        }

        final var hasElements = prev != null;

        //Process elements
        while (prev != null) {
            consumer.accept(prev.element);
            prev = prev.nextNode;
        }

        return hasElements;
    }

    static final class Node {
        public T element;
        public Node nextNode;

        public Node(final T element) {
            this.element = element;
        }
    }
}

Использование очень простое: отправляйте данные с помощью метода push и время от времени вызывайте swap и применяйте FIFO() , который обрабатывает уже собранные элементы. Обратите внимание, что push и поменять местами и Применить FIFO() не конфликтуют после отсоединения head, поэтому новые данные могут быть отправлены во время обработки ранее собранных данных.

Stacking Collector заменяет обе рабочие очереди в исполнителе, и вся архитектура теперь выглядит так:

Это в основном похоже на начальную диаграмму с очередью для каждого работника, за исключением того, что одновременный доступ к данным отсутствует. Как только swap и Apply FIFO() заменяют stack head, он получает эксклюзивный доступ к собранным данным, поэтому может обрабатывать элементы без каких-либо затрат на синхронизацию.

Вывод

Новая реализация Executor обеспечивала довольно низкие накладные расходы и легко справлялась с большим количеством небольших задач.

Код исполнителя можно найти здесь .

Оригинал: “https://dev.to/siy/fast-executor-for-small-tasks-gj6”