0%

Java多线程之ReentrantLock可重入锁

这里就JUC包中的ReentrantLock可重入锁做相关介绍

abstract.jpeg

概述

Java在语言层面提供了synchronized锁,其在经历了一系列的锁优化过程后。目前来看性能已经是很优秀的了。那ReentrantLock作为synchronized锁的替代实现,是否就完全没有必要了呢?显然不是,因为其提供了比synchronized锁更灵活的控制方式及手段。这里首先说明ReentrantLock是一个可重入的互斥锁,其常用方法如下所示。可以看到,一方面,相比于synchronized锁的非公平性而言,ReentrantLock支持公平、非公平两种实现,默认为非公平锁;另一方面,ReentrantLock的加锁、解锁需要显式调用方法操作,进一步提高了控制的灵活性。实践过程中,推荐将unlock释放锁操作放在finally块中,以避免锁未被正确释放。值得一提的是,对于tryLock()方法而言,其是非阻塞的。当此时锁未被其他线程持有,则会直接分配给它。不论是否存在其他正在等待该锁的线程。即使当前这个可重入锁实例是公平的。换言之tryLock()方法会破坏公平的可重入锁的公平性。如果既期望使用非阻塞方式,又期望不破坏公平锁的公平性,可以使用它的超时机制版本,同时将超时时间设为0。即 tryLock(0, TimeUnit.SECONDS)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 创建一个非公平的可重入锁
public ReentrantLock();

// 创建一个非公平/公平的可重入锁
public ReentrantLock(boolean fair);

// 阻塞式获取锁
public void lock();

// 阻塞式获取锁
public void lockInterruptibly() throws InterruptedException;

// 非阻塞式获取锁, true: 获取锁成功; false: 获取锁失败
public boolean tryLock();

// 支持超时机制的非阻塞式获取锁, true: 获取锁成功; false: 获取锁失败
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException;

// 释放锁
public void unlock();

// 获取一个Condition实例
public Condition newCondition();

不仅如此, ReentrantLock还支持基于条件变量Condition的控制方式。具体地,可通过其newCondition方法获取一个Condition实例。对于Condition而言,其常见的方法如下所示。可以看到,其与synchronized中的wait/notify/notifyAll机制是类似的。只不过ReentrantLock支持同时操作多个条件变量Condition,实现对线程间协作进行更精细化的控制。需要注意的是,一方面,某线程通过条件变量A而进入Wait状态,则唤醒它也必须是通过条件变量A,而不能通过其他条件变量进行唤醒;另一方面,调用signal/signalAll方法只会唤醒在调用该方法前已经进入Wait状态的线程,而在这之后进入Wait状态的线程则不会被唤醒

1
2
3
4
5
6
7
8
// 释放锁并进入Wait状态	
void await() throws InterruptedException;

// 随机唤醒一个通过该条件变量而进入Wait状态的线程
void signal();

// 唤醒全部通过该条件变量而进入Wait状态的线程
void signalAll();

实践

可重入性

顾名思义,ReentrantLock锁是可重入的。现在我们验证下,并通过这个例子来对其基本用法进行实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class ReentrantLockTest1 {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

@Test
public void test1() {
ReentrantLock reentrantLock = new ReentrantLock();

ExecutorService threadPool = Executors.newFixedThreadPool(10);

Task task = new Task(reentrantLock);
for(int i=1; i<=3; i++) {
threadPool.execute( task );
}

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
System.out.println("---------------------- 系统下线 ----------------------");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}

@AllArgsConstructor
private static class Task implements Runnable{

private final ReentrantLock reentrantLock;

@Override
public void run() {
reentrantLock.lock();
info("成功获取锁 #1");
try{
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
methodA();
} catch (Exception e) {
System.out.println( "Happen Exception: " + e.getMessage());
} finally {
info("释放锁 #1\n");
reentrantLock.unlock();
}

}

private void methodA() {
reentrantLock.lock();
info("成功获取锁 #2");

// 获取锁的重入次数
int count = reentrantLock.getHoldCount();
info("count: " + count);

try{
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
} finally {
info("释放锁 #2");
reentrantLock.unlock();
}
}
}
}

测试结果如下所示,符合预期。可以看到其是一方面具有可重入性,另一方面也具有互斥性

figure 1.jpeg

Condition条件变量

这里通过生产者-消费者模型来展示如何通过Condition条件变量进行更好的控制。在这个例子中,我们有两个生产者、两个消费者。前者用于添加数据,后者则进行数据消费,具体地,分别是奇数、偶数的消费者。生产者每次生产完成后,根据队列头部元素的奇偶性通过相应的条件变量通知唤醒对应的消费者进行消费。而消费者每次消费后会通知所有生产者。实现如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
public class ReentrantLockTest2 {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

@Test
public void test1() {
MyQueue myQueue = new MyQueue();

Thread producer1 = new Thread( ()->{
for(int i=0; i<2; i++) {
int random = RandomUtils.nextInt(1,100);
//info("准备添加数据: "+random);
myQueue.add( random );
//info("数据添加结束");
}
}, "生产者1" );

Thread producer2 = new Thread( ()->{
for(int i=0; i<2; i++) {
int random = RandomUtils.nextInt(1,100);
myQueue.add( random );
}
}, "生产者2" );

Thread consumer1 = new Thread( ()->{
while (true) {
int result = myQueue.getEven();
info("result: " + result);
}
}, "奇数消费者" );

Thread consumer2 = new Thread( ()->{
while (true) {
int result = myQueue.getOdd();
info("result: " + result);
}
}, "偶数消费者" );


producer1.start();
producer2.start();
consumer1.start();
consumer2.start();


// 主线程等待所有任务执行完毕
try{ Thread.sleep( 60*1000 ); } catch (Exception e) {}
System.out.println("\n---------------------- 系统下线 ----------------------");
}

/**
* 打印信息
* @param msg
*/
public static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}

}

class MyQueue {

private ReentrantLock reentrantLock = new ReentrantLock();

/**
* 条件: 队列不为满, 用于生产者可以向队尾添加元素
*/
private Condition nonfullCondition = reentrantLock.newCondition();

/**
* 条件: 队头元素为奇数, 用于消费者可以从队头获取奇数
*/
private Condition evenNumCondition = reentrantLock.newCondition();

/**
* 条件: 队头元素为偶数, 用于消费者可以从队头获取偶数
*/
private Condition oddNumCondition = reentrantLock.newCondition();

private Queue<Integer> queue = new LinkedList<>();

/**
* 队列最大容量
*/
private Integer maxSize = 2;

/**
* 队列最小容量
*/
private Integer minSize = 0;

/**
* 向队列(尾部)添加数据
* @param element
*/
public void add(Integer element) {
Integer head = null;
reentrantLock.lock();
ReentrantLockTest2.info("获取到锁");
try{
while ( queue.size() >= maxSize ) {
ReentrantLockTest2.info("队列已满无法添加, 被阻塞, 释放锁");
// 队列为满, 进入Wait状态并释放锁
nonfullCondition.await();
}
// 生产,添加元素到队尾
queue.offer(element);
// 查看队头元素
head = queue.peek();
Boolean isEvenNumber = head%2==1;
if( isEvenNumber ) {
// 唤醒奇数消费者
evenNumCondition.signal();
} else {
// 唤醒偶数消费者
oddNumCondition.signal();
}
} catch (Exception e) {
System.out.println( "Happen Exception: " + e.getMessage());
} finally {
ReentrantLockTest2.info("添加操作结束, 释放锁, Head: " +head +", Add: "+element + " , Queue: " + queue);
reentrantLock.unlock();
}

}

/**
* 获取奇数
* @return
*/
public Integer getEven() {
Integer element = null;
reentrantLock.lock();
ReentrantLockTest2.info("获取到锁");
try{
while( !isEven() ) {
ReentrantLockTest2.info("队头元素非奇数, 被阻塞, 释放锁");
// 队头非奇数, 进入Wait状态并释放锁
evenNumCondition.await();
}
element = queue.poll();
// 唤醒所有生产者
nonfullCondition.signalAll();
} catch (Exception e) {
System.out.println( "Happen Exception: " + e.getMessage());
} finally {
ReentrantLockTest2.info("获取奇数操作结束, 释放锁, Element: "+element+", Queue: "+queue);
reentrantLock.unlock();
}
return element;
}

/**
* 获取偶数
* @return
*/
public Integer getOdd() {
Integer element = null;
reentrantLock.lock();
ReentrantLockTest2.info("获取到锁");
try{
while( !isOdd() ) {
ReentrantLockTest2.info("队头元素非偶数, 被阻塞, 释放锁");
// 队头非偶数, 进入Wait状态并释放锁
oddNumCondition.await();
}
element = queue.poll();
// 唤醒所有生产者
nonfullCondition.signalAll();
} catch (Exception e) {
System.out.println( "Happen Exception: " + e.getMessage());
} finally {
ReentrantLockTest2.info("获取偶数操作结束, 释放锁, Element: "+element+", Queue: "+queue);
reentrantLock.unlock();
}

return element;
}

/**
* 判断队头元素是否为奇数
* @return
*/
private boolean isEven() {
if( queue.size() <= minSize ) {
return false;
}
Integer num = queue.peek();
if(num%2 == 1) {
return true;
} else {
return false;
}
}

/**
* 判断队头元素是否为偶数
* @return
*/
private boolean isOdd() {
if( queue.size() <= minSize ) {
return false;
}
Integer num = queue.peek();
if(num%2 == 0) {
return true;
} else {
return false;
}
}

}

测试结果如下所示,符合预期

figure 2.jpeg

实现原理

构造器

ReentrantLock可重入锁的实现过程同样依赖于AQS,具体地,其是对AQS中互斥锁的使用。在构建ReentrantLock实例过程中,其通过sync变量持有AQS的实现类Sync。进一步地,按公平性与否可细分为NonfairSync、FairSync两种实现方式。后面我们还会看到,其通过AQS的state字段来记录当前线程获取锁的次数。例如当一个线程连续调用两次lock方法,则state字段即为2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ReentrantLock implements Lock, java.io.Serializable {

private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {
...
}

static final class NonfairSync extends Sync {
...
}

static final class FairSync extends Sync {
...
}

public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
}

lock方法

对于lock()方法,我们先来看下其在非公平版本下的实现。其首先会调用NonfairSync类的lock()方法,在该方法中,由于是非公平性的实现,故其会直接使用CAS尝试获取锁。如果失败,则进一步调用AQS的acquire方法。tryAcquire方法的返回值决定了当前线程是否需要进入AQS阻塞队列,如果返回true则说明当前线程获取锁成功,直接结束;反之则说明该线程需要被放入AQS阻塞队列当中。可以看到NonfairSync类实现了tryAcquire方法,具体则是通过调用Sync的nonfairTryAcquire方法完成。可以看到nonfairTryAcquire方法中,当前线程根据state是否为0、是否为锁重入等场景进行了加锁尝试,如果成功则直接返回true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class ReentrantLock implements Lock, java.io.Serializable {

public void lock() {
sync.lock();
}

abstract static class Sync extends AbstractQueuedSynchronizer {

abstract void lock();

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// AQS的state为0, 说明没有线程获取锁, 故这里直接通过CAS方式尝试获取锁
if (compareAndSetState(0, acquires)) {
// 获取成功, 则设置锁持有线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 当前线程即为锁持有线程, 即锁重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 则更新其获取锁的次数
setState(nextc);
return true;
}
return false;
}
}

static final class NonfairSync extends Sync {
final void lock() {
// 直接使用CAS尝试获取锁
if (compareAndSetState(0, 1))
// 获取成功, 则设置锁持有线程为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 获取失败, 则调用AQS的acquire方法
acquire(1);
}

// 非公平的尝试获取锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
}

...

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 需要子类去实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
}

而在公平版本的lock()实现就比较简单了。其首先调用FairSync类的lock方法,然后进一步调用AQS的acquire方法。类似地,FairSync类实现了AQS的tryAcquire方法。值得一提的是为了保障公平性,其在通过CAS方式尝试获取锁前,需要先调用hasQueuedPredecessors方法,该方法用于判断AQS队列中有无其他线程在排队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class ReentrantLock implements Lock, java.io.Serializable {

public void lock() {
sync.lock();
}

abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
}

static final class FairSync extends Sync {
final void lock() {
acquire(1);
}

// 公平的尝试获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// AQS的state为0, 说明没有线程获取锁
if (c == 0) {
// AQS的阻塞队列中如果没有其他线程排队, 才会通过CAS方式尝试获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 获取成功, 则设置锁持有线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 当前线程即为锁持有线程, 即锁重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 则更新其获取锁的次数
setState(nextc);
return true;
}
return false;
}
}
}

...

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 需要子类去实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
}

unlock方法

对于unlock()方法来说,基本原理类似。其首先调用AQS的release方法,并进一步调用tryRelease()方法。该方法子类进行实现,其返回值如果为true,则表示锁已经完全被释放,需要将AQS阻塞队列的线程唤醒。具体地,Sync类实现了tryRelease方法,其内部逻辑很简单,如果state字段减为0则返回true;反之,则返回false。因为ReentrantLock是可重入锁,线程可能需要调用多次unlock()方法才会将锁完全释放掉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ReentrantLock implements Lock, java.io.Serializable {

public void unlock() {
sync.release(1);
}

abstract static class Sync extends AbstractQueuedSynchronizer {

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// AQS的state为0, 说明该线程持有的锁完全释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
}

...

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// 需要子类去实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
}

Condition 条件变量

ReentrantLock可重入锁特别地还提供了对Condition条件变量的支持。具体地,则是通过AQS的内部类ConditionObject来实现的。每一个Condition实例都会关联一个条件队列,其是一个单向链表。ConditionObject中包含两个Node类型的指针,分别用于指向条件队列的队头、队尾。而内部类Node用于对线程进行包装,其nextWaiter字段在这里的用途是作为条件队列中当前Node节点指向后继Node节点的指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ReentrantLock implements Lock, java.io.Serializable {

public Condition newCondition() {
return sync.newCondition();
}

abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
}

...

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

public class ConditionObject implements Condition, java.io.Serializable {
// 条件队列的队头指针
private transient Node firstWaiter;

// 条件队列的队尾指针
private transient Node lastWaiter;

public ConditionObject() {}
}

static final class Node {
Node nextWaiter;
}
}

这里以await()来介绍如何实现线程的挂起阻塞,前面提到Condition条件变量实例关联了一个条件队列。故通过addConditionWaiter方法将当前线程包装为Node实例添加到条件队列的尾部。我们清楚线程调用await()方法必然是持有锁的,故该线程在被阻塞挂起前,需要完全释放掉其持有的锁。故调用AQS的fullyRelease方法将state置为0。当然还需要通过savedState来保存、记录下线程此前持有锁的次数,以便线程被唤醒后可以正确地进行加锁。此时由于isOnSyncQueue方法返回false,故其进行while循环。并进一步地利用LockSupport.park()方法实现将当前线程挂起阻塞。后续当其他线程将该Node从条件队列转移到AQS阻塞队列并唤醒后,由于isOnSyncQueue()将返回true,即会退出while循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程包装为Node后加入条件队列的尾部
Node node = addConditionWaiter();
// 获取该线程加锁次数, 然后释放当前线程持有锁
int savedState = fullyRelease(node);
int interruptMode = 0;

// 当线程被唤醒后, 由于其已经从条件队列转移到AQS阻塞队列
// 故isOnSyncQueue(node)将返回true, 即退出while循环

while (!isOnSyncQueue(node)) {
// 如果其不在AQS阻塞队列中, 则利用park方法将其进行阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

// 被唤醒后将进入AQS的阻塞队列,等待获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
}

final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取state值, 即持有锁线程的加锁次数
int savedState = getState();
// 完全释放锁, 使得state为0
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
}

这里以signal()方法说明如何实现唤醒线程。其内部是通过调用doSignal()来实现的。从条件队列的头部移出一个Node,并通过transferForSignal()方法将该Node从条件队列转移到AQS阻塞队列并唤醒该Node。如果transferForSignal()方法成功则本次唤醒结束。如果失败了则继续从条件队列中移出下一个Node并重复上述操作,直到条件队列为空为止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

public class ConditionObject implements Condition, java.io.Serializable {

public final void signal() {
// 保证调用signal()方法的线程必须是当前锁的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 条件队列不为空
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

}
}

参考文献

  1. Java并发编程之美 翟陆续、薛宾田著
请我喝杯咖啡捏~

欢迎关注我的微信公众号:青灯抽丝