这里就JUC包中的ReentrantLock可重入锁做相关介绍
概述
Java在语言层面提供了synchronized锁,其在经历了一系列的锁优化过程后。目前来看性能已经是很优秀的了。那ReentrantLock作为synchronized锁的替代实现,是否就完全没有必要了呢?显然不是,因为其提供了比synchronized锁更灵活的控制方式及手段。这里首先说明ReentrantLock是一个可重入的互斥锁,其常用方法如下所示。可以看到,一方面,相比于synchronized锁的非公平性而言,ReentrantLock支持公平、非公平两种实现,默认为非公平锁;另一方面,ReentrantLock的加锁、解锁需要显式调用方法操作,进一步提高了控制的灵活性。实践过程中,推荐将unlock释放锁操作放在finally块中,以避免锁未被正确释放。值得一提的是,对于tryLock()方法而言,其是非阻塞的。当此时锁未被其他线程持有,则会直接分配给它。不论是否存在其他正在等待该锁的线程。即使当前这个可重入锁实例是公平的。换言之tryLock()方法会破坏公平的可重入锁的公平性。如果既期望使用非阻塞方式,又期望不破坏公平锁的公平性,可以使用它的超时机制版本,同时将超时时间设为0。即 tryLock(0, TimeUnit.SECONDS)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public ReentrantLock();
public ReentrantLock(boolean fair);
public void lock();
public void lockInterruptibly() throws InterruptedException;
public boolean tryLock();
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException;
public void unlock();
public Condition newCondition();
|
不仅如此, ReentrantLock还支持基于条件变量Condition的控制方式。具体地,可通过其newCondition方法获取一个Condition实例。对于Condition而言,其常见的方法如下所示。可以看到,其与synchronized中的wait/notify/notifyAll机制是类似的。只不过ReentrantLock支持同时操作多个条件变量Condition,实现对线程间协作进行更精细化的控制。需要注意的是,一方面,某线程通过条件变量A而进入Wait状态,则唤醒它也必须是通过条件变量A,而不能通过其他条件变量进行唤醒;另一方面,调用signal/signalAll方法只会唤醒在调用该方法前已经进入Wait状态的线程,而在这之后进入Wait状态的线程则不会被唤醒
1 2 3 4 5 6 7 8
| void await() throws InterruptedException;
void signal();
void signalAll();
|
实践
可重入性
顾名思义,ReentrantLock锁是可重入的。现在我们验证下,并通过这个例子来对其基本用法进行实践
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| public class ReentrantLockTest1 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
@Test public void test1() { ReentrantLock reentrantLock = new ReentrantLock();
ExecutorService threadPool = Executors.newFixedThreadPool(10);
Task task = new Task(reentrantLock); for(int i=1; i<=3; i++) { threadPool.execute( task ); }
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {} System.out.println("---------------------- 系统下线 ----------------------"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); }
@AllArgsConstructor private static class Task implements Runnable{
private final ReentrantLock reentrantLock;
@Override public void run() { reentrantLock.lock(); info("成功获取锁 #1"); try{ Thread.sleep(RandomUtils.nextLong(100, 500)); methodA(); } catch (Exception e) { System.out.println( "Happen Exception: " + e.getMessage()); } finally { info("释放锁 #1\n"); reentrantLock.unlock(); }
}
private void methodA() { reentrantLock.lock(); info("成功获取锁 #2");
int count = reentrantLock.getHoldCount(); info("count: " + count);
try{ Thread.sleep(RandomUtils.nextLong(100, 500)); } catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); } finally { info("释放锁 #2"); reentrantLock.unlock(); } } } }
|
测试结果如下所示,符合预期。可以看到其是一方面具有可重入性,另一方面也具有互斥性
Condition条件变量
这里通过生产者-消费者模型来展示如何通过Condition条件变量进行更好的控制。在这个例子中,我们有两个生产者、两个消费者。前者用于添加数据,后者则进行数据消费,具体地,分别是奇数、偶数的消费者。生产者每次生产完成后,根据队列头部元素的奇偶性通过相应的条件变量通知唤醒对应的消费者进行消费。而消费者每次消费后会通知所有生产者。实现如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
| public class ReentrantLockTest2 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
@Test public void test1() { MyQueue myQueue = new MyQueue();
Thread producer1 = new Thread( ()->{ for(int i=0; i<2; i++) { int random = RandomUtils.nextInt(1,100); myQueue.add( random ); } }, "生产者1" );
Thread producer2 = new Thread( ()->{ for(int i=0; i<2; i++) { int random = RandomUtils.nextInt(1,100); myQueue.add( random ); } }, "生产者2" );
Thread consumer1 = new Thread( ()->{ while (true) { int result = myQueue.getEven(); info("result: " + result); } }, "奇数消费者" );
Thread consumer2 = new Thread( ()->{ while (true) { int result = myQueue.getOdd(); info("result: " + result); } }, "偶数消费者" );
producer1.start(); producer2.start(); consumer1.start(); consumer2.start();
try{ Thread.sleep( 60*1000 ); } catch (Exception e) {} System.out.println("\n---------------------- 系统下线 ----------------------"); }
public static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); }
}
class MyQueue {
private ReentrantLock reentrantLock = new ReentrantLock();
private Condition nonfullCondition = reentrantLock.newCondition();
private Condition evenNumCondition = reentrantLock.newCondition();
private Condition oddNumCondition = reentrantLock.newCondition();
private Queue<Integer> queue = new LinkedList<>();
private Integer maxSize = 2;
private Integer minSize = 0;
public void add(Integer element) { Integer head = null; reentrantLock.lock(); ReentrantLockTest2.info("获取到锁"); try{ while ( queue.size() >= maxSize ) { ReentrantLockTest2.info("队列已满无法添加, 被阻塞, 释放锁"); nonfullCondition.await(); } queue.offer(element); head = queue.peek(); Boolean isEvenNumber = head%2==1; if( isEvenNumber ) { evenNumCondition.signal(); } else { oddNumCondition.signal(); } } catch (Exception e) { System.out.println( "Happen Exception: " + e.getMessage()); } finally { ReentrantLockTest2.info("添加操作结束, 释放锁, Head: " +head +", Add: "+element + " , Queue: " + queue); reentrantLock.unlock(); }
}
public Integer getEven() { Integer element = null; reentrantLock.lock(); ReentrantLockTest2.info("获取到锁"); try{ while( !isEven() ) { ReentrantLockTest2.info("队头元素非奇数, 被阻塞, 释放锁"); evenNumCondition.await(); } element = queue.poll(); nonfullCondition.signalAll(); } catch (Exception e) { System.out.println( "Happen Exception: " + e.getMessage()); } finally { ReentrantLockTest2.info("获取奇数操作结束, 释放锁, Element: "+element+", Queue: "+queue); reentrantLock.unlock(); } return element; }
public Integer getOdd() { Integer element = null; reentrantLock.lock(); ReentrantLockTest2.info("获取到锁"); try{ while( !isOdd() ) { ReentrantLockTest2.info("队头元素非偶数, 被阻塞, 释放锁"); oddNumCondition.await(); } element = queue.poll(); nonfullCondition.signalAll(); } catch (Exception e) { System.out.println( "Happen Exception: " + e.getMessage()); } finally { ReentrantLockTest2.info("获取偶数操作结束, 释放锁, Element: "+element+", Queue: "+queue); reentrantLock.unlock(); }
return element; }
private boolean isEven() { if( queue.size() <= minSize ) { return false; } Integer num = queue.peek(); if(num%2 == 1) { return true; } else { return false; } }
private boolean isOdd() { if( queue.size() <= minSize ) { return false; } Integer num = queue.peek(); if(num%2 == 0) { return true; } else { return false; } }
}
|
测试结果如下所示,符合预期
实现原理
构造器
ReentrantLock可重入锁的实现过程同样依赖于AQS,具体地,其是对AQS中互斥锁的使用。在构建ReentrantLock实例过程中,其通过sync变量持有AQS的实现类Sync。进一步地,按公平性与否可细分为NonfairSync、FairSync两种实现方式。后面我们还会看到,其通过AQS的state字段来记录当前线程获取锁的次数。例如当一个线程连续调用两次lock方法,则state字段即为2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer { ... }
static final class NonfairSync extends Sync { ... }
static final class FairSync extends Sync { ... }
public ReentrantLock() { sync = new NonfairSync(); }
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } }
|
lock方法
对于lock()方法,我们先来看下其在非公平版本下的实现。其首先会调用NonfairSync类的lock()方法,在该方法中,由于是非公平性的实现,故其会直接使用CAS尝试获取锁。如果失败,则进一步调用AQS的acquire方法。tryAcquire方法的返回值决定了当前线程是否需要进入AQS阻塞队列,如果返回true则说明当前线程获取锁成功,直接结束;反之则说明该线程需要被放入AQS阻塞队列当中。可以看到NonfairSync类实现了tryAcquire方法,具体则是通过调用Sync的nonfairTryAcquire方法完成。可以看到nonfairTryAcquire方法中,当前线程根据state是否为0、是否为锁重入等场景进行了加锁尝试,如果成功则直接返回true
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| public class ReentrantLock implements Lock, java.io.Serializable {
public void lock() { sync.lock(); }
abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock();
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } }
...
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } }
|
而在公平版本的lock()实现就比较简单了。其首先调用FairSync类的lock方法,然后进一步调用AQS的acquire方法。类似地,FairSync类实现了AQS的tryAcquire方法。值得一提的是为了保障公平性,其在通过CAS方式尝试获取锁前,需要先调用hasQueuedPredecessors方法,该方法用于判断AQS队列中有无其他线程在排队
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public class ReentrantLock implements Lock, java.io.Serializable {
public void lock() { sync.lock(); }
abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock(); }
static final class FairSync extends Sync { final void lock() { acquire(1); }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } }
...
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } }
|
unlock方法
对于unlock()方法来说,基本原理类似。其首先调用AQS的release方法,并进一步调用tryRelease()方法。该方法子类进行实现,其返回值如果为true,则表示锁已经完全被释放,需要将AQS阻塞队列的线程唤醒。具体地,Sync类实现了tryRelease方法,其内部逻辑很简单,如果state字段减为0则返回true;反之,则返回false。因为ReentrantLock是可重入锁,线程可能需要调用多次unlock()方法才会将锁完全释放掉
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class ReentrantLock implements Lock, java.io.Serializable {
public void unlock() { sync.release(1); }
abstract static class Sync extends AbstractQueuedSynchronizer { protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } } }
...
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } }
|
Condition 条件变量
ReentrantLock可重入锁特别地还提供了对Condition条件变量的支持。具体地,则是通过AQS的内部类ConditionObject来实现的。每一个Condition实例都会关联一个条件队列,其是一个单向链表。ConditionObject中包含两个Node类型的指针,分别用于指向条件队列的队头、队尾。而内部类Node用于对线程进行包装,其nextWaiter字段在这里的用途是作为条件队列中当前Node节点指向后继Node节点的指针
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class ReentrantLock implements Lock, java.io.Serializable {
public Condition newCondition() { return sync.newCondition(); }
abstract static class Sync extends AbstractQueuedSynchronizer { final ConditionObject newCondition() { return new ConditionObject(); } } }
...
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { private transient Node firstWaiter; private transient Node lastWaiter;
public ConditionObject() {} }
static final class Node { Node nextWaiter; } }
|
这里以await()来介绍如何实现线程的挂起阻塞,前面提到Condition条件变量实例关联了一个条件队列。故通过addConditionWaiter方法将当前线程包装为Node实例添加到条件队列的尾部。我们清楚线程调用await()方法必然是持有锁的,故该线程在被阻塞挂起前,需要完全释放掉其持有的锁。故调用AQS的fullyRelease方法将state置为0。当然还需要通过savedState来保存、记录下线程此前持有锁的次数,以便线程被唤醒后可以正确地进行加锁。此时由于isOnSyncQueue方法返回false,故其进行while循环。并进一步地利用LockSupport.park()方法实现将当前线程挂起阻塞。后续当其他线程将该Node从条件队列转移到AQS阻塞队列并唤醒后,由于isOnSyncQueue()将返回true,即会退出while循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0;
while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } } final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } }
|
这里以signal()方法说明如何实现唤醒线程。其内部是通过调用doSignal()来实现的。从条件队列的头部移出一个Node,并通过transferForSignal()方法将该Node从条件队列转移到AQS阻塞队列并唤醒该Node。如果transferForSignal()方法成功则本次唤醒结束。如果失败了则继续从条件队列中移出下一个Node并重复上述操作,直到条件队列为空为止
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
} }
|
参考文献
- Java并发编程之美 翟陆续、薛宾田著