Автор оригинала: Dhrubajyoti Bhattacharjee.
1. Обзор
В этом учебнике, мы будем смотреть на java.util.concurrent.Exchanger
2. Введение в Exchanger
Обмен класс в Java может использоваться для обмена объектами между двумя потоками типа T . Класс предоставляет только один перегруженный метод обмен (T t) .
При обменные ждет другой поток в паре, чтобы назвать его также. На этом этапе второй поток находит, что первый поток ждет своего объекта. Поток обменивается объектами, которые они держат, и сигнализирует об обмене, и теперь они могут вернуться.
Рассмотрим пример, чтобы понять обмен сообщениями между двумя потоками с Обмен :
@Test public void givenThreads_whenMessageExchanged_thenCorrect() { Exchangerexchanger = new Exchanger<>(); Runnable taskA = () -> { try { String message = exchanger.exchange("from A"); assertEquals("from B", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; Runnable taskB = () -> { try { String message = exchanger.exchange("from B"); assertEquals("from A", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; CompletableFuture.allOf( runAsync(taskA), runAsync(taskB)).join(); }
Здесь у нас есть два потока обмена сообщениями между ею с помощью общего обменщика. Рассмотрим пример, на котором мы обмениваем объект из основного потока на новый поток:
@Test public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException { Exchangerexchanger = new Exchanger<>(); Runnable runner = () -> { try { String message = exchanger.exchange("from runner"); assertEquals("to runner", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; CompletableFuture result = CompletableFuture.runAsync(runner); String msg = exchanger.exchange("to runner"); assertEquals("from runner", msg); result.join(); }
Обратите внимание, что мы должны начать бегун поток сначала, а затем вызов обмен () в основной нити.
Также обратите внимание, что вызов первого потока может быть тайм-аутом, если второй поток не достигнет точки обмена во времени. Как долго следует ждать первого потока, можно контролировать с помощью перегруженного обмен (T t, длинный тайм-аут, TimeUnit timeUnit).
3. Нет обмена данными GC
Обмен могут использоваться для создания шаблонов конвейера с передачей данных из одного потока в другой. В этом разделе мы создадим простой стек потоков, непрерывно передай данные между ею в качестве конвейера.
@Test public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException { Exchanger> readerExchanger = new Exchanger<>(); Exchanger > writerExchanger = new Exchanger<>(); Runnable reader = () -> { Queue readerBuffer = new ConcurrentLinkedQueue<>(); while (true) { readerBuffer.add(UUID.randomUUID().toString()); if (readerBuffer.size() >= BUFFER_SIZE) { readerBuffer = readerExchanger.exchange(readerBuffer); } } }; Runnable processor = () -> { Queue processorBuffer = new ConcurrentLinkedQueue<>(); Queue writerBuffer = new ConcurrentLinkedQueue<>(); processorBuffer = readerExchanger.exchange(processorBuffer); while (true) { writerBuffer.add(processorBuffer.poll()); if (processorBuffer.isEmpty()) { processorBuffer = readerExchanger.exchange(processorBuffer); writerBuffer = writerExchanger.exchange(writerBuffer); } } }; Runnable writer = () -> { Queue writerBuffer = new ConcurrentLinkedQueue<>(); writerBuffer = writerExchanger.exchange(writerBuffer); while (true) { System.out.println(writerBuffer.poll()); if (writerBuffer.isEmpty()) { writerBuffer = writerExchanger.exchange(writerBuffer); } } }; CompletableFuture.allOf( runAsync(reader), runAsync(processor), runAsync(writer)).join(); }
Здесь у нас есть три темы: читатель , процессор , и писатель . Вместе они работают как единый конвейер обмена данными между ними.
readerExchanger разделяется между читатель и процессор нить, в то время как писательИсследователь разделяется между процессор и писатель нить.
Обратите внимание, что пример здесь только для демонстрации. Мы должны быть осторожны при создании бесконечных петель с в то время как (правда) . Кроме того, чтобы сохранить код читаемым, мы опустили некоторые исключения обработки.
Эта модель обмена данными при повторном использовании буфера позволяет иметь меньше сбора мусора. Метод обмена возвращает те же экземпляры очереди и, таким образом, не будет GC для этих объектов. В отличие от любой блокировки очереди, обменитель не создает никаких узлов или объектов для хранения и обмена данными.
Создание такого трубопровода аналогично шаблону Disrupter, с ключевым отличием, шаблон Disrupter поддерживает нескольких производителей и потребителей, в то время как обменник может быть использован между парой потребителей и производителей.
4.Заключение
Итак, мы узнали, что Exchanger
Как всегда, код доступен более на GitHub .