这里就JUC包中的CyclicBarrier类做相关介绍

概述
JUC中的CyclicBarrier类是一个并发控制工具。其可以使线程在栅栏处进行等待。当指定数量的线程全部到达栅栏处后栅栏才会打开,从而使各线程结束阻塞继续向下执行。其主要方法如下所示,可以看到在线程全部到达栅栏时,还可以通过barrierAction参数设置准备打开栅栏前需执行的任务。其中,该任务由最后一个到达栅栏的线程负责执行。具体地,线程调用await方法实现告诉CyclicBarrier自己已经到达栅栏处,并阻塞等待栅栏打开
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 
 | public CyclicBarrier(int parties);
 
 
 public CyclicBarrier(int parties, Runnable barrierAction);
 
 
 public int await() throws InterruptedException, BrokenBarrierException;
 
 
 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException;
 
 
 public void reset();
 
 | 
基本实践
下面即是一个CyclicBarrier的基本实践示例
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 
 | public class CyclicBarrierTest1 {
 
 private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
 
 @Test
 public void test1() throws InterruptedException {
 ExecutorService threadPool = Executors.newFixedThreadPool(10);
 
 Runnable initTask = () -> {
 info("---------------------------------");
 };
 
 CyclicBarrier cyclicBarrier = new CyclicBarrier(3, initTask);
 
 Stream.of("张三","李四","王二")
 .map( name -> new PlayGame(name, cyclicBarrier) )
 .forEach(playGame -> {
 threadPool.execute(playGame);
 } );
 
 
 try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
 info("Game Over");
 }
 
 
 
 
 
 private static void info(String msg) {
 String time = formatter.format(LocalTime.now());
 String threadName = Thread.currentThread().getName();
 String log = "["+time+"] "+ msg+" <"+threadName+">";
 System.out.println(log);
 }
 
 
 
 
 private static void doSomeWork() {
 try{
 Integer second = RandomUtils.nextInt(3,20);
 System.out.println("second: " + second);
 Thread.sleep( second * 1000 );
 }catch (Exception e) {
 System.out.println( "Happen Exception: " + e.getMessage());
 }
 }
 
 @AllArgsConstructor
 private static class PlayGame implements Runnable{
 
 private String name;
 
 private CyclicBarrier cyclicBarrier;
 
 @Override
 public void run() {
 
 doSomeWork();
 info(name + " 上线");
 
 try{
 cyclicBarrier.await();
 }catch (Exception e) {
 System.out.println( "Happen Exception: " + e);
 }
 
 info(name + " 选择角色 开始");
 
 doSomeWork();
 info(name + " 选择角色 结束");
 
 try{
 cyclicBarrier.await();
 }catch (Exception e) {
 System.out.println( "Happen Exception: " + e);
 }
 
 info(name + " 开始游戏");
 
 }
 }
 }
 
 | 
从测试结果可以看出,当用户 开始选择角色 或 开始游戏时,各线程是同时开始的。至此也可以看出其与CountDownLatch的显著区别,后者是一次性的,而前者CyclicBarrier则可以重复使用

基本原理
通过上面的代码示例,可以看到CyclicBarrier与CountDownLatch相比功能很类似。只不过前者可以重复使用,而后者则是一次性的。但二者在实现上却大相径庭,CountDownLatch是直接基于AQS实现的。而CyclicBarrier则是利用ReentrantLock、Condition进行实现的。具体地,当线程调用CyclicBarrier的await方法时,如果未达到指定数量时,则是通过Condition条件变量的await方法进行阻塞的;如果是最后一个线程则会通过Condition条件变量的signalAll方法来唤醒所有被阻塞的线程
与此同时,由于CyclicBarrier是可重复使用的。故每一轮结束后,其内部会通过nextGeneration方法生成所谓的下一代CyclicBarrier。本质上相当于重新实例化了一次CyclicBarrier
Note
- 在实际使用CyclicBarrier过程中,需要非常小心处理BrokenBarrierException异常。本文示例代码为了简便,故省略了异常处理过程。因为发生该异常说明栅栏被损坏了。推荐的处理措施有:一方面,调用CyclicBarrier的reset方法,来唤醒其他由于调用await方法而被阻塞的线程以避免一直被阻塞,同时将CyclicBarrier实例恢复至初始化状态;另一方面,推荐使用具有超时机制的await方法,以避免线程被永久性阻塞
参考文献
- Java并发编程之美 翟陆续、薛宾田著