AQS深度探索:以ReentrantLock看Java并发编程的高效实现 [复制链接]
发表于 2026-2-9 07:40:26 | 显示全部楼层 |阅读模式
概述

AQS ( Abstract Queued Synchronizer )是一个抽象的队列同步器,通过维护一个共享资源状态( Volatile Int State )来体现同步状态 和一个先辈先出( FIFO )的线程等待队列来完成资源获取的列队工作,通过CAS完成对State值的修改。
AQS团体框架如下:


当有自界说同步器接入时,只需重写第一层所须要的部分方法即可,不须要关注底层详细的实现流程。当自界说同步器举行加锁大概解锁操纵时,先颠末第一层的API进入AQS内部方法,然后颠末第二层举行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处置惩罚,而这些处置惩罚方式均依靠于第五层的底子数据提供层
原理

AQS 为每个共享资源都设置一个共享资源锁,线程在须要访问共享资源时起首须要获取共享资源锁,假如获取到了共享资源锁,便可以在当火线程中利用该共享资源,假如获取不到,则将该线程放入线程等待队列,等待下一次资源调理,流程图如下所示:

Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。
底层结构

state:状态

Abstract Queued Synchronizer 维护了 volatile int 范例的变量,用于体现当前的同步状态。volatile固然不能包管操纵的原子性,但是能包管当前变量state的可见性。
state的访问方式有三种: getState()、setState()和 compareAndSetState(),均是原子操纵,此中,compareAndSetState的实现依靠于 Unsafe的compareAndSwaplnt()
  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. private volatile int state;
  3. protected final int getState() {
  4.     return state;
  5. }
  6. protected final void setState(int newState) {
  7.     state = newState;
  8. }
  9. protected final boolean compareAndSetState(int expect, int update) {
  10.     // See below for intrinsics setup to support this
  11.     return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  12. }
复制代码


CLH队列

Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的假造双向队列(FIFO),AQS是通过将每条哀求共享资源的线程封装成一个节点来实现锁的分配。

AQS利用一个Volatile的int范例的成员变量来体现同步状态,通过内置的FIFO队列来完成资源获取的列队工作,通过CAS完成对State值的修改。
AQS的独占式和共享式


  • 独占式:只有一个线程能实行,详细的 Java 实现有 ReentrantLock。
  • 共享式:多个线程可同时实行,详细的 Java 实现有 Semaphore和CountDownLatch。
AQS只是一个框架 ,只界说了一个接口,详细资源的获取、开释都由自界说同步器去实现。差别的自界说同步器争用共享资源的方式也差别,自界说同步器在实现时只需实现共享资源state的获取与开释方式即可,至于详细线程等待队列的维护,如获取资源失败入队、叫醒出队等, AQS 已经在顶层实现好(就是模板方法模式),不须要详细的同步器再做处置惩罚。自界说同步器实现时告急实现以下几种方法:


  • 以ReentrantLock为例,ReentrantLock中的state初始值为0体现无锁状态。在线程实行 tryAcquire()获取该锁后ReentrantLock中的state+1,这时该线程独占ReentrantLock锁,其他线程在通过tryAcquire() 获取锁时均会失败,直到该线程开释锁后state再次为0,其他线程才有机遇获取该锁。该线程在开释锁之前可以重复获取此锁,每获取一次便会实行一次state+1, 因此ReentrantLock也属于可重入锁。 但获取多少次锁就要开释多少次锁,如许才气包管state终极为0。假如获取锁的次数多于开释锁的次数,则会出现该线程不绝持有该锁的情况;假如获取锁的次数少于开释锁的次数,则运行中的步调会报锁非常。
  • 以CountDownLatch以例,任务分为N个子线程去实行,state也初始化为N(注意N要与线程个数同等)。这N个子线程是并行实行的,每个子线程实行完后countDown()一次,state会CAS减1。比及全部子线程都实行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继承背面的动作。
  • 以Semaphore为例,state则代表可以同时访问的线程数量,也大概明白为访问的答应证(permit)数量。每个线程访问(acquire)时须要拿到对应的答应证,否则举行壅闭,访问竣事则返还(release)答应证。state只能在Semaphore的构造方法中举行初始化,后续不能举行修改。
一样平常来说,自界说同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自界说同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
Node节点

Node即为上面CLH变体队列中的节点。

Node结点是每一个等待获取资源的线程的封装,其包罗了须要同步的线程本身及其等待状态waitStatus
Node中几个方法和属性值的寄义:

  • waitStatus:当前节点在队列中的状态
  • thread:体现处于该节点的线程
  • prev:前驱指针
  • predecessor:返回前驱节点,没有的话抛出npe
  • nextWaiter:指向下一个处于CONDITION状态的节点(由于本篇文章不陈诉Condition Queue队列,这个指针不多先容)
  • next:后继指针
等待状态waitStatus

waitStatus有下面几个摆列值:如是否被壅闭、是否等待叫醒、是否已经被取消等。共有5种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。

  • CANCELLED(1):体现当前结点已取消调理,不再想去获取资源了。当timeout或被制止(相应制止的情况下),会触发变更为此状态,进入该状态后的结点将不会再厘革。
  • SIGNAL(-1):体现后继结点在等待当前结点叫醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
  • CONDITION(-2):体现结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同队伍列中,等待获取同步锁。
  • PROPAGATE(-3):共享模式下,前继结点不但会叫醒厥后继结点,同时也大概会叫醒后继的后继结点。
  • 0:新结点入队时的默认状态。

注意,负值体现结点处于有效等待状态,而正值体现结点已被取消。以是源码中许多地方用>0、0是取消状态        if (ws > 0) {            do {                // 循环向前查找取消节点,把取消节点从队列中剔除                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            pred.next = node;        } else {            // 设置前任节点等待状态为SIGNAL            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }        return false;}[/code]parkAndCheckInterrupt告急用于挂起当火线程,壅闭调用栈,返回当火线程的制止状态。
  1. public final void acquire(int arg) {
  2.      if (!tryAcquire(arg) &&
  3.          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.          selfInterrupt();
  5. }
复制代码
详细挂起流程用流程图体现如下(shouldParkAfterFailedAcquire流程):

整个流程中,假如前驱结点的状态不是SIGNAL,那么本身就不能安心去苏息,须要去找个安心的苏息点,同时可以再实行下看有没有机遇轮到本身拿号。
park()会让当火线程进入waiting状态。在此状态下,有两种途径可以叫醒该线程:1)被unpark();2)被interrupt()。须要注意的是,Thread.interrupted()会打扫当火线程的制止标志位。
那么shouldParkAfterFailedAcquire中取消节点是怎么天生的呢?什么时间会把一个节点的waitStatus设置为-1?
是在什么时间开释节点关照到被挂起的线程呢?
CANCELLED状态节点天生

回看acquireQueued方法中的Finally代码
  1. protected boolean tryAcquire(int arg) {
  2.      throw new UnsupportedOperationException();
  3. }
复制代码
显然,当failed为true时才会实行方法cancelAcquire,那什么情况下failed为true呢?try代码段实行过程中出现非常。
这里不知道那边会出现非常?假设tryAcquire出现的非常,那么acquire方法就已经不会今后实行,也就不会实行到acquireQueued
通过cancelAcquire方法,将Node的状态标志为CANCELLED。
  1. protected final boolean tryAcquire(int acquires) {
  2.     final Thread current = Thread.currentThread();
  3.     int c = getState();
  4.     if (c == 0) {
  5.            if (!hasQueuedPredecessors() &&  //公平锁加锁时判断等待队列中是否存在有效节点的方法
  6.                 compareAndSetState(0, acquires)) {
  7.                 setExclusiveOwnerThread(current);
  8.                 return true;
  9.            }
  10.      }
  11.      else if (current == getExclusiveOwnerThread()) {
  12.            int nextc = c + acquires;
  13.            if (nextc < 0)
  14.                 throw new Error("Maximum lock count exceeded");
  15.            setState(nextc);
  16.            return true;
  17.      }
  18.      return false;
  19. }
复制代码
releaseShared()

此方法是共享模式下线程开释共享资源的顶层入口。它会开释指定量的资源,假如乐成开释且答应叫醒等待线程,它会叫醒等待队列里的其他线程来获取资源。
  1. protected final boolean tryAcquire(int acquires) {
  2.     return nonfairTryAcquire(acquires);
  3. }
  4. final boolean nonfairTryAcquire(int acquires) {
  5.      final Thread current = Thread.currentThread();
  6.      int c = getState();
  7.      if (c == 0) {
  8.            if (compareAndSetState(0, acquires)) {
  9.                setExclusiveOwnerThread(current);
  10.                return true;
  11.            }
  12.       }
  13.       else if (current == getExclusiveOwnerThread()) {
  14.            int nextc = c + acquires;
  15.            if (nextc < 0) // overflow
  16.                 throw new Error("Maximum lock count exceeded");
  17.            setState(nextc);
  18.            return true;
  19.       }
  20.       return false;
  21. }
复制代码
此方法的流程也比力简朴,一句话:开释掉资源后,叫醒后继。跟独占模式下的release()相似,但有一点轻微须要注意:独占模式下的tryRelease()在完全开释掉资源(state=0)后,才会返回true去叫醒其他线程,这告急是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制肯定量的线程并发实行,那么拥有资源的线程在开释掉部分资源时就可以叫醒后继等待结点。比方,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就须要等待。A在运行过程中开释掉2个资源量,然后tryReleaseShared(2)返回true叫醒C,C一看只有3个仍不敷继承等待;随后B又开释2个,tryReleaseShared(2)返回true叫醒C,C一看有5个够本身用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全开释掉资源(state=0)才返回true,以是自界说同步器可以根据须要决定tryReleaseShared()的返回值
doReleaseShared()

此方法告急用于叫醒后继
  1. public final boolean hasQueuedPredecessors() {
  2.         // The correctness of this depends on head being initialized
  3.         // before tail and on head.next being accurate if the current
  4.         // thread is first in queue.
  5.         Node t = tail; // Read fields in reverse initialization order
  6.         Node h = head;
  7.         Node s;
  8.         return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
  9. }
复制代码
应用

Mutex是一个不可重入的互斥锁实现。锁资源(AQS里的state)只有两种状态:0体现未锁定,1体现锁定。焦点源码:
  1. private Node addWaiter(Node mode) {
  2.     //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
  3.     Node node = new Node(Thread.currentThread(), mode);
  4.     //尝试快速方式直接放到队尾。
  5.     Node pred = tail;
  6.     if (pred != null) {
  7.         node.prev = pred;
  8.         if (compareAndSetTail(pred, node)) {
  9.             pred.next = node;
  10.             return node;
  11.         }
  12.     }
  13.     //上一步失败则通过enq入队。
  14.     enq(node);
  15.     return node;
  16. }
复制代码
除了Mutex,ReentrantLock/CountDownLatch/Semphore这些同步类的实现方式都差不多,差别的地方就在获取-开释资源的方式tryAcquire-tryRelelase。
ReentrantLock 的利用

ReentrantLock 的利用方式与 synchronized 关键字雷同,都是通过加锁和开释锁来实现同步的。我们来看看 ReentrantLock 的利用方式,以非公平锁为例:
  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. static {
  3.     try {
  4.         stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
  5.         headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
  6.         tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
  7.         waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
  8.         nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
  9.     } catch (Exception ex) {
  10.     throw new Error(ex);
  11.   }
  12. }
复制代码
代码很简朴,两个线程分别对 count 变量举行 10000 次累加操纵,末了输出 count 的值。我们来看看运行结果:
  1. private Node enq(final Node node) {
  2.     //CAS"自旋",直到成功加入队尾
  3.     for (;;) {
  4.         Node t = tail;
  5.         if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
  6.             if (compareAndSetHead(new Node()))
  7.                 tail = head;
  8.         } else {//正常流程,放入队尾
  9.             node.prev = t;
  10.             if (compareAndSetTail(t, node)) {
  11.                 t.next = node;
  12.                 return t;
  13.             }
  14.         }
  15.     }
  16. }
复制代码
可以看到,两个线程对 count 变量举行了 private Node enq(final Node node) {
    //CAS"自旋",直到成功加入队尾
    for (;;) {
        Node t = tail;
        if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {//正常流程,放入队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
} 次累加操纵,分析 ReentrantLock 是支持重入性的。再来看看公平锁的利用方式,只须要将 ReentrantLock 的构造方法改为公平锁即可:
  1. final boolean acquireQueued(final Node node, int arg) {
  2.     boolean failed = true;//标记是否成功拿到资源
  3.     try {
  4.         boolean interrupted = false;//标记等待过程中是否被中断过
  5.         //CAS“自旋”!
  6.         for (;;) {
  7.             final Node p = node.predecessor();//拿到前驱
  8.             //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源,也就是当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)。
  9.             if (p == head && tryAcquire(arg)) {
  10.                 setHead(node);// 获取锁成功,头指针移动到当前node
  11.                 p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
  12.                 failed = false; // 成功获取资源
  13.                 return interrupted;//返回等待过程中是否被中断过
  14.             }
  15.             // 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者 是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。
  16.             if (shouldParkAfterFailedAcquire(p, node) &&
  17.                 parkAndCheckInterrupt())
  18.                 interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
  19.         }
  20.     } finally {
  21.         if (failed) //说明发生了意料之外的异常,将节点移除,避免影响到其他节点
  22.             cancelAcquire(node);
  23.     }
  24. }
复制代码
运行结果为:
  1. private Node enq(final Node node) {
  2.     //CAS"自旋",直到成功加入队尾
  3.     for (;;) {
  4.         Node t = tail;
  5.         if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
  6.             if (compareAndSetHead(new Node()))
  7.                 tail = head;
  8.         } else {//正常流程,放入队尾
  9.             node.prev = t;
  10.             if (compareAndSetTail(t, node)) {
  11.                 t.next = node;
  12.                 return t;
  13.             }
  14.         }
  15.     }
  16. }
复制代码
可以看到,公平锁的运行结果与非公平锁的运行结果同等,这是由于公平锁的实现方式与非公平锁的实现方式根本同等,只是在获取锁时增长了判定当前节点是否有前驱节点的逻辑判定。

  • 公平锁: 按照线程哀求锁的次序获取锁,即先到先得。
  • 非公平锁: 线程获取锁的次序大概与哀求锁的次序差别,大概导致某些线程获取锁的速率较快。
须要注意的是,利用 ReentrantLock 时,锁必须在 try 代码块开始之前获取,而且加锁之前不能有非常抛出,否则在 finally 块中就无法开释锁(ReentrantLock 的锁必须在 finally 中手动开释)。
错误示例:
  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. // 靠前驱节点判断当前线程是否应该被阻塞
  3. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  4.         // 获取头结点的节点状态
  5.         int ws = pred.waitStatus;
  6.         // 说明头结点处于唤醒状态
  7.         if (ws == Node.SIGNAL)
  8.             return true;
  9.         // 通过枚举值我们知道waitStatus>0是取消状态
  10.         if (ws > 0) {
  11.             do {
  12.                 // 循环向前查找取消节点,把取消节点从队列中剔除
  13.                 node.prev = pred = pred.prev;
  14.             } while (pred.waitStatus > 0);
  15.             pred.next = node;
  16.         } else {
  17.             // 设置前任节点等待状态为SIGNAL
  18.             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  19.         }
  20.         return false;
  21. }
复制代码
准确示例:
  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. private final boolean parkAndCheckInterrupt() {
  3.     LockSupport.park(this);//调用park()使线程进入waiting状态
  4.     return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
  5. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金.

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表