Redisson是基于Redis的Java驻内存数据网格(In-Memory Data Grid),底层使用Netty进行实现。其提供了相应的分布式锁实现
RedissonLock 分布式非公平可重入互斥锁
RedissonLock是一个分布式非公平可重入互斥锁,其在获取锁的过程中,支持lock阻塞式、tryLock非阻塞式两种形式。其中,这两个方法还有多个重载版本,以支持设置锁的最大持有时间、设置获取锁的最大等待时间。具体方法如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void lock();
void lock(long leaseTime, TimeUnit unit);
boolean tryLock();
boolean tryLock(long time, TimeUnit unit);
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);
|
可重入性
现在我们验证下RedissonLock是一个互斥锁并且支持可重入。示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
|
public class RedissonLockDemo { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static RedissonClient redissonClient;
@BeforeClass public static void init() { Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setPassword("123456"); redissonClient = Redisson.create( config ); }
@Test public void testLock1() { final String lockName = "sellPc"; Runnable task = () -> { RLock lock = redissonClient.getLock(lockName); try{ lock.lock(); info("成功获取锁 #1"); Thread.sleep(RandomUtils.nextLong(100, 500));
lock.lock(); info("成功获取锁 #2"); Thread.sleep(RandomUtils.nextLong(100, 500)); } catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); } finally { info("释放锁 #2"); lock.unlock();
info("释放锁 #1\n"); lock.unlock(); } };
RLock tempLock = redissonClient.getLock(lockName); if( tempLock instanceof RedissonLock) { System.out.println("锁类型: RedissonLock"); } try{ Thread.sleep( 1*1000 ); } catch (Exception e) {} for (int i=1; i<=3; i++) { threadPool.execute( task ); }
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {} System.out.println("---------------------- 系统下线 ----------------------"); }
@AfterClass public static void close() { redissonClient.shutdown(); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); } }
|
测试结果如下所示,符合预期
显式指定锁的最大持有时间
前面提到支持通过leaseTime参数显式设置锁的最大持有时间,当业务持锁时间超过leaseTime参数值,则其持有的锁会被自动释放。但需要注意的是某个线程的锁一旦被自动释放后,此时再调用unlock方法来释放锁时,即会抛出IllegalMonitorStateException异常。原因也很简单,因为此时线程实际上并未持有锁,示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
public class RedissonLockDemo { ...
@Test public void testLock2() { final String lockName = "sellBooK"; Runnable task = () -> { RLock lock = redissonClient.getLock(lockName); try{ lock.lock(1, TimeUnit.SECONDS); info("成功获取锁"); Thread.sleep( 10 * 1000 ); } catch (Exception e) { info("Happen Exception: " + e.getMessage()); } finally { info("释放锁"); try { lock.unlock(); } catch (IllegalMonitorStateException e) { info("Happen Exception: " + e.getMessage()); } } };
for (int i=1; i<=5; i++) { threadPool.execute( task ); }
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {} System.out.println("---------------------- 系统下线 ----------------------"); }
... }
|
测试结果如下所示,符合预期。可以看到每隔1秒后,由于线程持锁时间到期了。锁被自动释放了,进而使得下一个任务拿到了锁。并且由于每个任务的锁都是自动释放的,故每次调用unlock方法均会抛出异常
非阻塞式获取锁
下面展示如果通过tryLock方法进行非阻塞式获取锁,示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
public class RedissonLockDemo { ...
@Test public void testTryLock() { final String lockName = "sellPhone"; Runnable task = () -> { RLock lock = redissonClient.getLock(lockName); boolean flag = false; try{ flag = lock.tryLock(); if( flag ) { info("成功获取锁"); Thread.sleep(RandomUtils.nextLong(100, 500)); } else { info("未获取到锁\n"); } } catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); } finally { if( flag ) { info("释放锁\n"); lock.unlock(); } } };
for (int i=1; i<=5; i++) { threadPool.execute( task ); }
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {} System.out.println("---------------------- 系统下线 ----------------------"); }
... }
|
测试结果如下所示,符合预期
Watch Dog看门狗机制
在介绍Redisson的Watch Dog看门狗机制之前,我们先来做个测试。如果某个线程一直持有锁执行业务逻辑,则锁是否会被自动释放呢?示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
public class RedissonLockDemo { ...
@Test public void testLock3() { final String lockName = "sellPig"; Runnable task = () -> { RLock lock = redissonClient.getLock(lockName); try{ info("尝试获取锁"); lock.lock(); info("成功获取锁"); while (true) { } } catch (Exception e) { info("Happen Exception: " + e.getMessage()); } finally { info("成功释放锁"); lock.unlock(); } };
for (int i=1; i<5; i++) { threadPool.execute( task ); }
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {} info("---------------------- 系统下线 ----------------------"); }
... }
|
测试结果如下所示,该锁被持有后,一直未被释放。其他任务都被阻塞住了
当我们调用不含leaseTime参数版本的lock()方法时,即未显式设置最大持锁时间。则其在RedissonLock类内部会将 特殊值-1 传给leaseTime参数。然后在tryAcquireAsync方法中会通过RedissonLock类的internalLockLeaseTime字段设置一个默认的最大持锁时间。最后通过RedissonLock构造器我们不难发现 internalLockLeaseTime 字段的值来自于Config类的lockWatchdogTimeout字段。其中lockWatchdogTimeout字段的默认值为30秒。换言之即使我们调用lock方法时,未显式设置最大持锁时间。但RedissonLock内部也会通过lockWatchdogTimeout字段给该锁设置一个最大持有时间,默认值为30秒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| public class RedissonLock extends RedissonBaseLock {
protected long internalLockLeaseTime; public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); }
@Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { ... Long ttl = tryAcquire(-1, leaseTime, unit, threadId); ... } private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); }
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); } }
...
public class Config {
private long lockWatchdogTimeout = 30 * 1000;
public long getLockWatchdogTimeout() { return lockWatchdogTimeout; }
... }
|
那问题来了,为啥在我们刚刚的测试代码中即使持锁时间超过了30秒,锁也没有被自动释放呢?原因就在于Redisson的看门狗机制。在RedissonLock类的tryAcquireAsync方法中,未显式设置最大持锁时间 则会启动一个定时任务用于进行自动续期。即RedissonLock类的tryAcquireAsync方法中会调用scheduleExpirationRenewal()以启动一个定时任务用于进行自动续期。具体的续期逻辑在RedissonBaseLock类的renewExpiration方法中,其中自动续期定时任务的执行周期 是RedissonBaseLock类的internalLockLeaseTime字段值的1/3。然后通过renewExpirationAsync方法每次利用Lua脚本向Redis发送续期命令,具体地。每次续期时会将RedissonBaseLock类的internalLockLeaseTime字段值设置为新的最大持锁时间。同样地,RedissonBaseLock类的 internalLockLeaseTime 字段值也是来自于Config类的lockWatchdogTimeout字段,即默认为30秒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
...
protected long internalLockLeaseTime;
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name; } protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); try { renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { cancelExpirationRenewal(threadId); } } } }
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { renewExpiration(); } else { cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
...
}
|
至此我们应该比较清楚Redisson的看门狗机制了:
- Redisson的Watch Dog看门狗机制只会在未显式设置最大持锁时间才会生效。换言之,一旦调用lock方法时指定了leaseTime参数值,则该锁到期后即会自动释放。Redisson的Watch Dog看门狗不会对该锁进行自动续期
- 当我们未显式设置Config类的lockWatchdogTimeout字段值时,使用默认的30秒。此时如果加锁时未显式设置最大持锁时间,即Watch Dog看门狗机制会生效的场景中。该锁实际上一开始也会设置一个默认的最大持锁时间,即30秒。然后看门狗每隔10秒(30秒 * 1/3 = 10秒)会将该锁的最大持锁时间再次设置为30秒,以达到自动续期的目的。这样只要持锁线程的业务还未执行完,则该锁就一直有效、不会被自动释放。当然一旦持锁的服务实例发生宕机后,看门狗的定时任务自然也无法续期。这样锁到期后也就释放掉了,避免了死锁的发生
RedissonFairLock 分布式公平可重入互斥锁
由于RedissonLock是非公平的,故Redisson提供了一个分布式公平可重入互斥锁——RedissonFairLock。示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
|
public class RedissonFairLockDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
@Test public void test1() { Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setPassword("123456"); RedissonClient redissonClient = Redisson.create( config );
final String lockName = "sellWatch"; Runnable task = () -> { RLock lock = redissonClient.getFairLock(lockName); try{ lock.lock(); info("成功获取锁 #1"); Thread.sleep(RandomUtils.nextLong(100, 500));
lock.lock(); info("成功获取锁 #2"); Thread.sleep(RandomUtils.nextLong(100, 500)); } catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); } finally { info("释放锁 #2"); lock.unlock();
info("释放锁 #1\n"); lock.unlock(); } };
RLock tempLock = redissonClient.getFairLock(lockName); if( tempLock instanceof RedissonFairLock ) { System.out.println("锁类型: RedissonFairLock"); } try{ Thread.sleep( 4*1000 ); } catch (Exception e) {}
for (int i=1; i<=3; i++) { threadPool.execute( task ); }
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {} redissonClient.shutdown(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); } }
|
测试结果如下所示,符合预期
RReadWriteLock 分布式读写锁
读写测试
RReadWriteLock是一个分布式可重入读写锁。其中读锁为可重入的共享锁、写锁为可重入的互斥锁,且读写互斥。示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
public class RReadWriteLockDemo { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static RedissonClient redissonClient;
@BeforeClass public static void init() { Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setPassword("123456"); redissonClient = Redisson.create( config ); }
@Test public void test1Read() { System.out.println("\n---------------------- Test 1 : Read ----------------------"); String lockName = "sellCat"; for(int i=1; i<=3; i++) { String taskName = "读任务#"+i; Runnable task = new ReadTask(taskName, redissonClient, lockName); threadPool.execute( task ); } try{ Thread.sleep( 10*1000 ); } catch (Exception e) {} }
@Test public void test2Write() { System.out.println("\n---------------------- Test 2 : Write ----------------------"); String lockName = "sellDog"; for(int i=1; i<=3; i++) { String taskName = "写任务#"+i; Runnable task = new WriteTask(taskName, redissonClient, lockName); threadPool.execute( task ); } try{ Thread.sleep( 10*1000 ); } catch (Exception e) {} }
@Test public void test3ReadWrite() { System.out.println("\n---------------------- Test 3 : Read Write ----------------------"); String lockName = "sellLion"; for(int i=1; i<=5; i++) { Runnable task = null; Boolean isReadTask = RandomUtils.nextBoolean(); if( isReadTask ) { task = new ReadTask( "读任务#"+i, redissonClient, lockName); } else { task = new WriteTask( "写任务#"+i, redissonClient, lockName); } threadPool.execute( task ); } try{ Thread.sleep( 10*1000 ); } catch (Exception e) {} }
@AfterClass public static void close() { redissonClient.shutdown(); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); }
private static class ReadTask implements Runnable { private String taskName;
private RedissonReadLock readLock;
public ReadTask(String taskName, RedissonClient redissonClient, String lockName) { this.taskName = taskName; RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName); this.readLock = (RedissonReadLock) readWriteLock.readLock(); }
@Override public void run() { try{ readLock.lock(); info(taskName + ": 成功获取读锁 #1"); Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.lock(); info(taskName + ": 成功获取读锁 #2"); Thread.sleep(RandomUtils.nextLong(100, 500)); } catch (Exception e) { System.out.println( taskName + ": Happen Exception: " + e.getMessage()); } finally { info(taskName + ": 释放读锁 #2"); readLock.unlock();
info(taskName + ": 释放读锁 #1"); readLock.unlock(); } } }
private static class WriteTask implements Runnable { private String taskName;
private RedissonWriteLock writeLock;
public WriteTask(String taskName, RedissonClient redissonClient, String lockName) { this.taskName = taskName; RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName); this.writeLock = (RedissonWriteLock) readWriteLock.writeLock(); }
@Override public void run() { try{ writeLock.lock(); info(taskName + ": 成功获取写锁 #1"); Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.lock(); info(taskName + ": 成功获取写锁 #2"); Thread.sleep(RandomUtils.nextLong(100, 500)); } catch (Exception e) { System.out.println( taskName + ": Happen Exception: " + e.getMessage()); } finally { info(taskName + ": 释放写锁 #2"); writeLock.unlock();
info(taskName + ": 释放写锁 #1\n"); writeLock.unlock(); } } } }
|
读锁测试结果如下所示,符合预期
写锁测试结果如下所示,符合预期
读写测试结果如下所示,符合预期
锁升级、锁降级
所谓锁升级指的是读锁升级为写锁。当一个线程先获取到读锁再去申请写锁,显然其是不支持的。理由也很简单,读锁是可以多个服务实例同时持有的。若其中一个服务实例此锁线程能够进行锁升级,成功获得写锁。显然与我们之前的所说的读写互斥相违背。因为其在获得写锁的同时,其他服务实例依然持有读锁;反之,其是支持锁降级的,即写锁降级为读锁。当一个服务实例的线程在获得写锁后,该线程依然可以获得读锁。这个时候当其释放写锁,则将只持有读锁,即完成了锁降级过程。锁降级的使用价值也很大,其一方面保证了安全,读锁在写锁释放前获取;另一方面保证了高效,因为读锁是共享的。
锁升级示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
public class RReadWriteLockDemo { ...
@Test public void test4Read2Write() { System.out.println("---------------------- Test 4 : Read -> Write ----------------------\n"); String lockName = "sellTiger"; RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName); RedissonReadLock readLock = (RedissonReadLock) readWriteLock.readLock(); RedissonWriteLock writeLock = (RedissonWriteLock) readWriteLock.writeLock();
try { readLock.lock(); info("成功获取读锁"); Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.lock(); info("成功获取写锁"); Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.unlock(); info("成功释放读锁"); Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.unlock(); info("成功释放写锁"); } catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); } System.out.println("---------------------- 系统下线 ----------------------"); } }
|
测试结果如下所示,在持有读锁的情况下,继续尝试获取写锁会被一直阻塞
锁降级示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
public class RReadWriteLockDemo { ...
@Test public void test5Write2Read() { System.out.println("---------------------- Test 2 : Write -> Read ----------------------"); String lockName = "sellChicken"; RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName); RedissonReadLock readLock = (RedissonReadLock) readWriteLock.readLock(); RedissonWriteLock writeLock = (RedissonWriteLock) readWriteLock.writeLock();
try { writeLock.lock(); info("成功获取写锁"); Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.lock(); info("成功获取读锁"); Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.unlock(); info("成功释放写锁"); Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.unlock(); info("成功释放读锁"); } catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); } System.out.println("---------------------- 系统下线 ----------------------"); } }
|
测试结果如下所示,符合预期
RedissonSpinLock 分布式非公平可重入自旋互斥锁
RedissonSpinLock则是一个分布式非公平可重入自旋互斥锁。示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
|
public class RedissonSpinLockDemo { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
@Test public void test1() { Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setPassword("123456"); RedissonClient redissonClient = Redisson.create( config );
final String lockName = "sellKeyword"; Runnable task = () -> { RLock lock = redissonClient.getSpinLock(lockName); try{ lock.lock(); info("成功获取锁 #1"); Thread.sleep(RandomUtils.nextLong(100, 500));
lock.lock(); info("成功获取锁 #2"); Thread.sleep(RandomUtils.nextLong(100, 500)); } catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); } finally { info("释放锁 #2"); lock.unlock();
info("释放锁 #1\n"); lock.unlock(); } };
RLock tempLock = redissonClient.getSpinLock(lockName); if( tempLock instanceof RedissonSpinLock) { System.out.println("锁类型: RedissonSpinLock"); } try{ Thread.sleep( 2*1000 ); } catch (Exception e) {}
for (int i=1; i<=3; i++) { threadPool.execute( task ); }
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {} redissonClient.shutdown(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); } }
|
测试结果如下所示,符合预期
RedissonCountDownLatch 分布式闩锁
RedissonCountDownLatch是一个分布式的CountDownLatch闩锁。比如我们的业务系统会依赖很多其他基础服务,这样在业务系统启动过程中,需要等待其他基础服务全部启动完毕。示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
public class RedissonCountDownLatchDemo { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static RedissonClient redissonClient;
@BeforeClass public static void init() { Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setPassword("123456"); redissonClient = Redisson.create( config ); }
@Test public void test1() throws InterruptedException { final String countDownLatchName = "systemInit"; int count = 5;
for (int i=1; i<=count; i++) { String serviceName = "基础服务 #"+i; BasicService basicService = new BasicService(serviceName, redissonClient, countDownLatchName, count); threadPool.execute( basicService ); }
RedissonCountDownLatch countDownLatch = (RedissonCountDownLatch) redissonClient.getCountDownLatch(countDownLatchName); countDownLatch.trySetCount(count); countDownLatch.await();
info("系统初始化已完成, 业务系统启动 ..."); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); }
private static class BasicService implements Runnable { private String serviceName;
private RedissonCountDownLatch countDownLatch;
public BasicService(String serviceName, RedissonClient redissonClient, String countDownLatchName, Integer count) { this.serviceName = serviceName; this.countDownLatch = (RedissonCountDownLatch) redissonClient.getCountDownLatch(countDownLatchName); this.countDownLatch.trySetCount( count ); }
@Override public void run() { try{ info(serviceName + ": 启动中"); Thread.sleep( RandomUtils.nextLong(1, 5) * 1000 ); countDownLatch.countDown(); info(serviceName + ": 启动完成"); } catch (Exception e) { System.out.println( serviceName + ": Happen Exception: " + e.getMessage()); } } } }
|
测试结果如下所示,符合预期
RedissonSemaphore 分布式信号量
RedissonSemaphore是一个分布式的信号量,示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
|
public class RedissonSemaphoreDemo { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
@Test public void test1() { Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setPassword("123456"); RedissonClient redissonClient = Redisson.create( config );
final String lockName = "sellAnimal"; int maxLimit = 3; IntStream.rangeClosed(1,8) .mapToObj( num -> new UserReq("用户#"+num, redissonClient, lockName, maxLimit) ) .forEach( threadPool::execute );
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {} redissonClient.shutdown(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); }
private static class UserReq implements Runnable { private String name;
private RedissonSemaphore semaphore;
public UserReq(String name, RedissonClient redissonClient, String lockName, Integer maxLimit) { this.name = name; this.semaphore = (RedissonSemaphore) redissonClient.getSemaphore(lockName); semaphore.trySetPermits( maxLimit ); }
@Override public void run() { try { Thread.sleep(RandomUtils.nextLong(500, 2000)); info( name + ": 发起请求" );
semaphore.acquire(); info(name + ": 系统开始处理请求"); Thread.sleep(RandomUtils.nextInt(5, 20)*1000);
semaphore.release(); info(name + ": 系统处理完毕"); }catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); } } } }
|
测试结果如下所示,符合预期
RedissonPermitExpirableSemaphore 分布式支持有效期的信号量
相比较于RedissonSemaphore而言,RedissonPermitExpirableSemaphore在获取许可的acquire方法中,增加了一个支持leaseTime参数的重载版本。以实现指定许可的最大持有时间。一旦业务持许可时间超过leaseTime参数值,则其持有的许可会被自动释放。但需要注意的是某个线程的许可一旦被自动释放后,此时再调用release方法来释放许可时,即会抛出异常。原因也很简单,因为此时线程实际上并未持有许可,示例代码如下所示。示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
public class RedissonPermitExpirableSemaphoreDemo { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
@Test public void test1() { Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setPassword("123456"); RedissonClient redissonClient = Redisson.create( config );
final String lockName = "sellMilk"; int maxLimit = 3; IntStream.rangeClosed(1,8) .mapToObj( num -> new UserReq("用户 #"+num, redissonClient, lockName, maxLimit) ) .forEach( threadPool::execute );
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {} redissonClient.shutdown(); System.out.println("---------------------- 系统下线 ----------------------"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); }
private static class UserReq implements Runnable { private String name;
private RedissonPermitExpirableSemaphore semaphore;
public UserReq(String name, RedissonClient redissonClient, String lockName, Integer maxLimit) { this.name = name; this.semaphore = (RedissonPermitExpirableSemaphore) redissonClient.getPermitExpirableSemaphore(lockName); semaphore.trySetPermits( maxLimit ); }
@Override public void run() { try { if( !name.equals("用户 #1") ) { Thread.sleep( RandomUtils.nextLong(1000, 2000) ); } info( name + ": 发起请求" );
String permitId = semaphore.acquire(2, TimeUnit.SECONDS); info(name + ": 系统开始处理请求");
if( name.equals("用户 #1") ) { Thread.sleep( 5 * 1000 ); } else { Thread.sleep(RandomUtils.nextInt(500, 1000)); }
semaphore.release(permitId); info(name + ": 系统处理完毕"); }catch (Exception e) { info( name + ": Happen Exception: " + e.getCause().getMessage()); } } } }
|
测试结果如下所示,符合预期