Рубрики
Без рубрики

Руководство По CompletableFuture

Краткое и практическое руководство по полной реализации Java 8.

Автор оригинала: baeldung.

1. введение

Этот учебник представляет собой руководство по функциональности и вариантам использования класса CompletableFuture , который был представлен в качестве улучшения API параллелизма Java 8.

Дальнейшее чтение:

Запускаемый против Вызываемый на Java

Руководство по java.util.concurrent.Будущее

2. Асинхронные вычисления в Java

Об асинхронных вычислениях трудно рассуждать. Обычно мы хотим думать о любом вычислении как о серии шагов, но в случае асинхронных вычислений действия, представленные в виде обратных вызовов, как правило, либо разбросаны по коду, либо глубоко вложены друг в друга . Все становится еще хуже, когда нам нужно обрабатывать ошибки, которые могут возникнуть во время одного из шагов.

Интерфейс Future был добавлен в Java 5, чтобы служить результатом асинхронного вычисления, но у него не было никаких методов для объединения этих вычислений или обработки возможных ошибок.

Java 8 представила класс CompletableFuture . Наряду с Будущим интерфейсом, он также реализовал Этап завершения интерфейс. Этот интерфейс определяет контракт для шага асинхронного вычисления, который мы можем объединить с другими шагами.

CompletableFuture является одновременно строительным блоком и фреймворком, с около 50 различными методами для составления, объединения и выполнения асинхронных вычислительных шагов и обработки ошибок .

Такой большой API может быть подавляющим, но они в основном попадают в несколько четких и отчетливых случаев использования.

3. Использование CompletableFuture в качестве простого будущего

Прежде всего, класс CompletableFuture реализует интерфейс Future , поэтому мы можем использовать его как реализацию Future , но с дополнительной логикой завершения .

Например, мы можем создать экземпляр этого класса с конструктором no-arg для представления некоторого будущего результата, передать его потребителям и завершить его в какой-то момент в будущем, используя метод complete . Потребители могут использовать метод get для блокировки текущего потока до тех пор, пока не будет получен этот результат.

В приведенном ниже примере у нас есть метод, который создает экземпляр CompletableFuture , затем выполняет некоторые вычисления в другом потоке и немедленно возвращает Future .

Когда вычисление выполнено, метод завершает Future , предоставляя результат методу complete :

public Future calculateAsync() throws InterruptedException {
    CompletableFuture completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });

    return completableFuture;
}

Чтобы запустить вычисление, мы используем API Executor . Этот метод создания и завершения CompletableFuture может использоваться вместе с любым механизмом параллелизма или API, включая необработанные потоки.

Обратите внимание, что метод calculateAsync возвращает Будущий экземпляр .

Мы просто вызываем метод, получаем экземпляр Future и вызываем на нем метод get , когда мы готовы заблокировать результат.

Также обратите внимание, что метод get вызывает некоторые проверенные исключения, а именно ExecutionException (инкапсулирует исключение, возникшее во время вычисления) и InterruptedException (исключение, означающее, что поток, выполняющий метод, был прерван):

Future completableFuture = calculateAsync();

// ... 

String result = completableFuture.get();
assertEquals("Hello", result);

Если мы уже знаем результат вычисления , мы можем использовать метод static completedFuture с аргументом, представляющим результат этого вычисления. Следовательно, метод get метода Future никогда не будет заблокирован, вместо этого он немедленно вернет этот результат:

Future completableFuture = 
  CompletableFuture.completedFuture("Hello");

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

В качестве альтернативного сценария мы можем захотеть отменить исполнение будущего .

4. CompletableFuture с инкапсулированной логикой вычислений

Приведенный выше код позволяет нам выбрать любой механизм параллельного выполнения, но что, если мы хотим пропустить этот шаблон и просто выполнить некоторый код асинхронно?

Статические методы RunAsync и supplyAsync позволяют нам создавать CompletableFuture экземпляр из Runnable и Supplier функциональных типов соответственно.

Оба Runnable и Supplier являются функциональными интерфейсами, которые позволяют передавать свои экземпляры в виде лямбда-выражений благодаря новой функции Java 8.

Интерфейс Runnable – это тот же старый интерфейс, который используется в потоках, и он не позволяет возвращать значение.

Интерфейс Supplier – это универсальный функциональный интерфейс с одним методом, который не имеет аргументов и возвращает значение параметризованного типа.

Это позволяет нам предоставить экземпляр Поставщика в виде лямбда-выражения, которое выполняет вычисление и возвращает результат . Это так же просто, как:

CompletableFuture future
  = CompletableFuture.supplyAsync(() -> "Hello");

// ...

assertEquals("Hello", future.get());

5. Обработка результатов асинхронных вычислений

Наиболее общий способ обработки результата вычисления-передать его в функцию. Метод then Apply делает именно это; он принимает экземпляр Функции , использует его для обработки результата и возвращает Future , который содержит значение, возвращаемое функцией:

CompletableFuture completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture future = completableFuture
  .thenApply(s -> s + " World");

assertEquals("Hello World", future.get());

Если нам не нужно возвращать значение в цепочке Future , мы можем использовать экземпляр функционального интерфейса Consumer . Его единственный метод принимает параметр и возвращает void .

В CompletableFuture есть метод для этого варианта использования. Метод |/Accept получает Потребитель и передает ему результат вычисления. Затем окончательный вызов future.get() возвращает экземпляр типа Void :

CompletableFuture completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));

future.get();

Наконец, если нам не нужно значение вычисления и мы не хотим возвращать какое-то значение в конце цепочки, мы можем передать Runnable lambda в метод thenRun . В следующем примере мы просто печатаем строку в консоли после вызова future.get():

CompletableFuture completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));

future.get();

6. Объединение Фьючерсов

Лучшая часть CompletableFuture API-это возможность комбинировать CompletableFuture экземпляры в цепочке вычислительных шагов .

Результатом этой цепочки является сама по себе CompletableFuture , которая позволяет дальнейшую цепочку и объединение. Этот подход широко распространен в функциональных языках и часто упоминается как монадический шаблон проектирования.

В следующем примере мы используем затем Составьте способ соединения двух цепей Фьючерсы последовательно.

Обратите внимание, что этот метод принимает функцию, которая возвращает экземпляр CompletableFuture . Аргумент этой функции является результатом предыдущего шага вычисления. Это позволяет нам использовать это значение внутри следующей лямбды CompletableFuture :

CompletableFuture completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

assertEquals("Hello World", completableFuture.get());

Метод then Compose вместе с then Apply/| реализует основные строительные блоки монадического шаблона. Они тесно связаны с методами map и flatMap классов Stream и Optional , также доступными в Java 8.

Оба метода получают функцию и применяют ее к результату вычисления, но затем они Составляют ( flatMap ) метод получает функцию, которая возвращает другой объект того же типа . Эта функциональная структура позволяет создавать экземпляры этих классов в качестве строительных блоков.

Если мы хотим выполнить две независимые Функции и что-то сделать с их результатами, мы можем использовать их , а затем объединить метод, который принимает Будущее и Функцию с двумя аргументами для обработки обоих результатов:

CompletableFuture completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2));

assertEquals("Hello World", completableFuture.get());

Более простой случай-это когда мы хотим что-то сделать с двумя результатами Futures , но нам не нужно передавать какое-либо результирующее значение по цепочке Future . Метод thenAcceptBoth может помочь:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));

7. Разница между then Apply() и then Compose()

В наших предыдущих разделах мы показали примеры, касающиеся then Apply() и then Compose() . Оба API помогают связывать разные вызовы CompletableFuture , но использование этих 2 функций отличается.

7.1. затем Примените()

Мы можем использовать этот метод для работы с результатом предыдущего вызова. Однако ключевым моментом, который следует помнить, является то, что тип возвращаемого значения будет объединен из всех вызовов.

Таким образом, этот метод полезен, когда мы хотим преобразовать результат вызова CompletableFuture :

CompletableFuture finalResult = compute().thenApply(s-> s + 1);

7.2. Затем составьте()

Метод then Compose() аналогичен методу then Apply() в том, что оба возвращают новый этап завершения. Однако затем Compose() использует предыдущий этап в качестве аргумента . Он будет сглаживаться и возвращать Будущее с прямым результатом, а не вложенный объект, как мы наблюдали в затем применить():

CompletableFuture computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture finalResult = compute().thenCompose(this::computeAnother);

Поэтому, если идея заключается в цепочке CompletableFuture методов, то лучше использовать , а затем Compose() .

Кроме того, обратите внимание, что разница между этими двумя методами аналогична разнице между map() и flatMap() .

8. Параллельное выполнение нескольких фьючерсов

Когда нам нужно выполнить несколько фьючерсов параллельно, мы обычно хотим дождаться выполнения всех из них, а затем обработать их объединенные результаты.

Статический метод CompletableFuture.allOf позволяет дождаться завершения всех функций , предоставляемых в качестве vararg:

CompletableFuture future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture future3  
  = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);

// ...

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

Обратите внимание, что возвращаемый тип CompletableFuture.allOf() является CompletableFuture . Ограничение этого метода заключается в том, что он не возвращает объединенные результаты всех фьючерсов . Вместо этого мы должны вручную получать результаты из фьючерсов . К счастью, метод CompletableFuture.join() и API потоков Java 8 делают его простым:

String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);

Метод CompletableFuture.join() аналогичен методу get , но он вызывает непроверенное исключение в случае, если Future не завершается нормально. Это позволяет использовать его в качестве ссылки на метод в методе Stream.map () .

9. Обработка Ошибок

Для обработки ошибок в цепочке асинхронных вычислительных шагов мы должны адаптировать идиому throw/catch аналогичным образом.

Вместо того, чтобы перехватывать исключение в синтаксическом блоке, класс CompletableFuture позволяет нам обрабатывать его в специальном методе handle . Этот метод получает два параметра: результат вычисления (если оно завершилось успешно) и исключение (если какой-то шаг вычисления не завершился нормально).

В следующем примере мы используем метод handle для предоставления значения по умолчанию, когда асинхронное вычисление приветствия было завершено с ошибкой из-за отсутствия имени:

String name = null;

// ...

CompletableFuture completableFuture  
  =  CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, " + name;
  }).handle((s, t) -> s != null ? s : "Hello, Stranger!");

assertEquals("Hello, Stranger!", completableFuture.get());

В качестве альтернативного сценария предположим, что мы хотим вручную заполнить Future значением, как в первом примере, но также имеем возможность завершить его с исключением. Метод complete Исключительно предназначен именно для этого. Метод CompletableFuture.get() в следующем примере вызывает исключение ExecutionException с исключением RuntimeException в качестве причины:

CompletableFuture completableFuture = new CompletableFuture<>();

// ...

completableFuture.completeExceptionally(
  new RuntimeException("Calculation failed!"));

// ...

completableFuture.get(); // ExecutionException

В приведенном выше примере мы могли бы обработать исключение с помощью метода handle асинхронно, но с помощью метода get мы можем использовать более типичный подход синхронной обработки исключений.

10. Асинхронные методы

Большинство методов fluent API в классе CompletableFuture имеют два дополнительных варианта с Async postfix. Эти методы обычно предназначены для выполнения соответствующего шага выполнения в другом потоке .

Методы без Async postfix запускают следующий этап выполнения с помощью вызывающего потока. Напротив, метод Async без аргумента Executor выполняет шаг с использованием реализации общего fork/join пула Executor , доступ к которому осуществляется с помощью метода ForkJoinPool.commonPool () . Наконец, метод Async с аргументом Executor выполняет шаг, используя переданный Executor .

Вот модифицированный пример, который обрабатывает результат вычисления с помощью экземпляра функции . Единственным видимым отличием является метод thenApplyAsync , но под капотом приложение функции завернуто в экземпляр ForkJoinTask (для получения дополнительной информации о фреймворке fork/join см. Статью “Руководство по фреймворку Fork/Join в Java” ). Это позволяет нам еще больше распараллеливать наши вычисления и более эффективно использовать системные ресурсы:

CompletableFuture completableFuture  
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture future = completableFuture
  .thenApplyAsync(s -> s + " World");

assertEquals("Hello World", future.get());

11. JDK 9 CompletableFuture API

Java 9 улучшает CompletableFuture API со следующими изменениями:

  • Добавлены новые заводские методы
  • Поддержка задержек и тайм-аутов
  • Улучшена поддержка подклассов

и новые API-интерфейсы экземпляров:

  • Исполнитель defaultExecutor()
  • CompletableFuture новое незавершенное будущее()
  • CompletableFuture копия()
  • Стадия завершения минимальная стадия завершения()
  • CompletableFuture полная асинхронность(Поставщик T> поставщик, исполнитель исполнитель) T> поставщик, исполнитель исполнитель)
  • CompletableFuture полная асинхронность(Поставщик расширяет T> поставщик) расширяет T> поставщик)
  • CompletableFuture timeout(длительный тайм-аут, единица измерения времени)
  • CompletableFuture completeOnTimeout(значение T, длительный тайм-аут, единица измерения времени)

Теперь у нас также есть несколько статических служебных методов:

  • Исполнитель delayedExecutor(длительная задержка, единица измерения времени, Исполнитель исполнитель)
  • Исполнитель delayedExecutor(длительная задержка, единица измерения времени)
  • CompletionStage completedStage(значение U)
  • < Этап завершения неудачный этап(Бросаемый ex)
  • < CompletableFuture неудачное будущее(Throwable ex)

Наконец, чтобы решить проблему тайм-аута, Java 9 представила еще две новые функции:

  • перерыв()
  • completeOnTimeout()

Вот подробная статья для дальнейшего чтения: Улучшения API Java 9 CompletableFuture .

12. Заключение

В этой статье мы описали методы и типичные случаи использования класса CompletableFuture .

Исходный код статьи доступен на GitHub .