这里就JUC包中的CountDownLatch类做相关介绍
概述
JUC包中的CountDownLatch类是一个同步工具类,可实现线程间的通信。其典型方法如下所示
1 | // 创建一个指定计数器值的CountDownLatch实例 |
基本使用方法也很简单。首先创建一个指定计数器值的CountDownLatch实例,每当其他线程完成任务时就通过countDown方法将计数器值减1。这样当计数器的值为0时,之前由于调用await方法而被阻塞的线程就会结束等待,恢复执行
实践
CountDownLatch的典型应用场景,大体可分为两类:结束信号、开始信号
结束信号
主线程创建、启动N个异步任务,我们期望当这N个任务全部执行完毕结束后,主线程才可以继续往下执行。即将CountDownLatch作为任务的结束信号来使用。示例代码如下所示
1 | public class CountDownLatchTest1 { |
测试结果如下所示,符合预期
开始信号
主线程创建N个异步任务,但这N个任务不能立即开始执行。而需要等待某个共同的前置任务(比如初始化任务)完成后,才允许这N个任务开始执行。即将CountDownLatch作为任务的开始信号来使用。示例代码如下所示
1 | public class CountDownLatchTest2 { |
测试结果如下所示,符合预期
基本原理
构造器
CountDownLatch类实现过程同样依赖于AQS。在构建CountDownLatch实例过程时,一方面,通过sync变量持有AQS的实现类Sync;另一方面,通过AQS的state字段来存储计数器值
1 | public class CountDownLatch { |
await方法
首先来看CountDownLatch的await方法。其委托sync调用AQS的acquireSharedInterruptibly方法,从方法名也可以看到其是对AQS中共享锁的使用。并根据当前计数器的值是否为0,来判断该线程是继续执行还是应该被阻塞。可以看到事实上AQS只是定义了是否需要阻塞线程的tryAcquireShared方法,具体的规则需要CountDownLatch类来进行实现
1 | public class CountDownLatch { |
当tryAcquireShared方法结果小于0时,即当前计数器不为0时,AQS如何通过doAcquireSharedInterruptibly方法实现阻塞呢?结合相关源码可以看到,首先通过addWaiter方法将当前线程包装为一个node实例,并将其加入AQS队列。在入队过程中需要注意,如果队列为空则其并不是直接将该node实例加入队列。而是先构造一个哨兵节点来入队,然后在enq方法下一轮for循环才将该node实例加入队列
1 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { |
然后通过shouldParkAfterFailedAcquire方法修改前驱节点的waitStatus。如果前驱节点的waitStatus字段是初始值0的话,需在第一轮for循环中进入shouldParkAfterFailedAcquire方法时,通过compareAndSetWaitStatus(pred, ws, Node.SIGNAL)方法将前驱节点的waitStatus字段修改为Node.SIGNAL(即-1)。这样在开始下一轮for循环时,shouldParkAfterFailedAcquire方法即会返回true。进而执行parkAndCheckInterrupt方法,利用LockSupport.park完成线程阻塞
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
countDown方法
CountDownLatch的countDown方法类似。其同样是委托sync调用AQS的releaseShared方法。然后AQS执行tryReleaseShared方法,CountDownLatch类负责实现具体的规则逻辑。如果自减后当前计数器为0,则说明需要唤醒之前通过await方法而被阻塞的线程。然后通过AQS的doReleaseShared方法实现唤醒。具体地,其是从头节点的后继节点开始唤醒。因为前面已经说过,AQS队列的第一个节点(即头节点)只是一个哨兵节点
1 | public class CountDownLatch { |
这里补充说明下,当上文由于调用await方法而被阻塞的线程唤醒后,其会在doAcquireSharedInterruptibly方法的for循环中恢复执行。此时由于tryAcquireShared方法的返回值r大于0满足条件,故其进入setHeadAndPropagate方法。在该方法中,其将自身重新设置为AQS的头节点。并通过doReleaseShared方法继续唤醒它的后继节点。从而实现将AQS队列被阻塞的线程全部唤醒
1 | private void setHeadAndPropagate(Node node, int propagate) { |
Note
CountDownLatch的计数器值只能在创建实例时进行设置,之后不可以对其进行重新设置。换言之,CountDownLatch是一次性的,当其使用完毕后将无法再次利用
参考文献
- Java并发编程之美 翟陆续、薛宾田著