一、读写锁
有这样一种场景:
- 1、如果对一个共享资源的写操作没有读操作那么频繁,这个时候可以允许多个线程同时读取共享资源;
- 2、但是如果有一个线程想去写这些共享资源,那么其他线程此刻就不应该对这些资源进行读和写操作了。
Java中的ReentrantReadWriteLock正是为这种场景提供的锁。该类里面包括了读锁和写锁。
1.1、可获取读锁的情况
- 没有其他线程正在持有写锁;
- 尝试获取读锁的线程同时持有写锁。
1.2、可获取写锁的情况
- 没有其他线程正在持有读锁;
- 没有其他线程正在持有写锁。
1.3、读写锁特点
- 允许并发读:只要没有线程正在更新数据,那么多个线程就可以同时读取数据;
- 只能独占写:只要有一个线程正在写数据,那么就会导致其他线程的读或者写均被阻塞;但写的线程可以获取读锁,并通过释放写锁,让锁降级为读锁;(不能由读锁升级为写锁)
- 只要有一个线程正在读数据,那么其他线程的写入就会阻塞,直到读锁被释放;
- 公平性:支持非公平锁和公平锁,非公平锁吞吐量较高;
- 可重入:无论是读锁还是写锁都是支持可重入的。
读写锁可以增加更新不频繁而读取频繁的共享数据结构的吞吐量。
二、ReentrantReadWriteLock读写锁
ReentrantReadWriteLock是Lock的另一种实现方式,我们已经知道了ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时ReentrantReadWriteLock能够提供比排他锁更好的并发性和吞吐量。
读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有 ReadWriteLock实现都必须保证 writeLock操作的内存同步效果也要保持与相关 readLock的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。
ReentrantReadWriteLock支持以下功能:
-
1、支持公平和非公平的获取锁的方式;
-
2、支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;
-
3、还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;
-
4、读取锁和写入锁都支持锁获取期间的中断;
-
5、Condition支持。仅写入锁提供了一个 Conditon 实现;读取锁不支持 Conditon ,readLock().newCondition() 会抛出 UnsupportedOperationException。
三、ReentrantReadWriteLock使用
3.1 更新缓存
public class CachedData { private Map<String,String> cacheData = new HashMap<>(); private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public String queryCachedData(String key) { //获取读锁 lock.readLock().lock(); try{ //如果缓存有效, 直接使用data String data = cacheData.get(key); if(!StringUtils.isEmpty(data)){ return data; } }finally { //释放读锁 lock.readLock().unlock(); } //获取写锁 lock.writeLock().lock(); try{ //如果缓存无效,更新cache; String data = loadCachedData(key); cacheData.put(key,data); return data; }finally { //释放写锁 lock.writeLock().unlock(); } } }3.2 支持并发读写的ArrayList
public class ReadWriteList<E> { private List<E> list = new ArrayList<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock readLock = lock.readLock(); //读锁 private final Lock writeLock = lock.writeLock(); //写锁 public ReadWriteList(E... initialElements) { list.addAll(Arrays.asList(initialElements)); } public void add(E element) { writeLock.lock(); try { list.add(element); } finally { writeLock.unlock(); } } public E get(int index) { readLock.lock(); try { return list.get(index); } finally { readLock.unlock(); } } public int size() { readLock.lock(); try { return list.size(); } finally { readLock.unlock(); } } }四、实现原理
ReentrantReadWriteLock是可重入读写锁的实现。我们先来看看涉及到的类:
可以看到,ReentrantReadWriteLock中也具有非公平锁NonfairSync和公平锁FairSync的实现。同时ReentrantReadWriteLock组合了两把锁:写锁WriteLock和读锁ReadLock。
看看具体的构造函数:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { /** Inner class providing readlock */ private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock */ private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } abstract static class Sync extends AbstractQueuedSynchronizer { } }可以发现,ReentrantReadWriteLock默认是非公平锁,可以通过参数fair控制是创建非公平锁还是公平锁。同时ReentrantReadWriteLock持有了写锁和读锁。
而本质上,读锁和写锁都是通过持有ReentrantReadWriteLock.sync来进行加锁和释放锁的,用的是同一个AQS,Sync类提供
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { // 引用的是ReentrantReadWriteLock的sync实例 sync = lock.sync; } } public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { // 引用的是ReentrantReadWriteLock的sync实例 sync = lock.sync; } } }基于对AQS原理的理解,知道sync是读写锁实现的关键,而aqs中核心是state字段和双端等待队列。下面我们来看看具体的实现。
4.1 提前了解的内容
在查看ReentrantReadWriteLock之前,您需要了解以下内容:
4.1.1、Sync.HoldCounter类
读锁计数器类,为每个获取读锁的线程进行计数。Sync类中有一个cachedHoldCounter字段,该字段主要是缓存上一个线程的读锁计数器,节省ThreadLocal查找次数。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { abstract static class Sync extends AbstractQueuedSynchronizer { static final class HoldCounter { // 某个读线程的重入次数 int count = 0; // Use id, not reference, to avoid garbage retention // 某个线程的tid字段 final long tid = getThreadId(Thread.currentThread()); } } }4.1.2、Sync.ThreadLocalHoldCounter类
当前线程持有的可重入读锁的数量,当数量下降到0的时候进行删除。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { abstract static class Sync extends AbstractQueuedSynchronizer { static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } } }4.1.3、Sync类的属性
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { abstract static class Sync extends AbstractQueuedSynchronizer { // 高16位为读锁,低16位为写锁 static final int SHARED_SHIFT = 16; // 读锁单位 static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 读锁最大数量 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 写锁最大数量 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 本地线程计数器 private transient ThreadLocalHoldCounter readHolds; // 缓存的计数器 private transient HoldCounter cachedHoldCounter; // 第一个读线程 private transient Thread firstReader = null; // 第一个读线程的计数 private transient int firstReaderHoldCount; } }该属性中包括了读锁、写锁线程的最大量。本地线程计数器等。
4.1.4、Sync类的构造函数
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { abstract static class Sync extends AbstractQueuedSynchronizer { Sync() { // 本地线程计数器 readHolds = new ThreadLocalHoldCounter(); // 设置AQS的状态 setState(getState()); // ensures visibility of readHolds } } }在Sync的构造函数中设置了本地线程计数器和AQS的状态state。
4.1.5、读写锁中AQS的state状态设计
AQS中的state为了能够同时记录读锁和写锁的状态,把32位变量分为了两部分:
如上图,高16位存储读状态,读锁是共享锁,这里记录持有读锁的线程数;低16位是写状态,写锁是排他锁,这里0表示没有线程持有,大于0表示持有线程对锁的重入次数。
假设当前同步状态值为S,get和set的操作如下:
-
1)获取写状态:S&0x0000FFFF:将高16位全部抹去
-
2)获取读状态:S>>>16:无符号补0,右移16位
-
3)写状态加1:S+1
-
4)读状态加1:S+(1<<16)即S + 0x00010000
在代码层的判断中,如果S不等于0,当写状态(S&0x0000FFFF),而读状态(S>>>16)大于0,则表示该读写锁的读锁已被获取。
4.1.6、关于读写锁的数据结构
虽然读写锁看起来有两把锁,但是底层用的都是同一个state,同一个等待队列。只不过是通过ReadLock和WriteLock分别提供了读锁和写锁的API,底层还是用同一个AQS。如下图:
-
由于读写锁是互斥的,所以线程1获取写锁,线程2获取读锁,并发执行的时候,一定有一个会失败;
-
如果是已经获取了读锁的线程尝试获取写锁,则会获取成功;
-
公平模式下,先进入等待队列的线程先被处理;非公平模式下,如果尝试获取写锁的线程节点在头节点后面,尝试获取读锁的线程要让步,进入等待队列;
-
线程节点获取到读锁之后,会判断下一个节点是否处于共享模式,如果是则会一直传播并唤醒后续共享模式节点;
-
如果有其他线程获取了写锁,那么获取写锁就会被阻塞。
公平和非公平是针对等待队列中的线程节点的处理来说的:
- 公平模式一般都是从队列头开始处理,并且如果等待队列还有待处理节点,新的线程全部都入等待队列;
- 非公平模式一般不管等待队列里面有没有待处理节点,都会先尝试竞争获取锁;特殊情况:如果等待队列中有写锁线程,那么新来的读锁线程必须排队让写锁线程先进行处理。
其实关于读写锁的原理就差不多是这么多了。
4.2 ReadLock实现原理
4.2.1、ReadLock.lock()
查看ReadLock的lock相关方法,调用的是AQS的acquireShared方法,该方法会以共享模式获取锁:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync; public void lock() { sync.acquireShared(1); } } } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquireShared(int arg) { // 尝试获取锁 if (tryAcquireShared(arg) < 0) // 如果获取锁失败了,那么会进入ASQ的等待队列,等待被唤醒后重新尝试获取锁 doAcquireShared(arg); } }下面看看关键获取锁的tryAcquireShared方法,该方法主要处理逻辑:
- 1、因为读写是互斥的,如果另一个线程持有写锁,则失败;
- 2、否则,此线程具备锁定write状态的条件,因此判断是否应该进入阻塞。 如果不是,请尝试CAS获取读锁许可并更新读锁计数。 请注意,该步骤不检查重入,这将推迟到最后fullTryAcquireShared方法;
- 3、如果第2步失败,或者由于线程不符合锁定条件或者CAS失败或读锁计数饱和,将会使用fullTryAcquireShared进一步重试。
让我们接着看fullTryAcquireShared方法,这个方法可知,只有其他线程持有写锁,或者使用的是公平锁并且头节点后面还有其他等待的线程,或者头节点后面的节点不是共享模式,或者读锁计数器达到了上限,则阻塞,否则一直会循环尝试获取锁:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { abstract static class Sync extends AbstractQueuedSynchronizer { final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); // 如果存在写锁,并且写锁不是当前线程,则返回false if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // 不存在写锁,继续判断是否应该阻塞:如果是公平锁并且头节点后有其他等待的线程,则阻塞, // 如果是非公平锁,判断头节点后面的节点是否共享模式,如果不是则阻塞 } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly // 如果当前线程是firstReader,说明是重入 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 进入该分支,说明没有读写锁冲突,并且不是重入,当前线程也不是firstReader if (rh == null) { rh = cachedHoldCounter; // 判断上一个获取到锁的线程是否当前线程,不是则进入AQS等待队列 // 计数器为空或者计数器的tid不为当前正在运行的线程的tid if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } // rh.count == 0 表示rh是刚新获取到的,直接返回,进入等待队列 if (rh.count == 0) return -1; } } // 读锁数量为最大值,抛出异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 比较并且设置成功 if (compareAndSetState(c, c + SHARED_UNIT)) { // 读线程数量为0 if (sharedCount(c) == 0) { // 设置第一个读线程 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } } }最后我们来看看doAcquireShared方法:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private void doAcquireShared(int arg) { // 添加一个共享等待节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 判断新增的节点的前一个节点是否头节点 final Node p = node.predecessor(); if (p == head) { // 是头节点,那么在此尝试获取共享锁 int r = tryAcquireShared(arg); if (r >= 0) { // 获取成功,把当前节点变为新的head节点, //并且检查后续节点是否可以在共享模式下等待, //并且允许继续传播,则调用doReleaseShared继续唤醒下一个节点尝试获取锁 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 阻塞节点 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 取消获取锁 cancelAcquire(node); } } }4.2.2、ReadLock.unlock()
接下来我们看看释放锁的代码。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync; public void unlock() { sync.releaseShared(1); } } }AbstractQueuedSynchronizer.releaseShared()
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } }主要处理方法是tryReleaseShared,该方法主要是清理ThreadLocal中的锁计数器,然后CAS修改读锁个数减1:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { abstract static class Sync extends AbstractQueuedSynchronizer { protected final boolean tryReleaseShared(int unused) { // 获取当前线程 Thread current = Thread.currentThread(); // 当前线程为第一个读线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; // 读线程占用的资源数为1 if (firstReaderHoldCount == 1) firstReader = null; else // 减少占用的资源 firstReaderHoldCount--; } else {// 当前线程不为第一个读线程 // 获取缓存的计数器 HoldCounter rh = cachedHoldCounter; // 计数器为空或者计数器的tid不为当前正在运行的线程的tid if (rh == null || rh.tid != getThreadId(current)) // 获取当前线程对应的计数器 rh = readHolds.get(); // 获取计数 int count = rh.count; if (count <= 1) {// 计数小于等于1 // 移除 readHolds.remove(); if (count <= 0) // 计数小于等于0,抛出异常 throw unmatchedUnlockException(); } // 减少计数 --rh.count; } //自旋CAS,减去1<<16 for (;;) {// 无限循环 // 获取状态 int c = getState(); // 获取状态 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc))// 比较并进行设置 // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } } } }4.3 WriteLock实现原理
4.3.1、WriteLock.lock()
查看WriteLock的lock锁相关方法,调用的是sync.acquire方法,该方法直接继承了ASQ的acquire()方法的实现:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; public void lock() { sync.acquire(1); } } } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } }与ReentrantLock的实现区别在具体的tryAcquire()方法的实现,我们来看看ReentrantReadWriteLock.Sync中该方法的实现,主要做了以下事情:
- 如果读锁数量>0,或者写锁数量>0,并且不是重入的,那么直接失败了;
- 如果锁数量为0,那么该线程有资格获取到写锁,进而尝试获取。
4.3.2、WriteLock.unlock()
查看WriteLock的unlock相关方法,调用的是sync.release方法,该方法直接继承了AQS的release实现
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; public void unlock() { sync.release(1); } } } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean release(int arg) { // 尝试释放锁 if (tryRelease(arg)) { // 释放锁成功,则唤醒队列中头节点后的一个线程 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } }- 释放锁的逻辑主要在tryRelease方法,下面是详细代码:
写锁的释放过程还是相对而言比较简单的:首先查看当前线程是否为写锁的持有者,如果不是抛出异常。然后检查释放后写锁的线程数是否为0,如果为0则表示写锁空闲了,释放锁资源将锁的持有线程设置为null,否则释放仅仅只是一次重入锁而已,并不能将写锁的线程清空。
说明:此方法用于释放写锁资源,首先会判断该线程是否为独占线程,若不为独占线程,则抛出异常,否则,计算释放资源后的写锁的数量,若为0,表示成功释放,资源不将被占用,否则,表示资源还被占用。其方法流程图如下。
总结
相比于ReentrantLock,读写锁的实现复杂一些,里面有很多的点很巧妙,比如下面几点:
- 将state拆分,高16位表示读锁状态,低16位表示写锁状态。
- 使用ThreadLocal封装HoldCounter对象,保证每个线程记录自己的重入锁数量。
- 使用锁降级,提高效率。
- 读锁不互斥,读锁和写锁互斥。
- 将首个持有读锁的线程单独保存,而不是放入ThreadLocal中,这样在只有一个读线程的场景中提高效率。
- 保证写锁优先,如果当前读锁正持有锁,在新的线程获取读锁的时候,先看一下阻塞队列第二个节点是不是写锁线程,如果是就阻塞,防止写锁饥饿。
- 公平性,无论写锁还是读锁都支持公平锁。
参考: https://www.itzhai.com/articles/introduction-and-use-of-reentrantreadwritelock.html
https://www.cnblogs.com/xiaoxi/p/9140541.html
https://www.cnblogs.com/zaizhoumo/p/7782941.html
https://www.cnblogs.com/gunduzi/p/13635002.html