一文搞懂到底什么是 AQS

曹旭辉  金牌会员 | 2024-7-4 18:19:58 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 835|帖子 835|积分 2505

媒介

日常开辟中,我们经常利用锁大概其他同步器来控制并发,那么它们的基础框架是什么呢?如何实现的同步功能呢?本文将详细讲授构建锁和同步器的基础框架--AQS,并根据源码分析其原理。
一、什么是 AQS?

1. AQS 简介

AQS(Abstract Queued Synchronizer),抽象队列同步器,它是用来构建锁或其他同步器的基础框架。虽然大多数步伐员可能永远不会利用到它,但是知道 AQS 的原理有助于明白一些锁或同步器的是如何运行的。
那么有哪些同步器是基于 AQS 实现的呢?这里仅是简单介绍,详情后续会单独总结一篇文章。
同步器说明CountDownLatch递减的计数器,直至所有线程的使命都执行完毕,才继续执行后续使命。Semaphore信号量,控制同时访问某个资源的数目。CyclicBarrier递增的计数器,所有线程达到屏障时,才会继续执行后续使命。ReentrantLock防止多个线程同时访问共享资源,类似 synchronized 关键字。ReentrantReadWriteLock维护了读锁和写锁,读锁允许多线程访问,读锁阻塞所有线程。Condition提供类似 Object 监视器的方法,于 Lock 共同可以实现等候/关照模式。FutureTask当一个线程需要等候另一线程把某个使命执行完后才能继续执行,此时可以利用 FutureTask如果你明白了 AQS 的原理,也可以基于它去自定义一个同步组件,下文会介绍。
2. AQS 数据布局

AQS 焦点是通过对同步状态的管理,来完成线程同步,底层是依靠一个双端队列来完成同步状态的管理

  • 当火线程获取同步状态失败后,会构造成一个 Node 节点并参加队列末尾,同实阻塞线程。
  • 当同步状态释放时,会把头节点中的线程唤醒,让其再次尝试获取同步状态
如下图,这里只是简单绘制,详细流程见下面原理分析:


这里的每个 Node 节点都存储着当火线程、等候信息等。
3. 资源共享模式

我们在获取共享资源时,有两种模式:
模式说明示例独占模式Exclusive,资源同一时刻只能被一个线程获取ReentrantLock共享模式Share,资源可同时被多个线程获取Semaphore、CountDownLatch二、AQS 原理分析

先简单说下原理分析的流程:

  • 同步状态相关源码;
  • 须重写的方法;
  • Node 节点布局分析;
  • 独占模式下的同步状态的获取与释放;
  • 共享模式下的同步状态的获取与释放;
1. 同步状态相关

上面介绍到, AQS 焦点是通过对同步状态的管理,来完成线程同步,所以起首介绍管理同步状态的三个方法,在自定义同步组件时,需要通过它们获取和修改同步状态。
  1. //保证可见性
  2. private volatile int state
  3. //获取当前同步状态。
  4. protected final int getState() {
  5.     return state;
  6. }
  7. //设置当前同步状态。
  8. protected final void setState(int newState) {
  9.     state = newState;
  10. }
  11. //使用 CAS 设置当前状态,保证原子性。
  12. protected final boolean compareAndSetState(int expect, int update) {
  13.     // See below for intrinsics setup to support this
  14.     return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  15. }
复制代码
2. 须重写的方法

AQS 是基于模板方法模式的,通过第一个 abstract 也可知道,AQS 是个抽象类,利用者需要继承 AQS 并重写指定方法。
以下这些方式是没有详细实现的,需要在利用 AQS 时在子类中去实现详细方法,比及介绍一些同步组件时,会详细说明如何重写。
  1. //独占式获取同步状态,实现该方法须查询并判断当前状态是否符合预期,然后再进行CAS设置状态。
  2. protected boolean tryAcquire (int arg)
  3. //独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态。
  4. protected boolean tryRelease (int arg)
  5. //共享式获取同步状态,返回大于0的值表示获取成功,反之获取失败。
  6. protected int tryAcquireShared (int arg)
  7. //共享式释放同步状态。
  8. protected boolean tryReleaseShared (int arg)
  9. //当前同步器是否再独占模式下被线程占用,一般用来表示是否被当前线程独占。
  10. protected boolean isHeldExclusively ()
复制代码
3. Node 源码

Node 是双端队列中的节点,是数据布局的告急部门,线程相关的信息都存在每一个 Node 中。

3.1 Node 布局源码

源码如下:
  1. static final class Node {
  2.     //标记当前节点的线程在共享模式下等待。
  3.     static final Node SHARED = new Node();
  4.    
  5.     //标记当前节点的线程在独占模式下等待。
  6.     static final Node EXCLUSIVE = null;
  7.    
  8.     //waitStatus的值,表示当前节点的线程已取消(等待超时或被中断)
  9.     static final int CANCELLED =  1;
  10.    
  11.     //waitStatus的值,表示后继节点的线程需要被唤醒
  12.     static final int SIGNAL    = -1;
  13.    
  14.     //waitStatus的值,表示当前节点在等待某个条件,正处于condition等待队列中
  15.     static final int CONDITION = -2;
  16.    
  17.     //waitStatus的值,表示在当前有资源可用,能够执行后续的acquireShared操作
  18.     static final int PROPAGATE = -3;
  19.     //等待状态,值如上,1、-1、-2、-3。
  20.     volatile int waitStatus;
  21.    
  22.     //前趋节点
  23.     volatile Node prev;
  24.     //后继节点
  25.     volatile Node next;
  26.    
  27.     //当前线程
  28.     volatile Thread thread;
  29.    
  30.     //等待队列中的后继节点,共享模式下值为SHARED常量
  31.     Node nextWaiter;
  32.    
  33.     //判断共享模式的方法
  34.     final boolean isShared() {
  35.         return nextWaiter == SHARED;
  36.     }
  37.    
  38.     //返回前趋节点,没有报NPE
  39.     final Node predecessor() throws NullPointerException {
  40.         Node p = prev;
  41.         if (p == null)
  42.             throw new NullPointerException();
  43.         else
  44.             return p;
  45.     }
  46.     //下面是三个构造方法
  47.     Node() {}    // Used to establish initial head or SHARED marke
  48.    
  49.     Node(Thread thread, Node mode) {     // Used by addWaiter
  50.         this.nextWaiter = mode;
  51.         this.thread = thread;
  52.     }
  53.     Node(Thread thread, int waitStatus) { // Used by Condition
  54.         this.waitStatus = waitStatus;
  55.         this.thread = thread;
  56.     }
  57. }
复制代码
3.2 设置头尾节点

Unsafe 类中,提供了一个基于 CAS 的设置头尾节点的方法,AQS 调用该方法进行设置头尾节点,包管并发编程中的线程安全。
  1. //CAS自旋设置头节点
  2. private final boolean compareAndSetHead(Node update) {
  3.     return unsafe.compareAndSwapObject(this, headOffset, null, update);
  4. }
  5. //CAS自旋设置尾节点,expect为当前线程“认为”的尾节点,update为当前节点
  6. private final boolean compareAndSetTail(Node expect, Node update) {
  7.     return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  8. }
  9.    
复制代码
4. 独占模式

资源同一时刻只能被一个线程获取,如 ReentrantLock。

4.1 获取同步状态

代码如下,调用 acquire 方法可以获取同步状态,底层就是调用须重写方法中的 tryAcquire。如果获取失败则进入同步队列中,即使后续对线程进行终端操作,线程也不会从同步队列中移除。
  1. public final void acquire(int arg) {
  2.     //调用须重写方法中的tryAcquire
  3.     if (!tryAcquire(arg) &&
  4.         //失败则进入同步队列中
  5.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6.         selfInterrupt();
  7. }
复制代码
获取失败会先调用 addWaiter 方法将当火线程封装成独占式模式的节点,添加到AQS的队列尾部,源码如下。
  1. private Node addWaiter(Node mode) {
  2.     //将当前线程封装成对应模式下的Node节点
  3.     Node node = new Node(Thread.currentThread(), mode);
  4.     Node pred = tail;//尾节点
  5.     if (pred != null) {
  6.         //双端队列需要两个指针指向
  7.         node.prev = pred;
  8.         //通过CAS方式
  9.         if (compareAndSetTail(pred, node)) {
  10.             //添加到队列尾部
  11.             pred.next = node;
  12.             return node;
  13.         }
  14.     }
  15.     //等待队列中没有节点,或者添加队列尾部失败则调用end方法
  16.     enq(node);
  17.     return node;
  18. }
  19. //Node节点通过CAS自旋的方式被添加到队列尾部,直到添加成功为止。
  20. private Node enq(final Node node) {
  21.     //死循环,类似 while(1)
  22.     for (;;) {
  23.         Node t = tail;
  24.         if (t == null) { // 须要初始化,代表队列的第一个元素
  25.             if (compareAndSetHead(new Node()))
  26.                 //头节点就是尾节点
  27.                 tail = head;
  28.         } else {
  29.             //双端队列需要两个指针指向
  30.             node.prev = t;
  31.             //通过自旋放入队列尾部
  32.             if (compareAndSetTail(t, node)) {
  33.                 t.next = node;
  34.                 return t;
  35.             }
  36.         }
  37.     }
  38. }
复制代码
此时,通过 addWaiter 已经将当火线程封装成独占模式的 Node 节点,并成功放入队列尾部。接下来会调用acquireQueued 方法在等候队列中排队。
  1. final boolean acquireQueued(final Node node, int arg) {
  2.     //获取资源失败标识
  3.     boolean failed = true;
  4.     try {
  5.         //线程是否被中断标识
  6.         boolean interrupted = false;
  7.         //死循环,类似 while(1)
  8.         for (;;) {
  9.             //获取当前节点的前趋节点
  10.             final Node p = node.predecessor();
  11.             //前趋节点是head,即队列的第二个节点,可以尝试获取资源
  12.             if (p == head && tryAcquire(arg)) {
  13.                 //资源获取成功将当前节点设置为头节点
  14.                 setHead(node);
  15.                 p.next = null; // help GC,表示head节点出队列
  16.                 failed = false;
  17.                 return interrupted;
  18.             }
  19.             //判断当前线程是否可以进入waitting状态,详解见下方
  20.             if (shouldParkAfterFailedAcquire(p, node) &&
  21.                 parkAndCheckInterrupt())        //阻塞当前线程,详解见下方
  22.                 interrupted = true;
  23.         }
  24.     } finally {
  25.         if (failed)
  26.             //取消获取同步状态,源码见下方的取消获取同步状态章节
  27.             cancelAcquire(node);
  28.     }
  29. }
  30. //将当前节点设置为头节点
  31. private void setHead(Node node) {
  32.     head = node;
  33.     node.thread = null;
  34.     node.prev = null;
  35. }
  36. //判断当前线程是否可以进入waitting状态
  37. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  38.     //获取前趋节点的等待状态,含义见上方Node结构源码
  39.     int ws = pred.waitStatus;
  40.     if (ws == Node.SIGNAL)        //表示当前节点的线程需要被唤醒
  41.         return true;
  42.     if (ws > 0) {        //表示当前节点的线程被取消
  43.         //则当前节点一直向前移动,直到找到一个waitStatus状态小于或等于0的节点
  44.         do {
  45.             node.prev = pred = pred.prev;
  46.         } while (pred.waitStatus > 0);
  47.         //排在这个节点的后面
  48.         pred.next = node;
  49.     } else {
  50.         //通过CAS设置等待状态
  51.         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  52.     }
  53.     return false;
  54. }
  55. //阻塞当前线程
  56. private final boolean parkAndCheckInterrupt() {
  57.     //底层调用的UnSafe类的方法 park:阻塞当前线程, unpark:使给定的线程停止阻塞
  58.     LockSupport.park(this);
  59.     //中断线程
  60.     return Thread.interrupted();
  61. }
复制代码
acquireQueued 方法中,只有当前驱节点等于 head 节点时,才能够尝试获取同步状态,这时为什么呢?
因为 head 节点是占有资源的节点,它释放后才会唤醒它的后继节点,所以需要检测。还有一个缘故原由是因为如果遇到了非 head 节点的其他节点出队或因停止而从等候中唤醒,这时种情况则需要判定前趋节点是否为 head 节点,是才允许获取同步状态。
获取同步状态的整体流程图如下:



4.2 释放同步状态

调用须重写方法中的 tryAcquire 进行同步状态的释放,成功则唤醒队列中最前面的线程,详细如下。
  1. public final boolean release(int arg) {
  2.     //调用须重写方法中的tryRelease
  3.     if (tryRelease(arg)) {
  4.         Node h = head;
  5.         if (h != null && h.waitStatus != 0)
  6.             //唤醒后继节点的线程,详情见下方
  7.             unparkSuccessor(h);
  8.         return true;
  9.     }
  10.     return false;
  11. }
  12. //唤醒后继节点的线程
  13. private void unparkSuccessor(Node node) {
  14.     //获取当前节点的等待状态
  15.     int ws = node.waitStatus;
  16.     if (ws < 0)
  17.         //小于0则,则尝试CAS设为0
  18.         compareAndSetWaitStatus(node, ws, 0);
  19.     //获取后继节点
  20.     Node s = node.next;
  21.     //后继节点为空或者等待状态大于0,代表被节点被取消
  22.     if (s == null || s.waitStatus > 0) {
  23.         s = null;
  24.         //将队列中的所有节点都向前移动
  25.         for (Node t = tail; t != null && t != node; t = t.prev)
  26.             if (t.waitStatus <= 0)
  27.                 s = t;
  28.     }
  29.     //不为空则进行唤醒操作
  30.     if (s != null)
  31.         //底层调用的UnSafe类的方法 park:阻塞当前线程, unpark:使给定的线程停止阻塞
  32.         LockSupport.unpark(s.thread);
  33. }
复制代码
5.2 释放同步状态

代码如下,调用 releaseShared 方法可以释放同步状态,底层就是先调用须重写方法中的 tryReleaseShared。
  1. public final void acquireInterruptibly(int arg)
  2.         throws InterruptedException {
  3.     if (Thread.interrupted())//中断则抛出异常
  4.         throw new InterruptedException();
  5.     if (!tryAcquire(arg))
  6.         doAcquireInterruptibly(arg);
  7. }
复制代码
6. 取消获取同步状态

无论是独占模式照旧共享模式,所有的程获取同步状态的过程中,如果发生异常或是超时唤醒等,都需要将当前的节点出队,源码如下。
[code]//一般在获取同步状态方法的finally块中private void cancelAcquire(Node node) {    if (node == null)        return;    node.thread = null;                //当火线程节点设为null    Node pred = node.prev;                //获取前驱节点    //前趋节点为取消状态,向前遍历找到非取消状态的节点    while (pred.waitStatus > 0)        node.prev = pred = pred.prev;    Node predNext = pred.next;        //获取非取消节点的下一个节点    node.waitStatus = Node.CANCELLED;        //将当前节点的等候状态设为取消状态    //当前节点是尾节点,则自旋将尾节点设置为前一个非取消节点    if (node == tail && compareAndSetTail(node, pred)) {        //将尾节点设为前一个非取消的节点,并将其后继节点设为null,help GC        compareAndSetNext(pred, predNext, null);    } else {             int ws;//用于表示等候状态        //pred != head:前一个非取消的节点非头节点也非尾节点        //ws == Node.SIGNAL:当前等候状态为待唤醒        //若不是待唤醒则CAS设置为待唤醒状态        //前一个非取消的节点的线程不为null        if (pred != head &&            ((ws = pred.waitStatus) == Node.SIGNAL ||             (ws

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曹旭辉

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表