ReentrantReadWriteLock使用场景
ReentrantReadWriteLock 是 Java 的一种读写锁,它允许多个读线程同时访问,但只允许一个写线程访问(会阻塞所有的读写线程)。这种锁的设计可以提高性能,特别是在读操作的数量远远超过写操作的环境下。
在并发场景中,为了办理线程安全题目,我们通常会使用关键字 synchronized 或者 JUC 包中实现了 Lock 接口的 ReentrantLock。但它们都是独占式获取锁,也就是在同一时刻只有一个线程能够获取锁。
而在一些业务场景中,大部分只是读数据,写数据很少,如果仅仅是读数据的话并不会影响数据正确性,而如果在这种业务场景下,依然使用独占锁的话,很显然会出现性能瓶颈。针对这种读多写少的环境,Java 提供了别的一个实现 Lock 接口的 ReentrantReadWriteLock——读写锁。
ReentrantReadWriteLock其实就是 读读并发、读写互斥、写写互斥。如果一个对象并发读的场景大于并发写的场景,那就可以使用 ReentrantReadWriteLock来达到保证线程安全的前提下提高并发效率。起首,我们先了解一下Doug Lea为我们准备的两个demo。
CachedData
一个缓存对象的使用案例,缓存对象在使用时,一样平常并发读的场景远远大于并发写的场景,所以缓存对象是非常得当使用ReentrantReadWriteLock来做控制的- class CachedData {
- //被缓存的具体对象
- Object data;
- //当前对象是否可用,使用volatile来保证可见性
- volatile boolean cacheValid;
- //ReentrantReadWriteLock
- final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
- //业务处理逻辑
- void processCachedData() {
- //要读取数据时,先加读锁,如果加成功,说明此时没有人在并发写
- rwl.readLock().lock();
- //拿到读锁后,判断当前对象是否有效
- if (!cacheValid) {
- // Must release read lock before acquiring write lock
- //这里的处理非常经典,当你持有读锁之后,不能直接获取写锁,
- //因为写锁是独占锁,如果直接获取写锁,那代码就在这里死锁了
- //所以必须要先释放读锁,然后手动获取写锁
- rwl.readLock().unlock();
- rwl.writeLock().lock();
- try {
- // Recheck state because another thread might have
- // acquired write lock and changed state before we did.
- //经典处理之二,在独占锁内部要处理数据时,一定要做二次校验
- //因为可能同时有多个线程全都在获取写锁,
- //当时线程1释放写锁之后,线程2马上获取到写锁,此时如果不做二次校验那可能就导致某些操作做了多次
- if (!cacheValid) {
- data = ...
- //当缓存对象更新成功后,重置标记为true
- cacheValid = true;
- }
- // Downgrade by acquiring read lock before releasing write lock
- //这里有一个非常神奇的锁降级操作,所谓降级是说当你持有写锁后,可以再次获取读锁
- //这里之所以要获取一次写锁是为了防止当前线程释放写锁之后,其他线程马上获取到写锁,改变缓存对象
- //因为读写互斥,所以有了这个读锁之后,在读锁释放之前,别的线程是无法修改缓存对象的
- rwl.readLock().lock();
- } finally {
- rwl.writeLock().unlock(); // Unlock write, still hold read
- }
- }
- try {
- use(data);
- } finally {
- rwl.readLock().unlock();
- }
- }
- }
复制代码 RWDictionary
Doug Lea给出的第二个demo,一个并发容器的demo。并发容器我们一样平常都是直接使用ConcurrentHashMap的,但是我们可以使用非并发安全的容器+ReentrantReadWriteLock来组合出一个并发容器。如果这个并发容器的读的频率>写的频率,那这个效率照旧不错的- class RWDictionary {
- //原来非并发安全的容器
- private final Map<String, Data> m = new TreeMap<String, Data>();
- private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
- private final Lock r = rwl.readLock();
- private final Lock w = rwl.writeLock();
- public Data get(String key) {
- //读数据,上读锁
- r.lock();
- try { return m.get(key); }
- finally { r.unlock(); }
- }
- public String[] allKeys() {
- //读数据,上读锁
- r.lock();
- try { return m.keySet().toArray(); }
- finally { r.unlock(); }
- }
- public Data put(String key, Data value) {
- //写数据,上写锁
- w.lock();
- try { return m.put(key, value); }
- finally { w.unlock(); }
- }
- public void clear() {
- //写数据,上写锁
- w.lock();
- try { m.clear(); }
- finally { w.unlock(); }
- }
- }
复制代码 ReentrantReadWriteLock的特性
读写锁允许同一时刻被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞。
在分析 WirteLock 和 ReadLock 的互斥性时,我们可以按照 WriteLock 与 WriteLock,WriteLock 与 ReadLock 以及 ReadLock 与 ReadLock 进行对比分析。
这里总结一下读写锁的特性:
- 公平性选择:支持非公平性(默认)和公平的锁获取方式,非公平的吞吐量优于公平;
- 重入性:支持重入,读锁获取后能再次获取,写锁获取之后能够再次获取写锁,同时也能够获取读锁;
- 锁降级:写锁降级是一种允许写锁转换为读锁的过程。通常的次序是:
- 获取写锁:线程起首获取写锁,确保在修改数据时排它访问。
- 获取读锁:在写锁保持的同时,线程可以再次获取读锁。
- 开释写锁:线程保持读锁的同时开释写锁。
- 开释读锁:最后线程开释读锁。
这样,写锁就降级为读锁,允许其他线程进行并发读取,但仍然排除其他线程的写操作。
接下来额外说一下锁降级
锁降级指的是写锁降级成为读锁。如果当火线程拥有写锁,然后将其开释,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后开释(先前拥有的)写锁的过程。
接下来看一个锁降级的示例。因为数据不常变革,所以多个线程可以并发地进行数据处理,当数据变更后,如果当火线程感知到数据变革,则进行数据的准备工作,同时其他处理线程被阻塞,直到当火线程完成数据的准备工作,如代码如下所示:- public void processData() {
- readLock.lock();
- if (!update) {
- // 必须先释放读锁
- readLock.unlock();
- // 锁降级从写锁获取到开始
- writeLock.lock();
- try {
- if (!update) {
- // 准备数据的流程(略)
- update = true;
- }
- readLock.lock();
- } finally {
- writeLock.unlock();
- }
- // 锁降级完成,写锁降级为读锁
- }
- try {
- // 使用数据的流程(略)
- } finally {
- readLock.unlock();
- }
- }
复制代码 上述示例中,当数据发生变更后,update变量(布尔范例且volatile修饰)被设置为false,此时所有访问processData()方法的线程都能够感知到变革,但只有一个线程能够获取到写锁,其他线程会被阻塞在读锁和写锁的lock()方法上。当火线程获取写锁完成数据准备之后,再获取读锁,随后开释写锁,完成锁降级。
锁降级中读锁的获取是否须要呢? 答案是须要的。主要是为了保证数据的可见性,如果当火线程不获取读锁而是直接开释写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当火线程无法感知线程T的数据更新。如果当火线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当火线程使用数据并开释读锁之后,线程T才气获取写锁进行数据更新。
RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁,最后开释读锁的过程)。目标也是保证数据可见性,如果读锁已被多个线程获取,此中恣意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。
ReentrantReadWriteLock源码分析
类的继承关系
- public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}
复制代码 说明: 可以看到,ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口界说了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以进行序列化,在源代码中可以看到ReentrantReadWriteLock实现了本身的序列化逻辑。
类的内部类
ReentrantReadWriteLock有五个内部类,五个内部类之间也是相互关联的。内部类的关系如下图所示。
说明: 如上图所示,Sync继承自AQS、NonfairSync继承自Sync类、FairSync继承自Sync类;ReadLock实现了Lock接口、WriteLock也实现了Lock接口。
内部类 -类Sync
- abstract static class Sync extends AbstractQueuedSynchronizer {}
复制代码 说明: Sync抽象类继承自AQS抽象类,Sync类提供了对ReentrantReadWriteLock的支持。
Sync类内部存在两个内部类,分别为HoldCounter和ThreadLocalHoldCounter,此中HoldCounter主要与读锁配套使用,此中,HoldCounter源码如下。- // 计数器
- static final class HoldCounter {
- // 计数
- int count = 0;
- // Use id, not reference, to avoid garbage retention
- // 获取当前线程的TID属性的值
- final long tid = getThreadId(Thread.currentThread());
- }
复制代码 说明: HoldCounter主要有两个属性,count和tid,此中count表示某个读线程重入的次数,tid表示该线程的tid字段的值,该字段可以用来唯一标识一个线程。ThreadLocalHoldCounter的源码如下- // 本地线程计数器
- static final class ThreadLocalHoldCounter
- extends ThreadLocal<HoldCounter> {
- // 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值
- public HoldCounter initialValue() {
- return new HoldCounter();
- }
- }
复制代码 说明: ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类可以将线程与对象相关联。在没有进行set的环境下,get到的均是initialValue方法里面生成的谁人HolderCounter对象。
- abstract static class Sync extends AbstractQueuedSynchronizer {
- // 版本序列号
- private static final long serialVersionUID = 6317671515068378041L;
- // 高16位为读锁,低16位为写锁
- static final int SHARED_SHIFT = 16;
- // 读锁单位
- static final int SHARED_UNIT = (1 << SHARED_SHIFT);
- // 读锁最大数量
- static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
- // 写锁最大数量
- static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
- // 本地线程计数器
- private transient ThreadLocalHoldCounter readHolds;
- // 缓存的计数器
- private transient HoldCounter cachedHoldCounter;
- // 第一个读线程
- private transient Thread firstReader = null;
- // 第一个读线程的计数
- private transient int firstReaderHoldCount;
- }
复制代码 说明::直接将state右移16位,就可以得到读锁的线程数量,因为state的高16位表示读锁,对应的低十六位表示写锁数量。
表示占有写锁的线程数量,源码如下- // 构造函数
- Sync() {
- // 本地线程计数器
- readHolds = new ThreadLocalHoldCounter();
- // 设置AQS的状态
- setState(getState()); // ensures visibility of readHolds
- }
复制代码 说明:
EXCLUSIVE_MASK为:- public class ReentrantReadWriteLock
- implements ReadWriteLock, java.io.Serializable {
- // 版本序列号
- private static final long serialVersionUID = -6992448646407690164L;
- // 读锁
- private final ReentrantReadWriteLock.ReadLock readerLock;
- // 写锁
- private final ReentrantReadWriteLock.WriteLock writerLock;
- // 同步队列
- final Sync sync;
-
- private static final sun.misc.Unsafe UNSAFE;
- // 线程ID的偏移地址
- private static final long TID_OFFSET;
- static {
- try {
- UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class<?> tk = Thread.class;
- // 获取线程的tid字段的内存地址
- TID_OFFSET = UNSAFE.objectFieldOffset
- (tk.getDeclaredField("tid"));
- } catch (Exception e) {
- throw new Error(e);
- }
- }
- }
复制代码 说明: 此函数用于获取写锁:起首会获取state,判定是否为0;
1. 若为0,表示此时没有读锁线程,再判定写线程是否应该被阻塞,而在非公平策略下总是不会被阻塞,在公平策略下会进行判定(判定同队伍列中是否有等待时间更长的线程;若存在,则需要被阻塞,否则,无需阻塞),之后在设置状态state,然后返回true。
2. 若state不为0,则表示此时存在读锁或写锁线程,若写锁线程数量为0或者当火线程为独占锁线程,则返回false,表示不成功,否则,判定写锁线程的重入次数是否大于了最大值,若是,则抛出非常,否则,设置状态state,返回true,表示成功。其函数流程图如下
其主要逻辑为:当读锁已经被读线程获取或者写锁已经被其他写线程获取,则写锁获取失败;否则,获取成功并支持重入,增长写状态。
写锁的开释
写锁开释通过重写 AQS 的 tryRelease 方法,源码为:
- public ReentrantReadWriteLock() {
- this(false);
- }
复制代码 说明: 此函数用于开释写锁资源,起首会判定该线程是否为独占线程,若不为独占线程,则抛出非常,否则,计算开释资源后的写锁的数量,若为0,表示成功开释,资源不将被占用,否则,表示资源还被占用。其函数流程图如下。
读锁的获取
看完了写锁,再来看看读锁,读锁不是独占式锁,即同一时刻该锁可以被多个读线程获取,也就是一种共享式锁。按照之前对 AQS 的介绍,实现共享式同步组件的同步语义需要通过重写 AQS 的 tryAcquireShared 方法和 tryReleaseShared 方法。读锁的获取实现方法为:
- public ReentrantReadWriteLock(boolean fair) {
- // 公平策略或者是非公平策略
- sync = fair ? new FairSync() : new NonfairSync();
- // 读锁
- readerLock = new ReadLock(this);
- // 写锁
- writerLock = new WriteLock(this);
- }
复制代码 说明: 此函数表示读锁线程获取读锁。起首判定写锁是否为0并且当火线程不占有独占锁,直接返回;否则,判定读线程是否需要被阻塞并且读锁数量是否小于最大值并且比较设置状态成功,若当前没有读锁,则设置第一个读线程firstReader和firstReaderHoldCount;若当火线程线程为第一个读线程,则增长firstReaderHoldCount;否则,将设置当火线程对应的HoldCounter对象的值。流程图如下。
当写锁被其他线程获取后,读锁获取失败,否则获取成功,会使用 CAS 更新同步状态。
别的,当前同步状态需要加上 SHARED_UNIT((1 |