1. Обзор
В этой статье мы рассмотрим конструкцию Delay Queue из пакета java.util.concurrent . Это блокирующая очередь, которую можно использовать в программах “производитель-потребитель”.
У него есть очень полезная характеристика – когда потребитель хочет взять элемент из очереди, он может взять его только тогда, когда задержка для этого конкретного элемента истекла.
2. Реализация Задержки для элементов в очереди задержки
Каждый элемент, который мы хотим поместить в очередь Delay , должен реализовать интерфейс Delayed |/. Допустим, мы хотим создать объект Delay class. Экземпляры этого класса будут помещены в очередь задержки .
Мы передадим Строку данные и задержку в миллисекундах как и аргументы его конструктору:
public class DelayObject implements Delayed { private String data; private long startTime; public DelayObject(String data, long delayInMilliseconds) { this.data = data; this.startTime = System.currentTimeMillis() + delayInMilliseconds; }
Мы определяем startTime – это время, когда элемент должен быть потреблен из очереди. Далее нам нужно реализовать метод getDelay () – он должен возвращать оставшуюся задержку, связанную с этим объектом в заданной единице времени.
Поэтому нам нужно использовать метод TimeUnit.convert () , чтобы вернуть оставшуюся задержку в соответствующей единице времени:
@Override public long getDelay(TimeUnit unit) { long diff = startTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); }
Когда потребитель пытается взять элемент из очереди, Очередь задержки выполнит getDelay () , чтобы узнать, разрешено ли возвращать этот элемент из очереди. Если метод getDelay() вернет ноль или отрицательное число, это означает, что оно может быть извлечено из очереди.
Нам также необходимо реализовать метод compareTo () , поскольку элементы в очереди Delay будут отсортированы в соответствии со временем истечения срока действия. Элемент, срок действия которого истекает первым, хранится в начале очереди, а элемент с наибольшим сроком действия-в конце очереди:
@Override public int compareTo(Delayed o) { return Ints.saturatedCast( this.startTime - ((DelayObject) o).startTime); }
3. Задержка очереди Потребителя и производителя
Чтобы иметь возможность протестировать нашу Очередь задержки , нам необходимо реализовать логику производителя и потребителя. Класс производителя принимает в качестве аргументов очередь, количество создаваемых элементов и задержку каждого сообщения в миллисекундах.
Затем, когда вызывается метод run () , он помещает элементы в очередь и спит в течение 500 миллисекунд после каждого ввода:
public class DelayQueueProducer implements Runnable { private BlockingQueuequeue; private Integer numberOfElementsToProduce; private Integer delayOfEachProducedMessageMilliseconds; // standard constructor @Override public void run() { for (int i = 0; i < numberOfElementsToProduce; i++) { DelayObject object = new DelayObject( UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds); System.out.println("Put object: " + object); try { queue.put(object); Thread.sleep(500); } catch (InterruptedException ie) { ie.printStackTrace(); } } } }
Реализация потребителя очень похожа, но она также отслеживает количество сообщений, которые были потреблены:
public class DelayQueueConsumer implements Runnable { private BlockingQueuequeue; private Integer numberOfElementsToTake; public AtomicInteger numberOfConsumedElements = new AtomicInteger(); // standard constructors @Override public void run() { for (int i = 0; i < numberOfElementsToTake; i++) { try { DelayObject object = queue.take(); numberOfConsumedElements.incrementAndGet(); System.out.println("Consumer take: " + object); } catch (InterruptedException e) { e.printStackTrace(); } } } }
4. Тест на использование очереди задержки
Чтобы проверить поведение очереди Delay, мы создадим один поток-производитель и один поток-потребитель.
Производитель поместит() два объекта в очередь с задержкой в 500 миллисекунд. Тест утверждает, что потребитель потребил два сообщения:
@Test public void givenDelayQueue_whenProduceElement _thenShouldConsumeAfterGivenDelay() throws InterruptedException { // given ExecutorService executor = Executors.newFixedThreadPool(2); BlockingQueuequeue = new DelayQueue<>(); int numberOfElementsToProduce = 2; int delayOfEachProducedMessageMilliseconds = 500; DelayQueueConsumer consumer = new DelayQueueConsumer( queue, numberOfElementsToProduce); DelayQueueProducer producer = new DelayQueueProducer( queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds); // when executor.submit(producer); executor.submit(consumer); // then executor.awaitTermination(5, TimeUnit.SECONDS); executor.shutdown(); assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce); }
Мы можем наблюдать, что запуск этой программы приведет к следующему результату:
Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007} Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007} Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512} Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Производитель помещает объект, и через некоторое время потребляется первый объект, для которого истекла задержка.
Такая же ситуация сложилась и со вторым элементом.
5. Потребитель, не способный потреблять в данный момент времени
Допустим, у нас есть производитель, который производит элемент, срок действия которого истекает через 10 секунд :
int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = 10_000; DelayQueueConsumer consumer = new DelayQueueConsumer( queue, numberOfElementsToProduce); DelayQueueProducer producer = new DelayQueueProducer( queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
Мы начнем наш тест, но он закончится через 5 секунд. Из-за характеристик очереди Задержки потребитель не сможет использовать сообщение из очереди, поскольку срок действия элемента еще не истек:
executor.submit(producer); executor.submit(consumer); executor.awaitTermination(5, TimeUnit.SECONDS); executor.shutdown(); assertEquals(consumer.numberOfConsumedElements.get(), 0);
Обратите внимание, что у потребителя количество Потребляемых Элементов имеет значение, равное нулю.
6. Производство Элемента С Немедленным Истечением Срока Годности
Когда реализации метода Delayed message getDelay() возвращают отрицательное число, это означает, что срок действия данного элемента уже истек. В этой ситуации производитель немедленно потребит этот элемент.
Мы можем протестировать ситуацию получения элемента с отрицательной задержкой:
int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = -10_000; DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce); DelayQueueProducer producer = new DelayQueueProducer( queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
Когда мы запускаем тестовый случай, потребитель немедленно потребляет элемент, потому что срок его действия уже истек:
executor.submit(producer); executor.submit(consumer); executor.awaitTermination(1, TimeUnit.SECONDS); executor.shutdown(); assertEquals(consumer.numberOfConsumedElements.get(), 1);
7. Заключение
В этой статье мы рассматривали конструкцию Delay Queue из пакета java.util.concurrent .
Мы реализовали элемент Delayed , который был создан и потреблен из очереди.
Мы использовали нашу реализацию очереди Delay для использования элементов, срок действия которых истек.
Реализация всех этих примеров и фрагментов кода может быть найдена в проекте GitHub , который является проектом Maven, поэтому его должно быть легко импортировать и запускать как есть.