1. введение
В многопоточной среде иногда нам нужно планировать задачи на основе пользовательских критериев, а не только времени создания.
Давайте посмотрим, как мы можем достичь этого в Java – используя PriorityBlockingQueue .
2. Обзор
Допустим, у нас есть задания, которые мы хотим выполнить, исходя из их приоритета:
public class Job implements Runnable { private String jobName; private JobPriority jobPriority; @Override public void run() { System.out.println("Job:" + jobName + " Priority:" + jobPriority); Thread.sleep(1000); // to simulate actual execution time } // standard setters and getters }
В демонстрационных целях мы печатаем имя задания и приоритет в методе run () .
Мы также добавили sleep () , чтобы имитировать более длительное задание; во время выполнения задания в очереди приоритетов будет накапливаться больше заданий.
Наконец, Приоритет работы – это простое перечисление:
public enum JobPriority { HIGH, MEDIUM, LOW }
3. Пользовательский Компаратор
Нам нужно написать компаратор, определяющий наши пользовательские критерии; и в Java 8 это тривиально:
Comparator.comparing(Job::getJobPriority);
4. Планировщик Приоритетных заданий
Теперь, когда все настройки выполнены, давайте реализуем простой планировщик заданий, который использует одного исполнителя потока для поиска заданий в PriorityBlockingQueue и выполняет их:
public class PriorityJobScheduler { private ExecutorService priorityJobPoolExecutor; private ExecutorService priorityJobScheduler = Executors.newSingleThreadExecutor(); private PriorityBlockingQueuepriorityQueue; public PriorityJobScheduler(Integer poolSize, Integer queueSize) { priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize); priorityQueue = new PriorityBlockingQueue ( queueSize, Comparator.comparing(Job::getJobPriority)); priorityJobScheduler.execute(() -> { while (true) { try { priorityJobPoolExecutor.execute(priorityQueue.take()); } catch (InterruptedException e) { // exception needs special handling break; } } }); } public void scheduleJob(Job job) { priorityQueue.add(job); } }
Ключ здесь заключается в создании экземпляра PriorityBlockingQueue типа Job с пользовательским компаратором. Следующее задание для выполнения выбирается из очереди с помощью метода take () , который извлекает и удаляет заголовок очереди.
Теперь клиентскому коду просто нужно вызвать функцию scheduleJob () , которая добавляет задание в очередь. priority Queue.and() ставит задание в соответствующую позицию по сравнению с существующими заданиями в очереди, используя JobExecutionComparator .
Обратите внимание, что фактические задания выполняются с использованием отдельного ExecutorService с выделенным пулом потоков.
5. Демо-версия
Наконец, вот быстрая демонстрация планировщика:
private static int POOL_SIZE = 1; private static int QUEUE_SIZE = 10; @Test public void whenMultiplePriorityJobsQueued_thenHighestPriorityJobIsPicked() { Job job1 = new Job("Job1", JobPriority.LOW); Job job2 = new Job("Job2", JobPriority.MEDIUM); Job job3 = new Job("Job3", JobPriority.HIGH); Job job4 = new Job("Job4", JobPriority.MEDIUM); Job job5 = new Job("Job5", JobPriority.LOW); Job job6 = new Job("Job6", JobPriority.HIGH); PriorityJobScheduler pjs = new PriorityJobScheduler( POOL_SIZE, QUEUE_SIZE); pjs.scheduleJob(job1); pjs.scheduleJob(job2); pjs.scheduleJob(job3); pjs.scheduleJob(job4); pjs.scheduleJob(job5); pjs.scheduleJob(job6); // clean up }
Чтобы продемонстрировать, что задания выполняются в порядке приоритета, мы сохранили POOL_SIZE как 1, хотя QUEUE_SIZE равно 10. Мы предоставляем планировщику задания с различным приоритетом.
Вот пример вывода, который мы получили для одного из запусков:
Job:Job3 Priority:HIGH Job:Job6 Priority:HIGH Job:Job4 Priority:MEDIUM Job:Job2 Priority:MEDIUM Job:Job1 Priority:LOW Job:Job5 Priority:LOW
Выход может варьироваться в зависимости от пробега. Однако у нас никогда не должно быть случая, когда задание с более низким приоритетом выполняется, даже если очередь содержит задание с более высоким приоритетом.
6. Заключение
В этом кратком уроке мы увидели, как PriorityBlockingQueue может использоваться для выполнения заданий в пользовательском порядке приоритета.
Как обычно, исходные файлы можно найти на GitHub .