Java多线程中的FutureTask类大家耳熟能详,这里来介绍下更高级的CompletableFuture类
概述
Java在多线程方面通过Future接口及其实现类FutureTask,实现了支持具有返回值的异步任务。但实践过程依然有不少缺陷。等待返回值时会被阻塞;不支持对多个异步任务进行组合实现复杂的任务流。为此CompletableFuture应运而生,其大大拓展了异步任务的能力,增强了表现力。该类上提供了如下的工厂方法用于创建实例,supplyAsync方法支持创建具有返回值的异步任务,runAsync方法支持创建无返回值的异步任务。如果不指定线程池,则使用默认的ForkJoinPool的公共线程池commonPool,注意该线程池中的线程均为守护线程
1 | public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); |
当然由于CompletableFuture亦实现了Future接口,故自然支持传统的阻塞式获取任务结果的方式,比如Future接口的get、isDone方法等。此外其还提供了join方法,其与get方法类似也是阻塞式获取结果,但其抛出的异常是运行时异常
任务结束后的回调处理
whenComplete
该方法可以实现任务结束后的回调逻辑。事实上其包括一组方法集,如下所示。CompletableFuture中的很多方法也都提供了类似的多个版本,下文不再赘述。这里统一使用异步版本的方法,即具有Async后缀版本的方法。所谓异步指的是提交给线程池进行处理
1 | public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); |
该方法接受两个参数——上一个任务结束时的返回值和异常,故可以看到whenComplete方法可以实现任务正常结束、异常结束两种场景下的回调处理。使用示例如下所示,前文提到由于ForkJoinPool的公共线程池commonPool中的线程均为守护线程,故主线程这边需要通过sleep来防止其结束
1 | public class Demo1 { |
测试结果如下所示,可以看到任务及其结束后的回调处理均是异步的
thenAccept
该方法可以实现任务正常完成的回调逻辑。其同样是一组方法集,如下所示
1 | public CompletableFuture<Void> thenAccept(Consumer<? super T> action); |
该方法接受一个参数——即上一个任务正常完成时的返回值。如果该任务发生异常可以根据需要通过exceptionally方法来实现对异常的处理。示例如下所示
1 | public class Demo1 { |
测试结果如下所示,符合预期
任务结束后继续下一任务
handle
该方法可以实现任务结束后,继续执行另外一个任务逻辑以实现对上一个任务结果的进一步处理。事实上其包括一组方法集,如下所示。从入参可以看出其会接收上一个任务完成时的结果和异常信息
1 | public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); |
测试示例如下所示。我们先通过前置任务查询电话号码,如果电话有效则进行呼叫。这里我们通过join方法来获取结果
1 | public class Demo1 { |
测试结果如下所示,符合预期
thenApply
thenApply方法集如下所示。其作用与handle方法类似,都是用于执行完前置任务,利用其执行结果进行下一个任务。不同点在于其不会对任务中的异常进行处理,这点从入参也是可以看出的
1 | public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); |
这里我们通过thenApply来实现上面那个先查电话再打电话的例子
1 | public class Demo1 { |
测试结果如下所示,符合预期。而且通过exceptionally方法实现了对异常的统一处理,避免handle方法对各任务的异常进行处理的繁琐
任务组合
thenCombine
该方法可以实现将两个无先后顺序依赖的任务进行组合,其等待两个任务均完成后执行回调逻辑。方法集如下所示
1 | public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); |
测试用例,如下所示
1 | public class Demo1 { |
测试结果如下,符合预期
thenCompose
thenCompose可实现将两个任务进行串行化,方法集如下所示
1 | public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); |
示例代码如下所示
1 | public class Demo1 { |
测试结果如下所示,符合预期
allOf
该方法可以实现组合多个任务并等待多个任务均完成,方法签名如下
1 | public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs); |
示例代码如下所示
1 | public class Demo1 { |
测试结果如下所示,符合预期
anyOf
该方法与allOf方法相反,当任一任务完成即结束等待,方法签名如下
1 | public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs); |
示例代码如下所示
1 | public class Demo1 { |
测试结果如下所示,符合预期