Рубрики
Без рубрики

Tascalate Concurrent – Заполнение пробелов в API CompletableFuture (Часть 1)

Параллельная библиотека Tascalate предоставляет реализацию интерфейса этапа завершения и связанных классов, которые предназначены для поддержки длительных задач блокировки (как правило, связанных с вводом-выводом). Эта функциональность дополняет единственную встроенную реализацию Java 8, CompletableFuture, которая в первую очередь поддерживает вычислительные задачи. Кроме того, библиотека помогает решать многочисленные задачи асинхронного программирования, такие как обработка тайм-аутов, функции повтора/опроса, организация результатов нескольких параллельных вычислений и тому подобное. С тегами java, с открытым исходным кодом, программирование, кодирование.

Библиотека Tascalate Concurrent предоставляет реализацию интерфейса Стадии завершения и связанных классов, которые предназначены для поддержки длительных задач блокировки (как правило, связанных с вводом-выводом). Эта функциональность расширяет единственную встроенную реализацию Java 8, CompletableFuture , то есть в первую очередь поддерживает вычислительные задачи. Кроме того, библиотека помогает решать многочисленные задачи асинхронного программирования, такие как обработка тайм-аутов, функции повтора/опроса, организация результатов нескольких параллельных вычислений и тому подобное.

Библиотека поставляется в виде JAR с несколькими выпусками и может использоваться как с Java 8 в качестве библиотеки путей к классам, так и с Java 9+ в качестве модуля.

Существует несколько недостатков, связанных с реализацией CompletableFuture , которые усложняют ее использование для реального асинхронного программирования, особенно когда вам приходится работать с прерываемыми задачами, связанными с вводом-выводом:

  1. Завершаемое будущее.отмена() метод не прерывает основной поток; он просто переводит future в исключительно завершенное состояние. Таким образом, даже если вы используете какие-либо блокирующие вызовы внутри функций, переданных в thenApplyAsync /|thenAcceptAsync | и т.д. - эти функции будут выполняться до конца и никогда не будут прерваны. Пожалуйста, смотрите Завершаемое будущее не может быть прервано Томаш Нуркевич. По умолчанию все
  2. *Асинхронные методы композиции Завершаемого будущего используют ForkJoinPool.commonPool() ( смотрите здесь ), если не указан явный Исполнитель . Этот пул потоков является общим для всех CompletableFuture -s и все параллельные потоки во всех приложениях, развернутых на одной и той же JVM. Этот жестко запрограммированный настраиваемый пул потоков полностью находится вне контроля разработчиков приложений, его трудно контролировать и масштабировать. Поэтому в надежных реальных приложениях вы всегда должны указывать свой собственный Исполнитель . С помощью улучшений API в Java 9+ вы можете исправить этот недостаток, но для этого потребуется некоторое пользовательское кодирование. Кроме того, встроенные классы параллелизма Java 8 предоставляют довольно неудобный API для объединения нескольких
  3. стадий завершения -s. CompletableFuture.Все|/ CompletableFuture.любой из методов принимает только CompletableFuture в качестве аргументов; у вас нет механизма для объединения произвольных Этап завершения -ов без преобразования их в Завершаемый будущий первый. Кроме того, тип возвращаемого значения вышеупомянутого CompletableFuture.allOf объявлен как CompletableFuture - следовательно, вы не можете удобно извлекать отдельные результаты каждого предоставленного будущего. |/CompletableFuture.любой из в этом отношении еще хуже; для получения более подробной информации, пожалуйста, прочитайте здесь: CompletableFuture в действии (см. Недостатки) Томаша Нуркевича. Поддержка тайм-аутов/задержек была введена в
  4. CompletableFuture только в Java 9, поэтому все еще широко поддерживаемые приложения, работающие на Java 8, остаются без этой важной функции. Кроме того, некоторые проектные решения, такие как использование отложенных исполнителей вместо оператора “задержка”, несколько сомнительны. Существует множество бесплатных библиотек с открытым исходным кодом, которые устраняют некоторые из вышеупомянутых недостатков. Однако ни один из них не обеспечивает реализацию прерываемого Этап Завершения и никто не решает все вопросы согласованно.

Чтобы использовать библиотеку, вам нужно добавить одну зависимость Maven


    net.tascalate.concurrent
    net.tascalate.concurrent.lib
    0.7.1

1. Интерфейс обещания

Это основной интерфейс параллельной библиотеки Tascalate. Это может быть лучше всего описано формулой:

Promise == CompletionStage + Future

Т.е. он сочетает в себе обе блокировки Будущий API, включая отмену (логическое значение может привести к прерыванию) |/метод , И композиционные возможности Стадии завершения API. Важно отметить, что все методы композиции Стадии завершения API ( затем принять , затем объединить , по завершении и т.д.) повторно объявляются для возврата Обещаю также.

Решение о внедрении интерфейса, который объединяет Стадия завершения и Будущее согласовано с дизайном CompletableFuture API. Кроме того, также добавлено несколько полезных методов CompletableFuture API:

T getNow(T valueIfAbsent) throws CancellationException, CompletionException;
T getNow(Supplier valueIfAbsent) throws CancellationException, CompletionException;
T join() throws CancellationException, CompletionException;

Поэтому использовать должно быть довольно просто Обещание в качестве замены для Завершаемое будущее во многих случаях.

Кроме того, в API Promise есть множество операторов для работы с тайм-аутами и задержками, для переопределения асинхронного исполнителя по умолчанию и тому подобного. Все они будут обсуждены позже.

При обсуждении Обещания интерфейса обязательно следует упомянуть сопутствующий класс Обещания , который предоставляет несколько полезных методов для адаптации стороннего Этапа завершения (включая стандарт Завершаемое будущее ) к Обещанию API. Во-первых, есть две операции с единицами для создания успешно/исправленных ошибок Обещание -эс:

static  Promise success(T value)
static  Promise failure(Throwable exception)

Во-вторых, существует метод адаптера из :

static  Promise from(CompletionStage stage)

Он ведет себя следующим образом:

  1. Если поставляемый этап уже является Обещание затем оно возвращается без изменений
  2. Если стадия является CompletableFuture затем возвращается специально разработанная оболочка.
  3. Если этап дополнительно реализует Будущее затем возвращается специализированная обертка который делегирует все методы блокировки, определенные в Будущем API
  4. В противном случае универсальная оболочка создается с достаточно хорошей реализацией блокировки Будущее API поверх асинхронного Этап завершения API. Подводя итог, возвращенная оболочка делегирует как можно больше функций поставляемому этапу и никогда не прибегает к Этапу завершения.tocompletablefuture , потому что в API Java 8 это необязательный метод. Из документации: “Реализация этапа завершения, которая не выбирает взаимодействие с другими, может вызвать исключение UnsupportedOperationException”. (этот текст был удален в Java 9+). В общем случае параллельная библиотека Tascalate не зависит от этого метода и должна быть совместима с любым минимальным (но допустимым) |/Стадия завершения реализация.

Важно подчеркнуть, что Обещание -ы, возвращенные из Обещания.успех , Обещания. провал и Обещания.методы from отменяются так же, как и CompletableFuture , но в целом не прерываются, в то время как прерывание зависит от конкретной реализации. Далее мы обсудим конкретную реализацию прерываемого Обещания , предоставляемого параллельной библиотекой Tascalate – классом CompletableTask .

2. Выполнимая Задача

Вот почему этот проект вообще был начат. Завершаемая задача – это реализация API Promise для длительных задач блокировки. Как правило, для создания Выполнимая задача , вы должны отправить Поставщику /|/Запускаемый к Исполнителю сразу, аналогично тому, как с CompletableFuture :

Promise p1 = CompletableTask.supplyAsync(() -> {
  return blockingCalculationOfSomeValue();
}, myExecutor);

Promise p2 = CompletableTask.runAsync(this::someIoBoundMethod, myExecutor);

блокирующий Расчет Некоторого Значения и некоторый метод ввода-вывода в приведенном выше примере может иметь код ввода-Вывода, работать с блокирующими очередями, получать блокировку на обычных Java-s Будущее -s и похожи. Если позже вы решите отменить любое из возвращенных обещаний, то соответствующий блокирующий Расчет Некоторого Значения и некоторого Метода Ввода-вывода будет прерван (если он еще не завершен).

В области функций, связанных с вводом-выводом, сбои, такие как тайм-ауты подключения, отсутствующие или заблокированные файлы, довольно распространены, и механизм проверенных исключений часто используется для сигнализации сбоев. Поэтому библиотека предоставляет точку входа в API, который принимает Вызываемый вместо Поставщика :

// Notice the checked exception in the method signature
byte[] loadFile(File file) throws IOException {
    byte[] result = ... //load file content;
    return result;
}
...
ExecutorService executorService = Executors.newFixedThreadPool(6);
Promise contentPromise = CompletableTask.submit(
    () -> loadFile(new File("./myfile.dat")), 
    executorService
); 

Кроме того, есть 2 единичные операции для создания Выполнимая Задача : : a. Завершаемая задача.асинхронный (Исполнитель исполнитель) Возвращает уже выполненное нулевое значение Обещание , которое "привязано" к указанному исполнителю. Т.е. любая функция, переданная асинхронным методам композиции Обещать ((например, thenApplyAsync /|/thenAcceptAsync /|при полной асинхронности и т. Д.) Будет выполняться с использованием этого исполнителя, если исполнитель не переопределен с помощью явного параметра метода композиции. Более того, любые вложенные вызовы композиции будут использовать одного и того же исполнителя, если он не переопределен с помощью явного параметра метода композиции:

CompletableTask
  .asyncOn(myExecutor)
  .thenApplyAsync(myValueGenerator)
  .thenAcceptAsync(myConsumer)
  .thenRunAsync(myAction);

Все мой генератор ценности , мой потребитель , мое действие будет выполнено с помощью мой исполнитель . b. Завершаемая задача.завершена (значение T, Исполнитель исполнитель) То же, что и выше, но отправной точкой является Обещание выполнено с указанным значением:

CompletableTask
   .complete("Hello!", myExecutor)
   .thenApplyAsync(myMapper)
   .thenApplyAsync(myTransformer)   
   .thenAcceptAsync(myConsumer)
   .thenRunAsync(myAction);

Все мой картограф , мой Трансформатор , мой потребитель , мое действие будет выполнено с использованием моего Исполнителя .

Важно отметить, что все составленные обещания поддерживают истинную отмену (вкл. прерывающий поток) для функций, предоставленных в качестве аргументов:

Promise p1 = CompletableTask.asyncOn(myExecutor)
Promise p2 = p1.thenApplyAsync(myValueGenerator)
Promise p3 = p2.thenRunAsync(myAction);

...
p2.cancel(true);

В приведенном выше примере мой генератор значений будет прерван, если он уже выполняется. Оба p2 и p3 будут урегулированы с ошибкой: p2 с Исключение отмены и p3 с Исключение завершения .

Вы можете заметить, что выше используется термин “методы асинхронной композиции”, а также * Асинхронные вызовы в примерах (например, thenApplyAsync , thenRunAsync . Это не случайно: несинхронные методы Стадии завершения API не прерываются. Основанием для проектного решения является то, что вызов асинхронных методов сопряжен с неизбежными накладными расходами, связанными с помещением команды в очередь исполнителя, неявным запуском новых потоков и т.д. А для простых, неблокирующих методов, таких как небольшие вычисления, тривиальные преобразования и т. Д., Эти накладные расходы могут перевесить само время выполнения метода. Таким образом, рекомендация такова: используйте методы асинхронной композиции для сложных задач блокировки, связанных с вводом-выводом, и используйте методы несинхронной композиции для (обычно легких) вычислений.

Стоит отметить, что Выполнимая задача -ы и Обещание -ы, составленные из него, могут быть когда-либо прерваны только если используемый Исполнитель прерываем по своей природе. Например, ThreadPoolExecutor поддерживает прерываемые задачи, но ForkJoinPool не делает!

3. Переопределение асинхронного исполнителя по умолчанию

Одна из ловушек реализации CompletableFuture заключается в том, как она работает с асинхронным исполнителем по умолчанию. Рассмотрим следующий пример:

CompletionStage p1 = CompletableFuture.supplyAsync(this::produceValue, executorInitial);
CompletionStage p2 = p1.thenApplyAsync(this::transformValueA);
CompletionStage p3 = p2.thenApplyAsync(this::transformValueB, executorNext);
CompletionStage p4 = p3.thenApplyAsync(this::transformValueC);

Призыв к производимая ценность будет выполнен на исполнителе, начальном - он передается явно. Однако вызов преобразования значений будет выполнен на... ForkJoinPool.Общий пул() ! Хммм... Возможно, в этом есть смысл, но как принудительно использовать альтернативного исполнителя по умолчанию? Ни за что! Возможно, это возможно при более глубоких вызовах? Ответ снова "НЕТ"! Обращение к преобразуйте значение выполняется на явно предоставленном следующий исполнитель . Но следующий вызов/|преобразует значения будет выполнен… ты догадываешься об этом… ForkJoinPool.Общий пул() !

Итак, как только вы используете CompletableFuture в среде JEE, вы должны передать явный экземпляр ManagedExecutorService каждому вызову метода. Не очень удобно! Справедливости ради, с Java 9+ API вы можете переопределить это поведение с помощью подкласса CompletableFuture и переопределения двух методов: defaultexecutor и newIncompleteFuture . Кроме того, вам придется определить свои собственные “точки входа” вместо стандартной CompletableFuture.RunAsync и ЗавершаемыйFuture.supplyAsync .

С Выполняемой задачей ситуация прямо противоположная. Давайте перепишем приведенный выше пример:

CompletionStage p1 = CompletableTask.supplyAsync(this::produceValue, executorInitial);
CompletionStage p2 = p1.thenApplyAsync(this::transformValueA);
CompletionStage p3 = p2.thenApplyAsync(this::transformValueB, executorNext);
CompletionStage p4 = p3.thenApplyAsync(this::transformValueC);

Призыв к производимая ценность будет выполняться на исполнителе, начальном , очевидно. Но теперь вызов преобразующего значения A будет выполнен также на исполнителе Initial ! А как насчет более глубоких звонков? Обращение к преобразуйте значение выполняется на явно предоставленном следующий исполнитель . И следующий вызов, преобразование значений будет выполнен на... проверьте свою интуицию... следующий исполнитель . Логика этого заключается в следующем: последний явно указанный Исполнитель - это то, что будет использоваться для всех вложенных методов асинхронной композиции без явного параметра Исполнитель

Очевидно, что это редко бывает, когда один размер подходит всем. поэтому существуют две дополнительные опции для указания асинхронного исполнителя по умолчанию: A. Завершаемая задача имеет перегруженный метод:

public static Promise asyncOn(Executor executor, boolean enforceDefaultAsync)

Когда принудительная асинхронность по умолчанию является истинной тогда все вложенные асинхронные методы композиции без явного параметра Исполнитель будут использовать предоставленного исполнителя, даже если предыдущие методы композиции используют альтернативный Исполнитель . Это чем-то похоже на CompletableFuture но с возможностью изначально явно установить асинхронный исполнитель по умолчанию. B. Обещание интерфейс имеет следующую операцию:

Promise defaultAsyncOn(Executor executor)

Возвращаемый декоратор будет использовать указанный исполнитель для всех вложенных асинхронных методов композиции без явного параметра Executor . Таким образом, в любой момент вы можете переключиться на нужный асинхронный оператор по умолчанию и продолжать использовать его для всех вызовов вложенной композиции.

Подводя итог, с помощью Tascalate Concurrent у вас есть следующие возможности для управления асинхронным исполнителем по умолчанию:

  1. Последний явный Исполнитель передан * Асинхронный метод используется для производного Обещание -s – опция по умолчанию.
  2. Одиночный дефолт Исполнитель передается в корневую Завершаемую задачу. asyncio(Исполнитель исполнитель, true) вызов распространяется по всей цепочке. Это единственный вариант, поддерживаемый с помощью CompletableFuture в Java 9+, хотя и с пользовательским кодированием.
  3. Переопределить Исполнитель с включенной асинхронностью по умолчанию (Исполнитель исполнитель) для всех производных Обещание -с. Обладая лучшим из трех (!) миров, единственная ответственность пользователя библиотеки заключается в последовательном использовании этих опций! Последнее, что следует упомянуть, – это типичная задача, когда вы хотите запустить метод прерываемой блокировки после завершения стандарта Завершаемое будущее . Следующий служебный метод определен в Завершаемой задаче :
public static  Promise waitFor(CompletionStage stage, Executor executor)

Грубо говоря, это короткий путь для следующего:

CompletableTask.asyncOn(executor).thenCombine(stage, (u, v) -> v);

Типичным использованием этого метода является:

TaskExecutorService executorService = TaskExecutors.newFixedThreadPool(3);
CompletableFuture replyUrlPromise = sendRequestAsync();
Promise dataPromise = CompletableTask.waitFor(replyUrlPromise, executorService)
    .thenApplyAsync(url -> loadDataInterruptibly(url));

Возвращенное обещание данных может быть отменено позже, и Загрузка данных будет прервана, если не будет завершена к этому времени.

4. Тайм-ауты

Любое надежное приложение должно справляться с ситуациями, когда что-то идет не так. Возможность отменить операцию, которая занимает слишком много времени, существовала в библиотеке с самого первого дня. Но само определение “слишком длинный” изначально было оставлено для кода приложения. Однако практика показывает, что отсутствие проверенных, тщательно протестированных материалов, связанных с таймаутом, в библиотеке приводит к сложному, повторяющемуся и, к сожалению, подверженному ошибкам коду в приложении. Следовательно, Tascalate Concurrent был расширен для устранения этого упущения.

Библиотека предлагает следующие операции для управления временем выполнения Обещание (объявлено в Обещание интерфейс):

 Promise orTimeout(long timeout, TimeUnit unit[, boolean cancelOnTimeout = true])
 Promise orTimeout(Duration duration[, boolean cancelOnTimeout = true])

Эти методы создают новый Обещание это либо решается успешно/в исключительных случаях, когда первоначальное обещание выполняется в течение заданного времени ожидания; либо оно решается в исключительных случаях с помощью Исключение TimeoutException когда истекло время. В любом случае код обработки выполняется на асинхронном Исполнителе по умолчанию оригинала Обещание .

Executor myExecutor = ...; // Get an executor
Promise callPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), myExecutor )
    .orTimeout( Duration.ofSeconds(3) );

Promise nextPromiseSync = callPromise.whenComplete((v, e) -> processResultSync(v, e));
Promise nextPromiseAsync = callPromise.whenCompleteAsync((v,e) -> processResultAsync(v, e));

В приведенном выше примере обещание вызова будет выполнено в течение 3 секунд либо успешно/в исключительных случаях в результате выполнения someLongRunningIoBoundMehtod , либо в исключительных случаях с Исключение TimeoutException .

Стоит отметить, что оба синхронизация результатов процесса и асинхронность результатов процесса будет выполнен с моим исполнителем

Необязательный отмена По Истечении Времени ожидания параметр определяет, следует ли отменять исходный Обещание по истечении времени; оно неявно истинно, если опущено. Так в примере выше метод someLongRunningIoBoundMehtod будет прерван, если для его завершения потребуется более 3 секунд. Обратите внимание: любой Обещание отменяется по истечении времени ожидания, даже обертки, созданные с помощью Promises.from(этап) , но только Завершаемая задача прерывается!

Отмена первоначального обещания по истечении времени ожидания является желательным поведением в большинстве случаев, но не всегда. На самом деле, сценарии “Предупредить-сначала-Отменить-следующий” не редки, где “предупредить” может регистрироваться, отправлять уведомления по электронной почте, показывать сообщения пользователю в пользовательском интерфейсе и т. Д. Библиотека предоставляет возможность установить несколько тайм-аутов без отмены, как в примере ниже:

Executor myExecutor = ...; // Get an executor
Promise resultPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor );

// Show UI message to user to let him/her know that everything is under control
Promise t1 = resultPromise
    .orTimeout( Duration.ofSeconds(2), false )
    .exceptionally( e -> {
      if (e instanceof TimeoutException) {
        UI.showMessage("The operation takes longer than expected, please wait...");
      }
      return null;
    }, false); 

// Show UI confirmation to user to let him/her cancel operation explicitly
Promise t2 = resultPromise
    .orTimeout( Duration.ofSeconds(5), false )
    .exceptionally( e -> {
      if (e instanceof TimeoutException) {
        UI.clearMessages();
        UI.showConfirmation("Service does not respond. Do you whant to cancel (Y/N)?");
      }
      return null;
    }, false); 

// Cancel in 10 seconds
resultPromise.orTimeout( Duration.ofSeconds(10), true );

Пожалуйста, обратите внимание, что время ожидания начинается с вызова метода timeout . Следовательно, если у вас есть цепочка нерешенных обещаний, заканчивающаяся вызовом timeout , то вся цепочка должна быть завершена в течение заданного времени:

Executor myExecutor = ...; // Get an executor
Promise parallelPromise = CompletableTask
    .supplyAsync( () -> someLongRunningDbCall(), executor );
Promise> resultPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    .thenApplyAsync( v -> converterMethod(v) )
    .thenCombineAsync(parallelPromise, (u, v) -> Arrays.asList(u, v))
    .orTimeout( Duration.ofSeconds(5) );

В последнем примере обещание результата будет успешно выполнено тогда и только тогда, когда все someLongRunningIoBoundMehtod , , способ преобразования и даже некоторые длительные вызовы Бд завершаются в течение 5 секунд. Если необходимо ограничить время выполнения одного шага, пожалуйста, используйте стандартный Этап завершения.затем составьте метод. Скажем, что в предыдущем примере мы должны ограничить время выполнения только метода

Promise> resultPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    // Restict only execution time of converterMethod
    // -- start of changes
    .thenCompose( v -> 
        CompletableTask.complete(v, executor)
                       .thenApplyAsync(vv -> converterMethod(vv))
                       .orTimeout( Duration.ofSeconds(5) )
    )
    // -- end of changes
    .thenCombineAsync(parallelPromise, (u, v) -> Arrays.asList(u, v))
    ;

Более того, в оригинальном примере только вызов thenCombineAsync будет отменен по таймауту (последний в цепочке), для отмены всей цепочки необходимо использовать функциональность интерфейса depeNdentprOmise (будет обсуждаться в следующем посте).

Еще один полезный метод, связанный с таймаутом, объявленный в Обещание интерфейс:

 Promise onTimeout(T value, long timeout, TimeUnit unit[, boolean cancelOnTimeout = true])
 Promise onTimeout(T value, Duration duration[, boolean cancelOnTimeout = true])
 Promise onTimeout(Supplier, long timeout, TimeUnit unit[, boolean cancelOnTimeout = true])
 Promise onTimeout(Supplier, Duration duration[, boolean cancelOnTimeout = true])

Семейство методов onTimeout во всех отношениях аналогично методам timeout с единственным очевидным отличием – вместо завершения результирующего Обещание в исключительных случаях с исключением TimeoutException по истечении времени они успешно рассчитываются с альтернативным значением , предоставленным (напрямую или через Поставщика ):

Executor myExecutor = ...; // Get an executor
Promise callPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    .onTimeout( "Timed-out!", Duration.ofSeconds(3) );

Пример показывает, что обещание вызова будет выполнено в течение 3 секунд либо успешно/исключительно в результате выполнения someLongRunningIoBoundMehtod , либо со значением по умолчанию "Время ожидания истекло!" когда время истекло.

Важно упомянуть о существенном различии между Promise.ortimeot/onTimeout и Завершаемое будущее.тайм-аут/Завершенное время ожидания в Java 9 +. В Tascalate Одновременные обе операции возвращают новый Обещание , то есть может быть отменено индивидуально, без отмены оригинала Обещать . Более того, оригинал Обещание не будет выполнено с исключением TimeoutException по истечении времени, а скорее с Исключение отмены ((в случае тайм-аута ([продолжительность], true) или тайм-аут ([продолжительность]) ). Поведение Завершаемое будущее в Java 9+ радикально отличается: операции, связанные с таймаутом, являются просто “побочными эффектами”, а возвращаемое значение является исходным Завершаемое будущее само по себе. Таким образом, вызов CompletableFuture.или тайм-аут (100, Единица времени. MILLIS).cancel() отменит Завершаемое будущее само по себе, и нет способа отменить тайм-аут после его установки. Соответственно, по истечении времени оригинал CompletableFuture будет завершен в исключительных случаях с Исключением TimeoutException .

Наконец, интерфейс Promise предоставляет возможность вставлять задержки в цепочку вызовов:

 Promise delay(long timeout, TimeUnit unit[, boolean delayOnError = true])
 Promise delay(Duration duration[, boolean delayOnError = true])

Задержка запускается только после первоначального Обещание выполняется либо успешно, либо в исключительных случаях (в отличие от ontimeout /|onTimeout методов, в которых тайм-аут запускается немедленно). Результирующая задержка Обещание разрешается по истечении указанного времени ожидания с тем же результатом, что и исходное Обещание . Аргумент последних методов - задержка при ошибке - указывает, следует ли нам задерживать, если исходное обещание выполняется в исключительных случаях, по умолчанию этот аргумент равен true . Если false , то задержка Обещание выполняется сразу после неудачного оригинала Обещание .

Executor myExecutor = ...; // Get an executor
Promise callPromise1 = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    .delay( Duration.ofSeconds(1) ) // Give a second for CPU to calm down :)
    .thenApply(v -> convertValue(v));

Promise callPromise2 = CompletableTask
    .supplyAsync( () -> aletrnativeLongRunningIoBoundMehtod(), executor )
    .delay( Duration.ofSeconds(1), false ) // Give a second for CPU to calm down ONLY on success :)
    .thenApply(v -> convertValue(v));

Как и в случае с другими методами, связанными с таймаутом, convertValue вызывается на асинхронном Исполнителе по умолчанию исходного Обещание .

Вы можете заметить, что задержка может быть введена только в середине цепочки, но что делать, если вы хотите отменить выполнение всей цепочки? Просто начните с решительного обещания!

// Option 1
// Interruptible tasks chain on the executor supplied
CompletableTask.asyncOn(executor)
    .delay( Duration.ofSeconds(5) )
    .thenApplyAsync(ignore -> produceValue());

// Option2
// Computational tasks on ForkJoinPool.commonPool()
Promises.from(CompletableFuture.completedFuture(""))
    .delay( Duration.ofSeconds(5) )
    .thenApplyAsync(ignore -> produceValue());

До тех пор, пока обратное выполнение не является очень редким случаем, библиотека предоставляет следующие удобные ярлыки в классе completabletask :

static Promise delay(long timeout, TimeUnit unit, Executor executor);
static Promise delay(Duration duration, Executor executor);

Обратите внимание, что в Java 9+ для реализации задержек выбран другой подход – для объекта CompletableFuture не определена соответствующая операция, и вы должны использовать отложенный Исполнитель . Пожалуйста, ознакомьтесь с документацией по CompletableFuture.delayedExecutor метод для деталей.

5. Объединение нескольких этапов завершения.

Класс полезности Обещания предоставляет богатый набор методов для объединения нескольких Этап Завершения -s, это оставляет ограниченную функциональность Completablefuture.allOf/anyOf далеко позади:

  1. Библиотека работает с любой Стадией завершения реализации, не прибегая к преобразованию аргументов в CompletableFuture first (и CompletionStage.tocompletablefuture является необязательной операцией, по крайней мере, это задокументировано в Java 8).
  2. В качестве аргументов можно передать либо массив, либо Список из Стадии завершения -ов.
  3. Полученное Обещание разрешить доступ к индивидуальным результатам урегулированного Этап завершения -ы пройден.
  4. Есть возможность отменить не урегулированный Этап завершения -s пройден, как только известен результат операции.
  5. При необходимости вы можете указать, следует ли допускать отдельные сбои, если они не влияют на конечный результат.
  6. Общие M успешно выполнено из N переданных обещаний возможен сценарий. Давайте рассмотрим соответствующие методы, от самых простых до самых передовых.
static  Promise> all([boolean cancelRemaining=true,] 
                                 CompletionStage... promises)
static  Promise> all([boolean cancelRemaining=true,] 
                                List> promises)

Возвращает обещание, которое выполняется нормально, когда все CompletionStage -ы, переданные в качестве параметров, выполняются нормально; если какое-либо обещание выполнено в исключительных случаях, то полученное обещание также выполняется в исключительных случаях.

static  Promise any([boolean cancelRemaining=true,] 
                          CompletionStage... promises)
static  Promise any([boolean cancelRemaining=true,] 
                          List> promises)

Возвращает обещание, которое обычно выполняется, когда любой Этап завершения пройденный как параметры, выполняется нормально (возможна гонка); если все обещания выполнены в исключительных случаях, то полученное обещание также выполняется в исключительных случаях.

static  Promise anyStrict([boolean cancelRemaining=true,] CompletionStage... promises)
static  Promise anyStrict([boolean cancelRemaining=true,] 
                                List> promises)

Возвращает обещание, которое обычно выполняется, когда любой Этап завершения , переданный в качестве параметров, выполняется нормально (возможна гонка); если какое-либо обещание выполнено в исключительных случаях до получения первого результата, то полученное обещание также выполняется в исключительных случаях (в отличие от нестрогого варианта, где исключения игнорируются, если какой-либо результат вообще доступен).

static  Promise> atLeast(int minResultsCount, [boolean cancelRemaining=true,] 
                                    CompletionStage... promises)
static  Promise> atLeast(int minResultsCount, [boolean cancelRemaining=true,] 
                                    List> promises)

Обобщение любого метода. Возвращает обещание, которое обычно выполняется, когда по крайней мере минимальное количество результатов этапа завершения -s, пройденных при нормальном выполнении параметров (возможна гонка); если меньше, чем Минимальное количество обещаний, выполненных в обычном режиме, затем полученное обещание выполняется в исключительных случаях.

static  Promise> atLeastStrict(int minResultsCount, [boolean cancelRemaining=true,] 
                                          CompletionStage... promises)
static  Promise> atLeastStrict(int minResultsCount, [boolean cancelRemaining=true,] 
                                          List> promises)

Обобщение любого строгого метода. Возвращает обещание, которое обычно выполняется, когда по крайней мере минимальное количество результатов этапа завершения -s, пройденных при нормальном выполнении параметров (возможна гонка); если какое-либо обещание было выполнено в исключительных случаях до Доступно минимальное количество результатов, затем полученное обещание также выполняется в исключительных случаях (в отличие от нестрогого варианта, где исключения игнорируются, если минимальное количество результатов успешных результатов доступно).

Все вышеперечисленные методы имеют необязательный параметр отменить Оставшиеся . Если опущено, это означает неявно отменить Оставшиеся . Параметр отменить оставшиеся определяет, следует ли с нетерпением отменять оставшиеся обещания как только результат операции известен, т. е. достаточно обещания пройдены успешно или некоторые Этап завершения завершен исключительно в строгой версии.

Каждая операция для объединения Этап завершения -s имеет перегруженные версии, которые принимают либо Список | Этапа завершения -s, либо массив vararg Этап Завершения -s.

Помимо любых / любых строгих методов, возвращающих однозначное обещание, все другие комбинирующие методы возвращают список значений для каждого успешно выполненного обещания в той же индексированной позиции. Если обещание в данной позиции вообще не было выполнено или не выполнено (в нестрогой версии), то соответствующий элемент в списке результатов равен null . Если необходимое число или обещания не было выполнено успешно, или кто-либо выполнил исключительно в строгой версии, то в результате Обещание выполняется при сбое типа Многоцелевое исключение . Разработчик приложения может изучить Многоцелевое исключение.getexceptions() , чтобы проверить, какова точная ошибка для конкретного Этап завершения пройден. Возвращенное Обещание имеет следующие характеристики:

  1. Отмена результата Обещание отменит все Этапы завершения , переданные в качестве аргументов.
  2. Асинхронный исполнитель по умолчанию результирующего Обещать не определено, т. е. это может быть либо forkJoin.общий пул или что бы Исполнитель ни использовал любой из Этап завершения передается в качестве аргумента. Чтобы убедиться, что необходимый Исполнитель по умолчанию используется для последующих асинхронных операций, пожалуйста, примените defaultAsyncOn(Myexecutor) о результате.

Список функций, предоставляемых Соответствующей параллельной библиотекой, на этом не заканчивается. Есть более интересные вещи, такие как функция повтора/опроса, управление отменой цепочки Обещаний , расширения ExecutorService и т.д. Но этот пост уже становится слишком длинным, поэтому оставшееся оставлено на следующий раз. Тем временем вы можете проверить домашнюю страницу библиотеки Tascalate Concurrent для получения самой последней документации.

васильев/таскалате-параллельный

Реализация блокирующей (связанной с вводом-выводом) отменяемой java.util.concurrent. Этап завершения и связанные с ним расширения java.util.concurrent. Услуги Исполнителя

Библиотека предоставляет реализацию интерфейса Стадии завершения и связанных классов, которые предназначены для поддержки длительных задач блокировки (как правило, связанных с вводом-выводом). Эта функциональность расширяет единственную встроенную реализацию Java 8, CompletableFuture , то есть в первую очередь поддерживает вычислительные задачи. Кроме того, библиотека помогает решать многочисленные задачи асинхронного программирования, такие как обработка тайм-аутов, функции повтора/опроса, организация результатов нескольких параллельных вычислений и тому подобное.

Начиная с версии 0.7.0 библиотека поставляется в виде JAR с несколькими выпусками и может использоваться как с Java 8 в качестве библиотеки путей к классам, так и с Java 9+ в качестве модуля.

важный!

В версии 0.8.0 артефакт был переименован в Новое имя:

net.tascalatenet.tascalate.concurrent
    0.9.0 

Старое Название

net.tascalate.concurrent
    net.tascalate.concurrent.lib

Детали внутренней реализации завершенной задачи иерархии классов в значительной степени вдохновлены работой , проделанной Лукашем Кржечаном . Описание его библиотеки доступно в виде статьи из двух частей о ДЗоне: Часть 1 и Часть II . Это стоит прочитать тем, кто хотел бы лучше понять Выполняемую задачу внутренности.

Оригинал: “https://dev.to/vsilaev/tascalate-concurrent—filling-the-gaps-in-completablefuture-api-part-1-7lm”