本文基于ZooKeeper Curator进行分布式锁的实践
搭建ZooKeeper环境
这里基于Docker搭建ZooKeeper环境
1 2 3 4 5 6 7
| docker pull zookeeper:3.4
docker run -p 2181:2181 -d \ --name ZooKeeper-Service-2 \ zookeeper:3.4
|
POM依赖
Curator,作为Netflix开源的ZooKeeper客户端框架,大大简化了我们操作、使用ZooKeeper的难度,并且提供了非常丰富的基于链式调用的API。故这里首先在POM中引入Curator依赖,其中我们需要在Curator依赖中排除ZooKeeper依赖,然后单独引入与服务端版本一致的ZooKeeper依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| <dependencies>
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.0</version> </dependency>
</dependencies>
|
Curator基本实践
创建节点
众所周知,在ZooKeeper中节点支持两种类型:持久/临时、有序/无序。即两两组合则共计四种节点。其中临时节点会在客户端连接断开后自动被删除,而持久节点则不会;有序节点在创建过程中则会被分配一个唯一的单调递增的序号,并将序号追加在节点名称中,而无序节点则不会。下面即是一个基于Curator创建节点的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
public class Basic { public static void main(String[] args) throws Exception { CuratorFramework zkClient = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .connectionTimeoutMs(15 * 1000) .sessionTimeoutMs( 60 * 1000 ) .retryPolicy(new RetryNTimes(3, 1000)) .build(); zkClient.start(); System.out.println("---------------------- 系统上线 ----------------------");
zkClient.create() .creatingParentsIfNeeded() .withMode( PERSISTENT ) .forPath("/AaronTest/nodeA", "Hello, Node A".getBytes());
zkClient.create() .creatingParentsIfNeeded() .withMode( PERSISTENT_SEQUENTIAL ) .forPath("/AaronTest/nodeB", "Hello, Node B".getBytes());
zkClient.create() .creatingParentsIfNeeded() .withMode( EPHEMERAL ) .forPath("/AaronTest/nodeC", "Hello, Node C".getBytes());
zkClient.create() .creatingParentsIfNeeded() .withMode( EPHEMERAL_SEQUENTIAL ) .forPath("/AaronTest/nodeD", "Hello, Node D".getBytes());
List<String> childrenNodeNameList = zkClient.getChildren() .forPath("/AaronTest");
for(String childrenNodeName : childrenNodeNameList) { Stat stat = new Stat(); byte[] bytes = zkClient.getData() .storingStatIn(stat) .forPath( "/AaronTest/" + childrenNodeName ); String data = new String( bytes );
System.out.println("--------------------------------"); System.out.println("childrenNodeName: " + childrenNodeName); System.out.println("data: " + data); System.out.println("stat: " + stat); }
zkClient.close(); System.out.println("---------------------- 系统下线 ----------------------"); } }
|
测试结果如下所示,符合预期
Watcher机制
ZooKeeper通过引入Watched机制实现发布/订阅功能,但原生的Watcher机制一旦触发一次后就会失效。如果期望一直监听,则必须每次重复注册Watcher,使用起来较为繁琐。为此Curator对其进行了优化,实现了自动注册,以便进行重复监听。具体地,Curator中提供了三种监听器:NodeCache、PathChildrenCache、TreeCache。其中,NodeCache只可监听指定路径所在节点的创建、修改、删除;PathChildrenCache只可监听指定路径下的第一级子节点的创建、修改、删除,无法监听指定路径所在节点的事件,无法监听指定路径的子节点的子节点的事件;TreeCache可监听指定路径所在节点的创建、修改、删除,可监听指定路径下的所有各级子节点的创建、修改、删除
NodeCache实践
下面即是一个基于Curator实践NodeCache的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
public class NodeCacheDemo { public static void main(String[] args) throws Exception { CuratorFramework zkClient = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .connectionTimeoutMs(15 * 1000) .sessionTimeoutMs( 60 * 1000 ) .retryPolicy(new RetryNTimes(3, 1000)) .build(); zkClient.start(); System.out.println("---------------------- 系统上线 ----------------------");
String node = "/Aaron/Bob"; NodeCache nodeCache = new NodeCache(zkClient, node); nodeCache.start(); nodeCache.getListenable() .addListener( () -> { System.out.print("监听当前节点的事件"); ChildData currentNode = nodeCache.getCurrentData(); if( currentNode==null ) { System.out.println(": 当前节点已被删除"); } else { getNodeInfo( currentNode ); } });
System.out.println("Test 1: 创建当前节点"); zkClient.create() .creatingParentsIfNeeded() .withMode( PERSISTENT ) .forPath( node, "Good Morning".getBytes() );
Thread.sleep(2000);
System.out.println("Test 2: 修改当前节点的数据"); zkClient.setData() .forPath( node, "Good Night".getBytes() ); Thread.sleep(2000);
System.out.println("Test 3: 删除当前节点"); zkClient.delete() .forPath(node); Thread.sleep(2000);
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {} zkClient.close(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void getNodeInfo(ChildData currentNode) { String info = ", Current Data Info: \n" + "path: " + currentNode.getPath() + ", data: " + new String(currentNode.getData()) + ", stat="+currentNode.getStat(); System.out.println(info); } }
|
测试结果如下所示,符合预期
PathChildrenCache实践
下面即是一个基于Curator实践PathChildrenCache的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
public class PathChildrenCacheDemo { public static void main(String[] args) throws Exception { CuratorFramework zkClient = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .connectionTimeoutMs(15 * 1000) .sessionTimeoutMs( 60 * 1000 ) .retryPolicy(new RetryNTimes(3, 1000)) .build(); zkClient.start(); System.out.println("---------------------- 系统上线 ----------------------");
String parentPath = "/Aaron/Tony"; PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, parentPath, true); pathChildrenCache.start(); pathChildrenCache.getListenable() .addListener( (client, event) -> { PathChildrenCacheEvent.Type eventType = event.getType(); ChildData currentNode = event.getData(); if( PathChildrenCacheEvent.Type.CHILD_ADDED.equals( eventType ) ) { System.out.println("添加 子节点, Current Data Info:"); getNodeInfo(currentNode); } else if( PathChildrenCacheEvent.Type.CHILD_UPDATED.equals( eventType ) ) { System.out.println("修改 子节点, Current Data Info:"); getNodeInfo(currentNode); } else if( PathChildrenCacheEvent.Type.CHILD_REMOVED.equals( eventType ) ) { System.out.println("删除 子节点, Current Data Info:"); getNodeInfo(currentNode); } });
System.out.println("Test 1: 创建子节点"); String childNode1 = parentPath + "/Lucy"; zkClient.create() .creatingParentsIfNeeded() .withMode( PERSISTENT_SEQUENTIAL ) .forPath( childNode1, "I'm a Dog".getBytes() ); Thread.sleep(2000);
System.out.println("Test 2: 创建子节点"); String childNode2 = parentPath + "/Tony"; zkClient.create() .creatingParentsIfNeeded() .withMode( PERSISTENT ) .forPath( childNode2, "Good Morning".getBytes() ); Thread.sleep(2000);
System.out.println("Test 3: 修改子节点的数据"); zkClient.setData() .forPath( childNode2, "Good Night".getBytes() ); Thread.sleep(2000);
System.out.println("Test 4: 删除子节点"); zkClient.delete() .forPath(childNode2); Thread.sleep(2000);
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {} zkClient.close(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void getNodeInfo(ChildData currentNode) { String info = "path: " + currentNode.getPath() + ", data: " + new String(currentNode.getData()) + ", stat="+currentNode.getStat(); System.out.println(info); } }
|
测试结果如下所示,符合预期
TreeCache实践
下面即是一个基于Curator实践TreeCache的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
public class TreeCacheDemo { public static void main(String[] args) throws Exception { CuratorFramework zkClient = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .connectionTimeoutMs(15 * 1000) .sessionTimeoutMs( 60 * 1000 ) .retryPolicy(new RetryNTimes(3, 1000)) .build(); zkClient.start(); System.out.println("---------------------- 系统上线 ----------------------");
String parentPath = "/Aaron/Luca"; TreeCache treeCache = new TreeCache(zkClient, parentPath); treeCache.start(); treeCache.getListenable() .addListener( (client, event) -> { TreeCacheEvent.Type eventType = event.getType(); ChildData currentNode = event.getData(); if( TreeCacheEvent.Type.NODE_ADDED.equals( eventType ) ) { System.out.println("添加 节点, Current Data Info:"); getNodeInfo(currentNode); } else if( TreeCacheEvent.Type.NODE_UPDATED.equals( eventType ) ) { System.out.println("修改 节点, Current Data Info:"); getNodeInfo(currentNode); } else if( TreeCacheEvent.Type.NODE_REMOVED.equals( eventType ) ) { System.out.println("删除 节点, Current Data Info:"); getNodeInfo(currentNode); } });
System.out.println("Test 1: 创建指定路径所在节点"); zkClient.create() .creatingParentsIfNeeded() .withMode( PERSISTENT ) .forPath( parentPath, "I'm a Dog".getBytes() ); Thread.sleep(2000);
System.out.println("Test 2: 修改指定路径所在节点"); zkClient.setData() .forPath( parentPath, "Good Night".getBytes() ); Thread.sleep(2000);
System.out.println("Test 3: 创建一级子节点"); String childNode1 = parentPath + "/Cat"; zkClient.create() .withMode( PERSISTENT ) .forPath( childNode1, "I'm a Cat".getBytes() ); Thread.sleep(2000);
System.out.println("Test 4: 创建二级子节点"); String childNode2 = childNode1 + "/David"; zkClient.create() .withMode( PERSISTENT ) .forPath( childNode2, "My Name is David".getBytes() ); Thread.sleep(2000);
System.out.println("Test 5: 修改二级子节点"); zkClient.setData() .forPath( childNode2, "I'm Sorry".getBytes() ); Thread.sleep(2000);
System.out.println("Test 6: 删除二级子节点"); zkClient.delete() .forPath(childNode2); Thread.sleep(2000);
System.out.println("Test 7: 删除一级子节点"); zkClient.delete() .forPath(childNode1); Thread.sleep(2000);
System.out.println("Test 8: 删除指定路径所在节点"); zkClient.delete() .forPath(parentPath); Thread.sleep(2000);
try{ Thread.sleep( 60*1000 ); } catch (Exception e) {} zkClient.close(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void getNodeInfo(ChildData currentNode) { String info = "path: " + currentNode.getPath() + ", data: " + new String(currentNode.getData()) + ", stat="+currentNode.getStat(); System.out.println(info); } }
|
测试结果如下所示,符合预期
分布式锁
Curator还进一步地提供了非常丰富的分布式锁特性,具体包括:
- InterProcessMutex 分布式可重入互斥锁
- InterProcessReadWriteLock 分布式可重入读写锁
- InterProcessSemaphoreMutex 分布式不可重入互斥锁
- InterProcessSemaphoreV2 分布式信号量
InterProcessMutex分布式可重入互斥锁
InterProcessMutex是一个分布式的可重入的互斥锁,示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
public class InterProcessMutexDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static String zkLockPath = "/Aaron/Lock1";
public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(10);
CuratorFramework zkClient = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .connectionTimeoutMs(15 * 1000) .sessionTimeoutMs( 60 * 1000 ) .retryPolicy(new RetryNTimes(3, 1000)) .build(); zkClient.start(); System.out.println("---------------------- 系统上线 ----------------------");
for(int i=1; i<=3; i++) { String taskName = "任务#"+i; Task task = new Task(taskName, zkClient, zkLockPath); threadPool.execute( task ); }
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {} zkClient.close(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); }
private static class Task implements Runnable { private String taskName;
private InterProcessMutex lock;
public Task(String taskName, CuratorFramework zkClient, String zkLockPath) { this.taskName = taskName; this.lock = new InterProcessMutex(zkClient, zkLockPath); }
@Override public void run() { try{ lock.acquire(); info(taskName + ": 成功获取锁 #1"); Thread.sleep(RandomUtils.nextLong(100, 500)); methodA(); } catch (Exception e) { System.out.println( taskName + ": Happen Exception: " + e.getMessage()); } finally { info(taskName + ": 释放锁 #1\n"); try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } }
private void methodA() { try{ lock.acquire(); info(taskName + ": 成功获取锁 #2");
Thread.sleep(RandomUtils.nextLong(100, 500)); } catch (Exception e) { System.out.println(taskName + ": Happen Exception: " + e.getMessage()); } finally { info(taskName + ": 释放锁 #2"); try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
|
测试结果如下所示,符合预期
InterProcessReadWriteLock分布式可重入读写锁
读写互斥
InterProcessReadWriteLock是一个分布式可重入读写锁,其中读锁为共享锁、写锁为互斥锁。示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
public class InterProcessReadWriteLockDemo1 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static String zkLockPath = "/Aaron/Lock2";
private static CuratorFramework zkClient;
@BeforeClass public static void init() { zkClient = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .connectionTimeoutMs(15 * 1000) .sessionTimeoutMs( 60 * 1000 ) .retryPolicy(new RetryNTimes(3, 1000)) .build(); zkClient.start(); System.out.println("---------------------- 系统上线 ----------------------"); }
@Test public void test1Read() { System.out.println("\n---------------------- Test 1 : Read ----------------------"); for(int i=1; i<=3; i++) { String taskName = "读任务#"+i; Runnable task = new ReadTask(taskName, zkClient, zkLockPath); threadPool.execute( task ); } try{ Thread.sleep( 10*1000 ); } catch (Exception e) {} }
@Test public void test2Write() { System.out.println("\n---------------------- Test 2 : Write ----------------------"); for(int i=1; i<=3; i++) { String taskName = "写任务#"+i; Runnable task = new WriteTask(taskName, zkClient, zkLockPath); threadPool.execute( task ); } try{ Thread.sleep( 30*1000 ); } catch (Exception e) {} }
@Test public void test2ReadWrite() { System.out.println("\n---------------------- Test 3 : Read Write ----------------------"); for(int i=1; i<=8; i++) { Runnable task = null; Boolean isReadTask = RandomUtils.nextBoolean(); if( isReadTask ) { task = new ReadTask( "读任务#"+i, zkClient, zkLockPath ); } else { task = new WriteTask( "写任务#"+i, zkClient, zkLockPath ); } threadPool.execute( task ); } try{ Thread.sleep( 40*1000 ); } catch (Exception e) {} }
@AfterClass public static void close() { zkClient.close(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); }
private static class ReadTask implements Runnable { private String taskName;
private InterProcessMutex readLock;
public ReadTask(String taskName, CuratorFramework zkClient, String zkLockPath) { this.taskName = taskName; InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath); this.readLock = interProcessReadWriteLock.readLock(); }
@Override public void run() { try{ readLock.acquire(); info(taskName + ": 成功获取读锁 #1"); Thread.sleep(RandomUtils.nextLong(100, 500)); } catch (Exception e) { System.out.println( taskName + ": Happen Exception: " + e.getMessage()); } finally { info(taskName + ": 释放读锁 #1"); try { readLock.release(); } catch (Exception e) { e.printStackTrace(); } } } }
private static class WriteTask implements Runnable { private String taskName;
private InterProcessMutex writeLock;
public WriteTask(String taskName, CuratorFramework zkClient, String zkLockPath) { this.taskName = taskName; InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath); this.writeLock = interProcessReadWriteLock.writeLock(); }
@Override public void run() { try{ writeLock.acquire(); info(taskName + ": 成功获取写锁 #1"); Thread.sleep(RandomUtils.nextLong(100, 500)); } catch (Exception e) { System.out.println( taskName + ": Happen Exception: " + e.getMessage()); } finally { info(taskName + ": 释放写锁 #1\n"); try { writeLock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
|
测试结果如下所示,符合预期
可重入性
由于读锁、写锁分别是基于InterProcessMutex实现的,故这二者自然也是支持可重入的。示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
public class InterProcessReadWriteLockDemo2 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static String zkLockPath = "/Aaron/Lock3";
private static CuratorFramework zkClient;
@BeforeClass public static void init() { System.out.println("---------------------- 系统上线 ----------------------");
zkClient = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .connectionTimeoutMs(15 * 1000) .sessionTimeoutMs( 60 * 1000 ) .retryPolicy(new RetryNTimes(3, 1000)) .build(); zkClient.start(); }
@Test public void test1Read() { System.out.println("\n---------------------- Test 1 : Read ----------------------"); InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath); InterProcessMutex readLock = interProcessReadWriteLock.readLock();
Runnable task = new Task("读任务", readLock, readLock); threadPool.execute( task );
try{ Thread.sleep( 5*1000 ); } catch (Exception e) {} }
@Test public void test2Write() { System.out.println("\n---------------------- Test 2 : Write ----------------------"); InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath); InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();
Runnable task = new Task("写任务", writeLock, writeLock); threadPool.execute( task );
try{ Thread.sleep( 5*1000 ); } catch (Exception e) {} }
@AfterClass public static void close() { zkClient.close(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); }
private static class Task implements Runnable { private String taskName;
private InterProcessMutex firstLock;
private InterProcessMutex secondLock;
public Task(String taskName, InterProcessMutex firstLock, InterProcessMutex secondLock) { this.taskName = taskName; this.firstLock = firstLock; this.secondLock = secondLock; }
@Override public void run() { try{ firstLock.acquire(); info(taskName + ": 成功获取锁 firstLock"); Thread.sleep(RandomUtils.nextLong(100, 500)); methodB(); } catch (Exception e) { System.out.println( taskName + ": Happen Exception: " + e.getMessage()); } finally { info(taskName + ": 释放锁 firstLock\n"); try { firstLock.release(); } catch (Exception e) { e.printStackTrace(); } } }
public void methodB() { try{ secondLock.acquire(); info(taskName + ": 成功获取锁 secondLock"); Thread.sleep(RandomUtils.nextLong(100, 500)); } catch (Exception e) { System.out.println( taskName + ": Happen Exception: " + e.getMessage()); } finally { info(taskName + ": 释放锁 secondLock"); try { secondLock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
|
测试结果如下所示,符合预期
锁升级、锁降级
所谓锁升级指的是读锁升级为写锁。当一个线程先获取到读锁再去申请写锁,显然其是不支持的。理由也很简单,读锁是可以多个服务实例同时持有的。若其中一个服务实例此锁线程能够进行锁升级,成功获得写锁。显然与我们之前的所说的读写互斥相违背。因为其在获得写锁的同时,其他服务实例依然持有读锁;反之,其是支持锁降级的,即写锁降级为读锁。当一个服务实例的线程在获得写锁后,该线程依然可以获得读锁。这个时候当其释放写锁,则将只持有读锁,即完成了锁降级过程。锁降级的使用价值也很大,其一方面保证了安全,读锁在写锁释放前获取;另一方面保证了高效,因为读锁是共享的。
锁升级示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
public class InterProcessReadWriteLockDemo3 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static String zkLockPath = "/Aaron/Lock4";
private static CuratorFramework zkClient;
@Test public void test1Read2Write() { System.out.println("---------------------- 系统上线 ----------------------"); zkClient = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .connectionTimeoutMs(15 * 1000) .sessionTimeoutMs( 60 * 1000 ) .retryPolicy(new RetryNTimes(3, 1000)) .build(); zkClient.start();
System.out.println("---------------------- Test 1 : Read -> Write ----------------------\n");
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath); InterProcessMutex readLock = interProcessReadWriteLock.readLock(); InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();
try { readLock.acquire(); info("成功获取读锁"); Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.acquire(); info("成功获取写锁"); Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.release(); info("成功释放读锁"); Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.release(); info("成功释放写锁"); } catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); }
zkClient.close(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); } }
|
测试结果如下所示,在持有读锁的情况下,继续尝试获取写锁会被一直阻塞
锁降级示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
public class InterProcessReadWriteLockDemo3 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static String zkLockPath = "/Aaron/Lock4";
private static CuratorFramework zkClient;
@Test public void test2Write2Read() { System.out.println("---------------------- 系统上线 ----------------------"); zkClient = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .connectionTimeoutMs(15 * 1000) .sessionTimeoutMs( 60 * 1000 ) .retryPolicy(new RetryNTimes(3, 1000)) .build(); zkClient.start();
System.out.println("---------------------- Test 2 : Write -> Read ----------------------\n"); InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath); InterProcessMutex readLock = interProcessReadWriteLock.readLock(); InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();
try { writeLock.acquire(); info("成功获取写锁"); Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.acquire(); info("成功获取读锁"); Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.release(); info("成功释放写锁"); Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.release(); info("成功释放读锁"); } catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); }
zkClient.close(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); } }
|
测试结果如下所示,符合预期
InterProcessSemaphoreMutex分布式不可重入互斥锁
InterProcessSemaphoreMutex则是一个分布式不可重入互斥锁,示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
public class InterProcessSemaphoreMutexDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static String zkLockPath = "/Aaron/Lock5";
private static CuratorFramework zkClient;
@BeforeClass public static void init() { System.out.println("---------------------- 系统上线 ----------------------"); zkClient = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .connectionTimeoutMs(15 * 1000) .sessionTimeoutMs( 60 * 1000 ) .retryPolicy(new RetryNTimes(3, 1000)) .build(); zkClient.start(); }
@Test public void test1() { System.out.println("\n---------------------- Test 1 ----------------------"); Runnable task = () -> { InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(zkClient, zkLockPath);
try{ lock.acquire(); info("成功获取锁 #1"); Thread.sleep(RandomUtils.nextLong(100, 500)); } catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); } finally { info("释放锁 #1\n"); try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } };
for(int i=1; i<=3; i++) { threadPool.execute( task ); }
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {} }
@Test public void test2() { System.out.println("\n---------------------- Test 2 ----------------------"); InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(zkClient, zkLockPath);
try{ lock.acquire(); info("成功获取锁 #1"); Thread.sleep(RandomUtils.nextLong(100, 500));
lock.acquire(); info("成功获取锁 #2"); Thread.sleep(RandomUtils.nextLong(100, 500));
lock.release(); info("释放锁 #1\n");
lock.release(); info("释放锁 #1\n"); } catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); } }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); } }
|
测试结果如下所示,符合预期。Test 1结果证明其是一个互斥锁,而Test 2则在第二次获取锁时被阻塞,证明其不可重入
InterProcessSemaphoreV2分布式信号量
InterProcessSemaphoreV2是一个的分布式信号量,示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
|
public class InterProcessSemaphoreV2Demo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static String zkLockPath = "/Aaron/Lock6";
private static CuratorFramework zkClient;
@Test public void test1() { System.out.println("---------------------- 系统上线 ----------------------"); zkClient = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .connectionTimeoutMs(15 * 1000) .sessionTimeoutMs( 60 * 1000 ) .retryPolicy(new RetryNTimes(3, 1000)) .build(); zkClient.start();
int maxLimit = 2;
IntStream.rangeClosed(1,5) .mapToObj( num -> new UserReq("用户#"+num, zkClient, zkLockPath, maxLimit) ) .forEach( threadPool::execute );
try{ Thread.sleep( 20*1000 ); } catch (Exception e) {} zkClient.close(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); }
private static class UserReq implements Runnable {
private String name;
private InterProcessSemaphoreV2 interProcessSemaphoreV2;
private Integer maxLimit;
public UserReq(String name, CuratorFramework zkClient, String zkLockPath, Integer maxLimit) { this.name = name; this.maxLimit = maxLimit; this.interProcessSemaphoreV2 = new InterProcessSemaphoreV2(zkClient, zkLockPath, maxLimit); }
@Override public void run() { try { Thread.sleep(RandomUtils.nextLong(500, 2000)); String msg = name + ": 发起请求"; info(msg);
Lease lease = interProcessSemaphoreV2.acquire();
info(name + ": 系统开始处理请求"); Thread.sleep(RandomUtils.nextInt(5, 20)*1000);
interProcessSemaphoreV2.returnLease( lease ); info(name + ": 系统处理完毕"); }catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); } } } }
|
测试结果如下所示,符合预期。其每次同时处理的用户请求数最大只有2个