这里就JUC包中的ReentrantReadWriteLock读写锁做相关介绍
概述 前面介绍了ReentrantLock可重入锁,但其存在明显的弊端。对于读场景而言,实际完全可以允许多个线程同时访问,而不必使用独占锁来进行并发保护。故ReentrantReadWriteLock读写锁应运而生。其内部维护了两个锁——读锁、写锁。前者为共享锁,后者则为互斥锁。具体而言,读锁可以被多个线程同时获取,而写锁只能被一个线程获取;同时读锁、写锁之间也是互斥的,即一旦某个线程获取到了读锁,则其他线程不可以同时获得写锁。反之同理。具体地,读写锁支持公平、非公平锁两种实现方式,默认为非公平锁。在锁的获取方面,其与ReentrantLock可重入锁类似。即支持lock()、lockInterruptibly()阻塞式获取,也支持tryLock()、tryLock(long timeout, TimeUnit unit)实现非阻塞式获取。但tryLock()方法会破坏公平性,即使是一个公平的读写锁实例。故为了保证公平性,可使用支持超时的tryLock方法,同时将超时时间设为0即可——tryLock(0, TimeUnit.SECONDS)。而在条件变量Condition方面,仅写锁支持
实践 读写测试 现在分别就读读、写写、读写场景进行实践验证。示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class ReentrantReadWriteLockTest1 { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS" ); private static ExecutorService threadPool = Executors.newFixedThreadPool(10 ); private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); @Test public void test1 () { System.out.println("\n---------------------- Test 1 ----------------------" ); for (int i=0 ; i<3 ; i++) { Runnable task = new ReadTask ( lock, "读任务 #" +i ); threadPool.execute( task ); } try { Thread.sleep( 10 *1000 ); } catch (Exception e) {} } @Test public void test2 () { System.out.println("\n---------------------- Test 2 ----------------------" ); for (int i=0 ; i<3 ; i++) { Runnable task = new WriteTask ( lock, "写任务 #" +i ); threadPool.execute( task ); } try { Thread.sleep( 10 *1000 ); } catch (Exception e) {} } @Test public void test3 () { System.out.println("\n---------------------- Test 3 ----------------------" ); for (int i=0 ; i<8 ; i++) { Runnable task = null ; Boolean isReadTask = RandomUtils.nextBoolean(); if ( isReadTask ) { task = new ReadTask ( lock, "读任务 #" +i ); } else { task = new WriteTask ( lock, "写任务 #" +i ); } threadPool.execute( task ); } try { Thread.sleep( 50 *1000 ); } catch (Exception e) {} } public static void info (String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "[" +time+"] " + " <" + thread +"> " + msg; System.out.println(log); } @AllArgsConstructor private static class ReadTask implements Runnable { private ReentrantReadWriteLock lock; private String name; @Override public void run () { ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); readLock.lock(); info(name+ ": 成功获取读锁" ); try { Thread.sleep(RandomUtils.nextLong(1000 , 3000 )); } catch (Exception e) { System.out.println( "Happen Exception: " + e.getMessage()); } finally { info(name+ ": 释放读锁" ); readLock.unlock(); } } } @AllArgsConstructor private static class WriteTask implements Runnable { private ReentrantReadWriteLock lock; private String name; @Override public void run () { ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); writeLock.lock(); info(name+ ": 成功获取写锁" ); try { Thread.sleep(RandomUtils.nextLong(1000 , 3000 )); } catch (Exception e) { System.out.println( "Happen Exception: " + e.getMessage()); } finally { info(name+ ": 释放写锁" ); writeLock.unlock(); } } } }
从Test1、Test2的测试结果可以证明读锁是共享锁、写锁是互斥锁
从Test3的测试结果可以看出读写之间是互斥的
可重入性 ReentrantReadWriteLock同样是可重入的。当一个线程获取到读锁(或写锁)后,可以继续获取相应类型的锁。示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class ReentrantReadWriteLockTest2 { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS" ); private static ExecutorService threadPool = Executors.newFixedThreadPool(10 ); private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); @Test public void test1 () { System.out.println("\n---------------------- Test 1 ----------------------\n" ); for (int i=0 ; i<2 ; i++) { Runnable runnable = new Task ("Task" +i, lock.readLock(), lock.readLock()); threadPool.execute( runnable ); } try { Thread.sleep( 120 *1000 ); } catch (Exception e) {} } @Test public void test2 () { System.out.println("\n---------------------- Test 2 ----------------------\n" ); for (int i=0 ; i<2 ; i++) { Runnable runnable = new Task ("Task" +i, lock.writeLock(), lock.writeLock()); threadPool.execute( runnable ); } try { Thread.sleep( 120 *1000 ); } catch (Exception e) {} } public static void info (String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "[" +time+"] " + msg; System.out.println(log); } @AllArgsConstructor private static class Task implements Runnable { private String name; private Lock firstLock; private Lock secondLock; @Override public void run () { firstLock.lock(); info(name+ ": 成功获取锁 firstLock" ); try { Thread.sleep(RandomUtils.nextLong(1000 , 3000 )); methodA(); } catch (Exception e) { System.out.println( "Happen Exception: " + e.getMessage()); } finally { info(name+ ": 释放锁 firstLock" ); firstLock.unlock(); } } private void methodA () { secondLock.lock(); info(name+ ": 成功获取锁 secondLock" ); try { Thread.sleep(RandomUtils.nextLong(1000 , 3000 )); } catch (Exception e) { System.out.println( "Happen Exception: " + e.getMessage()); } finally { info(name+ ": 释放锁 secondLock" ); secondLock.unlock(); } } } }
测试结果,如下所示
锁升级、降级 所谓锁升级指的是读锁升级为写锁。当一个线程先获取到读锁再去申请写锁,显然ReentrantReadWriteLock是不支持的。理由也很简单,读锁是可以多个线程同时持有的。若其中的一个线程能够进行锁升级,成功获得写锁。显然与我们之前的所说的读写互斥相违背。因为其在获得写锁的同时,其他线程依然持有读锁
反之,ReentrantReadWriteLock是支持锁降级的,即写锁降级为读锁。当一个线程在获得写锁后,依然可以获得读锁。这个时候当其释放写锁,则将只持有读锁,即完成了锁降级过程。锁降级的场景也很常见,假设存在一个先写后读的方法。共计耗时5s。其中前1秒用于写操作、后4秒用于读操作。最简单的思路是对该方法从开始到结束全部使用写锁进行保护。但其实该方法后4秒的读操作完全没有必要使用写锁进行保护。因为这样会阻塞其他线程读锁的获取,效率较低。而如果通过写锁、读锁分别对前1秒、后4秒的操作进行控制,即先获取写锁、再释放写锁,然后获取读锁、再释放读锁的方案。则有可能导致并发问题,具体表现在执行该方法过程中,刚释放写锁、准备获取读锁时,其他线程恰好获取到了写锁并对数据进行了更新。而锁降级则为此场景提供了新的解决思路及方案。其一方面保证了安全,读锁在写锁释放前获取,另一方面保证了高效,因为读锁是共享的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class ReentrantReadWriteLockTest2 { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS" ); private static ExecutorService threadPool = Executors.newFixedThreadPool(10 ); private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); @Test public void test3 () { System.out.println("\n---------------------- Test 3 ----------------------\n" ); Runnable runnable1 = new Task ("Task" , lock.readLock(), lock.writeLock()); threadPool.execute( runnable1 ); try { Thread.sleep( 120 *1000 ); } catch (Exception e) {} } @Test public void test4 () { System.out.println("\n---------------------- test4 ----------------------\n" ); Runnable runnable = () -> { try { lock.writeLock().lock(); info("成功获取写锁" ); lock.readLock().lock(); info("成功获取读锁" ); } catch (Exception e) { System.out.println( "Happen Exception: " + e.getMessage()); } finally { lock.writeLock().unlock(); info("释放写锁" ); } }; threadPool.execute( runnable ); try { Thread.sleep( 10 *1000 ); } catch (Exception e) {} for (int i=0 ; i<2 ; i++) { Runnable runnable2 = new Task ("Task" +i, lock.readLock(), lock.readLock()); threadPool.execute( runnable2 ); } try { Thread.sleep( 120 *1000 ); } catch (Exception e) {} } }
从Test 3的测试结果可以看出。由于不支持锁升级,故其在持有读锁的条件下尝试获取写锁会被一直阻塞下去
从Test 4的测试结果可以看出锁降级是可行的。这里为了便于演示,故runnable一直未释放其持有的读锁。实际应用中需要将其释放掉
实现原理 基本结构 ReentrantReadWriteLock读写锁的实现过程同样依赖于AQS,其是对AQS中共享锁、互斥锁的应用。在构建ReentrantReadWriteLock读写锁实例过程中,一方面,其会创建AQS实现类Sync的实例,其中Sync根据公平性与否又可细分为NonfairSync、FairSync这两个子类。这两个子类通过实现Sync中的readerShouldBlock、writerShouldBlock抽象方法来保障公平与否这一特性;另一方面,还会相应地创建ReadLock、WriteLock实例,并通过持有Sync实例来进行对AQS的调用。而且在ReentrantReadWriteLock读写锁的实现中,其将AQS的state字段分为两部分来使用。具体地,state字段的高16位表示获取到读锁的次数;state字段的低16位表示获取到写锁的次数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 public class ReentrantReadWriteLock implements ReadWriteLock , java.io.Serializable { private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public ReentrantReadWriteLock.WriteLock writeLock () { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock () { return readerLock; } public ReentrantReadWriteLock () { this (false ); } public ReentrantReadWriteLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); readerLock = new ReadLock (this ); writerLock = new WriteLock (this ); } abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16 ; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1 ; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1 ; static int sharedCount (int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount (int c) { return c & EXCLUSIVE_MASK; } abstract boolean readerShouldBlock () ; abstract boolean writerShouldBlock () ; } static final class NonfairSync extends Sync { final boolean writerShouldBlock () { ... } final boolean readerShouldBlock () { ... } } static final class FairSync extends Sync { final boolean writerShouldBlock () { ... } final boolean readerShouldBlock () { ... } } public static class ReadLock implements Lock , java.io.Serializable { private final Sync sync; protected ReadLock (ReentrantReadWriteLock lock) { sync = lock.sync; } } public static class WriteLock implements Lock , java.io.Serializable { private final Sync sync; protected WriteLock (ReentrantReadWriteLock lock) { sync = lock.sync; } } }
写锁 ReentrantReadWriteLock中的写锁是对AQS中互斥锁的使用。其使用方式是通过writeLock()获取写锁实例,然后分别通过写锁的lock()、unlock()方法进行加锁、解锁操作
在调用加锁lock()方法时,其首先会调用AQS的acquire()方法。而在Sync类中则提供了tryAcquire方法的实现。如果其返回true则加锁操作结束,否则其将会进入AQS的阻塞队列。同时为了支持公平、非公平两种实现版本,Sync类中定义了writerShouldBlock抽象方法,用于判断当前线程是否可以直接通过CAS获取锁。然后通过Sync的子类NonfairSync、FairSync来实现该方法。具体地,在NonfairSync类中,writerShouldBlock方法会直接返回false。即直接利用CAS获取锁;而在FairSync类中,writerShouldBlock方法需要调用AQS的hasQueuedPredecessors方法,即如果AQS阻塞队列中如果没有其他线程在排队才可以通过CAS获取锁
类似地,在调用解锁unlock()方法时,其首先会调用AQS的release()方法。而在Sync类中则提供了tryRelease方法的实现。如果返回true则说明锁已经完全被释放了,需要AQS唤醒队列中的其他线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 public class ReentrantReadWriteLock implements ReadWriteLock , java.io.Serializable { private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public ReentrantReadWriteLock.WriteLock writeLock () { return writerLock; } abstract static class Sync extends AbstractQueuedSynchronizer { abstract boolean writerShouldBlock () ; protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; } protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) setExclusiveOwnerThread(null ); setState(nextc); return free; } } static final class NonfairSync extends Sync { final boolean writerShouldBlock () { return false ; } } static final class FairSync extends Sync { final boolean writerShouldBlock () { return hasQueuedPredecessors(); } } public static class WriteLock implements Lock , java.io.Serializable { private final Sync sync; protected WriteLock (ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock () { sync.acquire(1 ); } public void unlock () { sync.release(1 ); } } } ... public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException (); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected boolean tryRelease (int arg) { throw new UnsupportedOperationException (); } }
读锁 ReentrantReadWriteLock中的读锁是对AQS中共享锁的使用。其使用方式是通过readLock()获取读锁实例,然后分别通过读锁的lock()、unlock()方法进行加锁、解锁操作
在调用加锁lock()方法时,其首先会调用AQS的acquireShared()方法。而在Sync类中则提供了tryAcquireShared方法的实现。如果返回值小于0则进入AQS的阻塞队列,否则加锁操作结束。同时为了支持公平、非公平两种实现版本,Sync类中定义了readerShouldBlock抽象方法,用于判断当前线程是否可以直接通过CAS获取锁。然后通过Sync的子类NonfairSync、FairSync来实现该方法。具体地,在NonfairSync类中,readerShouldBlock方法会调用AQS的apparentlyFirstQueuedIsExclusive方法来判断AQS阻塞队列中排队的第一个节点是不是获取写锁的,如果是则放弃本次CAS操作;而在FairSync类中,readerShouldBlock方法同样需要调用AQS的hasQueuedPredecessors方法,即如果AQS阻塞队列中如果没有其他线程在排队,本次才尝试通过CAS获取锁
类似地,在调用解锁unlock()方法时,其首先会调用AQS的releaseShared()方法。而在Sync类中则提供了tryReleaseShared方法的实现。如果返回true则说明锁已经完全被释放了,需要AQS进一步唤醒队列中的其他线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 public class ReentrantReadWriteLock implements ReadWriteLock , java.io.Serializable { private final ReentrantReadWriteLock.ReadLock readerLock; final Sync sync; public ReentrantReadWriteLock.ReadLock readLock () { return readerLock; } abstract static class Sync extends AbstractQueuedSynchronizer { abstract boolean readerShouldBlock () ; protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); } protected final boolean tryReleaseShared (int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1 ) firstReader = null ; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1 ) { readHolds.remove(); if (count <= 0 ) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } } static final class NonfairSync extends Sync { final boolean readerShouldBlock () { return apparentlyFirstQueuedIsExclusive(); } } static final class FairSync extends Sync { final boolean readerShouldBlock () { return hasQueuedPredecessors(); } } public static class ReadLock implements Lock , java.io.Serializable { private final Sync sync; protected ReadLock (ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock () { sync.acquireShared(1 ); } public void unlock () { sync.releaseShared(1 ); } } } ... public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } protected int tryAcquireShared (int arg) { throw new UnsupportedOperationException (); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected boolean tryReleaseShared (int arg) { throw new UnsupportedOperationException (); } }
参考文献
Java并发编程之美 翟陆续、薛宾田著