Вот подход к использованию java CompletableFuture для обеспечения параллельного запуска/выполнения нескольких запросов на обслуживание для сбора данных ответа службы/API и их агрегирования.
Местоположение на Github
СУНИЛ-КУМАР-L/Java Завершаемая будущая демо-версия
Объединяйте результаты параллельных запросов на обслуживание с помощью CompletableFuture
- Подход 1: Простой старый простой способ java
давайте представим CompletableFuture с различными подходами
Подход 2: Параллельные вызовы служб и сбор данных с помощью CompletableFuture::get()
Подход 3: Параллельные вызовы служб и сбор данных с помощью CompletableFuture::join()
Подход 4: Параллельные вызовы служб и групповые фьючерсы с использованием CompletableFuture::allOf(), а затем собирают все данные
Подход 5 (
Мои предпочтения
): Параллельные вызовы служб и групповые фьючерсы с использованием CompletableFuture::allOf(), а затем сборка с помощью CompletableFuture::thenApply()
Подход 1:
Цель состоит в том, чтобы последовательно выполнить несколько вызовов службы (один за другим), а затем собрать ответ и объединить, чтобы получить окончательный составной ответ.
псевдокод
Клиент вводит идентификатор, чтобы получить Информацию о Человеке + Информацию Об Адресе (Составное Лицо С Информацией Об Адресе)
Для достижения этой цели -> выполнять последовательные запросы на обслуживание
запускает вызов службы Person, который взаимодействует с PersonRepository по идентификатору, чтобы получить информацию о человеке
запуск вызова службы адресов, которая взаимодействует с хранилищем адресов по идентификатору для получения информации об адресе
наконец, верните Человека С Информацией об Адресе, где Информация о человеке + Информация об Адресе суммируются.
код
public class Address { private String addressId; private String addressLine1; private String addressLine2; private String addressLine3; private String zipCode; private String city; private String state; private String country; public Address( String addressId, String addressLine1, String addressLine2, String addressLine3, String zipCode, String city, String state, String country) { this.addressId = addressId; this.addressLine1 = addressLine1; this.addressLine2 = addressLine2; this.addressLine3 = addressLine3; this.zipCode = zipCode; this.city = city; this.state = state; this.country = country; } // write toString, hashCode or use lombok annotation }
public class AddressRepository { public Address getAddressById(String id){ // hard coded ... change it to real data // fetch from DB or API return new Address( "1","add1", "add2", "add3","55305", "Mpls", "MN","USA"); } }
public class AddressService { private AddressRepository addressRepository; public AddressService(AddressRepository addressRepository) { this.addressRepository = addressRepository; } public Address getAddressById(String id) { return addressRepository.getAddressById(id); } }
public class Person { public Person(String id, String firstName, String age) { this.id = id; this.firstName = firstName; this.age = age; } private String id; private String firstName; private String age; // write toString, hashCode or use lombok annotation }
public class PersonRepository { public Person getPersonById(String id) { // hard coded ... change it to real data // fetch from DB or API return new Person("1", "hello", "22"); } }
public class PersonService { private PersonRepository personRepository; public PersonService(PersonRepository personRepository) { this.personRepository = personRepository; } public Person getPersonById(String id){ return personRepository.getPersonById(id); } }
public class PersonWithAddress { private Person person; private Address address; public PersonWithAddress(Person person, Address address) { this.person = person; this.address = address; } // write toString, hashCode or use lombok annotation }
public class CFMain { static PersonRepository personRepository = new PersonRepository(); static PersonService personService = new PersonService(personRepository); static AddressRepository addressRepository = new AddressRepository(); static AddressService addressService = new AddressService(addressRepository); public static void sequencial(String personId) { final Person personById = personService.getPersonById(personId); final Address addressById = addressService.getAddressById(personId); final PersonWithAddress personWithAddress = new PersonWithAddress(personById, addressById); System.out.println(personWithAddress); } public static void main(String[] args) { sequencial("1"); } }
Хорошо, что мы смогли достичь этой цели, но можно ли это улучшить? если да, то каким образом? Ответ: выполняйте каждый вызов службы параллельно, чтобы увеличить время отклика (см. Подход 2)
Подход 2:
Цель состоит в том, чтобы выполнить несколько вызовов службы параллельно с использованием Completablefuture и собрать данные из каждого CompletableFuture с помощью get() get() вызывает InterruptedException, ExecutionException
((Проверенное исключение)
псевдокод
Клиент вводит идентификатор, чтобы получить Информацию о Человеке + Информацию Об Адресе (Составное Лицо С Информацией Об Адресе)
Для достижения этой цели -> выполнять параллельные запросы на обслуживание
запустите вызов службы Person, который взаимодействует с PersonRepository по идентификатору, чтобы получить информацию о человеке с помощью java CompletableFuture (скажем, CompletableFuture 1)
запустите вызов службы адресов, который взаимодействует с хранилищем адресов по идентификатору, чтобы получить информацию об адресе с помощью java CompletableFuture (скажем, CompletableFuture 2)
теперь заблокируйте каждый CompletableFuture, чтобы получить данные (один за другим/последовательно), используя метод CompletableFuture::get
наконец, верните Человека С Информацией об Адресе, где Информация о человеке + Информация об Адресе суммируются.
улучшение кода
import java.util.concurrent.CompletableFuture; public class CFMain { static PersonRepository personRepository = new PersonRepository(); static PersonService personService = new PersonService(personRepository); static AddressRepository addressRepository = new AddressRepository(); static AddressService addressService = new AddressService(addressRepository); public static void main(String[] args) { parallelServiceCallWithCFUsingGet("1"); } public static void parallelServiceCallWithCFUsingGet(String personId) { final CompletableFuturepersonCompletableFuture = CompletableFuture.supplyAsync(() -> personService.getPersonById(personId)); final CompletableFuture addressCompletableFuture = CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId)); try { final Person person = personCompletableFuture.get(); // until this is not complete below line is not executed final Address address = addressCompletableFuture.get(); // until this is not complete below line is not executed // below line will be executed based on the slowest service response System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service response time } catch (Exception exp) { System.err.println(exp); } } }
Хорошо, что мы смогли достичь этой цели, но можно ли это улучшить? Хорошо, что мы смогли достичь этой цели, но можно ли это улучшить? Ответ: используйте CompletableFuture join() вместо get() (см. Подход 3)
Подход 3:
Цель состоит в том, чтобы выполнить несколько вызовов службы параллельно с использованием Completablefuture и собрать данные из каждого CompletableFuture с помощью join() join() вызывает исключение UncheckedException
(исключение RuntimeException)
улучшение кода
import java.util.concurrent.CompletableFuture; public class CFMain { static PersonRepository personRepository = new PersonRepository(); static PersonService personService = new PersonService(personRepository); static AddressRepository addressRepository = new AddressRepository(); static AddressService addressService = new AddressService(addressRepository); public static void main(String[] args) { parallelServiceCallWithCFUsingJoin("1"); } public static void parallelServiceCallWithCFUsingJoin(String personId) { final CompletableFuturepersonCompletableFuture = CompletableFuture.supplyAsync(() -> personService.getPersonById(personId)); final CompletableFuture addressCompletableFuture = CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId)); final Person person = personCompletableFuture.join(); // until this is not complete below line is not executed final Address address = addressCompletableFuture.join(); // until this is not complete below line is not executed // below line will be executed based on the slowest service response System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service response time } }
Хорошо, что мы смогли достичь этой цели, но можно ли это улучшить? если да, то каким образом? Ответ: используйте CompletableFuture allOf(CompletableFuture >... >... cfs), чтобы добавить все фьючерсы (см. Подход 4)
Подход 4:
Цель состоит в том, чтобы выполнить несколько вызовов служб параллельно с использованием Completablefuture и использовать все, а затем собирать данные из каждого CompletableFuture с помощью join() или get()
улучшение кода
import java.util.concurrent.CompletableFuture; public class CFMain { static PersonRepository personRepository = new PersonRepository(); static PersonService personService = new PersonService(personRepository); static AddressRepository addressRepository = new AddressRepository(); static AddressService addressService = new AddressService(addressRepository); public static void main(String[] args) { parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndGet("1"); parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndJoin("1"); } public static void parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndGet(String personId) { final CompletableFuturepersonCompletableFuture = CompletableFuture.supplyAsync(() -> personService.getPersonById(personId)); final CompletableFuture addressCompletableFuture = CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId)); final CompletableFuture completableFutureAllOf = CompletableFuture.allOf(personCompletableFuture, addressCompletableFuture); try { completableFutureAllOf.get(); // time taken to return is based on the slowest service response/ slowest future response final Person person = personCompletableFuture.get(); final Address address = addressCompletableFuture.get(); System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service return time } catch (Exception exp) { System.err.println(exp); } } public static void parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndJoin(String personId) { final CompletableFuture personCompletableFuture = CompletableFuture.supplyAsync(() -> personService.getPersonById(personId)); final CompletableFuture addressCompletableFuture = CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId)); final CompletableFuture completableFutureAllOf = CompletableFuture.allOf( personCompletableFuture, addressCompletableFuture); completableFutureAllOf.join(); // time taken to return is based on the slowest future call final Person person = personCompletableFuture.join(); final Address address = addressCompletableFuture.join(); System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service response time } }
Хорошо, что мы смогли достичь этой цели, но можно ли это улучшить? если да, то каким образом? Ответ: Рефакторинг подхода 4, сборка CompletableFuture::allOf () () возвращаемый тип с помощью thenApply() (см. Подход 5)
Подход 5:
Цель состоит в том, чтобы выполнить несколько вызовов служб параллельно с использованием Completablefuture и использовать все затем агрегированные данные с помощью thenApply()
псевдокод
Клиент вводит идентификатор, чтобы получить Информацию о Человеке + Информацию Об Адресе (Составное Лицо С Информацией Об Адресе)
Для достижения этой цели -> выполнять параллельные запросы на обслуживание
запустите вызов службы Person, который взаимодействует с PersonRepository по идентификатору, чтобы получить информацию о человеке с помощью java CompletableFuture (скажем, CompletableFuture 1)
запустите вызов службы адресов, который взаимодействует с хранилищем адресов по идентификатору, чтобы получить информацию об адресе с помощью java CompletableFuture (скажем, CompletableFuture 2)
добавьте эти CompletableFutures в CompletableFuture::allOf
после описанного выше шага составьте/соберите выше CompletableFuture, используя затем Apply()
теперь получите Человека С информацией об адресе, где Информация о человеке + Данные об адресе объединяются с помощью CompletableFuture.join()
улучшение кода
import java.util.concurrent.CompletableFuture; public class CFMain { static PersonRepository personRepository = new PersonRepository(); static PersonService personService = new PersonService(personRepository); static AddressRepository addressRepository = new AddressRepository(); static AddressService addressService = new AddressService(addressRepository); public static void main(String[] args) { parallelServiceCallUseAllToMergeFuturesDataUseAllAndThenApply("1"); } public static void parallelServiceCallUseAllToMergeFuturesDataUseAllAndThenApply(String personId) { final CompletableFuturepersonCompletableFuture = CompletableFuture.supplyAsync(() -> personService.getPersonById(personId)); final CompletableFuture addressCompletableFuture = CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId)); final CompletableFuture completableFutureAllOf = CompletableFuture.allOf(personCompletableFuture, addressCompletableFuture); final CompletableFuture personWithAddressCompletableFuture = completableFutureAllOf.thenApply( (voidInput) -> new PersonWithAddress( personCompletableFuture.join(), addressCompletableFuture.join())); System.out.println(personWithAddressCompletableFuture.join()); // time taken to return is based on the slowest service response time } }
Оригинал: “https://dev.to/sunilkumarl/merge-results-of-parallel-service-requests-using-completablefuture-40no”