Java多线程中的FutureTask类大家耳熟能详,这里来介绍下更高级的CompletableFuture类
概述 Java在多线程方面通过Future接口及其实现类FutureTask,实现了支持具有返回值的异步任务。但实践过程依然有不少缺陷。等待返回值时会被阻塞;不支持对多个异步任务进行组合实现复杂的任务流。为此CompletableFuture应运而生,其大大拓展了异步任务的能力,增强了表现力。该类上提供了如下的工厂方法用于创建实例,supplyAsync方法支持创建具有返回值的异步任务,runAsync方法支持创建无返回值的异步任务。如果不指定线程池,则使用默认的ForkJoinPool的公共线程池commonPool,注意该线程池中的线程均为守护线程
1 2 3 4 5 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) ;public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor) ;public static CompletableFuture<Void> runAsync (Runnable runnable) ;public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor) ;
当然由于CompletableFuture亦实现了Future接口,故自然支持传统的阻塞式获取任务结果的方式,比如Future接口的get、isDone方法等。此外其还提供了join方法,其与get方法类似也是阻塞式获取结果,但其抛出的异常是运行时异常
任务结束后的回调处理 whenComplete 该方法可以实现任务结束后的回调逻辑。事实上其包括一组方法集,如下所示。CompletableFuture中的很多方法也都提供了类似的多个版本,下文不再赘述。这里统一使用异步版本的方法,即具有Async后缀版本的方法。所谓异步指的是提交给线程池进行处理
1 2 3 4 5 public CompletionStage<T> whenComplete (BiConsumer<? super T, ? super Throwable> action) ;public CompletionStage<T> whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action) ;public CompletionStage<T> whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action, Executor executor) ;
该方法接受两个参数——上一个任务结束时的返回值和异常,故可以看到whenComplete方法可以实现任务正常结束、异常结束两种场景下的回调处理。使用示例如下所示,前文提到由于ForkJoinPool的公共线程池commonPool中的线程均为守护线程,故主线程这边需要通过sleep来防止其结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class Demo1 { @Test public void test1 () { test11("中国移动" ); try { Thread.sleep(5000 ); } catch (Exception e) {} System.out.println("============================================================" ); test11("中国电信" ); try { Thread.sleep(5000 ); } catch (Exception e) {} } private void test11 (String name) { System.out.println("#1 Thread Name: " + Thread.currentThread().getName() ); CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync( ()-> { System.out.println("#2 Thread Name: " + Thread.currentThread().getName() ); return findTel(name); } ) .whenCompleteAsync( (tel, throwable) -> { System.out.println("#3 Thread Name: " + Thread.currentThread().getName() ); if ( throwable!=null ) { System.out.println("happen throwable: " + throwable); return ; } System.out.println("tel: " + tel); }); } public static Integer findTel (String name) { if ( "中国移动" .equals(name) ) { return 10086 ; } else if ( "中国联通" .equals(name) ) { return 10010 ; } else { throw new RuntimeException ("Not found Tel" ); } } }
测试结果如下所示,可以看到任务及其结束后的回调处理均是异步的
thenAccept 该方法可以实现任务正常完成的回调逻辑。其同样是一组方法集,如下所示
1 2 3 public CompletableFuture<Void> thenAccept (Consumer<? super T> action) ;public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action) ;public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action, Executor executor) ;
该方法接受一个参数——即上一个任务正常完成时的返回值。如果该任务发生异常可以根据需要通过exceptionally方法来实现对异常的处理。示例如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class Demo1 { @Test public void test2 () { test22("中国移动" ); try { Thread.sleep(5000 ); } catch (Exception e) {} System.out.println("============================================================" ); test22("中国电信" ); try { Thread.sleep(5000 ); } catch (Exception e) {} } private void test22 (String name) { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync( ()-> findTel(name) ) .thenAcceptAsync( tel -> { System.out.println("tel: " + tel); }) .exceptionally( throwable -> { System.out.println("happen throwable: " + throwable.getMessage()); return null ; } ); } }
测试结果如下所示,符合预期
任务结束后继续下一任务 handle 该方法可以实现任务结束后,继续执行另外一个任务逻辑以实现对上一个任务结果的进一步处理。事实上其包括一组方法集,如下所示。从入参可以看出其会接收上一个任务完成时的结果和异常信息
1 2 3 public <U> CompletableFuture<U> handle (BiFunction<? super T, Throwable, ? extends U> fn) ;public <U> CompletableFuture<U> handleAsync (BiFunction<? super T, Throwable, ? extends U> fn) ;public <U> CompletableFuture<U> handleAsync (BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) ;
测试示例如下所示。我们先通过前置任务查询电话号码,如果电话有效则进行呼叫。这里我们通过join方法来获取结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class Demo1 { @Test public void test3 () { test33("中国电信" ); try { Thread.sleep(5000 ); } catch (Exception e) {} System.out.println("=========" ); test33("中国移动" ); try { Thread.sleep(5000 ); } catch (Exception e) {} } private void test33 (String name) { CompletableFuture<Boolean> completableFuture = CompletableFuture.supplyAsync( ()-> { Integer telNo = findTel(name); System.out.println("tel: " + telNo); return telNo; } ) .handleAsync( (tel, throwable) -> { if (throwable != null ) { System.out.println("happen throwable: " + throwable.getMessage()); return false ; } Boolean success = callTel(tel); System.out.println("success: " + success); return success; }); Boolean b = completableFuture.join(); System.out.println("b: " + b); } public static Boolean callTel (Integer tel) { if ( tel==10086 ) { return true ; } else { throw new RuntimeException ("Call Tel Fail" ); } } }
测试结果如下所示,符合预期
thenApply thenApply方法集如下所示。其作用与handle方法类似,都是用于执行完前置任务,利用其执行结果进行下一个任务。不同点在于其不会对任务中的异常进行处理,这点从入参也是可以看出的
1 2 3 public <U> CompletableFuture<U> thenApply (Function<? super T,? extends U> fn) ;public <U> CompletableFuture<U> thenApplyAsync (Function<? super T,? extends U> fn) ;public <U> CompletableFuture<U> thenApplyAsync (Function<? super T,? extends U> fn, Executor executor) ;
这里我们通过thenApply来实现上面那个先查电话再打电话的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class Demo1 { @Test public void test4 () { test44("中国电信" ); try { Thread.sleep(5000 ); } catch (Exception e) {} System.out.println("=========" ); test44("中国联通" ); try { Thread.sleep(5000 ); } catch (Exception e) {} System.out.println("=========" ); test44("中国移动" ); try { Thread.sleep(5000 ); } catch (Exception e) {} } private void test44 (String name) { CompletableFuture<Boolean> completableFuture = CompletableFuture.supplyAsync( ()-> { Integer telNo = findTel(name); System.out.println("tel: " + telNo); return telNo; } ) .thenApplyAsync( tel -> { Boolean success = callTel(tel); System.out.println("success: " + success); return success; } ) .exceptionally( throwable -> { System.out.println("happen throwable: " + throwable.getMessage()); return false ; } ); Boolean b = completableFuture.join(); System.out.println("b: " + b); } }
测试结果如下所示,符合预期。而且通过exceptionally方法实现了对异常的统一处理,避免handle方法对各任务的异常进行处理的繁琐
任务组合 thenCombine 该方法可以实现将两个无先后顺序依赖的任务进行组合,其等待两个任务均完成后执行回调逻辑。方法集如下所示
1 2 3 public <U,V> CompletableFuture<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) ;public <U,V> CompletableFuture<V> thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) ;public <U,V> CompletableFuture<V> thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) ;
测试用例,如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class Demo1 { @Test public void test5 () { CompletableFuture<Set<String>> task1 = CompletableFuture.supplyAsync( ()->{ Set<String> set = new HashSet <>(); set.add("电脑" ); set.add("电视" ); return set; }); CompletableFuture<Set<String>> task2 = CompletableFuture.supplyAsync( ()->{ Set<String> set = new HashSet <>(); set.add("电脑" ); set.add("电话" ); return set; }); CompletableFuture<Set<String>> result = task1.thenCombineAsync(task2, (set1, set2) -> { Set<String> set = new HashSet <>(); set.addAll(set1); set.addAll(set2); return set; }) .whenCompleteAsync( (set, throwable) -> { if (throwable!=null ) { System.out.println("happen Exception: " + throwable.getMessage()); return ; } set.forEach(System.out::println); }); try { Thread.sleep(5000 ); } catch (Exception e) {} } }
测试结果如下,符合预期
thenCompose thenCompose可实现将两个任务进行串行化,方法集如下所示
1 2 3 public <U> CompletableFuture<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn) ;public <U> CompletableFuture<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn) ;public <U> CompletableFuture<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class Demo1 { @Test public void test6 () { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync( ()->{ List<String> list = new LinkedList <>(); list.add("bob" ); list.add("aaron" ); return list; }) .thenComposeAsync( list -> toUpperCase(list) ) .thenAccept( System.out::println ); try { Thread.sleep(5000 ); } catch (Exception e) {} } private CompletableFuture<List<String>> toUpperCase (List<String> list) { CompletableFuture<List<String>> task = CompletableFuture.supplyAsync( () -> { List<String> tempList = list.stream() .map(String::toUpperCase) .collect(Collectors.toList()); return tempList; } ); return task; } }
测试结果如下所示,符合预期
allOf 该方法可以实现组合多个任务并等待多个任务均完成,方法签名如下
1 public static CompletableFuture<Void> allOf (CompletableFuture<?>... cfs) ;
示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class Demo1 { @Test public void test7 () { List<Integer> list= new LinkedList <>(); CompletableFuture[] taskArray = IntStream.rangeClosed(1 ,5 ) .mapToObj( i ->{ CompletableFuture task = CompletableFuture.supplyAsync( () -> { try { Thread.sleep(RandomUtils.nextInt(500 , 5000 )); } catch (Exception e) {} return i; }) .whenCompleteAsync( (result, throwable) -> { if (throwable!=null ) { System.out.println("happen Exception: " + throwable.getMessage()); } list.add( result ); System.out.println("Task " +i+" Done" ); }); return task; }) .toArray( CompletableFuture[]::new ); CompletableFuture.allOf( taskArray ) .whenCompleteAsync( (result, throwable) ->{ System.out.println("result: " + result + " throwable: " + throwable); System.out.println("All Task Done" ); } ) .join(); System.out.println("list: " + list); try { Thread.sleep(15000 ); } catch (Exception e) {} } }
测试结果如下所示,符合预期
anyOf 该方法与allOf方法相反,当任一任务完成即结束等待,方法签名如下
1 public static CompletableFuture<Object> anyOf (CompletableFuture<?>... cfs) ;
示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class Demo1 { @Test public void test8 () { List<Integer> list= new LinkedList <>(); CompletableFuture[] taskArray = IntStream.rangeClosed(1 ,5 ) .mapToObj( i ->{ CompletableFuture task = CompletableFuture.supplyAsync( () -> { try { Thread.sleep(RandomUtils.nextInt(500 , 5000 )); } catch (Exception e) {} return i; }) .whenCompleteAsync( (result, throwable) -> { if (throwable!=null ) { System.out.println("happen Exception: " + throwable.getMessage()); } list.add( result ); System.out.println("Task " +i+" Done" ); }); return task; }) .toArray( CompletableFuture[]::new ); CompletableFuture.anyOf( taskArray ) .whenCompleteAsync( (result, throwable) ->{ System.out.println("result: " + result + " throwable: " + throwable); System.out.println("Have One Task Done" ); } ) .join(); System.out.println("list: " + list); try { Thread.sleep(15000 ); } catch (Exception e) {} } }
测试结果如下所示,符合预期