分布式锁(三):基于Redisson的分布式锁实践

Redisson是基于Redis的Java驻内存数据网格(In-Memory Data Grid),底层使用Netty进行实现。其提供了相应的分布式锁实现

abstract.png

RedissonLock 分布式非公平可重入互斥锁

RedissonLock是一个分布式非公平可重入互斥锁,其在获取锁的过程中,支持lock阻塞式、tryLock非阻塞式两种形式。其中,这两个方法还有多个重载版本,以支持设置锁的最大持有时间、设置获取锁的最大等待时间。具体方法如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 阻塞式获取锁
void lock();

# 阻塞式获取锁
# 支持通过leaseTime参数设置锁的最大持有时间
void lock(long leaseTime, TimeUnit unit);

# 非阻塞式获取锁
boolean tryLock();

# 非阻塞式获取锁
# 支持通过time参数设置获取锁的最大等待时间
boolean tryLock(long time, TimeUnit unit);

# 非阻塞式获取锁
# 支持通过waitTime参数设置获取锁的最大等待时间
# 支持通过leaseTime参数设置锁的最大持有时间
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);

可重入性

现在我们验证下RedissonLock是一个互斥锁并且支持可重入。示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* RedissonLock Demo : 分布式非公平可重入互斥锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonLockDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

private static RedissonClient redissonClient;

@BeforeClass
public static void init() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
redissonClient = Redisson.create( config );
}

/**
* 测试: 阻塞式获取锁、可重入性
*/
@Test
public void testLock1() {
final String lockName = "sellPc";
Runnable task = () -> {
// 设置分布式锁
RLock lock = redissonClient.getLock(lockName);
try{
// 阻塞式获取锁
lock.lock();
info("成功获取锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

lock.lock();
info("成功获取锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
} finally {
info("释放锁 #2");
lock.unlock();

info("释放锁 #1\n");
lock.unlock();
}
};

RLock tempLock = redissonClient.getLock(lockName);
if( tempLock instanceof RedissonLock) {
System.out.println("锁类型: RedissonLock");
}
try{ Thread.sleep( 1*1000 ); } catch (Exception e) {}

for (int i=1; i<=3; i++) {
threadPool.execute( task );
}

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
System.out.println("---------------------- 系统下线 ----------------------");
}


@AfterClass
public static void close() {
redissonClient.shutdown();
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
}

测试结果如下所示,符合预期

figure 1.jpeg

显式指定锁的最大持有时间

前面提到支持通过leaseTime参数显式设置锁的最大持有时间,当业务持锁时间超过leaseTime参数值,则其持有的锁会被自动释放。但需要注意的是某个线程的锁一旦被自动释放后,此时再调用unlock方法来释放锁时,即会抛出IllegalMonitorStateException异常。原因也很简单,因为此时线程实际上并未持有锁,示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* RedissonLock Demo : 分布式非公平可重入互斥锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonLockDemo {
...

/**
* 测试: 指定锁的最大持有时间
*/
@Test
public void testLock2() {
final String lockName = "sellBooK";
Runnable task = () -> {
// 设置分布式锁
RLock lock = redissonClient.getLock(lockName);
try{
// 阻塞式获取锁, 指定锁的最大持有时间为1秒
lock.lock(1, TimeUnit.SECONDS);
info("成功获取锁");
// 模拟业务耗时: 10s
Thread.sleep( 10 * 1000 );
} catch (Exception e) {
info("Happen Exception: " + e.getMessage());
} finally {
info("释放锁");
try {
lock.unlock();
} catch (IllegalMonitorStateException e) {
info("Happen Exception: " + e.getMessage());
}
}
};

for (int i=1; i<=5; i++) {
threadPool.execute( task );
}

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
System.out.println("---------------------- 系统下线 ----------------------");
}

...
}

测试结果如下所示,符合预期。可以看到每隔1秒后,由于线程持锁时间到期了。锁被自动释放了,进而使得下一个任务拿到了锁。并且由于每个任务的锁都是自动释放的,故每次调用unlock方法均会抛出异常

figure 2.jpeg

非阻塞式获取锁

下面展示如果通过tryLock方法进行非阻塞式获取锁,示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* RedissonLock Demo : 分布式非公平可重入互斥锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonLockDemo {
...

/**
* 测试: 非阻塞式获取锁
*/
@Test
public void testTryLock() {
final String lockName = "sellPhone";
Runnable task = () -> {
// 设置分布式锁
RLock lock = redissonClient.getLock(lockName);
boolean flag = false;
try{
// 非阻塞式获取锁
flag = lock.tryLock();
if( flag ) {
info("成功获取锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} else {
info("未获取到锁\n");
}
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
} finally {
if( flag ) {
info("释放锁\n");
lock.unlock();
}
}
};

for (int i=1; i<=5; i++) {
threadPool.execute( task );
}

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
System.out.println("---------------------- 系统下线 ----------------------");
}

...
}

测试结果如下所示,符合预期

figure 3.jpeg

Watch Dog看门狗机制

在介绍Redisson的Watch Dog看门狗机制之前,我们先来做个测试。如果某个线程一直持有锁执行业务逻辑,则锁是否会被自动释放呢?示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* RedissonLock Demo : 分布式非公平可重入互斥锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonLockDemo {
...

/**
* 测试: 看门狗机制
*/
@Test
public void testLock3() {
final String lockName = "sellPig";
Runnable task = () -> {
// 设置分布式锁
RLock lock = redissonClient.getLock(lockName);
try{
info("尝试获取锁");
// 阻塞式获取锁
lock.lock();
info("成功获取锁");
// 未显式指定锁的持有时间, 则看门狗会在断开连接前一直进行续期
while (true) {
}
} catch (Exception e) {
info("Happen Exception: " + e.getMessage());
} finally {
info("成功释放锁");
lock.unlock();
}
};

for (int i=1; i<5; i++) {
threadPool.execute( task );
}

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
info("---------------------- 系统下线 ----------------------");
}

...
}

测试结果如下所示,该锁被持有后,一直未被释放。其他任务都被阻塞住了

figure 4.jpeg

当我们调用不含leaseTime参数版本的lock()方法时,即未显式设置最大持锁时间。则其在RedissonLock类内部会将 特殊值-1 传给leaseTime参数。然后在tryAcquireAsync方法中会通过RedissonLock类的internalLockLeaseTime字段设置一个默认的最大持锁时间。最后通过RedissonLock构造器我们不难发现 internalLockLeaseTime 字段的值来自于Config类的lockWatchdogTimeout字段。其中lockWatchdogTimeout字段的默认值为30秒。换言之即使我们调用lock方法时,未显式设置最大持锁时间。但RedissonLock内部也会通过lockWatchdogTimeout字段给该锁设置一个最大持有时间,默认值为30秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class RedissonLock extends RedissonBaseLock {

protected long internalLockLeaseTime;

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
// internalLockLeaseTime 字段的值来自于Config类的lockWatchdogTimeout字段
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

/**
* 未显式设置最大持锁时间的lock方法
*/
@Override
public void lock() {
try {
// 则其会将 特殊值-1 传给leaseTime参数
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
...
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
...
}

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 未显式设置最大持锁时间 则会 通过 internalLockLeaseTime 字段设置一个默认的最大持锁时间
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 未显式设置最大持锁时间 则会启动一个定时任务用于进行自动续期
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
}

...

public class Config {

// 时间: 30秒
private long lockWatchdogTimeout = 30 * 1000;

public long getLockWatchdogTimeout() {
return lockWatchdogTimeout;
}

...
}

那问题来了,为啥在我们刚刚的测试代码中即使持锁时间超过了30秒,锁也没有被自动释放呢?原因就在于Redisson的看门狗机制。在RedissonLock类的tryAcquireAsync方法中,未显式设置最大持锁时间 则会启动一个定时任务用于进行自动续期。即RedissonLock类的tryAcquireAsync方法中会调用scheduleExpirationRenewal()以启动一个定时任务用于进行自动续期。具体的续期逻辑在RedissonBaseLock类的renewExpiration方法中,其中自动续期定时任务的执行周期 是RedissonBaseLock类的internalLockLeaseTime字段值的1/3。然后通过renewExpirationAsync方法每次利用Lua脚本向Redis发送续期命令,具体地。每次续期时会将RedissonBaseLock类的internalLockLeaseTime字段值设置为新的最大持锁时间。同样地,RedissonBaseLock类的 internalLockLeaseTime 字段值也是来自于Config类的lockWatchdogTimeout字段,即默认为30秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {

...

protected long internalLockLeaseTime;

public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
// internalLockLeaseTime 参数的值来自于Config类的lockWatchdogTimeout变量
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}

protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
// 自动续期
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}

private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}

Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}

// 通过Lua脚本向Redis发送续期命令
RFuture<Boolean> future = renewExpirationAsync(threadId);

future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}

if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
// 定时任务的执行周期 是 internalLockLeaseTime字段值的 1/3
// 即, 定时任务的执行周期 是 lockWatchdogTimeout字段值的 1/3
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

ee.setTimeout(task);
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
// 每次续期时会将internalLockLeaseTime字段值作为新的最大持锁时间
Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId));
}

...

}

至此我们应该比较清楚Redisson的看门狗机制了:

  1. Redisson的Watch Dog看门狗机制只会在未显式设置最大持锁时间才会生效。换言之,一旦调用lock方法时指定了leaseTime参数值,则该锁到期后即会自动释放。Redisson的Watch Dog看门狗不会对该锁进行自动续期
  2. 当我们未显式设置Config类的lockWatchdogTimeout字段值时,使用默认的30秒。此时如果加锁时未显式设置最大持锁时间,即Watch Dog看门狗机制会生效的场景中。该锁实际上一开始也会设置一个默认的最大持锁时间,即30秒。然后看门狗每隔10秒(30秒 * 1/3 = 10秒)会将该锁的最大持锁时间再次设置为30秒,以达到自动续期的目的。这样只要持锁线程的业务还未执行完,则该锁就一直有效、不会被自动释放。当然一旦持锁的服务实例发生宕机后,看门狗的定时任务自然也无法续期。这样锁到期后也就释放掉了,避免了死锁的发生

RedissonFairLock 分布式公平可重入互斥锁

由于RedissonLock是非公平的,故Redisson提供了一个分布式公平可重入互斥锁——RedissonFairLock。示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* RedissonFairLock Demo : 分布式公平可重入互斥锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonFairLockDemo {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

/**
* 测试: 可重入性
*/
@Test
public void test1() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
RedissonClient redissonClient = Redisson.create( config );

final String lockName = "sellWatch";
Runnable task = () -> {
// 设置分布式锁
RLock lock = redissonClient.getFairLock(lockName);
try{
// 阻塞式获取锁
lock.lock();
info("成功获取锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

lock.lock();
info("成功获取锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
} finally {
info("释放锁 #2");
lock.unlock();

info("释放锁 #1\n");
lock.unlock();
}
};

RLock tempLock = redissonClient.getFairLock(lockName);
if( tempLock instanceof RedissonFairLock ) {
System.out.println("锁类型: RedissonFairLock");
}
try{ Thread.sleep( 4*1000 ); } catch (Exception e) {}

for (int i=1; i<=3; i++) {
threadPool.execute( task );
}

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
redissonClient.shutdown();
System.out.println("---------------------- 系统下线 ----------------------");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
}

测试结果如下所示,符合预期

figure 5.jpeg

RReadWriteLock 分布式读写锁

读写测试

RReadWriteLock是一个分布式可重入读写锁。其中读锁为可重入的共享锁、写锁为可重入的互斥锁,且读写互斥。示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/**
* RReadWriteLock Demo : 分布式可重入读写锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RReadWriteLockDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

private static RedissonClient redissonClient;

@BeforeClass
public static void init() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
redissonClient = Redisson.create( config );
}

/**
* 测试: 读锁为共享锁, 读锁具有可重入性
*/
@Test
public void test1Read() {
System.out.println("\n---------------------- Test 1 : Read ----------------------");
String lockName = "sellCat";
for(int i=1; i<=3; i++) {
String taskName = "读任务#"+i;
Runnable task = new ReadTask(taskName, redissonClient, lockName);
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
}

/**
* 测试: 写锁为互斥锁, 写锁具有可重入性
*/
@Test
public void test2Write() {
System.out.println("\n---------------------- Test 2 : Write ----------------------");
String lockName = "sellDog";
for(int i=1; i<=3; i++) {
String taskName = "写任务#"+i;
Runnable task = new WriteTask(taskName, redissonClient, lockName);
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
}

/**
* 测试: 读写互斥
*/
@Test
public void test3ReadWrite() {
System.out.println("\n---------------------- Test 3 : Read Write ----------------------");
String lockName = "sellLion";
for(int i=1; i<=5; i++) {
Runnable task = null;
Boolean isReadTask = RandomUtils.nextBoolean();
if( isReadTask ) {
task = new ReadTask( "读任务#"+i, redissonClient, lockName);
} else {
task = new WriteTask( "写任务#"+i, redissonClient, lockName);
}
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
}

@AfterClass
public static void close() {
redissonClient.shutdown();
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}

/**
* 读任务
*/
private static class ReadTask implements Runnable {
private String taskName;

private RedissonReadLock readLock;

public ReadTask(String taskName, RedissonClient redissonClient, String lockName) {
this.taskName = taskName;
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName);
this.readLock = (RedissonReadLock) readWriteLock.readLock();
}

@Override
public void run() {
try{
readLock.lock();
info(taskName + ": 成功获取读锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

readLock.lock();
info(taskName + ": 成功获取读锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放读锁 #2");
readLock.unlock();

info(taskName + ": 释放读锁 #1");
readLock.unlock();
}
}
}

/**
* 写任务
*/
private static class WriteTask implements Runnable {
private String taskName;

private RedissonWriteLock writeLock;

public WriteTask(String taskName, RedissonClient redissonClient, String lockName) {
this.taskName = taskName;
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName);
this.writeLock = (RedissonWriteLock) readWriteLock.writeLock();
}

@Override
public void run() {
try{
writeLock.lock();
info(taskName + ": 成功获取写锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

writeLock.lock();
info(taskName + ": 成功获取写锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放写锁 #2");
writeLock.unlock();

info(taskName + ": 释放写锁 #1\n");
writeLock.unlock();
}
}
}
}

读锁测试结果如下所示,符合预期

figure 6.jpeg

写锁测试结果如下所示,符合预期

figure 7.jpeg

读写测试结果如下所示,符合预期

figure 8.jpeg

锁升级、锁降级

所谓锁升级指的是读锁升级为写锁。当一个线程先获取到读锁再去申请写锁,显然其是不支持的。理由也很简单,读锁是可以多个服务实例同时持有的。若其中一个服务实例此锁线程能够进行锁升级,成功获得写锁。显然与我们之前的所说的读写互斥相违背。因为其在获得写锁的同时,其他服务实例依然持有读锁;反之,其是支持锁降级的,即写锁降级为读锁。当一个服务实例的线程在获得写锁后,该线程依然可以获得读锁。这个时候当其释放写锁,则将只持有读锁,即完成了锁降级过程。锁降级的使用价值也很大,其一方面保证了安全,读锁在写锁释放前获取;另一方面保证了高效,因为读锁是共享的。

锁升级示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* RReadWriteLock Demo : 分布式可重入读写锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RReadWriteLockDemo {

...

/**
* 测试: 锁升级
*/
@Test
public void test4Read2Write() {
System.out.println("---------------------- Test 4 : Read -> Write ----------------------\n");
String lockName = "sellTiger";
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName);
RedissonReadLock readLock = (RedissonReadLock) readWriteLock.readLock();
RedissonWriteLock writeLock = (RedissonWriteLock) readWriteLock.writeLock();

try {
readLock.lock();
info("成功获取读锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

writeLock.lock();
info("成功获取写锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

readLock.unlock();
info("成功释放读锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

writeLock.unlock();
info("成功释放写锁");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
System.out.println("---------------------- 系统下线 ----------------------");
}
}

测试结果如下所示,在持有读锁的情况下,继续尝试获取写锁会被一直阻塞

锁降级示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* RReadWriteLock Demo : 分布式可重入读写锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RReadWriteLockDemo {

...

/**
* 测试: 锁降级
*/
@Test
public void test5Write2Read() {
System.out.println("---------------------- Test 2 : Write -> Read ----------------------");
String lockName = "sellChicken";
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName);
RedissonReadLock readLock = (RedissonReadLock) readWriteLock.readLock();
RedissonWriteLock writeLock = (RedissonWriteLock) readWriteLock.writeLock();

try {
writeLock.lock();
info("成功获取写锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

readLock.lock();
info("成功获取读锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

writeLock.unlock();
info("成功释放写锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

readLock.unlock();
info("成功释放读锁");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
System.out.println("---------------------- 系统下线 ----------------------");
}
}

测试结果如下所示,符合预期

figure 9.jpeg

RedissonSpinLock 分布式非公平可重入自旋互斥锁

RedissonSpinLock则是一个分布式非公平可重入自旋互斥锁。示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* RedissonSpinLock Demo : 分布式非公平可重入自旋互斥锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonSpinLockDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

/**
* 测试: 可重入性、互斥锁
*/
@Test
public void test1() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
RedissonClient redissonClient = Redisson.create( config );

final String lockName = "sellKeyword";
Runnable task = () -> {
// 设置分布式锁
RLock lock = redissonClient.getSpinLock(lockName);
try{
// 阻塞式获取锁
lock.lock();
info("成功获取锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

lock.lock();
info("成功获取锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
} finally {
info("释放锁 #2");
lock.unlock();

info("释放锁 #1\n");
lock.unlock();
}
};

RLock tempLock = redissonClient.getSpinLock(lockName);
if( tempLock instanceof RedissonSpinLock) {
System.out.println("锁类型: RedissonSpinLock");
}
try{ Thread.sleep( 2*1000 ); } catch (Exception e) {}

for (int i=1; i<=3; i++) {
threadPool.execute( task );
}

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
redissonClient.shutdown();
System.out.println("---------------------- 系统下线 ----------------------");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
}

测试结果如下所示,符合预期

figure 10.jpeg

RedissonCountDownLatch 分布式闩锁

RedissonCountDownLatch是一个分布式的CountDownLatch闩锁。比如我们的业务系统会依赖很多其他基础服务,这样在业务系统启动过程中,需要等待其他基础服务全部启动完毕。示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
* RedissonCountDownLatch Demo : 分布式闩锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonCountDownLatchDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

private static RedissonClient redissonClient;

@BeforeClass
public static void init() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
redissonClient = Redisson.create( config );
}

@Test
public void test1() throws InterruptedException {
final String countDownLatchName = "systemInit";
int count = 5;

for (int i=1; i<=count; i++) {
String serviceName = "基础服务 #"+i;
BasicService basicService = new BasicService(serviceName, redissonClient, countDownLatchName, count);
threadPool.execute( basicService );
}

RedissonCountDownLatch countDownLatch = (RedissonCountDownLatch) redissonClient.getCountDownLatch(countDownLatchName);
countDownLatch.trySetCount(count);
// 阻塞等待基础服务全部启动完成
countDownLatch.await();

info("系统初始化已完成, 业务系统启动 ...");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}

/**
* 基础服务
*/
private static class BasicService implements Runnable {
private String serviceName;

private RedissonCountDownLatch countDownLatch;

public BasicService(String serviceName, RedissonClient redissonClient, String countDownLatchName, Integer count) {
this.serviceName = serviceName;
this.countDownLatch = (RedissonCountDownLatch) redissonClient.getCountDownLatch(countDownLatchName);
this.countDownLatch.trySetCount( count );
}

@Override
public void run() {
try{
info(serviceName + ": 启动中");
// 模拟 基础服务启动 耗时
Thread.sleep( RandomUtils.nextLong(1, 5) * 1000 );
countDownLatch.countDown();
info(serviceName + ": 启动完成");
} catch (Exception e) {
System.out.println( serviceName + ": Happen Exception: " + e.getMessage());
}
}
}
}

测试结果如下所示,符合预期

figure 11.jpeg

RedissonSemaphore 分布式信号量

RedissonSemaphore是一个分布式的信号量,示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* RedissonSemaphore Demo : 分布式信号量
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonSemaphoreDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

@Test
public void test1() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
RedissonClient redissonClient = Redisson.create( config );

final String lockName = "sellAnimal";
// 系统最大并发处理量
int maxLimit = 3;
IntStream.rangeClosed(1,8)
.mapToObj( num -> new UserReq("用户#"+num, redissonClient, lockName, maxLimit) )
.forEach( threadPool::execute );

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
redissonClient.shutdown();
System.out.println("---------------------- 系统下线 ----------------------");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}

private static class UserReq implements Runnable {
private String name;

private RedissonSemaphore semaphore;

public UserReq(String name, RedissonClient redissonClient, String lockName, Integer maxLimit) {
this.name = name;
this.semaphore = (RedissonSemaphore) redissonClient.getSemaphore(lockName);
// 设置信号量的许可数
semaphore.trySetPermits( maxLimit );
}

@Override
public void run() {
try {
// 模拟用户不定时发起请求
Thread.sleep(RandomUtils.nextLong(500, 2000));
info( name + ": 发起请求" );

// 阻塞等待,直到获取许可
semaphore.acquire();
info(name + ": 系统开始处理请求");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextInt(5, 20)*1000);

// 用户请求处理完毕,释放许可
semaphore.release();
info(name + ": 系统处理完毕");
}catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
}
}
}

测试结果如下所示,符合预期

figure 12.jpeg

RedissonPermitExpirableSemaphore 分布式支持有效期的信号量

相比较于RedissonSemaphore而言,RedissonPermitExpirableSemaphore在获取许可的acquire方法中,增加了一个支持leaseTime参数的重载版本。以实现指定许可的最大持有时间。一旦业务持许可时间超过leaseTime参数值,则其持有的许可会被自动释放。但需要注意的是某个线程的许可一旦被自动释放后,此时再调用release方法来释放许可时,即会抛出异常。原因也很简单,因为此时线程实际上并未持有许可,示例代码如下所示。示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* RedissonPermitExpirableSemaphore Demo : 分布式支持有效期的信号量
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonPermitExpirableSemaphoreDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

@Test
public void test1() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
RedissonClient redissonClient = Redisson.create( config );

final String lockName = "sellMilk";
// 系统最大并发处理量
int maxLimit = 3;
IntStream.rangeClosed(1,8)
.mapToObj( num -> new UserReq("用户 #"+num, redissonClient, lockName, maxLimit) )
.forEach( threadPool::execute );

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
redissonClient.shutdown();
System.out.println("---------------------- 系统下线 ----------------------");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}

private static class UserReq implements Runnable {
private String name;

private RedissonPermitExpirableSemaphore semaphore;

public UserReq(String name, RedissonClient redissonClient, String lockName, Integer maxLimit) {
this.name = name;
this.semaphore = (RedissonPermitExpirableSemaphore) redissonClient.getPermitExpirableSemaphore(lockName);
// 设置信号量的许可数
semaphore.trySetPermits( maxLimit );
}

@Override
public void run() {
try {
// 模拟用户不定时发起请求
if( !name.equals("用户 #1") ) {
Thread.sleep( RandomUtils.nextLong(1000, 2000) );
}
info( name + ": 发起请求" );

// 阻塞等待直到获取许可, 指定信号量的最大持有时间为2秒
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
info(name + ": 系统开始处理请求");

// 模拟业务耗时
if( name.equals("用户 #1") ) {
Thread.sleep( 5 * 1000 );
} else {
Thread.sleep(RandomUtils.nextInt(500, 1000));
}

// 用户请求处理完毕,释放许可
semaphore.release(permitId);
info(name + ": 系统处理完毕");
}catch (Exception e) {
info( name + ": Happen Exception: " + e.getCause().getMessage());
}
}
}
}

测试结果如下所示,符合预期

figure 13.jpeg

0%