Рубрики
Без рубрики

Реактивные потоки Java 9

Реактивные потоки Java 9, Реактивное программирование Java, API потока Java 9, Издатель Java 9, Подписчик, Процессор, при подписке, onNext, Метод запроса подписки, Завершение, Пример ошибки, Преобразование сообщений, Цепочка процессора.

Автор оригинала: Pankaj Kumar.

Реактивные потоки Java 9 позволяют нам реализовывать неблокирующую асинхронную обработку потоков. Это важный шаг на пути к применению модели реактивного программирования к основному программированию на java.

Если вы новичок в реактивном программировании, пожалуйста, прочтите Реактивный манифест и просмотрите короткие заметки о Реактивных потоках . Потоки RxJava и Akka были популярной реализацией реактивных потоков. Теперь java 9 внедрила поддержку реактивных потоков через java.util.concurrent.Поток API.

Реактивные потоки Java 9

Реактивные потоки связаны с асинхронной обработкой потока, поэтому должны быть Издатель и Подписчик . Издатель публикует поток данных, а Подписчик использует эти данные.

Иногда нам приходится преобразовывать данные между Издателем и Подписчиком. Процессор – это сущность, находящаяся между конечным издателем и подписчиком для преобразования данных, полученных от издателя, чтобы подписчик мог их понять. У нас может быть цепочка процессоров.

Из приведенного выше изображения очень ясно, что Процессор работает как Подписчик, так и Издатель.

API потока Java 9

Java 9 Flow API реализует спецификацию Реактивных потоков . API потока представляет собой комбинацию Итератор и Наблюдатель шаблон. Итератор работает с моделью вытягивания, когда приложение извлекает элементы из источника, в то время как Наблюдатель работает с моделью выталкивания и реагирует, когда элемент перемещается из источника в приложение.

Подписчик Java 9 Flow API может запросить N элементов при подписке на издателя. Затем элементы передаются от издателя к подписчику до тех пор, пока не останется больше элементов для отправки или не произойдет какая-либо ошибка.

Классы и интерфейсы API потока Java 9

Давайте быстро рассмотрим классы и интерфейсы API потока.

  • java.util.параллельный.Flow : Это основной класс API потока. Этот класс инкапсулирует все важные интерфейсы API потока. Это последний урок, и мы не можем его продлить.
  • java.util.concurrent.Flow.Publisher : Это функциональный интерфейс, и каждый издатель должен реализовать свой метод подписки, чтобы добавить данного подписчика для получения сообщений.
  • java.util.concurrent.Flow.Subscriber : Каждый подписчик должен реализовать этот интерфейс. Методы в подписчике вызываются в строгом последовательном порядке. В этом интерфейсе есть четыре метода:

    1. при подписке : Это первый метод, который вызывается, когда подписчик подписан на получение сообщений издателем. Обычно мы вызываем subscription.request , чтобы начать получать элементы от процессора.
    2. на следующем : Этот метод вызывается, когда элемент получен от издателя, именно здесь мы реализуем нашу бизнес-логику для обработки потока, а затем запрашиваем дополнительные данные у издателя.
    3. onError : Этот метод вызывается при возникновении неисправимой ошибки, в этом методе мы можем выполнять задачи очистки, такие как закрытие соединения с базой данных.
    4. неполный : Это похоже на метод finally и вызывается, когда издатель не создает никаких других элементов, а издатель закрыт. Мы можем использовать его для отправки уведомления об успешной обработке потока.

  • java.util.concurrent.Flow.Подписка : Используется для создания асинхронной неблокирующей связи между издателем и подписчиком. Подписчик вызывает свой запрос метод, чтобы запросить элементы у издателя. В нем также есть отмена метод отмены подписки, т. е. закрытие связи между издателем и подписчиком.
  • java.util.параллельный.Поток.Процессор : Этот интерфейс расширяет возможности как Издателя, так и Подписчика, он используется для преобразования сообщения между издателем и подписчиком.
  • java.util.concurrent.SubmissionPublisher : Реализация издателя, которая асинхронно выдает отправленные элементы текущим подписчикам, пока она не будет закрыта. Он использует структуру исполнителя.Мы будем использовать этот класс в примерах реактивного потока, чтобы добавить подписчика, а затем отправить ему элементы.

Пример реактивного потока Java 9

Давайте начнем с простого примера, в котором мы реализуем интерфейс подписчика API потока и издателя отправки сообщений пользователем для создания издателя и отправки сообщений.

Потоковые Данные

Допустим, у нас есть класс Employee, который будет использоваться для создания потокового сообщения, отправляемого от издателя подписчику.

package com.journaldev.reactive.beans;

public class Employee {

	private int id;
	private String name;
	
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	
	public Employee(int i, String s) {
		this.id = i;
		this.name = s;
	}
	
	public Employee() {
	}
	
	@Override
	public String toString() {
		return "[id="+id+",name="+name+"]";
	}
}

У нас также есть служебный класс для создания списка сотрудников для нашего примера.

package com.journaldev.reactive_streams;

import java.util.ArrayList;
import java.util.List;

import com.journaldev.reactive.beans.Employee;

public class EmpHelper {

	public static List getEmps() {

		Employee e1 = new Employee(1, "Pankaj");
		Employee e2 = new Employee(2, "David");
		Employee e3 = new Employee(3, "Lisa");
		Employee e4 = new Employee(4, "Ram");
		Employee e5 = new Employee(5, "Anupam");
		
		List emps = new ArrayList<>();
		emps.add(e1);
		emps.add(e2);
		emps.add(e3);
		emps.add(e4);
		emps.add(e5);
		
		return emps;
	}

}

Подписчик

package com.journaldev.reactive_streams;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

import com.journaldev.reactive.beans.Employee;

public class MySubscriber implements Subscriber {

	private Subscription subscription;
	
	private int counter = 0;
	
	@Override
	public void onSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		this.subscription = subscription;
		this.subscription.request(1); //requesting data from publisher
		System.out.println("onSubscribe requested 1 item");
	}

	@Override
	public void onNext(Employee item) {
		System.out.println("Processing Employee "+item);
		counter++;
		this.subscription.request(1);
	}

	@Override
	public void onError(Throwable e) {
		System.out.println("Some error happened");
		e.printStackTrace();
	}

	@Override
	public void onComplete() {
		System.out.println("All Processing Done");
	}

	public int getCounter() {
		return counter;
	}

}
  • Подписка переменная для сохранения ссылки, чтобы запрос мог быть сделан в методе onNext .
  • счетчик переменная для подсчета количества обработанных элементов обратите внимание, что его значение увеличивается в методе onNext. Это будет использоваться в нашем основном методе для ожидания завершения выполнения перед завершением основного потока.
  • Запрос на подписку вызывается в OnSubscribe методе для начала обработки. Также обратите внимание, что он снова вызывается методом on Next после обработки элемента, требуя от издателя обработки следующего элемента.
  • onError и onComplete здесь не так много, но в реальном сценарии они должны использоваться для выполнения корректирующих мер при возникновении ошибки или очистки ресурсов при успешном завершении обработки.

Программа тестирования Реактивного Потока

Мы будем использовать Submission Publisher в качестве Издателя для наших примеров, поэтому давайте рассмотрим тестовую программу для нашей реализации реактивного потока.

package com.journaldev.reactive_streams;

import java.util.List;
import java.util.concurrent.SubmissionPublisher;

import com.journaldev.reactive.beans.Employee;

public class MyReactiveApp {

	public static void main(String args[]) throws InterruptedException {

		// Create Publisher
		SubmissionPublisher publisher = new SubmissionPublisher<>();

		// Register Subscriber
		MySubscriber subs = new MySubscriber();
		publisher.subscribe(subs);

		List emps = EmpHelper.getEmps();

		// Publish items
		System.out.println("Publishing Items to Subscriber");
		emps.stream().forEach(i -> publisher.submit(i));

		// logic to wait till processing of all messages are over
		while (emps.size() != subs.getCounter()) {
			Thread.sleep(10);
		}
		// close the Publisher
		publisher.close();

		System.out.println("Exiting the app");

	}

}

Наиболее важной частью приведенного выше кода является подписка и отправка вызов методов издателя. Мы всегда должны закрывать издателя, чтобы избежать каких-либо утечек памяти.

Мы получим следующий вывод, когда будет выполнена вышеуказанная программа.

Subscribed
Publishing Items to Subscriber
onSubscribe requested 1 item
Processing Employee [id=1,name=Pankaj]
Processing Employee [id=2,name=David]
Processing Employee [id=3,name=Lisa]
Processing Employee [id=4,name=Ram]
Processing Employee [id=5,name=Anupam]
Exiting the app
All Processing Done

Обратите внимание, что если у нас не будет логики, чтобы основной метод ждал, пока все элементы будут обработаны, то мы получим нежелательные результаты.

Пример Преобразования Сообщений

Процессор используется для преобразования сообщения между издателем и подписчиком. Допустим, у нас есть другой подписчик, который ожидает обработки сообщений другого типа. Допустим, этот новый тип сообщения – Фрилансер .

package com.journaldev.reactive.beans;

public class Freelancer extends Employee {

	private int fid;

	public int getFid() {
		return fid;
	}

	public void setFid(int fid) {
		this.fid = fid;
	}
	
	public Freelancer(int id, int fid, String name) {
		super(id, name);
		this.fid = fid;
	}
	
	@Override
	public String toString() {
		return "[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]";
	}
}

У нас появился новый подписчик, который будет использовать потоковые данные фрилансера.

package com.journaldev.reactive_streams;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

import com.journaldev.reactive.beans.Freelancer;

public class MyFreelancerSubscriber implements Subscriber {

	private Subscription subscription;
	
	private int counter = 0;
	
	@Override
	public void onSubscribe(Subscription subscription) {
		System.out.println("Subscribed for Freelancer");
		this.subscription = subscription;
		this.subscription.request(1); //requesting data from publisher
		System.out.println("onSubscribe requested 1 item for Freelancer");
	}

	@Override
	public void onNext(Freelancer item) {
		System.out.println("Processing Freelancer "+item);
		counter++;
		this.subscription.request(1);
	}

	@Override
	public void onError(Throwable e) {
		System.out.println("Some error happened in MyFreelancerSubscriber");
		e.printStackTrace();
	}

	@Override
	public void onComplete() {
		System.out.println("All Processing Done for MyFreelancerSubscriber");
	}

	public int getCounter() {
		return counter;
	}

}

Процессор

Важной частью является реализация процессора интерфейса. Поскольку мы хотим использовать издателя отправки , мы расширим его и будем использовать везде, где это применимо.

package com.journaldev.reactive_streams;

import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;

import com.journaldev.reactive.beans.Employee;
import com.journaldev.reactive.beans.Freelancer;

public class MyProcessor extends SubmissionPublisher implements Processor {

	private Subscription subscription;
	private Function function;
	
	public MyProcessor(Function function) {  
	    super();  
	    this.function = function;  
	  }  
	
	@Override
	public void onSubscribe(Subscription subscription) {
		this.subscription = subscription;
		subscription.request(1);
	}

	@Override
	public void onNext(Employee emp) {
		submit((Freelancer) function.apply(emp));  
	    subscription.request(1);  
	}

	@Override
	public void onError(Throwable e) {
		e.printStackTrace();
	}

	@Override
	public void onComplete() {
		System.out.println("Done");
	}

}
  • Функция будет использоваться для преобразования объекта Сотрудника в объект фрилансера.
  • Мы преобразуем входящее сообщение сотрудника в сообщение фрилансера методом контекст , а затем используем метод отправки SubmissionPublisher для отправки его подписчику.
  • Поскольку процессор работает как подписчик, так и издатель, мы можем создать цепочку процессоров между конечными издателями и подписчиками.

Тест на преобразование сообщений

package com.journaldev.reactive_streams;

import java.util.List;
import java.util.concurrent.SubmissionPublisher;

import com.journaldev.reactive.beans.Employee;
import com.journaldev.reactive.beans.Freelancer;

public class MyReactiveAppWithProcessor {

	public static void main(String[] args) throws InterruptedException {
		// Create End Publisher
		SubmissionPublisher publisher = new SubmissionPublisher<>();

		// Create Processor
		MyProcessor transformProcessor = new MyProcessor(s -> {
			return new Freelancer(s.getId(), s.getId() + 100, s.getName());
		});

		//Create End Subscriber
		MyFreelancerSubscriber subs = new MyFreelancerSubscriber();

		//Create chain of publisher, processor and subscriber
		publisher.subscribe(transformProcessor); // publisher to processor
		transformProcessor.subscribe(subs); // processor to subscriber

		List emps = EmpHelper.getEmps();

		// Publish items
		System.out.println("Publishing Items to Subscriber");
		emps.stream().forEach(i -> publisher.submit(i));

		// Logic to wait for messages processing to finish
		while (emps.size() != subs.getCounter()) {
			Thread.sleep(10);
		}

		// Closing publishers
		publisher.close();
		transformProcessor.close();

		System.out.println("Exiting the app");
	}

}

Прочитайте комментарии в программе, чтобы правильно понять ее, самым важным изменением является создание цепочки производитель-процессор-подписчик. Мы получим следующий вывод, когда будет выполнена вышеуказанная программа.

Subscribed for Freelancer
Publishing Items to Subscriber
onSubscribe requested 1 item for Freelancer
Processing Freelancer [id=1,name=Pankaj,fid=101]
Processing Freelancer [id=2,name=David,fid=102]
Processing Freelancer [id=3,name=Lisa,fid=103]
Processing Freelancer [id=4,name=Ram,fid=104]
Processing Freelancer [id=5,name=Anupam,fid=105]
Exiting the app
All Processing Done for MyFreelancerSubscriber
Done

Отменить Подписку

Мы можем использовать метод отмены подписки, чтобы прекратить получать сообщения от подписчика. Обратите внимание, что если мы отменим подписку, то подписчик не получит неполный или ошибочный сигнал.

Вот пример кода, в котором подписчик получает только 3 сообщения, а затем отменяет подписку.

@Override
public void onNext(Employee item) {
	System.out.println("Processing Employee "+item);
	counter++;
	if(counter==3) {
		this.subscription.cancel();
		return;
	}
	this.subscription.request(1);
}

Обратите внимание, что в этом случае наша логика остановки основного потока до того, как все сообщения будут обработаны, перейдет в бесконечный цикл. Мы можем добавить некоторую дополнительную логику для этого сценария, возможно, какую-то глобальную переменную, которую нужно искать, если подписчик прекратил обработку или отменил подписку.

Обратное Давление

Когда издатель создает сообщения с гораздо большей скоростью, чем их потребляет подписчик, возникает обратное давление. API потока не предоставляет никакого механизма для сигнализации о противодавлении или для борьбы с ним. Но мы можем разработать свою собственную стратегию, чтобы справиться с этим, например, настроить подписчика или снизить скорость передачи сообщений. Вы можете прочитать, как RxJava справляется с обратным давлением .

Резюме

Java 9 Flow API-это хороший шаг в направлении реактивного программирования и создания асинхронного неблокирующего приложения. Однако создание настоящего реактивного приложения возможно только в том случае, если его поддерживают все системные API.