0%

Java多线程之CompletableFuture

Java多线程中的FutureTask类大家耳熟能详,这里来介绍下更高级的CompletableFuture类

abstract.jpg

概述

Java在多线程方面通过Future接口及其实现类FutureTask,实现了支持具有返回值的异步任务。但实践过程依然有不少缺陷。等待返回值时会被阻塞;不支持对多个异步任务进行组合实现复杂的任务流。为此CompletableFuture应运而生,其大大拓展了异步任务的能力,增强了表现力。该类上提供了如下的工厂方法用于创建实例,supplyAsync方法支持创建具有返回值的异步任务,runAsync方法支持创建无返回值的异步任务。如果不指定线程池,则使用默认的ForkJoinPool的公共线程池commonPool,注意该线程池中的线程均为守护线程

1
2
3
4
5
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

当然由于CompletableFuture亦实现了Future接口,故自然支持传统的阻塞式获取任务结果的方式,比如Future接口的get、isDone方法等。此外其还提供了join方法,其与get方法类似也是阻塞式获取结果,但其抛出的异常是运行时异常

任务结束后的回调处理

whenComplete

该方法可以实现任务结束后的回调逻辑。事实上其包括一组方法集,如下所示。CompletableFuture中的很多方法也都提供了类似的多个版本,下文不再赘述。这里统一使用异步版本的方法,即具有Async后缀版本的方法。所谓异步指的是提交给线程池进行处理

1
2
3
4
5
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
// 异步版本的 whenComplete 方法
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
// 异步版本的使用自定义线程池的 whenComplete 方法
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);

该方法接受两个参数——上一个任务结束时的返回值和异常,故可以看到whenComplete方法可以实现任务正常结束、异常结束两种场景下的回调处理。使用示例如下所示,前文提到由于ForkJoinPool的公共线程池commonPool中的线程均为守护线程,故主线程这边需要通过sleep来防止其结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class Demo1 {

@Test
public void test1() {
test11("中国移动");
try{ Thread.sleep(5000); } catch (Exception e) {}

System.out.println("============================================================");

test11("中国电信");

// 防止主线程结束
try{ Thread.sleep(5000); } catch (Exception e) {}
}

private void test11(String name) {
System.out.println("#1 Thread Name: " + Thread.currentThread().getName() );

CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync( ()-> {
System.out.println("#2 Thread Name: " + Thread.currentThread().getName() );
return findTel(name);
} )
// 任务完成回调处理
.whenCompleteAsync( (tel, throwable) -> {
System.out.println("#3 Thread Name: " + Thread.currentThread().getName() );
// 任务异常的回调处理
if( throwable!=null ) {
System.out.println("happen throwable: " + throwable);
return;
}
// 任务正常完成的回调处理
System.out.println("tel: " + tel);
});
}

/**
* 根据公司查电话
* @param name
* @return
*/
public static Integer findTel(String name) {
if( "中国移动".equals(name) ) {
return 10086;
} else if( "中国联通".equals(name) ) {
return 10010;
} else {
throw new RuntimeException("Not found Tel");
}
}

}

测试结果如下所示,可以看到任务及其结束后的回调处理均是异步的

figure 1.jpg

thenAccept

该方法可以实现任务正常完成的回调逻辑。其同样是一组方法集,如下所示

1
2
3
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

该方法接受一个参数——即上一个任务正常完成时的返回值。如果该任务发生异常可以根据需要通过exceptionally方法来实现对异常的处理。示例如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Demo1 {

@Test
public void test2() {
test22("中国移动");
try{ Thread.sleep(5000); } catch (Exception e) {}

System.out.println("============================================================");

test22("中国电信");
// 防止主线程结束
try{ Thread.sleep(5000); } catch (Exception e) {}
}

private void test22(String name) {
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync( ()-> findTel(name) )
// 任务正常完成的回调处理
.thenAcceptAsync( tel -> {
System.out.println("tel: " + tel);
})
// 任务异常的回调处理
.exceptionally( throwable -> {
System.out.println("happen throwable: " + throwable.getMessage());
return null;
} );
}

}

测试结果如下所示,符合预期

figure 2.jpg

任务结束后继续下一任务

handle

该方法可以实现任务结束后,继续执行另外一个任务逻辑以实现对上一个任务结果的进一步处理。事实上其包括一组方法集,如下所示。从入参可以看出其会接收上一个任务完成时的结果和异常信息

1
2
3
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);

测试示例如下所示。我们先通过前置任务查询电话号码,如果电话有效则进行呼叫。这里我们通过join方法来获取结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Demo1 {

@Test
public void test3() {
test33("中国电信");
try{ Thread.sleep(5000); } catch (Exception e) {}

System.out.println("=========");

test33("中国移动");
// 防止主线程结束
try{ Thread.sleep(5000); } catch (Exception e) {}
}

private void test33(String name) {
CompletableFuture<Boolean> completableFuture = CompletableFuture.supplyAsync( ()-> {
Integer telNo = findTel(name);
System.out.println("tel: " + telNo);
return telNo;
} )
.handleAsync( (tel, throwable) -> {
// 前置任务发生异常, 当前任务之间返回默认值
if(throwable != null) {
System.out.println("happen throwable: " + throwable.getMessage());
return false;
}

// 前置任务正常完成, 执行当前任务逻辑
Boolean success = callTel(tel);
System.out.println("success: " + success);
return success;
});

// 等待任务结束获取结果
Boolean b = completableFuture.join();
System.out.println("b: "+ b);
}

/**
* 打电话
* @param tel
* @return
*/
public static Boolean callTel(Integer tel) {
if( tel==10086 ) {
// 打电话成功
return true;
} else {
throw new RuntimeException("Call Tel Fail");
}
}

}

测试结果如下所示,符合预期

figure 3.jpg

thenApply

thenApply方法集如下所示。其作用与handle方法类似,都是用于执行完前置任务,利用其执行结果进行下一个任务。不同点在于其不会对任务中的异常进行处理,这点从入参也是可以看出的

1
2
3
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

这里我们通过thenApply来实现上面那个先查电话再打电话的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Demo1 {

@Test
public void test4() {

test44("中国电信");
try{ Thread.sleep(5000); } catch (Exception e) {}

System.out.println("=========");

test44("中国联通");
try{ Thread.sleep(5000); } catch (Exception e) {}

System.out.println("=========");

test44("中国移动");
// 防止主线程结束
try{ Thread.sleep(5000); } catch (Exception e) {}
}

private void test44(String name) {
CompletableFuture<Boolean> completableFuture = CompletableFuture.supplyAsync( ()-> {
Integer telNo = findTel(name);
System.out.println("tel: " + telNo);
return telNo;
} )
.thenApplyAsync( tel -> {
Boolean success = callTel(tel);
System.out.println("success: " + success);
return success;
} )
// 任务异常的回调处理
.exceptionally( throwable -> {
System.out.println("happen throwable: " + throwable.getMessage());
// 返回默认值
return false;
} );

Boolean b = completableFuture.join();
System.out.println("b: "+ b);
}

}

测试结果如下所示,符合预期。而且通过exceptionally方法实现了对异常的统一处理,避免handle方法对各任务的异常进行处理的繁琐

figure 4.jpg

任务组合

thenCombine

该方法可以实现将两个无先后顺序依赖的任务进行组合,其等待两个任务均完成后执行回调逻辑。方法集如下所示

1
2
3
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);

测试用例,如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Demo1 {

@Test
public void test5() {
// 异步任务1
CompletableFuture<Set<String>> task1 = CompletableFuture.supplyAsync( ()->{
Set<String> set = new HashSet<>();
set.add("电脑");
set.add("电视");
return set;
});

// 异步任务2
CompletableFuture<Set<String>> task2 = CompletableFuture.supplyAsync( ()->{
Set<String> set = new HashSet<>();
set.add("电脑");
set.add("电话");
return set;
});

// 将 两个异步任务组合在一起。具体表现在两个异步任务执行完成后,执行回调
CompletableFuture<Set<String>> result = task1.thenCombineAsync(task2, (set1, set2) -> {
// task1, task2 完成后执行的回调逻辑
Set<String> set = new HashSet<>();
set.addAll(set1);
set.addAll(set2);
return set;
})
.whenCompleteAsync( (set, throwable) -> {
if(throwable!=null) {
System.out.println("happen Exception: " + throwable.getMessage());
return;
}
set.forEach(System.out::println);
});

// 防止主线程结束
try{ Thread.sleep(5000); } catch (Exception e) {}
}
}

测试结果如下,符合预期

figure 5.jpg

thenCompose

thenCompose可实现将两个任务进行串行化,方法集如下所示

1
2
3
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);

示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Demo1 {

@Test
public void test6() {
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync( ()->{
List<String> list = new LinkedList<>();
list.add("bob");
list.add("aaron");
return list;
})
.thenComposeAsync( list -> toUpperCase(list) )
.thenAccept( System.out::println );

// 防止主线程结束
try{ Thread.sleep(5000); } catch (Exception e) {}
}

private CompletableFuture<List<String>> toUpperCase(List<String> list) {
CompletableFuture<List<String>> task = CompletableFuture.supplyAsync( () -> {
List<String> tempList = list.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
return tempList;
} );

return task;
}

}

测试结果如下所示,符合预期

figure 6.jpg

allOf

该方法可以实现组合多个任务并等待多个任务均完成,方法签名如下

1
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Demo1 {

@Test
public void test7() {
List<Integer> list= new LinkedList<>();

CompletableFuture[] taskArray = IntStream.rangeClosed(1,5)
.mapToObj( i ->{
CompletableFuture task = CompletableFuture.supplyAsync( () -> {
// 模拟业务耗时
try{ Thread.sleep(RandomUtils.nextInt(500, 5000)); } catch (Exception e) {}
return i;
})
.whenCompleteAsync( (result, throwable) -> {
if(throwable!=null) {
System.out.println("happen Exception: " + throwable.getMessage());
}
list.add( result );
System.out.println("Task "+i+" Done");
});
return task;
})
.toArray( CompletableFuture[]::new );

CompletableFuture.allOf( taskArray )
// 多个任务均完成后, 执行回调
.whenCompleteAsync( (result, throwable) ->{
System.out.println("result: " + result + " throwable: " + throwable);
System.out.println("All Task Done");
} )
.join();

System.out.println("list: " + list);
// 防止主线程结束
try{ Thread.sleep(15000); } catch (Exception e) {}
}

}

测试结果如下所示,符合预期

figure 7.jpg

anyOf

该方法与allOf方法相反,当任一任务完成即结束等待,方法签名如下

1
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Demo1 {

@Test
public void test8() {
List<Integer> list= new LinkedList<>();

CompletableFuture[] taskArray = IntStream.rangeClosed(1,5)
.mapToObj( i ->{
CompletableFuture task = CompletableFuture.supplyAsync( () -> {
// 模拟业务耗时
try{ Thread.sleep(RandomUtils.nextInt(500, 5000)); } catch (Exception e) {}
return i;
})
.whenCompleteAsync( (result, throwable) -> {
if(throwable!=null) {
System.out.println("happen Exception: " + throwable.getMessage());
}
list.add( result );
System.out.println("Task "+i+" Done");
});
return task;
})
.toArray( CompletableFuture[]::new );

CompletableFuture.anyOf( taskArray )
// 任一任务即执行回调逻辑
.whenCompleteAsync( (result, throwable) ->{
System.out.println("result: " + result + " throwable: " + throwable);
System.out.println("Have One Task Done");
} )
.join();

System.out.println("list: " + list);
// 防止主线程结束
try{ Thread.sleep(15000); } catch (Exception e) {}
}

}

测试结果如下所示,符合预期

figure 8.jpg

请我喝杯咖啡捏~

欢迎关注我的微信公众号:青灯抽丝