1. Обзор
В этой статье мы рассмотрим Очередь передачи конструкцию из стандартного пакета java.util.concurrent .
Проще говоря, эта очередь позволяет нам создавать программы в соответствии с шаблоном “производитель-потребитель” и координировать передачу сообщений от производителей к потребителям.
Реализация на самом деле похожа на BlockingQueue – , но дает нам новую возможность реализовать форму противодавления. Это означает, что, когда производитель отправляет сообщение потребителю с помощью метода transfer () , производитель будет заблокирован до тех пор, пока сообщение не будет использовано.
2. Один Производитель – Ноль Потребителей
Давайте протестируем метод transfer() из TransferQueue – ожидаемое поведение заключается в том, что производитель будет заблокирован до тех пор, пока потребитель не получит сообщение из очереди с помощью метода take () .
Для этого мы создадим программу, в которой будет один производитель, но ноль потребителей. Первый вызов transfer() из потока производителя будет заблокирован на неопределенный срок, так как у нас нет потребителей, которые могли бы извлечь этот элемент из очереди.
Давайте посмотрим, как выглядит класс Producer :
class Producer implements Runnable { private TransferQueuetransferQueue; private String name; private Integer numberOfMessagesToProduce; public AtomicInteger numberOfProducedMessages = new AtomicInteger(); @Override public void run() { for (int i = 0; i < numberOfMessagesToProduce; i++) { try { boolean added = transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS); if(added){ numberOfProducedMessages.incrementAndGet(); } } catch (InterruptedException e) { e.printStackTrace(); } } } // standard constructors }
Мы передаем экземпляр очереди передачи конструктору вместе с именем, которое мы хотим дать нашему производителю, и количеством элементов, которые должны быть переданы в очередь.
Обратите внимание, что мы используем метод try Transfer() с заданным таймаутом. Мы ждем четыре секунды, и если производитель не может передать сообщение в течение заданного времени ожидания, он возвращает false и переходит к следующему сообщению. У производителя есть переменная количество созданных сообщений , чтобы отслеживать, сколько сообщений было создано.
Далее давайте рассмотрим класс Consumer :
class Consumer implements Runnable { private TransferQueuetransferQueue; private String name; private int numberOfMessagesToConsume; public AtomicInteger numberOfConsumedMessages = new AtomicInteger(); @Override public void run() { for (int i = 0; i < numberOfMessagesToConsume; i++) { try { String element = transferQueue.take(); longProcessing(element); } catch (InterruptedException e) { e.printStackTrace(); } } } private void longProcessing(String element) throws InterruptedException { numberOfConsumedMessages.incrementAndGet(); Thread.sleep(500); } // standard constructors }
Он похож на производителя, но мы получаем элементы из очереди с помощью метода take () . Мы также имитируем некоторые длительные действия, используя метод long Processing () , в котором мы увеличиваем переменную numberOfConsumedMessages , которая является счетчиком полученных сообщений.
Теперь давайте начнем нашу программу только с одного продюсера:
@Test public void whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout() throws InterruptedException { // given TransferQueuetransferQueue = new LinkedTransferQueue<>(); ExecutorService exService = Executors.newFixedThreadPool(2); Producer producer = new Producer(transferQueue, "1", 3); // when exService.execute(producer); // then exService.awaitTermination(5000, TimeUnit.MILLISECONDS); exService.shutdown(); assertEquals(producer.numberOfProducedMessages.intValue(), 0); }
Мы хотим отправить три элемента в очередь, но производитель заблокирован на первом элементе, и нет потребителя, который мог бы извлечь этот элемент из очереди . Мы используем метод dry Transfer () , который будет блокироваться до тех пор, пока сообщение не будет израсходовано или не будет достигнут тайм-аут. После тайм-аута он вернет false , чтобы указать, что передача не удалась, и попытается передать следующую. Это результат предыдущего примера:
Producer: 1 is waiting to transfer... can not add an element due to the timeout Producer: 1 is waiting to transfer...
3. Один Производитель – Один Потребитель
Давайте проверим ситуацию, когда есть один производитель и один потребитель:
@Test public void whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages() throws InterruptedException { // given TransferQueuetransferQueue = new LinkedTransferQueue<>(); ExecutorService exService = Executors.newFixedThreadPool(2); Producer producer = new Producer(transferQueue, "1", 3); Consumer consumer = new Consumer(transferQueue, "1", 3); // when exService.execute(producer); exService.execute(consumer); // then exService.awaitTermination(5000, TimeUnit.MILLISECONDS); exService.shutdown(); assertEquals(producer.numberOfProducedMessages.intValue(), 3); assertEquals(consumer.numberOfConsumedMessages.intValue(), 3); }
Очередь передачи используется в качестве точки обмена, и до тех пор, пока потребитель не потребит элемент из очереди, производитель не сможет продолжить добавление в нее другого элемента. Давайте посмотрим на выходные данные программы:
Producer: 1 is waiting to transfer... Consumer: 1 is waiting to take element... Producer: 1 transferred element: A0 Producer: 1 is waiting to transfer... Consumer: 1 received element: A0 Consumer: 1 is waiting to take element... Producer: 1 transferred element: A1 Producer: 1 is waiting to transfer... Consumer: 1 received element: A1 Consumer: 1 is waiting to take element... Producer: 1 transferred element: A2 Consumer: 1 received element: A2
Мы видим, что создание и потребление элементов из очереди происходит последовательно из-за спецификации TransferQueue.
4. Многие Производители – Многие Потребители
В последнем примере мы рассмотрим наличие нескольких потребителей и нескольких производителей:
@Test public void whenMultipleConsumersAndProducers_thenProcessAllMessages() throws InterruptedException { // given TransferQueuetransferQueue = new LinkedTransferQueue<>(); ExecutorService exService = Executors.newFixedThreadPool(3); Producer producer1 = new Producer(transferQueue, "1", 3); Producer producer2 = new Producer(transferQueue, "2", 3); Consumer consumer1 = new Consumer(transferQueue, "1", 3); Consumer consumer2 = new Consumer(transferQueue, "2", 3); // when exService.execute(producer1); exService.execute(producer2); exService.execute(consumer1); exService.execute(consumer2); // then exService.awaitTermination(10_000, TimeUnit.MILLISECONDS); exService.shutdown(); assertEquals(producer1.numberOfProducedMessages.intValue(), 3); assertEquals(producer2.numberOfProducedMessages.intValue(), 3); }
В этом примере у нас есть два потребителя и два производителя. Когда программа запускается, мы видим, что оба производителя могут произвести один элемент, и после этого они будут блокироваться до тех пор, пока один из потребителей не заберет этот элемент из очереди:
Producer: 1 is waiting to transfer... Consumer: 1 is waiting to take element... Producer: 2 is waiting to transfer... Producer: 1 transferred element: A0 Producer: 1 is waiting to transfer... Consumer: 1 received element: A0 Consumer: 1 is waiting to take element... Producer: 2 transferred element: A0 Producer: 2 is waiting to transfer... Consumer: 1 received element: A0 Consumer: 1 is waiting to take element... Producer: 1 transferred element: A1 Producer: 1 is waiting to transfer... Consumer: 1 received element: A1 Consumer: 2 is waiting to take element... Producer: 2 transferred element: A1 Producer: 2 is waiting to transfer... Consumer: 2 received element: A1 Consumer: 2 is waiting to take element... Producer: 1 transferred element: A2 Consumer: 2 received element: A2 Consumer: 2 is waiting to take element... Producer: 2 transferred element: A2 Consumer: 2 received element: A2
5. Заключение
В этой статье мы рассматривали конструкцию Очередь передачи из пакета java.util.concurrent .
Мы видели, как реализовать программу “производитель-потребитель”, используя эту конструкцию. Мы использовали метод transfer() для создания формы обратного давления, когда производитель не может опубликовать другой элемент, пока потребитель не получит элемент из очереди.
Очередь передачи может быть очень полезна, когда мы не хотим, чтобы перепроизводящий производитель наводнял очередь сообщениями, что приводит к ошибкам OutOfMemory . В таком дизайне потребитель будет диктовать скорость, с которой производитель будет создавать сообщения.
Все эти примеры и фрагменты кода можно найти на GitHub – это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.