Рубрики
Без рубрики

Руководство по PriorityBlockingQueue на Java

Введение в Java PriorityBlockingQueue с примерами использования

Автор оригинала: baeldung.

1. введение

В этой статье мы сосредоточимся на классе PriorityBlockingQueue и рассмотрим некоторые практические примеры.

Начиная с предположения, что мы уже знаем, что такое Очередь |, мы сначала продемонстрируем , как элементы в PriorityBlockingQueue упорядочены по приоритету .

После этого мы продемонстрируем, как этот тип очереди можно использовать для блокировки потока.

Наконец, мы покажем, как совместное использование этих двух функций может быть полезно при обработке данных в нескольких потоках.

2. Приоритет элементов

В отличие от стандартной очереди, вы не можете просто добавить любой тип элемента в PriorityBlockingQueue. Есть два варианта:

  1. Добавление элементов, которые реализуют Сопоставимые
  2. Добавление элементов , которые не реализуют Comparable , при условии, что вы также предоставите Компаратор

Используя либо Comparator , либо Comparable реализации для сравнения элементов, PriorityBlockingQueue всегда будет отсортирован.

Цель состоит в том, чтобы реализовать логику сравнения таким образом, чтобы элемент с наивысшим приоритетом всегда упорядочивался первым . Затем, когда мы удаляем элемент из нашей очереди, он всегда будет иметь самый высокий приоритет.

Для начала давайте использовать нашу очередь в одном потоке, а не в нескольких. Таким образом, легко доказать, как элементы упорядочены в модульном тесте:

PriorityBlockingQueue queue = new PriorityBlockingQueue<>();
ArrayList polledElements = new ArrayList<>();
 
queue.add(1);
queue.add(5);
queue.add(2);
queue.add(3);
queue.add(4);

queue.drainTo(polledElements);

assertThat(polledElements).containsExactly(1, 2, 3, 4, 5);

Как мы видим, несмотря на добавление элементов в очередь в случайном порядке, они будут упорядочены, когда мы начнем их опрашивать. Это связано с тем, что класс Integer реализует Comparable, , который, в свою очередь, будет использоваться для того, чтобы мы извлекали их из очереди в порядке возрастания.

Также стоит отметить, что когда два элемента сравниваются и являются одинаковыми, нет никакой гарантии того, как они будут упорядочены.

3. Использование очереди для блокировки

Если бы мы имели дело со стандартной очередью, мы бы вызвали poll() для извлечения элементов. Однако, если очередь была пуста, вызов poll() вернет null.

PriorityBlockingQueue реализует интерфейс BlockingQueue , который дает нам некоторые дополнительные методы, позволяющие нам блокировать при удалении из пустой очереди . Давайте попробуем использовать метод take () , который должен делать именно это:

PriorityBlockingQueue queue = new PriorityBlockingQueue<>();

new Thread(() -> {
  System.out.println("Polling...");

  try {
      Integer poll = queue.take();
      System.out.println("Polled: " + poll);
  } catch (InterruptedException e) {
      e.printStackTrace();
  }
}).start();

Thread.sleep(TimeUnit.SECONDS.toMillis(5));
System.out.println("Adding to queue");
queue.add(1);

Хотя использование sleep() является немного хрупким способом демонстрации вещей, когда мы запустим этот код, мы увидим:

Polling...
Adding to queue
Polled: 1

Это доказывает, что take() заблокирован до тех пор, пока не будет добавлен элемент:

  1. Поток выведет “Опрос”, чтобы доказать, что он запущен
  2. Затем тест будет приостановлен примерно на пять секунд, чтобы доказать, что поток должен был вызвать take() к этому моменту
  3. Мы добавляем в очередь и должны более или менее мгновенно увидеть “Опрошено: 1”, чтобы доказать, что take() вернул элемент, как только он стал доступен

Также стоит отметить, что интерфейс BlockingQueue также предоставляет нам способы блокировки при добавлении в полные очереди.

Однако PriorityBlockingQueue не ограничен. Это означает, что он никогда не будет полным, поэтому всегда можно будет добавлять новые элементы.

4. Совместное использование блокировки и приоритизации

Теперь, когда мы объяснили две ключевые концепции PriorityBlockingQueue, давайте использовать их вместе. Мы можем просто расширить наш предыдущий пример, но на этот раз добавить больше элементов в очередь:

Thread thread = new Thread(() -> {
    System.out.println("Polling...");
    while (true) {
        try {
            Integer poll = queue.take();
            System.out.println("Polled: " + poll);
        } 
        catch (InterruptedException e) { 
            e.printStackTrace();
        }
    }
});

thread.start();

Thread.sleep(TimeUnit.SECONDS.toMillis(5));
System.out.println("Adding to queue");

queue.addAll(newArrayList(1, 5, 6, 1, 2, 6, 7));
Thread.sleep(TimeUnit.SECONDS.toMillis(1));

Опять же, хотя это немного хрупко из-за использования sleep(), он все еще показывает нам допустимый вариант использования. Теперь у нас есть очередь, которая блокируется, ожидая добавления элементов. Затем мы добавляем сразу много элементов, а затем показываем, что они будут обрабатываться в приоритетном порядке. Результат будет выглядеть следующим образом:

Polling...
Adding to queue
Polled: 1
Polled: 1
Polled: 2
Polled: 5
Polled: 6
Polled: 6
Polled: 7

5. Заключение

В этом руководстве мы продемонстрировали, как мы можем использовать PriorityBlockingQueue для блокировки потока до тех пор, пока в него не будут добавлены некоторые элементы, а также что мы можем обрабатывать эти элементы на основе их приоритета.

Реализацию этих примеров можно найти на GitHub . Это проект на основе Maven, поэтому его должно быть легко запустить как есть.