Java多线程之CyclicBarrier

这里就JUC包中的CyclicBarrier类做相关介绍

abstract.jpeg

概述

JUC中的CyclicBarrier类是一个并发控制工具。其可以使线程在栅栏处进行等待。当指定数量的线程全部到达栅栏处后栅栏才会打开,从而使各线程结束阻塞继续向下执行。其主要方法如下所示,可以看到在线程全部到达栅栏时,还可以通过barrierAction参数设置准备打开栅栏前需执行的任务。其中,该任务由最后一个到达栅栏的线程负责执行。具体地,线程调用await方法实现告诉CyclicBarrier自己已经到达栅栏处,并阻塞等待栅栏打开

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建一个指定计数器值的CyclicBarrier实例
public CyclicBarrier(int parties);

// 创建一个指定计数器值的CyclicBarrier实例, 并指定栅栏打开前需执行的任务
public CyclicBarrier(int parties, Runnable barrierAction);

// 线程阻塞等待栅栏打开
public int await() throws InterruptedException, BrokenBarrierException;

// 支持超时的await方法
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException;

// 唤醒其他正在栅栏处被阻塞的线程(即抛出BrokenBarrierException异常), 同时将CyclicBarrier实例恢复为初始化状态,以便下一次使用
public void reset();

基本实践

下面即是一个CyclicBarrier的基本实践示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

public class CyclicBarrierTest1 {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");

@Test
public void test1() throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(10);

Runnable initTask = () -> {
info("---------------------------------");
};

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, initTask);

Stream.of("张三","李四","王二")
.map( name -> new PlayGame(name, cyclicBarrier) )
.forEach(playGame -> {
threadPool.execute(playGame);
} );

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
info("Game Over");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String threadName = Thread.currentThread().getName();
String log = "["+time+"] "+ msg+" <"+threadName+">";
System.out.println(log);
}

/**
* 模拟业务耗时
*/
private static void doSomeWork() {
try{
Integer second = RandomUtils.nextInt(3,20);
System.out.println("second: " + second);
Thread.sleep( second * 1000 );
}catch (Exception e) {
System.out.println( "Happen Exception: " + e.getMessage());
}
}

@AllArgsConstructor
private static class PlayGame implements Runnable{

private String name;

private CyclicBarrier cyclicBarrier;

@Override
public void run() {
// 模拟业务耗时
doSomeWork();
info(name + " 上线");
// 阻塞等待其他玩家上线
try{
cyclicBarrier.await();
}catch (Exception e) {
System.out.println( "Happen Exception: " + e);
}

info(name + " 选择角色 开始");
// 模拟业务耗时
doSomeWork();
info(name + " 选择角色 结束");
// 阻塞等待其他玩家选择角色
try{
cyclicBarrier.await();
}catch (Exception e) {
System.out.println( "Happen Exception: " + e);
}

info(name + " 开始游戏");

}
}
}

从测试结果可以看出,当用户 开始选择角色 或 开始游戏时,各线程是同时开始的。至此也可以看出其与CountDownLatch的显著区别,后者是一次性的,而前者CyclicBarrier则可以重复使用

figure 1.jpeg

基本原理

通过上面的代码示例,可以看到CyclicBarrier与CountDownLatch相比功能很类似。只不过前者可以重复使用,而后者则是一次性的。但二者在实现上却大相径庭,CountDownLatch是直接基于AQS实现的。而CyclicBarrier则是利用ReentrantLock、Condition进行实现的。具体地,当线程调用CyclicBarrier的await方法时,如果未达到指定数量时,则是通过Condition条件变量的await方法进行阻塞的;如果是最后一个线程则会通过Condition条件变量的signalAll方法来唤醒所有被阻塞的线程

与此同时,由于CyclicBarrier是可重复使用的。故每一轮结束后,其内部会通过nextGeneration方法生成所谓的下一代CyclicBarrier。本质上相当于重新实例化了一次CyclicBarrier

Note

  1. 在实际使用CyclicBarrier过程中,需要非常小心处理BrokenBarrierException异常。本文示例代码为了简便,故省略了异常处理过程。因为发生该异常说明栅栏被损坏了。推荐的处理措施有:一方面,调用CyclicBarrier的reset方法,来唤醒其他由于调用await方法而被阻塞的线程以避免一直被阻塞,同时将CyclicBarrier实例恢复至初始化状态;另一方面,推荐使用具有超时机制的await方法,以避免线程被永久性阻塞

参考文献

  1. Java并发编程之美 翟陆续、薛宾田著
0%