这里就JUC包中的CyclicBarrier类做相关介绍
概述
JUC中的CyclicBarrier类是一个并发控制工具。其可以使线程在栅栏处进行等待。当指定数量的线程全部到达栅栏处后栅栏才会打开,从而使各线程结束阻塞继续向下执行。其主要方法如下所示,可以看到在线程全部到达栅栏时,还可以通过barrierAction参数设置准备打开栅栏前需执行的任务。其中,该任务由最后一个到达栅栏的线程负责执行。具体地,线程调用await方法实现告诉CyclicBarrier自己已经到达栅栏处,并阻塞等待栅栏打开
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public CyclicBarrier(int parties);
public CyclicBarrier(int parties, Runnable barrierAction);
public int await() throws InterruptedException, BrokenBarrierException;
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException;
public void reset();
|
基本实践
下面即是一个CyclicBarrier的基本实践示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| public class CyclicBarrierTest1 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
@Test public void test1() throws InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(10);
Runnable initTask = () -> { info("---------------------------------"); };
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, initTask);
Stream.of("张三","李四","王二") .map( name -> new PlayGame(name, cyclicBarrier) ) .forEach(playGame -> { threadPool.execute(playGame); } );
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {} info("Game Over"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String threadName = Thread.currentThread().getName(); String log = "["+time+"] "+ msg+" <"+threadName+">"; System.out.println(log); }
private static void doSomeWork() { try{ Integer second = RandomUtils.nextInt(3,20); System.out.println("second: " + second); Thread.sleep( second * 1000 ); }catch (Exception e) { System.out.println( "Happen Exception: " + e.getMessage()); } }
@AllArgsConstructor private static class PlayGame implements Runnable{
private String name;
private CyclicBarrier cyclicBarrier;
@Override public void run() { doSomeWork(); info(name + " 上线"); try{ cyclicBarrier.await(); }catch (Exception e) { System.out.println( "Happen Exception: " + e); }
info(name + " 选择角色 开始"); doSomeWork(); info(name + " 选择角色 结束"); try{ cyclicBarrier.await(); }catch (Exception e) { System.out.println( "Happen Exception: " + e); }
info(name + " 开始游戏");
} } }
|
从测试结果可以看出,当用户 开始选择角色 或 开始游戏时,各线程是同时开始的。至此也可以看出其与CountDownLatch的显著区别,后者是一次性的,而前者CyclicBarrier则可以重复使用
基本原理
通过上面的代码示例,可以看到CyclicBarrier与CountDownLatch相比功能很类似。只不过前者可以重复使用,而后者则是一次性的。但二者在实现上却大相径庭,CountDownLatch是直接基于AQS实现的。而CyclicBarrier则是利用ReentrantLock、Condition进行实现的。具体地,当线程调用CyclicBarrier的await方法时,如果未达到指定数量时,则是通过Condition条件变量的await方法进行阻塞的;如果是最后一个线程则会通过Condition条件变量的signalAll方法来唤醒所有被阻塞的线程
与此同时,由于CyclicBarrier是可重复使用的。故每一轮结束后,其内部会通过nextGeneration方法生成所谓的下一代CyclicBarrier。本质上相当于重新实例化了一次CyclicBarrier
Note
- 在实际使用CyclicBarrier过程中,需要非常小心处理BrokenBarrierException异常。本文示例代码为了简便,故省略了异常处理过程。因为发生该异常说明栅栏被损坏了。推荐的处理措施有:一方面,调用CyclicBarrier的reset方法,来唤醒其他由于调用await方法而被阻塞的线程以避免一直被阻塞,同时将CyclicBarrier实例恢复至初始化状态;另一方面,推荐使用具有超时机制的await方法,以避免线程被永久性阻塞
参考文献
- Java并发编程之美 翟陆续、薛宾田著