Рубрики
Без рубрики

Руководство по java.util.concurrent.Блокирование

Обзор java.util.concurrent.Blocking-ue с примером использования

Автор оригинала: baeldung.

1. Обзор

В этой статье мы посмотрим на одну из самых полезных конструкций java.util.concurrent для решения одновременной проблемы производителя и потребителя. Мы посмотрим на API Блокирование Куэ интерфейс и как методы из этого интерфейса сделать написание одновременных программ проще.

Позже в статье мы покажем пример простой программы, которая имеет несколько потоков производителя и несколько потоков потребителя.

2. Блокирование типов куэй

Мы можем различать два типа Блокирование Куэ :

  • несвехая очередь – может расти почти бесконечно
  • ограниченная очередь – с максимальной емкостью определены

2.1. Неохветная очередь

Создание неограниченных очередей просто:

BlockingQueue blockingQueue = new LinkedBlockingDeque<>();

Емкость блокированиеКуэ будет установлен на Интегре.MAX-ВАЛЕ. Все операции, добавляемые элементом в несысяную очередь, никогда не будут блокироваться, таким образом, он может вырасти до очень большого размера.

Самое главное при разработке программы производителя и потребителя с использованием неограниченных Blockingue является то, что потребители должны иметь возможность потреблять сообщения так же быстро, как производители добавляют сообщения в очередь. В противном случае, память может заполниться, и мы получим OutOfMemory исключение.

2.2. Ограниченная очередь

Второй тип очередей – ограниченная очередь. Мы можем создавать такие очереди, передавая емкость в качестве аргумента конструктору:

BlockingQueue blockingQueue = new LinkedBlockingDeque<>(10);

Здесь у нас есть блокированиеКуэ которая имеет емкость, равную 10. Это означает, что, когда производитель пытается добавить элемент в уже полную очередь, в зависимости от метода, который был использован, чтобы добавить его (предложение () , добавить () или положить () ), он будет блокировать до тех пор, пока пространство для вставки объекта становится доступным. В противном случае операции потерпят неудачу.

Использование ограниченной очереди является хорошим способом разработки параллельных программ, потому что, когда мы вставляем элемент в уже полную очередь, что операции должны ждать, пока потребители наверстать упущенное и сделать некоторое пространство доступным в очереди. Это дает нам регулирование без каких-либо усилий с нашей стороны.

3. Блокировка API

Есть два типа методов в Блокирование Куэ интерфейс методы, ответственные за добавление элементов в очередь и методы, которые извлекают эти элементы. Каждый метод из этих двух групп ведет себя по-разному в случае, если очередь заполнена/пуста.

3.1. Добавление элементов

  • добавить () – возвращает истинное если вставка была успешной, в противном случае бросает Незаконноегосударственногоисключения
  • положить () – вставляет указанный элемент в очередь в ожидании свободного слота в случае необходимости
  • предложение () – возвращает истинное если вставка была успешной, в противном случае ложный
  • предложение (E e, длительный тайм-аут, подразделение TimeUnit) – пытается вставить элемент в очередь и ждет доступного слота в течение определенного тайм-аута

3.2. Извлечение элементов

  • взять () – ждет головной элемент очереди и удаляет его. Если очередь пуста, она блокирует и ждет, когда элемент станет доступным
  • опрос (длительный тайм-аут, подразделение TimeUnit) – извлекает и удаляет головную часть очереди, ожидая указанного времени ожидания, если это необходимо для того, чтобы элемент стал доступен. Возвращает нулевой после тайм-аута

Эти методы являются наиболее важными строительными блоками из Блокирование Куэ интерфейс при создании программ производителя-потребителя.

4. Пример многопрочитанного производителя и потребителя

Давайте создадим программу, которая состоит из двух частей – производителя и потребителя.

Производитель будет производить случайное число от 0 до 100 и поставит это число в Блокирование Куэ . Мы будем иметь 4 потока производителя и использовать положить () метод блокировки до тех пор, пока в очереди не будет места.

Важно помнить, что мы должны остановить наших потребительских потоков от ожидания элемент появится в очереди на неопределенный срок.

Хороший метод, чтобы сигнал от производителя к потребителю, что Есть не более сообщений для обработки заключается в том, чтобы отправить специальное сообщение под названием яд таблетки. Нам нужно послать столько ядовитых таблеток, сколько у нас есть потребителей. Тогда, когда потребитель будет принимать, что специальные сообщения таблетки яд из очереди, он закончит выполнение изящно.

Давайте посмотрим на класс производителей:

public class NumbersProducer implements Runnable {
    private BlockingQueue numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;
    
    public NumbersProducer(BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
        }
     }
}

Наш производитель конструктор принимает в качестве аргумента Блокирование Куэ используется для координации обработки между производителем и потребителем. Мы видим, что метод generateNumbers () поставит 100 элементов в очередь. Он принимает также яд таблетки сообщение, чтобы знать, какой тип сообщения должны быть введены в очередь, когда исполнение будет закончено. Это послание должно быть poisonPillPerProducer раз в очередь.

Каждый потребитель будет брать элемент из Блокирование Куэ с помощью взять () метод, поэтому он будет блокироваться до тех пор, пока не будет элемента в очереди. После принятия Интегер из очереди он проверяет, является ли сообщение ядовитой таблеткой, если да, то выполнение потока закончено. В противном случае он распечатает результат на стандартном выходе вместе с именем текущего потока.

Это даст нам представление о внутренней работе наших потребителей:

public class NumbersConsumer implements Runnable {
    private BlockingQueue queue;
    private final int poisonPill;
    
    public NumbersConsumer(BlockingQueue queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }
    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                System.out.println(Thread.currentThread().getName() + " result: " + number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Важно заметить использование очереди. Так же, как и в конструкторе производителя, очередь передается в качестве аргумента. Мы можем сделать это, потому что Блокирование Куэ могут быть разделены между потоками без явной синхронизации.

Теперь, когда у нас есть наш производитель и потребитель, мы можем начать нашу программу. Нам нужно определить емкость очереди, и мы установили ее до 100 элементов.

Мы хотим иметь 4 потока производителя и ряд потребителей потоки будут равны количеству доступных процессоров:

int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;

BlockingQueue queue = new LinkedBlockingQueue<>(BOUND);

for (int i = 1; i < N_PRODUCERS; i++) {
    new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}

for (int j = 0; j < N_CONSUMERS; j++) {
    new Thread(new NumbersConsumer(queue, poisonPill)).start();
}

new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();

Блокирование Куэ создается с использованием конструкции с емкостью. Мы создаем 4 производителей и N потребителей. Мы указать наши ядовитые таблетки сообщение, чтобы быть Интегер.MAX-VALUE потому что такая стоимость никогда не будет отправлена нашим производителем при нормальных условиях труда. Самое главное, чтобы заметить здесь, что Блокирование Куэ используется для координации работы между ними.

Когда мы запускаем программу, 4 потока производителя будут ставить случайные Интеграторы в Блокирование Куэ и потребители будут принимать эти элементы из очереди. Каждый поток будет печататься на стандартном выходе имени потока вместе с результатом.

5. Заключение

В этой статье показано практическое использование Блокирование Куэ и объясняет методы, которые используются для добавления и извлечения элементов из него. Кроме того, мы показали, как построить многопрофилитную программу производителя и потребителя с помощью Блокирование Куэ координировать работу между производителями и потребителями.

Реализация всех этих примеров и фрагментов кода можно найти в Проект GitHub – это Maven основе проекта, поэтому она должна быть легко импортировать и работать, как она есть.