Рубрики
Без рубрики

Введение в пулы потоков на Java

Быстрое и практическое руководство по различным реализациям ThreadPool на Java и Guava.

Автор оригинала: Eugen Paraschiv.

Введение в пулы потоков на Java

1. Введение

В этой статье будут посмотреть пулы потоков на Java – начиная с различных реализаций в стандартной библиотеке Java, а затем глядя на библиотеку Гуавы от Google.

Дальнейшее чтение:

Разница между потоком и виртуальной нитью в Java

ИсполнительСервис – Ожидание темы, чтобы закончить

Пользовательские пулы потоков в Java 8 Параллельные потоки

2. Бассейн резьбы

В Java потоки отображаются на потоках системного уровня, которые являются ресурсами операционной системы. Если вы создаете потоки бесконтрольно, вы можете быстро выбежать из этих ресурсов.

Переключение контекста между потоками также делается операционной системой , чтобы подражать параллелизму. Упрощенное представление заключается в том, что чем больше потоков вы порождаете, тем меньше времени каждый поток тратит на работу.

Шаблон Thread Pool помогает экономить ресурсы в многопрочитанном приложении, а также содержать параллелизм в определенных предопределенных пределах.

При использовании пула потоков напишите параллельный код в виде параллельных задач и отправьте их для выполнения в экземпляр пула потоков . Этот экземпляр управляет несколькими повторно используемыми потоками для выполнения этих задач.

2016-08-10-10-16-52-1024x572

Шаблон позволяет контролировать количество потоков, которые создает приложение , их жизненный цикл, а также планировать выполнение задач и держать входящие задачи в очереди.

3. Бассейны потоков в Java

3.1. Исполнители, Исполнитель и ИсполнительСервис

Исполнители Класс помощников содержит несколько методов создания предварительно настроенных экземпляров пула потоков для вас. Эти классы являются хорошим местом для начала – использовать его, если вам не нужно применять какие-либо пользовательские тонкой настройки.

Исполнитель и ИсполнительСервис интерфейсы используются для работы с различными реализациями пула потоков в Java. Как правило, вы должны сохранить код отделен от фактической реализации пула потоков и использовать эти интерфейсы на протяжении всего приложения.

тем душеприказчик интерфейс имеет единый исполнять метод для отправки Выируемый экземпляров для исполнения.

Вот быстрый пример о том, как вы можете использовать Исполнители API для приобретения Исполнитель при поддержке одного пула потоков и несыряемой очереди для последовательного выполнения задач. Здесь мы выполняем одну задачу, которая просто печатает ” Здравствуйте, ” на экране. Задача представлена в качестве лямбды (функция Java 8), которая должна быть Бег .

Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));

ИсполнительСервис интерфейс содержит большое количество методов для контроль выполнения задач и управление прекращением обслуживания . Используя этот интерфейс, вы можете отправить задачи для выполнения, а также контролировать их выполнение с помощью возвращенной Будущие пример.

В следующем примере , мы создаем ИсполнительСервис , представить задачу, а затем использовать возвращенный Будущие ‘ы получить метод ожидания, пока представленная задача не будет закончена и значение возвращается:

ExecutorService executorService = Executors.newFixedThreadPool(10);
Future future = executorService.submit(() -> "Hello World");
// some operations
String result = future.get();

Конечно, в реальной жизни сценарий, вы обычно не хотите называть future.get () сразу же, но отложить вызов его, пока вы на самом деле нужно значение вычислений.

представить метод перегружен, чтобы принять любой Бег или Вызов оба из которых являются функциональными интерфейсами и могут быть переданы в качестве лямбды (начиная с Java 8).

Бег Один метод не делает исключение и не возвращает значение. Вызов интерфейс может быть более удобным, так как позволяет сделать исключение и вернуть значение.

Наконец – позволить компилятору сделать вывод о Вызов типа, просто вернуть значение от lambda.

Дополнительные примеры использования ИсполнительСервис интерфейс и фьючерсы, посмотрите на ” Руководство по работе с исполнителями Java “.

3.2. ThreadPoolExecutor

ThreadPoolExecutor является разгибаемой реализацией пула потоков с большим количеством параметров и крючков для тонкой настройки.

Основные параметры конфигурации, которые мы обсудим здесь: corePoolSize , maximumPoolSize , и keepAliveTime .

Пул состоит из фиксированного количества основных потоков, которые хранятся внутри все время, и некоторых чрезмерных потоков, которые могут быть порождены, а затем прекращены, когда они больше не нужны. corePoolSize параметром является количество основных потоков, которые будут мгновенно и храниться в пуле. Когда приходит новая задача, если все основные потоки заняты и внутренняя очередь заполнена, то пул может вырасти до maximumPoolSize .

keepAliveTime параметром является интервал времени, в течение которого чрезмерные потоки (мгновенно превышающие corePoolSize ) могут существовать в состоянии простоя. По умолчанию ThreadPoolExecutor рассматривает неосновные потоки для удаления. Для того, чтобы применить ту же политику удаления к основным потокам, мы можем использовать allowCoreThreadTimeOut (правда) метод.

Эти параметры охватывают широкий спектр случаев использования, но наиболее типичные конфигурации предопределены в Исполнители статические методы .

Например, , newFixedThreadPool метод создает ThreadPoolExecutor с равными corePoolSize и maximumPoolSize значения параметров и нулевой keepAliveTime. Это означает, что количество потоков в этом пуле потоков всегда одинаково:

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());

В приведеном выше примере мы мгновенно ThreadPoolExecutor с фиксированным подсчетом потока 2. Это означает, что если количество одновременно выполняемых задач меньше или равно двум во все времена, то они выполняются сразу. В противном случае, некоторые из этих задач могут быть поставлены в очередь, чтобы дождаться своей .

Мы создали три Вызов задачи, которые имитируют тяжелую работу, спя в течение 1000 миллисекунд. Первые две задачи будут выполнены сразу, а третью придется ждать в очереди. Мы можем проверить это, позвонив в getPoolSize () и получить Куэ ().размер() методы сразу после отправки задач.

Еще одна предварительно настроенная ThreadPoolExecutor могут быть созданы с Исполнители.newCachedThreadPool () метод. Этот метод вообще не получает количество потоков. corePoolSize на самом деле установлен на 0, и maximumPoolSize установлен на Интегер.MAX-VALUE для этого случая. keepAliveTime 60 секунд для этого.

Эти значения параметров означают, что кэшированный пул потоков может расти без ограничений для размещения любого количества представленных задач . Но когда потоки больше не нужны, они будут удалены через 60 секунд бездействия. Типичным случаем использования является, когда у вас есть много краткосрочных задач в вашем приложении.

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(3, executor.getPoolSize());
assertEquals(0, executor.getQueue().size());

Размер очереди в приведенной выше примере всегда будет нулевым, поскольку внутренне Синхронная используется экземпляр. В Синхронная , пары вставить и удалить операции всегда происходят одновременно, поэтому очередь никогда ничего не содержит.

Исполнители.newSingleThreadExecutor() API создает еще одну типичную форму ThreadPoolExecutor содержащие одну нить. Исполнитель одного потока идеально подходит для создания цикла событий. corePoolSize и maximumPoolSize параметры равны 1, а keepAliveTime равна нулю.

Задачи в приведеном выше примере будут выполняться последовательно, поэтому значение флага будет 2 после завершения задачи:

AtomicInteger counter = new AtomicInteger();

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    counter.set(1);
});
executor.submit(() -> {
    counter.compareAndSet(1, 2);
});

Кроме того, ThreadPoolExecutor украшена неизменной оберткой, поэтому она не может быть перенастроена после создания. Обратите внимание, что именно по этой причине мы не можем бросить его на ThreadPoolExecutor .

3.3. ЗапланированныйПротекторPoolExecutor

ЗапланированоThreadPoolExecutor расширяет ThreadPoolExecutor класса, а также реализует План-экскуторСервис интерфейс с несколькими дополнительными методами:

  • расписание метод позволяет выполнить задачу один раз после указанной задержки;
  • расписаниеАтФиксированный метод позволяет выполнить задачу после указанной первоначальной задержки, а затем выполнить ее повторно с определенным периодом; период аргумент это время измеряется между временами начала задач , таким образом, скорость исполнения фиксируется;
  • расписаниеСФиксированныйДелей метод похож на расписаниеАтФиксированный в том, что он неоднократно выполняет заданную задачу, но указанная задержка измеряется между окончанием предыдущей задачи и началом следующего ; скорость выполнения может варьироваться в зависимости от времени, необходимого для выполнения любой задачи.

Исполнители.newScheduledThreadPool () метод обычно используется для создания ЗапланированоThreadPoolExecutor с данной corePoolSize , неограниченный maximumPoolSize и нулевой keepAliveTime . Вот как запланировать выполнение задачи за 500 миллисекунд:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
    System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);

Следующий код показывает, как выполнить задачу после 500 миллисекунд задержки, а затем повторить его каждые 100 миллисекунд. После планирования задачи, мы ждем, пока он заготовит три раза, используя CountDownLatch блокировка , затем отмените его с помощью Future.cancel () метод.

CountDownLatch lock = new CountDownLatch(3);

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture future = executor.scheduleAtFixedRate(() -> {
    System.out.println("Hello World");
    lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);

lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);

3.4. ForkJoinPool

ForkJoinPool является центральной частью вилка/присоединиться рамки, представленные на Java 7. Это решает общую проблему нереста несколько задач в рекурсивных алгоритмов . Использование простого ThreadPoolExecutor , Вы будете работать из потоков быстро, так как каждая задача или подразрядка требует своего собственного потока для запуска.

В вилка/присоединиться рамки, любая задача может породить ( вилкой ) ряд подзадач и ждать их завершения с помощью присоединиться метод. Преимущество вилка/присоединиться является то, что не создает новый поток для каждой задачи или подзарядки , реализация алгоритма кражи работы вместо этого. Эта основа подробно описана в статье « Руководство по рамочной программе Fork/Join в Java “

Рассмотрим простой пример использования ForkJoinPool пройти дерево узлов и рассчитать сумму всех значений листа. Вот простая реализация дерева, состоящего из узла, int значение и набор детских узлов:

static class TreeNode {

    int value;

    Set children;

    TreeNode(int value, TreeNode... children) {
        this.value = value;
        this.children = Sets.newHashSet(children);
    }
}

Теперь, если мы хотим суммировать все значения в дереве параллельно, мы должны реализовать Интерфейс RecursiveTask интерфейс. Каждая задача получает свой собственный узел и добавляет свою ценность к сумме значений своего дети . Рассчитать сумму дети значения, реализация задачи делает следующее:

  • потоков дети набор
  • карты над этим потоком, создавая новую ПодсчетТеск для каждого элемента,
  • выполняет каждую подразрядку, замахив ее,
  • собирает результаты, позвонив в присоединиться метод на каждой раздвоенной задаче,
  • суммирует результаты с использованием Коллекционеры.summingInt коллектор.
public static class CountingTask extends RecursiveTask {

    private final TreeNode node;

    public CountingTask(TreeNode node) {
        this.node = node;
    }

    @Override
    protected Integer compute() {
        return node.value + node.children.stream()
          .map(childNode -> new CountingTask(childNode).fork())
          .collect(Collectors.summingInt(ForkJoinTask::join));
    }
}

Код для времени расчета на самом дереве очень прост:

TreeNode tree = new TreeNode(5,
  new TreeNode(3), new TreeNode(2,
    new TreeNode(2), new TreeNode(8)));

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));

4. Реализация потока бассейн в Гуаве

Гуава является популярной библиотекой утилит Google. Он имеет много полезных классов concurrency, в том числе несколько удобных реализаций ИсполнительСервис . Классы реализации недоступны для прямого мгновенного или подклассификации, поэтому единственной точкой входа для создания их экземпляров является MoreExecutors класс помощников.

4.1. Добавление Гуавы в качестве зависимости от Maven

Добавьте следующую зависимость в файл Maven pom, чтобы включить библиотеку Guava в свой проект. Вы можете найти последнюю версию библиотеки Гуавы в Мавен Центральный хранилище:


    com.google.guava
    guava
    19.0

4.2. Прямой исполнитель и непосредственный исполнитель

Иногда требуется выполнить задачу либо в текущем потоке, либо в пуле потоков, в зависимости от некоторых условий. Вы предпочитаете использовать одну Исполнитель интерфейс и просто переключить реализацию. Хотя придумать реализацию этого проекта не так уж и сложно Исполнитель или ИсполнительСервис который выполняет задачи в текущем потоке, он по-прежнему требует написания некоторого шаблонного кода.

К счастью, Гуава предоставляет нам предопределенные экземпляры.

Вот пример что демонстрирует выполнение задачи в том же потоке. Несмотря на то, что предоставленная задача спит в течение 500 миллисекунд, блокирует текущий поток , и результат доступен сразу после выполнить вызов закончен:

Executor executor = MoreExecutors.directExecutor();

AtomicBoolean executed = new AtomicBoolean();

executor.execute(() -> {
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executed.set(true);
});

assertTrue(executed.get());

Экземпляр, возвращенный directExecutor() метод на самом деле статический синглтон, поэтому с помощью этого метода не обеспечивает каких-либо накладных расходов на создание объекта на всех.

Вы должны предпочесть этот метод MoreExecutors.newDirectExecutorService () потому что этот API создает полноценную реализацию службы исполнителя на каждом вызове.

4.3. Услуги по выходу из группы исполнителей

Другой распространенной проблемой является выключение виртуальной машины в то время как пул потоков все еще работает со своими задачами. Даже при использовании механизма отмены нет никакой гарантии, что задачи будут вести себя хорошо и при остановке их работы, когда служба исполнителя закроется. Это может привести к тому, что JVM завис на неопределенный срок, в то время как задачи продолжают выполнять свою работу.

Чтобы решить эту проблему, Гуава вводит семейство услуг выходящих исполнителей. Они основаны на темы daemon, которые заканчиваются вместе с JVM .

Эти службы также добавляют крюк выключения с Runtime.getRuntime ().addShutdownHook() метод и предотвратить прекращение VM для настроенного количества времени, прежде чем отказаться от висела задач.

В следующем примере мы помеем задачу, которая содержит бесконечный цикл, но мы используем службу выхода исполнителя с настроенным временем 100 миллисекунд, чтобы ждать задач после завершения VM. Без выходЭксекуторСервис на месте, эта задача приведет к VM повесить на неопределенный срок:

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
ExecutorService executorService = 
  MoreExecutors.getExitingExecutorService(executor, 
    100, TimeUnit.MILLISECONDS);

executorService.submit(() -> {
    while (true) {
    }
});

4.4. Прослушивание декораторов

Прослушивание декораторов позволяет обернуть ИсполнительСервис и получать СлушайтеФутурные экземпляров при представлении задачи вместо простого Будущие Экземпляров. СлушайтеФутурные интерфейс расширяет Будущие и имеет единый дополнительный метод добавитьListener . Этот метод позволяет добавить слушателя, который призван к будущему завершению.

Вы редко захотите использовать ListenableFuture.addListener () метод непосредственно, но это необходимы для большинства методов помощников в Фьючерсные утилита класса . Например, с Futures.allAsList() метод можно объединить несколько СлушайтеФутурные экземпляров в одном СлушайтеФутурные , который завершается после успешного завершения всех фьючерсов вместе взятых:

ExecutorService executorService = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService = 
  MoreExecutors.listeningDecorator(executorService);

ListenableFuture future1 = 
  listeningExecutorService.submit(() -> "Hello");
ListenableFuture future2 = 
  listeningExecutorService.submit(() -> "World");

String greeting = Futures.allAsList(future1, future2).get()
  .stream()
  .collect(Collectors.joining(" "));
assertEquals("Hello World", greeting);

5. Заключение

В этой статье мы обсудили шаблон Thread Pool и его реализации в стандартной библиотеке Java и в библиотеке Гуавы от Google.

Исходный код статьи доступен для более на GitHub .