Автор оригинала: Pankaj Kumar.
Реактивные потоки Java 9 позволяют нам реализовывать неблокирующую асинхронную обработку потоков. Это важный шаг на пути к применению модели реактивного программирования к основному программированию на java.
Если вы новичок в реактивном программировании, пожалуйста, прочтите Реактивный манифест и просмотрите короткие заметки о Реактивных потоках . Потоки RxJava и Akka были популярной реализацией реактивных потоков. Теперь java 9 внедрила поддержку реактивных потоков через java.util.concurrent.Поток
API.
Реактивные потоки Java 9
Реактивные потоки связаны с асинхронной обработкой потока, поэтому должны быть Издатель и Подписчик . Издатель публикует поток данных, а Подписчик использует эти данные.
Иногда нам приходится преобразовывать данные между Издателем и Подписчиком. Процессор – это сущность, находящаяся между конечным издателем и подписчиком для преобразования данных, полученных от издателя, чтобы подписчик мог их понять. У нас может быть цепочка процессоров.
Из приведенного выше изображения очень ясно, что Процессор работает как Подписчик, так и Издатель.
API потока Java 9
Java 9 Flow API реализует спецификацию Реактивных потоков . API потока представляет собой комбинацию Итератор и Наблюдатель шаблон. Итератор работает с моделью вытягивания, когда приложение извлекает элементы из источника, в то время как Наблюдатель работает с моделью выталкивания и реагирует, когда элемент перемещается из источника в приложение.
Подписчик Java 9 Flow API может запросить N элементов при подписке на издателя. Затем элементы передаются от издателя к подписчику до тех пор, пока не останется больше элементов для отправки или не произойдет какая-либо ошибка.
Классы и интерфейсы API потока Java 9
Давайте быстро рассмотрим классы и интерфейсы API потока.
java.util.параллельный.Flow
: Это основной класс API потока. Этот класс инкапсулирует все важные интерфейсы API потока. Это последний урок, и мы не можем его продлить.java.util.concurrent.Flow.Publisher
: Это функциональный интерфейс, и каждый издатель должен реализовать свой метод подписки, чтобы добавить данного подписчика для получения сообщений.java.util.concurrent.Flow.Subscriber
: Каждый подписчик должен реализовать этот интерфейс. Методы в подписчике вызываются в строгом последовательном порядке. В этом интерфейсе есть четыре метода:при подписке
: Это первый метод, который вызывается, когда подписчик подписан на получение сообщений издателем. Обычно мы вызываемsubscription.request
, чтобы начать получать элементы от процессора.на следующем
: Этот метод вызывается, когда элемент получен от издателя, именно здесь мы реализуем нашу бизнес-логику для обработки потока, а затем запрашиваем дополнительные данные у издателя.onError
: Этот метод вызывается при возникновении неисправимой ошибки, в этом методе мы можем выполнять задачи очистки, такие как закрытие соединения с базой данных.неполный
: Это похоже на метод finally и вызывается, когда издатель не создает никаких других элементов, а издатель закрыт. Мы можем использовать его для отправки уведомления об успешной обработке потока.
java.util.concurrent.Flow.Подписка
: Используется для создания асинхронной неблокирующей связи между издателем и подписчиком. Подписчик вызывает свойзапрос
метод, чтобы запросить элементы у издателя. В нем также естьотмена
метод отмены подписки, т. е. закрытие связи между издателем и подписчиком.java.util.параллельный.Поток.Процессор
: Этот интерфейс расширяет возможности как Издателя, так и Подписчика, он используется для преобразования сообщения между издателем и подписчиком.java.util.concurrent.SubmissionPublisher
: Реализация издателя, которая асинхронно выдает отправленные элементы текущим подписчикам, пока она не будет закрыта. Он использует структуру исполнителя.Мы будем использовать этот класс в примерах реактивного потока, чтобы добавить подписчика, а затем отправить ему элементы.
Пример реактивного потока Java 9
Давайте начнем с простого примера, в котором мы реализуем интерфейс подписчика API потока и издателя отправки сообщений пользователем для создания издателя и отправки сообщений.
Потоковые Данные
Допустим, у нас есть класс Employee, который будет использоваться для создания потокового сообщения, отправляемого от издателя подписчику.
package com.journaldev.reactive.beans; public class Employee { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Employee(int i, String s) { this.id = i; this.name = s; } public Employee() { } @Override public String toString() { return "[id="+id+",name="+name+"]"; } }
У нас также есть служебный класс для создания списка сотрудников для нашего примера.
package com.journaldev.reactive_streams; import java.util.ArrayList; import java.util.List; import com.journaldev.reactive.beans.Employee; public class EmpHelper { public static ListgetEmps() { Employee e1 = new Employee(1, "Pankaj"); Employee e2 = new Employee(2, "David"); Employee e3 = new Employee(3, "Lisa"); Employee e4 = new Employee(4, "Ram"); Employee e5 = new Employee(5, "Anupam"); List emps = new ArrayList<>(); emps.add(e1); emps.add(e2); emps.add(e3); emps.add(e4); emps.add(e5); return emps; } }
Подписчик
package com.journaldev.reactive_streams; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import com.journaldev.reactive.beans.Employee; public class MySubscriber implements Subscriber{ private Subscription subscription; private int counter = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed"); this.subscription = subscription; this.subscription.request(1); //requesting data from publisher System.out.println("onSubscribe requested 1 item"); } @Override public void onNext(Employee item) { System.out.println("Processing Employee "+item); counter++; this.subscription.request(1); } @Override public void onError(Throwable e) { System.out.println("Some error happened"); e.printStackTrace(); } @Override public void onComplete() { System.out.println("All Processing Done"); } public int getCounter() { return counter; } }
Подписка
переменная для сохранения ссылки, чтобы запрос мог быть сделан в методеonNext
.счетчик
переменная для подсчета количества обработанных элементов обратите внимание, что его значение увеличивается в методе onNext. Это будет использоваться в нашем основном методе для ожидания завершения выполнения перед завершением основного потока.- Запрос на подписку вызывается в
OnSubscribe
методе для начала обработки. Также обратите внимание, что он снова вызывается методомon Next
после обработки элемента, требуя от издателя обработки следующего элемента. onError
иonComplete
здесь не так много, но в реальном сценарии они должны использоваться для выполнения корректирующих мер при возникновении ошибки или очистки ресурсов при успешном завершении обработки.
Программа тестирования Реактивного Потока
Мы будем использовать Submission Publisher
в качестве Издателя для наших примеров, поэтому давайте рассмотрим тестовую программу для нашей реализации реактивного потока.
package com.journaldev.reactive_streams; import java.util.List; import java.util.concurrent.SubmissionPublisher; import com.journaldev.reactive.beans.Employee; public class MyReactiveApp { public static void main(String args[]) throws InterruptedException { // Create Publisher SubmissionPublisherpublisher = new SubmissionPublisher<>(); // Register Subscriber MySubscriber subs = new MySubscriber(); publisher.subscribe(subs); List emps = EmpHelper.getEmps(); // Publish items System.out.println("Publishing Items to Subscriber"); emps.stream().forEach(i -> publisher.submit(i)); // logic to wait till processing of all messages are over while (emps.size() != subs.getCounter()) { Thread.sleep(10); } // close the Publisher publisher.close(); System.out.println("Exiting the app"); } }
Наиболее важной частью приведенного выше кода является подписка
и отправка
вызов методов издателя. Мы всегда должны закрывать издателя, чтобы избежать каких-либо утечек памяти.
Мы получим следующий вывод, когда будет выполнена вышеуказанная программа.
Subscribed Publishing Items to Subscriber onSubscribe requested 1 item Processing Employee [id=1,name=Pankaj] Processing Employee [id=2,name=David] Processing Employee [id=3,name=Lisa] Processing Employee [id=4,name=Ram] Processing Employee [id=5,name=Anupam] Exiting the app All Processing Done
Обратите внимание, что если у нас не будет логики, чтобы основной метод ждал, пока все элементы будут обработаны, то мы получим нежелательные результаты.
Пример Преобразования Сообщений
Процессор используется для преобразования сообщения между издателем и подписчиком. Допустим, у нас есть другой подписчик, который ожидает обработки сообщений другого типа. Допустим, этот новый тип сообщения – Фрилансер
.
package com.journaldev.reactive.beans; public class Freelancer extends Employee { private int fid; public int getFid() { return fid; } public void setFid(int fid) { this.fid = fid; } public Freelancer(int id, int fid, String name) { super(id, name); this.fid = fid; } @Override public String toString() { return "[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]"; } }
У нас появился новый подписчик, который будет использовать потоковые данные фрилансера.
package com.journaldev.reactive_streams; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import com.journaldev.reactive.beans.Freelancer; public class MyFreelancerSubscriber implements Subscriber{ private Subscription subscription; private int counter = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed for Freelancer"); this.subscription = subscription; this.subscription.request(1); //requesting data from publisher System.out.println("onSubscribe requested 1 item for Freelancer"); } @Override public void onNext(Freelancer item) { System.out.println("Processing Freelancer "+item); counter++; this.subscription.request(1); } @Override public void onError(Throwable e) { System.out.println("Some error happened in MyFreelancerSubscriber"); e.printStackTrace(); } @Override public void onComplete() { System.out.println("All Processing Done for MyFreelancerSubscriber"); } public int getCounter() { return counter; } }
Процессор
Важной частью является реализация процессора
интерфейса. Поскольку мы хотим использовать издателя отправки
, мы расширим его и будем использовать везде, где это применимо.
package com.journaldev.reactive_streams; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; import java.util.function.Function; import com.journaldev.reactive.beans.Employee; import com.journaldev.reactive.beans.Freelancer; public class MyProcessor extends SubmissionPublisherimplements Processor { private Subscription subscription; private Function function; public MyProcessor(Function function) { super(); this.function = function; } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(Employee emp) { submit((Freelancer) function.apply(emp)); subscription.request(1); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); } }
Функция
будет использоваться для преобразования объекта Сотрудника в объект фрилансера.- Мы преобразуем входящее сообщение сотрудника в сообщение фрилансера методом
контекст
, а затем используем метод отправки SubmissionPublisher для отправки его подписчику. - Поскольку процессор работает как подписчик, так и издатель, мы можем создать цепочку процессоров между конечными издателями и подписчиками.
Тест на преобразование сообщений
package com.journaldev.reactive_streams; import java.util.List; import java.util.concurrent.SubmissionPublisher; import com.journaldev.reactive.beans.Employee; import com.journaldev.reactive.beans.Freelancer; public class MyReactiveAppWithProcessor { public static void main(String[] args) throws InterruptedException { // Create End Publisher SubmissionPublisherpublisher = new SubmissionPublisher<>(); // Create Processor MyProcessor transformProcessor = new MyProcessor(s -> { return new Freelancer(s.getId(), s.getId() + 100, s.getName()); }); //Create End Subscriber MyFreelancerSubscriber subs = new MyFreelancerSubscriber(); //Create chain of publisher, processor and subscriber publisher.subscribe(transformProcessor); // publisher to processor transformProcessor.subscribe(subs); // processor to subscriber List emps = EmpHelper.getEmps(); // Publish items System.out.println("Publishing Items to Subscriber"); emps.stream().forEach(i -> publisher.submit(i)); // Logic to wait for messages processing to finish while (emps.size() != subs.getCounter()) { Thread.sleep(10); } // Closing publishers publisher.close(); transformProcessor.close(); System.out.println("Exiting the app"); } }
Прочитайте комментарии в программе, чтобы правильно понять ее, самым важным изменением является создание цепочки производитель-процессор-подписчик. Мы получим следующий вывод, когда будет выполнена вышеуказанная программа.
Subscribed for Freelancer Publishing Items to Subscriber onSubscribe requested 1 item for Freelancer Processing Freelancer [id=1,name=Pankaj,fid=101] Processing Freelancer [id=2,name=David,fid=102] Processing Freelancer [id=3,name=Lisa,fid=103] Processing Freelancer [id=4,name=Ram,fid=104] Processing Freelancer [id=5,name=Anupam,fid=105] Exiting the app All Processing Done for MyFreelancerSubscriber Done
Отменить Подписку
Мы можем использовать метод отмены подписки, чтобы прекратить получать сообщения от подписчика. Обратите внимание, что если мы отменим подписку, то подписчик не получит неполный или ошибочный сигнал.
Вот пример кода, в котором подписчик получает только 3 сообщения, а затем отменяет подписку.
@Override public void onNext(Employee item) { System.out.println("Processing Employee "+item); counter++; if(counter==3) { this.subscription.cancel(); return; } this.subscription.request(1); }
Обратите внимание, что в этом случае наша логика остановки основного потока до того, как все сообщения будут обработаны, перейдет в бесконечный цикл. Мы можем добавить некоторую дополнительную логику для этого сценария, возможно, какую-то глобальную переменную, которую нужно искать, если подписчик прекратил обработку или отменил подписку.
Обратное Давление
Когда издатель создает сообщения с гораздо большей скоростью, чем их потребляет подписчик, возникает обратное давление. API потока не предоставляет никакого механизма для сигнализации о противодавлении или для борьбы с ним. Но мы можем разработать свою собственную стратегию, чтобы справиться с этим, например, настроить подписчика или снизить скорость передачи сообщений. Вы можете прочитать, как RxJava справляется с обратным давлением .
Резюме
Java 9 Flow API-это хороший шаг в направлении реактивного программирования и создания асинхронного неблокирующего приложения. Однако создание настоящего реактивного приложения возможно только в том случае, если его поддерживают все системные API.