分布式锁(二):基于ZooKeeper Curator的分布式锁实践

本文基于ZooKeeper Curator进行分布式锁的实践

abstract.png

搭建ZooKeeper环境

这里基于Docker搭建ZooKeeper环境

1
2
3
4
5
6
7
# 拉取 ZooKeeper 镜像
docker pull zookeeper:3.4

# 创建 ZooKeeper 容器
docker run -p 2181:2181 -d \
--name ZooKeeper-Service-2 \
zookeeper:3.4

POM依赖

Curator,作为Netflix开源的ZooKeeper客户端框架,大大简化了我们操作、使用ZooKeeper的难度,并且提供了非常丰富的基于链式调用的API。故这里首先在POM中引入Curator依赖,其中我们需要在Curator依赖中排除ZooKeeper依赖,然后单独引入与服务端版本一致的ZooKeeper依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependencies>

<!-- ZooKeeper Client: Curator -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
<exclusions>
<exclusion>
<!--排除自带ZooKeeper依赖-->
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Zookeeper依赖版本需与服务端保持一致 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.0</version>
</dependency>

</dependencies>

Curator基本实践

创建节点

众所周知,在ZooKeeper中节点支持两种类型:持久/临时、有序/无序。即两两组合则共计四种节点。其中临时节点会在客户端连接断开后自动被删除,而持久节点则不会;有序节点在创建过程中则会被分配一个唯一的单调递增的序号,并将序号追加在节点名称中,而无序节点则不会。下面即是一个基于Curator创建节点的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* ZooKeeper Curator 基本实践
* @author Aaron Zhu
* @date 2022-03-28
*/
public class Basic {
public static void main(String[] args) throws Exception {
// 创建客户端
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- 系统上线 ----------------------");

/******************************* 创建节点 *******************************/
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( PERSISTENT ) // 节点类型: 持久节点
.forPath("/AaronTest/nodeA", "Hello, Node A".getBytes());

zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( PERSISTENT_SEQUENTIAL ) // 节点类型: 持久有序节点
.forPath("/AaronTest/nodeB", "Hello, Node B".getBytes());

zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( EPHEMERAL ) // 节点类型: 临时节点
.forPath("/AaronTest/nodeC", "Hello, Node C".getBytes());

zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( EPHEMERAL_SEQUENTIAL ) // 节点类型: 临时有序节点
.forPath("/AaronTest/nodeD", "Hello, Node D".getBytes());

List<String> childrenNodeNameList = zkClient.getChildren()
.forPath("/AaronTest");

for(String childrenNodeName : childrenNodeNameList) {
// 获取节点的状态信息、数据信息
Stat stat = new Stat();
byte[] bytes = zkClient.getData()
.storingStatIn(stat)
.forPath( "/AaronTest/" + childrenNodeName );
String data = new String( bytes );

System.out.println("--------------------------------");
System.out.println("childrenNodeName: " + childrenNodeName);
System.out.println("data: " + data);
System.out.println("stat: " + stat);
}

// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}
}

测试结果如下所示,符合预期

figure 1.jpeg

Watcher机制

ZooKeeper通过引入Watched机制实现发布/订阅功能,但原生的Watcher机制一旦触发一次后就会失效。如果期望一直监听,则必须每次重复注册Watcher,使用起来较为繁琐。为此Curator对其进行了优化,实现了自动注册,以便进行重复监听。具体地,Curator中提供了三种监听器:NodeCache、PathChildrenCache、TreeCache。其中,NodeCache只可监听指定路径所在节点的创建、修改、删除;PathChildrenCache只可监听指定路径下的第一级子节点的创建、修改、删除,无法监听指定路径所在节点的事件,无法监听指定路径的子节点的子节点的事件;TreeCache可监听指定路径所在节点的创建、修改、删除,可监听指定路径下的所有各级子节点的创建、修改、删除

NodeCache实践

下面即是一个基于Curator实践NodeCache的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* NodeCache Demo
* @author Aaron Zhu
* @date 2022-03-29
*/
public class NodeCacheDemo {
public static void main(String[] args) throws Exception {
// 创建客户端
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- 系统上线 ----------------------");

// 只可监听指定路径所在节点的创建、修改、删除
String node = "/Aaron/Bob";
NodeCache nodeCache = new NodeCache(zkClient, node);
nodeCache.start();
nodeCache.getListenable()
.addListener( () -> {
System.out.print("监听当前节点的事件");
ChildData currentNode = nodeCache.getCurrentData();
if( currentNode==null ) {
System.out.println(": 当前节点已被删除");
} else {
getNodeInfo( currentNode );
}
});

System.out.println("Test 1: 创建当前节点");
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( PERSISTENT ) // 节点类型: 持久节点
.forPath( node, "Good Morning".getBytes() );

Thread.sleep(2000);

System.out.println("Test 2: 修改当前节点的数据");
zkClient.setData()
.forPath( node, "Good Night".getBytes() );
Thread.sleep(2000);

System.out.println("Test 3: 删除当前节点");
zkClient.delete()
.forPath(node);
Thread.sleep(2000);

// 主线程等待执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}

private static void getNodeInfo(ChildData currentNode) {
String info = ", Current Data Info: \n"
+ "path: " + currentNode.getPath()
+ ", data: " + new String(currentNode.getData())
+ ", stat="+currentNode.getStat();
System.out.println(info);
}
}

测试结果如下所示,符合预期

figure 2.jpeg

PathChildrenCache实践

下面即是一个基于Curator实践PathChildrenCache的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* PathChildrenCache Demo
* @author Aaron Zhu
* @date 2022-03-29
*/
public class PathChildrenCacheDemo {
public static void main(String[] args) throws Exception {
// 创建客户端
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- 系统上线 ----------------------");

// 只可监听指定路径下的第一级子节点的创建、修改、删除
// 无法监听指定路径所在节点的事件
// 无法监听指定路径的子节点的子节点的事件
String parentPath = "/Aaron/Tony";
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, parentPath, true);
pathChildrenCache.start();
pathChildrenCache.getListenable()
.addListener( (client, event) -> {
PathChildrenCacheEvent.Type eventType = event.getType();
ChildData currentNode = event.getData();
if( PathChildrenCacheEvent.Type.CHILD_ADDED.equals( eventType ) ) {
System.out.println("添加 子节点, Current Data Info:");
getNodeInfo(currentNode);
} else if( PathChildrenCacheEvent.Type.CHILD_UPDATED.equals( eventType ) ) {
System.out.println("修改 子节点, Current Data Info:");
getNodeInfo(currentNode);
} else if( PathChildrenCacheEvent.Type.CHILD_REMOVED.equals( eventType ) ) {
System.out.println("删除 子节点, Current Data Info:");
getNodeInfo(currentNode);
}
});

System.out.println("Test 1: 创建子节点");
String childNode1 = parentPath + "/Lucy";
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( PERSISTENT_SEQUENTIAL ) // 节点类型: 持久有序节点
.forPath( childNode1, "I'm a Dog".getBytes() );
Thread.sleep(2000);

System.out.println("Test 2: 创建子节点");
String childNode2 = parentPath + "/Tony";
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( PERSISTENT ) // 节点类型: 持久节点
.forPath( childNode2, "Good Morning".getBytes() );
Thread.sleep(2000);

System.out.println("Test 3: 修改子节点的数据");
zkClient.setData()
.forPath( childNode2, "Good Night".getBytes() );
Thread.sleep(2000);

System.out.println("Test 4: 删除子节点");
zkClient.delete()
.forPath(childNode2);
Thread.sleep(2000);

// 主线程等待执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}

private static void getNodeInfo(ChildData currentNode) {
String info = "path: " + currentNode.getPath()
+ ", data: " + new String(currentNode.getData())
+ ", stat="+currentNode.getStat();
System.out.println(info);
}
}

测试结果如下所示,符合预期

figure 3.jpeg

TreeCache实践

下面即是一个基于Curator实践TreeCache的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* TreeCache Demo
* @author Aaron Zhu
* @date 2022-03-29
*/
public class TreeCacheDemo {
public static void main(String[] args) throws Exception {
// 创建客户端
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- 系统上线 ----------------------");

// 可监听指定路径所在节点的创建、修改、删除
// 可监听指定路径下的所有各级子节点的创建、修改、删除
String parentPath = "/Aaron/Luca";
TreeCache treeCache = new TreeCache(zkClient, parentPath);
treeCache.start();
treeCache.getListenable()
.addListener( (client, event) -> {
TreeCacheEvent.Type eventType = event.getType();
ChildData currentNode = event.getData();
if( TreeCacheEvent.Type.NODE_ADDED.equals( eventType ) ) {
System.out.println("添加 节点, Current Data Info:");
getNodeInfo(currentNode);
} else if( TreeCacheEvent.Type.NODE_UPDATED.equals( eventType ) ) {
System.out.println("修改 节点, Current Data Info:");
getNodeInfo(currentNode);
} else if( TreeCacheEvent.Type.NODE_REMOVED.equals( eventType ) ) {
System.out.println("删除 节点, Current Data Info:");
getNodeInfo(currentNode);
}
});

System.out.println("Test 1: 创建指定路径所在节点");
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( PERSISTENT ) // 节点类型: 持久节点
.forPath( parentPath, "I'm a Dog".getBytes() );
Thread.sleep(2000);

System.out.println("Test 2: 修改指定路径所在节点");
zkClient.setData()
.forPath( parentPath, "Good Night".getBytes() );
Thread.sleep(2000);

System.out.println("Test 3: 创建一级子节点");
String childNode1 = parentPath + "/Cat";
zkClient.create()
.withMode( PERSISTENT ) // 节点类型: 持久节点
.forPath( childNode1, "I'm a Cat".getBytes() );
Thread.sleep(2000);

System.out.println("Test 4: 创建二级子节点");
String childNode2 = childNode1 + "/David";
zkClient.create()
.withMode( PERSISTENT ) // 节点类型: 持久节点
.forPath( childNode2, "My Name is David".getBytes() );
Thread.sleep(2000);

System.out.println("Test 5: 修改二级子节点");
zkClient.setData()
.forPath( childNode2, "I'm Sorry".getBytes() );
Thread.sleep(2000);

System.out.println("Test 6: 删除二级子节点");
zkClient.delete()
.forPath(childNode2);
Thread.sleep(2000);

System.out.println("Test 7: 删除一级子节点");
zkClient.delete()
.forPath(childNode1);
Thread.sleep(2000);

System.out.println("Test 8: 删除指定路径所在节点");
zkClient.delete()
.forPath(parentPath);
Thread.sleep(2000);

// 主线程等待执行完毕
try{ Thread.sleep( 60*1000 ); } catch (Exception e) {}
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}

private static void getNodeInfo(ChildData currentNode) {
String info = "path: " + currentNode.getPath()
+ ", data: " + new String(currentNode.getData())
+ ", stat="+currentNode.getStat();
System.out.println(info);
}
}

测试结果如下所示,符合预期

figure 4.jpeg

分布式锁

Curator还进一步地提供了非常丰富的分布式锁特性,具体包括:

  • InterProcessMutex 分布式可重入互斥锁
  • InterProcessReadWriteLock 分布式可重入读写锁
  • InterProcessSemaphoreMutex 分布式不可重入互斥锁
  • InterProcessSemaphoreV2 分布式信号量

InterProcessMutex分布式可重入互斥锁

InterProcessMutex是一个分布式的可重入的互斥锁,示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* InterProcessMutex Demo: 分布式可重入互斥锁
* @author Aaron Zhu
* @date 2022-03-31
*/
public class InterProcessMutexDemo {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static String zkLockPath = "/Aaron/Lock1";

public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(10);

// 创建客户端
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- 系统上线 ----------------------");

for(int i=1; i<=3; i++) {
String taskName = "任务#"+i;
Task task = new Task(taskName, zkClient, zkLockPath);
threadPool.execute( task );
}

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}

private static class Task implements Runnable {
private String taskName;

private InterProcessMutex lock;

public Task(String taskName, CuratorFramework zkClient, String zkLockPath) {
this.taskName = taskName;
this.lock = new InterProcessMutex(zkClient, zkLockPath);
}

@Override
public void run() {
try{
lock.acquire();
info(taskName + ": 成功获取锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
methodA();
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放锁 #1\n");
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}

private void methodA() {
try{
lock.acquire();
info(taskName + ": 成功获取锁 #2");

// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println(taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放锁 #2");
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

测试结果如下所示,符合预期

figure 5.jpeg

InterProcessReadWriteLock分布式可重入读写锁

读写互斥

InterProcessReadWriteLock是一个分布式可重入读写锁,其中读锁为共享锁、写锁为互斥锁。示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/**
* InterProcessReadWriteLock Demo: 分布式读写锁, 读锁为共享锁、写锁为互斥锁
* @author Aaron Zhu
* @date 2022-03-31
*/
public class InterProcessReadWriteLockDemo1 {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

private static String zkLockPath = "/Aaron/Lock2";

private static CuratorFramework zkClient;

@BeforeClass
public static void init() {
// 创建客户端
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- 系统上线 ----------------------");
}

/**
* 测试: 读锁为共享锁
*/
@Test
public void test1Read() {
System.out.println("\n---------------------- Test 1 : Read ----------------------");
for(int i=1; i<=3; i++) {
String taskName = "读任务#"+i;
Runnable task = new ReadTask(taskName, zkClient, zkLockPath);
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
}

/**
* 测试: 写锁为互斥锁
*/
@Test
public void test2Write() {
System.out.println("\n---------------------- Test 2 : Write ----------------------");
for(int i=1; i<=3; i++) {
String taskName = "写任务#"+i;
Runnable task = new WriteTask(taskName, zkClient, zkLockPath);
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 30*1000 ); } catch (Exception e) {}
}

/**
* 测试: 读写互斥
*/
@Test
public void test2ReadWrite() {
System.out.println("\n---------------------- Test 3 : Read Write ----------------------");
for(int i=1; i<=8; i++) {
Runnable task = null;
Boolean isReadTask = RandomUtils.nextBoolean();
if( isReadTask ) {
task = new ReadTask( "读任务#"+i, zkClient, zkLockPath );
} else {
task = new WriteTask( "写任务#"+i, zkClient, zkLockPath );
}
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
}

@AfterClass
public static void close() {
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}

/**
* 读任务
*/
private static class ReadTask implements Runnable {
private String taskName;

private InterProcessMutex readLock;

public ReadTask(String taskName, CuratorFramework zkClient, String zkLockPath) {
this.taskName = taskName;
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
this.readLock = interProcessReadWriteLock.readLock();
}

@Override
public void run() {
try{
readLock.acquire();
info(taskName + ": 成功获取读锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放读锁 #1");
try {
readLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

/**
* 写任务
*/
private static class WriteTask implements Runnable {
private String taskName;

private InterProcessMutex writeLock;

public WriteTask(String taskName, CuratorFramework zkClient, String zkLockPath) {
this.taskName = taskName;
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
this.writeLock = interProcessReadWriteLock.writeLock();
}

@Override
public void run() {
try{
writeLock.acquire();
info(taskName + ": 成功获取写锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放写锁 #1\n");
try {
writeLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

测试结果如下所示,符合预期

figure 6.jpeg

figure 7.jpeg

可重入性

由于读锁、写锁分别是基于InterProcessMutex实现的,故这二者自然也是支持可重入的。示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/**
* InterProcessReadWriteLock Demo: 分布式读写锁, 可重入性测试
* @author Aaron Zhu
* @date 2022-03-31
*/
public class InterProcessReadWriteLockDemo2 {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

private static String zkLockPath = "/Aaron/Lock3";

private static CuratorFramework zkClient;

@BeforeClass
public static void init() {
System.out.println("---------------------- 系统上线 ----------------------");

// 创建客户端
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
}

/**
* 测试: 读锁具有可重入性
*/
@Test
public void test1Read() {
System.out.println("\n---------------------- Test 1 : Read ----------------------");
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
InterProcessMutex readLock = interProcessReadWriteLock.readLock();

Runnable task = new Task("读任务", readLock, readLock);
threadPool.execute( task );

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 5*1000 ); } catch (Exception e) {}
}

/**
* 测试: 写锁具有可重入性
*/
@Test
public void test2Write() {
System.out.println("\n---------------------- Test 2 : Write ----------------------");
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();

Runnable task = new Task("写任务", writeLock, writeLock);
threadPool.execute( task );

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 5*1000 ); } catch (Exception e) {}
}

@AfterClass
public static void close() {
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}

/**
* 任务
*/
private static class Task implements Runnable {
private String taskName;

private InterProcessMutex firstLock;

private InterProcessMutex secondLock;

public Task(String taskName, InterProcessMutex firstLock, InterProcessMutex secondLock) {
this.taskName = taskName;
this.firstLock = firstLock;
this.secondLock = secondLock;
}

@Override
public void run() {
try{
firstLock.acquire();
info(taskName + ": 成功获取锁 firstLock");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
methodB();
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放锁 firstLock\n");
try {
firstLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}

public void methodB() {
try{
secondLock.acquire();
info(taskName + ": 成功获取锁 secondLock");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放锁 secondLock");
try {
secondLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

测试结果如下所示,符合预期

figure 8.jpeg

锁升级、锁降级

所谓锁升级指的是读锁升级为写锁。当一个线程先获取到读锁再去申请写锁,显然其是不支持的。理由也很简单,读锁是可以多个服务实例同时持有的。若其中一个服务实例此锁线程能够进行锁升级,成功获得写锁。显然与我们之前的所说的读写互斥相违背。因为其在获得写锁的同时,其他服务实例依然持有读锁;反之,其是支持锁降级的,即写锁降级为读锁。当一个服务实例的线程在获得写锁后,该线程依然可以获得读锁。这个时候当其释放写锁,则将只持有读锁,即完成了锁降级过程。锁降级的使用价值也很大,其一方面保证了安全,读锁在写锁释放前获取;另一方面保证了高效,因为读锁是共享的。

锁升级示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* InterProcessReadWriteLock Demo: 分布式读写锁, 锁升级、锁降级测试
* @author Aaron Zhu
* @date 2022-03-31
*/
public class InterProcessReadWriteLockDemo3 {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static String zkLockPath = "/Aaron/Lock4";

private static CuratorFramework zkClient;

/**
* 测试: 锁升级
*/
@Test
public void test1Read2Write() {
System.out.println("---------------------- 系统上线 ----------------------");
// 创建客户端
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();

System.out.println("---------------------- Test 1 : Read -> Write ----------------------\n");

InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
InterProcessMutex readLock = interProcessReadWriteLock.readLock();
InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();

try {
readLock.acquire();
info("成功获取读锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

writeLock.acquire();
info("成功获取写锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

readLock.release();
info("成功释放读锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

writeLock.release();
info("成功释放写锁");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}

zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
}

测试结果如下所示,在持有读锁的情况下,继续尝试获取写锁会被一直阻塞

figure 9.jpeg

锁降级示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* InterProcessReadWriteLock Demo: 分布式读写锁, 锁升级、锁降级测试
* @author Aaron Zhu
* @date 2022-03-31
*/
public class InterProcessReadWriteLockDemo3 {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static String zkLockPath = "/Aaron/Lock4";

private static CuratorFramework zkClient;

/**
* 测试: 锁降级
*/
@Test
public void test2Write2Read() {
System.out.println("---------------------- 系统上线 ----------------------");
// 创建客户端
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();

System.out.println("---------------------- Test 2 : Write -> Read ----------------------\n");
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
InterProcessMutex readLock = interProcessReadWriteLock.readLock();
InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();

try {
writeLock.acquire();
info("成功获取写锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

readLock.acquire();
info("成功获取读锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

writeLock.release();
info("成功释放写锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

readLock.release();
info("成功释放读锁");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}

zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
}

测试结果如下所示,符合预期

figure 10.jpeg

InterProcessSemaphoreMutex分布式不可重入互斥锁

InterProcessSemaphoreMutex则是一个分布式不可重入互斥锁,示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/**
* InterProcessSemaphoreMutex Demo : 分布式不可重入互斥锁
* @author Aaron Zhu
* @date 2022-04-03
*/
public class InterProcessSemaphoreMutexDemo {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

private static String zkLockPath = "/Aaron/Lock5";

private static CuratorFramework zkClient;

@BeforeClass
public static void init() {
System.out.println("---------------------- 系统上线 ----------------------");
// 创建客户端
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
}

/**
* InterProcessSemaphoreMutex 是互斥锁
*/
@Test
public void test1() {
System.out.println("\n---------------------- Test 1 ----------------------");
Runnable task = () -> {
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(zkClient, zkLockPath);

try{
lock.acquire();
info("成功获取锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
} finally {
info("释放锁 #1\n");
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
};

for(int i=1; i<=3; i++) {
threadPool.execute( task );
}

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
}

/**
* InterProcessSemaphoreMutex 是不可重入锁
*/
@Test
public void test2() {
System.out.println("\n---------------------- Test 2 ----------------------");
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(zkClient, zkLockPath);

try{
lock.acquire();
info("成功获取锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

lock.acquire();
info("成功获取锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));

lock.release();
info("释放锁 #1\n");

lock.release();
info("释放锁 #1\n");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
}

测试结果如下所示,符合预期。Test 1结果证明其是一个互斥锁,而Test 2则在第二次获取锁时被阻塞,证明其不可重入

figure 11.jpeg

InterProcessSemaphoreV2分布式信号量

InterProcessSemaphoreV2是一个的分布式信号量,示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/**
* InterProcessSemaphoreV2 Demo : 分布式信号量
* @author Aaron Zhu
* @date 2022-04-03
*/
public class InterProcessSemaphoreV2Demo {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

private static String zkLockPath = "/Aaron/Lock6";

private static CuratorFramework zkClient;

@Test
public void test1() {
System.out.println("---------------------- 系统上线 ----------------------");
// 创建客户端
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();

// 系统最大并发处理量
int maxLimit = 2;

IntStream.rangeClosed(1,5)
.mapToObj( num -> new UserReq("用户#"+num, zkClient, zkLockPath, maxLimit) )
.forEach( threadPool::execute );

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 20*1000 ); } catch (Exception e) {}
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}

/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}

private static class UserReq implements Runnable {

private String name;

private InterProcessSemaphoreV2 interProcessSemaphoreV2;

private Integer maxLimit;

public UserReq(String name, CuratorFramework zkClient, String zkLockPath, Integer maxLimit) {
this.name = name;
this.maxLimit = maxLimit;
this.interProcessSemaphoreV2 = new InterProcessSemaphoreV2(zkClient, zkLockPath, maxLimit);
}

@Override
public void run() {
try {
// 模拟用户不定时发起请求
Thread.sleep(RandomUtils.nextLong(500, 2000));
String msg = name + ": 发起请求";
info(msg);

// 阻塞等待,直到获取许可
Lease lease = interProcessSemaphoreV2.acquire();

info(name + ": 系统开始处理请求");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextInt(5, 20)*1000);

// 用户请求处理完毕,释放许可
interProcessSemaphoreV2.returnLease( lease );
info(name + ": 系统处理完毕");
}catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
}
}
}

测试结果如下所示,符合预期。其每次同时处理的用户请求数最大只有2个

figure 12.jpeg

0%