Рубрики
Без рубрики

Обзор java.util.concurrent

Откройте для себя содержимое пакета java.util.concurrent.

Автор оригинала: baeldung.

1. Обзор

Пакет java.util.concurrent предоставляет инструменты для создания параллельных приложений.

В этой статье мы сделаем обзор всего пакета.

2. Основные Компоненты

java.util.concurrent содержит слишком много функций, чтобы обсуждать их в одной записи. В этой статье мы в основном сосредоточимся на некоторых наиболее полезных утилитах из этого пакета, таких как:

  • Исполнитель
  • ExecutorService
  • ScheduledExecutorService
  • Будущее
  • Обратный отсчет
  • Циклобарьер
  • Семафор
  • Резьбонарезной завод
  • BlockingQueue
  • Очередь задержки
  • Замки
  • Фазер

Вы также можете найти здесь много статей, посвященных отдельным классам.

2.1. Исполнитель

Исполнитель это интерфейс, представляющий объект, выполняющий поставленные задачи.

Это зависит от конкретной реализации (от того, откуда инициируется вызов), должна ли задача выполняться в новом или текущем потоке. Следовательно, используя этот интерфейс, мы можем отделить поток выполнения задачи от фактического механизма выполнения задачи.

Здесь следует отметить, что Executor строго не требует, чтобы выполнение задачи было асинхронным. В простейшем случае исполнитель может мгновенно вызвать отправленную задачу в вызывающем потоке.

Нам нужно создать вызыватель для создания экземпляра исполнителя:

public class Invoker implements Executor {
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}

Теперь мы можем использовать этот вызов для выполнения задачи.

public void execute() {
    Executor executor = new Invoker();
    executor.execute( () -> {
        // task to be performed
    });
}

Здесь следует отметить, что если исполнитель не может принять задачу к выполнению, он бросит Исключение RejectedExecutionException .

2.2. Служба исполнителей

ExecutorService – это комплексное решение для асинхронной обработки. Он управляет очередью в памяти и планирует отправленные задачи на основе доступности потоков.

Чтобы использовать ExecutorService, нам нужно создать один запускаемый класс.

public class Task implements Runnable {
    @Override
    public void run() {
        // task details
    }
}

Теперь мы можем создать экземпляр ExecutorService и назначить эту задачу. Во время создания нам необходимо указать размер пула потоков.

ExecutorService executor = Executors.newFixedThreadPool(10);

Если мы хотим создать однопоточный экземпляр ExecutorService , мы можем использовать newSingleThreadExecutor(ThreadFactory ThreadFactory) для создания экземпляра.

Как только исполнитель будет создан, мы сможем использовать его для отправки задачи.

public void execute() { 
    executor.submit(new Task()); 
}

Мы также можем создать экземпляр Runnable при отправке задачи.

executor.submit(() -> {
    new Task();
});

Он также поставляется с двумя готовыми методами завершения выполнения. Первый из них – shut down() ; он ждет, пока все отправленные задачи не завершат выполнение. Другой метод – shutdownNow() whic h немедленно завершает все отложенные/выполняемые задачи.

Существует также другой метод awaitTermination(длительный тайм-аут, единица измерения времени) , который принудительно блокирует выполнение до тех пор, пока все задачи не завершат выполнение после срабатывания события завершения работы или тайм-аута выполнения, или сам поток выполнения не будет прерван,

try {
    executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
    e.printStackTrace();
}

2.3. ScheduledExecutorService

ScheduledExecutorService – это интерфейс, аналогичный интерфейсу ExecutorService, но он может периодически выполнять задачи.

Методы Executor и ExecutorService планируются на месте без каких-либо искусственных задержек. Ноль или любое отрицательное значение означает, что запрос должен быть выполнен мгновенно.

Мы можем использовать как Выполняемый , так и Вызываемый интерфейс для определения задачи.

public void execute() {
    ScheduledExecutorService executorService
      = Executors.newSingleThreadScheduledExecutor();

    Future future = executorService.schedule(() -> {
        // ...
        return "Hello world";
    }, 1, TimeUnit.SECONDS);

    ScheduledFuture scheduledFuture = executorService.schedule(() -> {
        // ...
    }, 1, TimeUnit.SECONDS);

    executorService.shutdown();
}

ScheduledExecutorService также может запланировать задачу после некоторой заданной фиксированной задержки :

executorService.scheduleAtFixedRate(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

Здесь метод scheduleAtFixedRate( выполняемая команда, long initialDelay, long period, единица измерения времени ) создает и выполняет периодическое действие, которое вызывается сначала после заданной начальной задержки, а затем с заданным периодом до завершения работы экземпляра службы.

Метод scheduleWithFixedDelay( Выполняемая команда, long initialDelay, long delay, единица измерения времени ) создает и выполняет периодическое действие, которое вызывается сначала после заданной начальной задержки и повторно с заданной задержкой между завершением выполняемого и вызовом следующего.

2.4. Будущее

Future используется для представления результата асинхронной операции. Он поставляется с методами проверки того, завершена ли асинхронная операция или нет, получения вычисленного результата и т. Д.

Более того, cancel(boolean mayInterruptIfRunning) API отменяет операцию и освобождает выполняющийся поток. Если значение mayInterruptIfRunning равно true, поток, выполняющий задачу, будет немедленно завершен.

В противном случае незавершенные задачи будут разрешены для выполнения.

Мы можем использовать приведенный ниже фрагмент кода для создания будущего экземпляра:

public void invoke() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);

    Future future = executorService.submit(() -> {
        // ...
        Thread.sleep(10000l);
        return "Hello world";
    });
}

Мы можем использовать следующий фрагмент кода, чтобы проверить, готов ли будущий результат, и получить данные, если вычисление выполнено:

if (future.isDone() && !future.isCancelled()) {
    try {
        str = future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

Мы также можем указать тайм-аут для данной операции. Если задача занимает больше этого времени, a TimeoutException выбрасывается:

try {
    future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

2.5. Обратный отсчет

CountDownLatch (введенный в JDK 5 ) – это служебный класс, который блокирует набор потоков до завершения какой-либо операции.

A CountDownLatch инициализируется счетчиком (целочисленный тип); этот счетчик уменьшается по мере завершения выполнения зависимых потоков. Но как только счетчик достигает нуля, другие потоки освобождаются.

Вы можете узнать больше о CountDownLatch здесь .

2.6. Циклический барьерный

CyclicBarrier работает почти так же, как CountDownLatch , за исключением того, что мы можем использовать его повторно. В отличие от CountDownLatch , он позволяет нескольким потокам ждать друг друга с помощью метода await () (известного как условие барьера) перед вызовом конечной задачи.

Нам нужно создать Запускаемый экземпляр задачи, чтобы инициировать условие барьера:

public class Task implements Runnable {

    private CyclicBarrier barrier;

    public Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            LOG.info(Thread.currentThread().getName() + 
              " is waiting");
            barrier.await();
            LOG.info(Thread.currentThread().getName() + 
              " is released");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

}

Теперь мы можем вызвать некоторые потоки для гонки за условием барьера:

public void start() {

    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
        // ...
        LOG.info("All previous tasks are completed");
    });

    Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); 
    Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); 
    Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); 

    if (!cyclicBarrier.isBroken()) { 
        t1.start(); 
        t2.start(); 
        t3.start(); 
    }
}

Здесь метод is Broken() проверяет, не прервался ли какой-либо из потоков во время выполнения. Мы всегда должны выполнять эту проверку перед выполнением фактического процесса.

2.7. Семафор

Семафор используется для блокировки доступа на уровне потоков к некоторой части физического или логического ресурса. семафор содержит набор разрешений; всякий раз, когда поток пытается войти в критический раздел, он должен проверить семафор, доступно ли разрешение или нет.

Если разрешение отсутствует (через Попробуйте() ), потоку не разрешается переходить в критическую секцию; однако, если разрешение доступно, доступ предоставляется, и счетчик разрешений уменьшается.

Как только исполняющий поток освобождает критическую секцию, счетчик разрешений снова увеличивается (выполняется методом release () ).

Мы можем указать тайм-аут для получения доступа с помощью метода tryAcquire(long timeout, TimeUnit unit) .

Мы также можем проверить количество доступных разрешений или количество потоков, ожидающих получения семафора.

Следующий фрагмент кода можно использовать для реализации семафора:

static Semaphore semaphore = new Semaphore(10);

public void execute() throws InterruptedException {

    LOG.info("Available permit : " + semaphore.availablePermits());
    LOG.info("Number of threads waiting to acquire: " + 
      semaphore.getQueueLength());

    if (semaphore.tryAcquire()) {
        try {
            // ...
        }
        finally {
            semaphore.release();
        }
    }

}

Мы можем реализовать Мьютекс подобную структуру данных, используя Семафор . Более подробную информацию об этом можно найти здесь.

2.8. Резьбонарезной завод

Как следует из названия, ThreadFactory действует как пул потоков (несуществующий), который создает новый поток по требованию. Это устраняет необходимость в большом количестве шаблонного кодирования для реализации эффективных механизмов создания потоков.

Мы можем определить ThreadFactory :

public class BaeldungThreadFactory implements ThreadFactory {
    private int threadId;
    private String name;

    public BaeldungThreadFactory(String name) {
        threadId = 1;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, name + "-Thread_" + threadId);
        LOG.info("created new thread with id : " + threadId +
            " and name : " + t.getName());
        threadId++;
        return t;
    }
}

Мы можем использовать этот метод new Thread(Runnable r) для создания нового потока во время выполнения:

BaeldungThreadFactory factory = new BaeldungThreadFactory( 
    "BaeldungThreadFactory");
for (int i = 0; i < 10; i++) { 
    Thread t = factory.newThread(new Task());
    t.start(); 
}

2.9. Блокирующая очередь

В асинхронном программировании одним из наиболее распространенных шаблонов интеграции является шаблон производитель-потребитель . Пакет java.util.concurrent поставляется со структурой данных, известной как BlockingQueue , которая может быть очень полезна в этих асинхронных сценариях.

Более подробная информация и рабочий пример по этому вопросу доступны здесь .

2.10. Очередь задержки

Очередь задержки -это очередь блокировки элементов бесконечного размера, в которой элемент может быть извлечен только в том случае, если время его истечения (известное как задержка, определенная пользователем) завершено. Следовательно, самый верхний элемент ( head ) будет иметь наибольшую задержку, и он будет опрошен последним.

Более подробная информация и рабочий пример по этому вопросу доступны здесь .

2.11. Замки

Неудивительно, что Lock – это утилита для блокирования доступа других потоков к определенному сегменту кода, кроме потока, который выполняет его в данный момент.

Основное различие между блокировкой и синхронизированным блоком заключается в том, что синхронизированный блок полностью содержится в методе; однако мы можем использовать операции lock API lock() и unlock() в отдельных методах.

Более подробная информация и рабочий пример по этому вопросу доступны здесь .

2.12. Фазер

Phaser является более гибким решением, чем CyclicBarrier и CountDownLatch – используется в качестве многоразового барьера, на котором динамическое количество потоков должно ждать продолжения выполнения. Мы можем координировать несколько фаз выполнения, повторно используя экземпляр Phaser для каждой фазы программы.

Более подробная информация и рабочий пример по этому вопросу доступны здесь .

3. Заключение

В этой обзорной статье высокого уровня мы сосредоточились на различных утилитах, доступных в java.util.concurrent package.

Как всегда, полный исходный код доступен на GitHub .