Рубрики
Без рубрики

Объединяйте результаты параллельных запросов на обслуживание с помощью CompletableFuture

Вот подход к использованию java CompletableFuture для обеспечения выполнения / выполнения нескольких запросов на обслуживание… Помеченный java.

Вот подход к использованию java CompletableFuture для обеспечения параллельного запуска/выполнения нескольких запросов на обслуживание для сбора данных ответа службы/API и их агрегирования.

Местоположение на Github

СУНИЛ-КУМАР-L/Java Завершаемая будущая демо-версия

Объединяйте результаты параллельных запросов на обслуживание с помощью CompletableFuture

  • Подход 1: Простой старый простой способ java

давайте представим CompletableFuture с различными подходами

  • Подход 2: Параллельные вызовы служб и сбор данных с помощью CompletableFuture::get()

  • Подход 3: Параллельные вызовы служб и сбор данных с помощью CompletableFuture::join()

  • Подход 4: Параллельные вызовы служб и групповые фьючерсы с использованием CompletableFuture::allOf(), а затем собирают все данные

  • Подход 5 ( Мои предпочтения ): Параллельные вызовы служб и групповые фьючерсы с использованием CompletableFuture::allOf(), а затем сборка с помощью CompletableFuture::thenApply()

Подход 1:

Цель состоит в том, чтобы последовательно выполнить несколько вызовов службы (один за другим), а затем собрать ответ и объединить, чтобы получить окончательный составной ответ.

псевдокод

  • Клиент вводит идентификатор, чтобы получить Информацию о Человеке + Информацию Об Адресе (Составное Лицо С Информацией Об Адресе)

  • Для достижения этой цели -> выполнять последовательные запросы на обслуживание

  • запускает вызов службы Person, который взаимодействует с PersonRepository по идентификатору, чтобы получить информацию о человеке

  • запуск вызова службы адресов, которая взаимодействует с хранилищем адресов по идентификатору для получения информации об адресе

  • наконец, верните Человека С Информацией об Адресе, где Информация о человеке + Информация об Адресе суммируются.

код

public class Address {

    private String addressId;
    private String addressLine1;
    private String addressLine2;
    private String addressLine3;
    private String zipCode;
    private String city;
    private String state;
    private String country;

    public Address(
         String addressId, 
         String addressLine1, 
         String addressLine2, 
         String addressLine3, 
         String zipCode, 
         String city, 
         String state, 
         String country) {
        this.addressId = addressId;
        this.addressLine1 = addressLine1;
        this.addressLine2 = addressLine2;
        this.addressLine3 = addressLine3;
        this.zipCode = zipCode;
        this.city = city;
        this.state = state;
        this.country = country;
    }

// write toString, hashCode or use lombok annotation

}
public class AddressRepository {

    public Address getAddressById(String id){
// hard coded ... change it to real data // fetch from DB or API 
        return new Address( "1","add1", "add2", "add3","55305",
                "Mpls", "MN","USA");
    }
}
public class AddressService {

    private AddressRepository addressRepository;

    public AddressService(AddressRepository addressRepository) {
        this.addressRepository = addressRepository;
    }

    public Address getAddressById(String id) {
        return addressRepository.getAddressById(id);
    }
}
public class Person {

    public Person(String id, String firstName, String age) {
        this.id = id;
        this.firstName = firstName;
        this.age = age;
    }

    private String id;
    private String firstName;
    private String age;
   // write toString, hashCode or use lombok annotation
}
public class PersonRepository {

    public Person getPersonById(String id) {
// hard coded ... change it to real data // fetch from DB or API 
        return new Person("1", "hello", "22");
    }
}
public class PersonService {

    private PersonRepository personRepository;

    public PersonService(PersonRepository personRepository) {
        this.personRepository = personRepository;
    }

    public Person getPersonById(String id){
        return personRepository.getPersonById(id);
    }
}
public class PersonWithAddress {
    private Person person;
    private Address address;

    public PersonWithAddress(Person person, Address address) {
        this.person = person;
        this.address = address;
    }
// write toString, hashCode or use lombok annotation
}
public class CFMain {
    static PersonRepository personRepository = 
new PersonRepository();
    static PersonService personService = 
new PersonService(personRepository);

    static AddressRepository addressRepository = 
new AddressRepository();
    static AddressService addressService = 
new AddressService(addressRepository);

    public static void sequencial(String personId) {
        final Person personById = 
personService.getPersonById(personId);
        final Address addressById = 
addressService.getAddressById(personId);
        final PersonWithAddress personWithAddress = 
new PersonWithAddress(personById, addressById);
        System.out.println(personWithAddress);
    }

    public static void main(String[] args) {
        sequencial("1");
    }
}

Хорошо, что мы смогли достичь этой цели, но можно ли это улучшить? если да, то каким образом? Ответ: выполняйте каждый вызов службы параллельно, чтобы увеличить время отклика (см. Подход 2)

Подход 2:

Цель состоит в том, чтобы выполнить несколько вызовов службы параллельно с использованием Completablefuture и собрать данные из каждого CompletableFuture с помощью get() get() вызывает InterruptedException, ExecutionException ((Проверенное исключение)

псевдокод

  • Клиент вводит идентификатор, чтобы получить Информацию о Человеке + Информацию Об Адресе (Составное Лицо С Информацией Об Адресе)

  • Для достижения этой цели -> выполнять параллельные запросы на обслуживание

  • запустите вызов службы Person, который взаимодействует с PersonRepository по идентификатору, чтобы получить информацию о человеке с помощью java CompletableFuture (скажем, CompletableFuture 1)

  • запустите вызов службы адресов, который взаимодействует с хранилищем адресов по идентификатору, чтобы получить информацию об адресе с помощью java CompletableFuture (скажем, CompletableFuture 2)

  • теперь заблокируйте каждый CompletableFuture, чтобы получить данные (один за другим/последовательно), используя метод CompletableFuture::get

  • наконец, верните Человека С Информацией об Адресе, где Информация о человеке + Информация об Адресе суммируются.

улучшение кода

import java.util.concurrent.CompletableFuture;

public class CFMain {

    static PersonRepository personRepository = 
new PersonRepository();
    static PersonService personService = new PersonService(personRepository);

    static AddressRepository addressRepository = 
new AddressRepository();
    static AddressService addressService = new AddressService(addressRepository);

    public static void main(String[] args) {
        parallelServiceCallWithCFUsingGet("1");
    }


    public static void parallelServiceCallWithCFUsingGet(String personId) {
        final CompletableFuture personCompletableFuture = 
CompletableFuture.supplyAsync(() -> 
personService.getPersonById(personId));
        final CompletableFuture
addressCompletableFuture = CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId)); try { final Person person = personCompletableFuture.get(); // until this is not complete below line is not executed final Address address = addressCompletableFuture.get(); // until this is not complete below line is not executed // below line will be executed based on the slowest service response System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service response time } catch (Exception exp) { System.err.println(exp); } } }

Хорошо, что мы смогли достичь этой цели, но можно ли это улучшить? Хорошо, что мы смогли достичь этой цели, но можно ли это улучшить? Ответ: используйте CompletableFuture join() вместо get() (см. Подход 3)

Подход 3:

Цель состоит в том, чтобы выполнить несколько вызовов службы параллельно с использованием Completablefuture и собрать данные из каждого CompletableFuture с помощью join() join() вызывает исключение UncheckedException (исключение RuntimeException)

улучшение кода

import java.util.concurrent.CompletableFuture;

public class CFMain {

    static PersonRepository personRepository = 
new PersonRepository();
    static PersonService personService = 
new PersonService(personRepository);

    static AddressRepository addressRepository = 
new AddressRepository();
    static AddressService addressService = 
new AddressService(addressRepository);

    public static void main(String[] args) {
        parallelServiceCallWithCFUsingJoin("1");
    }

    public static void parallelServiceCallWithCFUsingJoin(String personId) {
        final CompletableFuture personCompletableFuture = 
CompletableFuture.supplyAsync(() -> 
personService.getPersonById(personId));
        final CompletableFuture
addressCompletableFuture = CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId)); final Person person = personCompletableFuture.join(); // until this is not complete below line is not executed final Address address = addressCompletableFuture.join(); // until this is not complete below line is not executed // below line will be executed based on the slowest service response System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service response time } }

Хорошо, что мы смогли достичь этой цели, но можно ли это улучшить? если да, то каким образом? Ответ: используйте CompletableFuture allOf(CompletableFuture >... >... cfs), чтобы добавить все фьючерсы (см. Подход 4)

Подход 4:

Цель состоит в том, чтобы выполнить несколько вызовов служб параллельно с использованием Completablefuture и использовать все, а затем собирать данные из каждого CompletableFuture с помощью join() или get()

улучшение кода

import java.util.concurrent.CompletableFuture;

public class CFMain {

    static PersonRepository personRepository = 
new PersonRepository();
    static PersonService personService = 
new PersonService(personRepository);

    static AddressRepository addressRepository = 
new AddressRepository();
    static AddressService addressService = 
new AddressService(addressRepository);

    public static void main(String[] args) {
        parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndGet("1");
        parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndJoin("1");
    }

    public static void parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndGet(String personId) {
        final CompletableFuture personCompletableFuture =
 CompletableFuture.supplyAsync(() -> 
personService.getPersonById(personId));
        final CompletableFuture
addressCompletableFuture = CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId)); final CompletableFuture completableFutureAllOf = CompletableFuture.allOf(personCompletableFuture, addressCompletableFuture); try { completableFutureAllOf.get(); // time taken to return is based on the slowest service response/ slowest future response final Person person = personCompletableFuture.get(); final Address address = addressCompletableFuture.get(); System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service return time } catch (Exception exp) { System.err.println(exp); } } public static void parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndJoin(String personId) { final CompletableFuture personCompletableFuture = CompletableFuture.supplyAsync(() -> personService.getPersonById(personId)); final CompletableFuture
addressCompletableFuture = CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId)); final CompletableFuture completableFutureAllOf = CompletableFuture.allOf( personCompletableFuture, addressCompletableFuture); completableFutureAllOf.join(); // time taken to return is based on the slowest future call final Person person = personCompletableFuture.join(); final Address address = addressCompletableFuture.join(); System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service response time } }

Хорошо, что мы смогли достичь этой цели, но можно ли это улучшить? если да, то каким образом? Ответ: Рефакторинг подхода 4, сборка CompletableFuture::allOf () () возвращаемый тип с помощью thenApply() (см. Подход 5)

Подход 5:

Цель состоит в том, чтобы выполнить несколько вызовов служб параллельно с использованием Completablefuture и использовать все затем агрегированные данные с помощью thenApply()

псевдокод

  • Клиент вводит идентификатор, чтобы получить Информацию о Человеке + Информацию Об Адресе (Составное Лицо С Информацией Об Адресе)

  • Для достижения этой цели -> выполнять параллельные запросы на обслуживание

  • запустите вызов службы Person, который взаимодействует с PersonRepository по идентификатору, чтобы получить информацию о человеке с помощью java CompletableFuture (скажем, CompletableFuture 1)

  • запустите вызов службы адресов, который взаимодействует с хранилищем адресов по идентификатору, чтобы получить информацию об адресе с помощью java CompletableFuture (скажем, CompletableFuture 2)

  • добавьте эти CompletableFutures в CompletableFuture::allOf

  • после описанного выше шага составьте/соберите выше CompletableFuture, используя затем Apply()

  • теперь получите Человека С информацией об адресе, где Информация о человеке + Данные об адресе объединяются с помощью CompletableFuture.join()

улучшение кода

import java.util.concurrent.CompletableFuture;

public class CFMain {

    static PersonRepository personRepository = 
new PersonRepository();
    static PersonService personService = 
new PersonService(personRepository);

    static AddressRepository addressRepository = 
new AddressRepository();
    static AddressService addressService = 
new AddressService(addressRepository);

    public static void main(String[] args) {
 parallelServiceCallUseAllToMergeFuturesDataUseAllAndThenApply("1");
    }

    public static void parallelServiceCallUseAllToMergeFuturesDataUseAllAndThenApply(String personId) {
        final CompletableFuture personCompletableFuture = 
CompletableFuture.supplyAsync(() -> 
personService.getPersonById(personId));
        final CompletableFuture
addressCompletableFuture = CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId)); final CompletableFuture completableFutureAllOf = CompletableFuture.allOf(personCompletableFuture, addressCompletableFuture); final CompletableFuture personWithAddressCompletableFuture = completableFutureAllOf.thenApply( (voidInput) -> new PersonWithAddress( personCompletableFuture.join(), addressCompletableFuture.join())); System.out.println(personWithAddressCompletableFuture.join()); // time taken to return is based on the slowest service response time } }

Оригинал: “https://dev.to/sunilkumarl/merge-results-of-parallel-service-requests-using-completablefuture-40no”