1. Обзор
В этой статье мы рассмотрим SynchronousQueue из пакета java.util.concurrent .
Проще говоря, эта реализация позволяет нам обмениваться информацией между потоками потокобезопасным способом.
2. Обзор API
SynchronousQueue имеет только две поддерживаемые операции: take() и put(), и обе они блокируют .
Например, когда мы хотим добавить элемент в очередь, нам нужно вызвать метод put () . Этот метод будет блокироваться до тех пор, пока какой-то другой поток не вызовет метод take () , сигнализируя о том, что он готов принять элемент.
Хотя SynchronousQueue имеет интерфейс очереди, мы должны думать о нем как о точке обмена одним элементом между двумя потоками, в которой один поток передает элемент, а другой принимает этот элемент.
3. Реализация Хэндоффов С использованием общей переменной
Чтобы понять, почему SynchronousQueue может быть так полезен, мы реализуем логику с использованием общей переменной между двумя потоками, а затем перепишем эту логику с помощью SynchronousQueue , сделав наш код намного проще и читабельнее.
Допустим, у нас есть два потока – производитель и потребитель – и когда производитель устанавливает значение общей переменной, мы хотим сообщить об этом потоку-потребителю. Затем поток-потребитель извлекает значение из общей переменной.
Мы будем использовать CountDownLatch для координации этих двух потоков, чтобы предотвратить ситуацию, когда потребитель обращается к значению общей переменной, которое еще не было установлено.
Мы определим shared State variable и CountDownLatch , которые будут использоваться для координации обработки:
ExecutorService executor = Executors.newFixedThreadPool(2); AtomicInteger sharedState = new AtomicInteger(); CountDownLatch countDownLatch = new CountDownLatch(1);
Производитель сохранит случайное целое число в переменной shared State и выполнит метод Countdown() на CountDownLatch, сигнализируя потребителю, что он может извлечь значение из SharedState:
Runnable producer = () -> { Integer producedElement = ThreadLocalRandom .current() .nextInt(); sharedState.set(producedElement); countDownLatch.countDown(); };
Потребитель будет ждать CountDownLatch с помощью метода await () . Когда производитель сигнализирует, что переменная была установлена, потребитель извлекает ее из общего состояния :
Runnable consumer = () -> { try { countDownLatch.await(); Integer consumedElement = sharedState.get(); } catch (InterruptedException ex) { ex.printStackTrace(); } };
И последнее, но не менее важное: давайте начнем нашу программу:
executor.execute(producer); executor.execute(consumer); executor.awaitTermination(500, TimeUnit.MILLISECONDS); executor.shutdown(); assertEquals(countDownLatch.getCount(), 0);
Это приведет к следующему результату:
Saving an element: -1507375353 to the exchange point consumed an element: -1507375353 from the exchange point
Мы видим, что это очень много кода для реализации такой простой функции, как обмен элементом между двумя потоками. В следующем разделе мы постараемся сделать его лучше.
4. Реализация Хэндоффов С использованием SynchronousQueue
Теперь давайте реализуем ту же функциональность, что и в предыдущем разделе, но с помощью SynchronousQueue. Он имеет двойной эффект, потому что мы можем использовать его для обмена состояниями между потоками и для координации этого действия, так что нам не нужно использовать ничего, кроме SynchronousQueue.
Во – первых, мы определим очередь:
ExecutorService executor = Executors.newFixedThreadPool(2); SynchronousQueuequeue = new SynchronousQueue<>();
Производитель вызовет метод put () , который будет блокироваться до тех пор, пока какой-либо другой поток не возьмет элемент из очереди:
Runnable producer = () -> { Integer producedElement = ThreadLocalRandom .current() .nextInt(); try { queue.put(producedElement); } catch (InterruptedException ex) { ex.printStackTrace(); } };
Потребитель просто получит этот элемент с помощью метода take() :
Runnable consumer = () -> { try { Integer consumedElement = queue.take(); } catch (InterruptedException ex) { ex.printStackTrace(); } };
Далее мы начнем нашу программу:
executor.execute(producer); executor.execute(consumer); executor.awaitTermination(500, TimeUnit.MILLISECONDS); executor.shutdown(); assertEquals(queue.size(), 0);
Это приведет к следующему результату:
Saving an element: 339626897 to the exchange point consumed an element: 339626897 from the exchange point
Мы видим, что a SynchronousQueue используется в качестве точки обмена между потоками, что намного лучше и понятнее, чем предыдущий пример, в котором использовалось общее состояние вместе с a CountDownLatch.
5. Заключение
В этом кратком руководстве мы рассмотрели конструкцию SynchronousQueue . Мы создали программу, которая обменивается данными между двумя потоками, используя общее состояние, а затем переписали эту программу, чтобы использовать конструкцию SynchronousQueue . Это служит точкой обмена, которая координирует поток производителя и потребителя.
Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub – это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.