Java中AQS的原理与实现

河曲智叟  金牌会员 | 2023-7-8 22:26:31 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 921|帖子 921|积分 2763

目录

1:什么是AQS?
2:AQS都有那些用途?
3:我们如何使用AQS
4:AQS的实现原理
5:对AQS的设计与实现的一些思考
1:什么是AQS

​                随着计算机的算力越来越强大,各种各样的并行编程模型也随即踊跃而来,但当我们要在并行计算中使用共享资源的时候,就需要利用一种手段控制对共享资源的访问和修改来保证我们程序的正确的运行。而Java中除了在语言级别实现的synchronized锁之外,还有另一种对共享资源访问控制的实现,而这些实现都依赖于一个叫做抽象队列同步器(AbstractQueuedSynchronizer)的模板框架类,简称AQS。所以我们想要更深刻的理解Java中对共享资源的访问控制,就不可避免的要对AQS深入的了解和探讨。
​                我们首先看一下官方对于AQS的描述:提供一个依赖先进先出(FIFO)等待队列的挂起锁和相关同步器(信号量、事件等)框架。此类被设计为大多数类型的同步器的基类,这些同步器依赖于单个原子int值来表示状态。子类必须定义更改该状态的受保护方法,以及定义该状态在获取或释放该对象方面的含义。考虑到这些,类中的其他方法实现排队和挂起机制。子类可以维护状态字段,但只有使用方法getState、setState和compareAndSetState方法才可以更改状态。
2:AQS有哪些用途

​                AQS的用途有很多,几乎Java中所有的共享资源控制的实现都离不开它,比如我们常用的锁ReentrantLock、是基于AQS实现了一套可重入的公平和非公平互斥锁的实现。上图中我列举了我们常用的一些对于共享资源访问控制的一些工具,也都是基于AQS实现的。
3:如何使用AQS

​                我们要是用AQS实现我们自己的锁,都离不开AQS中一个叫做int类型state的变量,而这个变量的具体意义是由使用者自已定义的,比如我们要基于AQS实现一个不可重入的互斥锁,我们可以定义state为1代表有线程已经获取了锁,state为0为空闲状态。
下面是AQS文档中的一段使用AQS自定义互斥锁的一段示例代码,我把它放到此处,方便大家查阅。
  1. class Mutex implements Lock, java.io.Serializable {
  2.     // Our internal helper class
  3.     private static class Sync extends AbstractQueuedSynchronizer {
  4.         //Reports whether in locked state
  5.         protected boolean isHeldExclusively() {
  6.             return getState() == 1;
  7.         }
  8.         // Acquires the lock if state is zero
  9.         public boolean tryAcquire(int acquires) {
  10.             assert acquires == 1; // Otherwise unused
  11.             if (compareAndSetState(0, 1)) {
  12.                 setExclusiveOwnerThread(Thread.currentThread());
  13.                 return true;
  14.             }
  15.             return false;
  16.         }
  17.         // Releases the lock by setting state to zero
  18.         protected boolean tryRelease(int releases) {
  19.             assert releases == 1; // Otherwise unused
  20.             if (getState() == 0) throw new IllegalMonitorStateException();
  21.             setExclusiveOwnerThread(null);
  22.             setState(0);
  23.             return true;
  24.         }
  25.         // Provides a Condition
  26.         Condition newCondition() {
  27.             return new ConditionObject();
  28.         }
  29.         // Deserializes properly
  30.         private void readObject(ObjectInputStream s)
  31.                 throws IOException, ClassNotFoundException {
  32.             s.defaultReadObject();
  33.             setState(0); // reset to unlocked state
  34.         }
  35.     }
  36.     // The sync object does all the hard work. We just forward to it.
  37.     private final Sync sync = new Sync();
  38.     public void lock() {
  39.         sync.acquire(1);
  40.     }
  41.     public boolean tryLock() {
  42.         return sync.tryAcquire(1);
  43.     }
  44.     public void unlock() {
  45.         sync.release(1);
  46.     }
  47.     public Condition newCondition() {
  48.         return sync.newCondition();
  49.     }
  50.     public boolean isLocked() {
  51.         return sync.isHeldExclusively();
  52.     }
  53.     public boolean hasQueuedThreads() {
  54.         return sync.hasQueuedThreads();
  55.     }
  56.     public void lockInterruptibly() throws InterruptedException {
  57.         sync.acquireInterruptibly(1);
  58.     }
  59.     public boolean tryLock(long timeout, TimeUnit unit)
  60.             throws InterruptedException {
  61.         return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  62.     }
  63. }
复制代码
4:AQS的实现原理

​                AQS实现的核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要将此线程放入一个叫做CLH(三个人名Craig、Landin and Hagersten)的等待队列中,然后通过挂起和唤醒机制来保证锁的分配。而将资源设置为锁定状态主要是通过说到的一个叫做int类型的state的变量来控制的,队列的FIFO操作则是利用CLH队列来实现。
等待队列是“CLH”(Craig、Landin和Hagersten)锁队列的变体。我们使用它们来作为同步器,在其节点中存储有关线程的一些控制信息。每个节点中的status字段跟踪线程是否应该挂起。当节点的前驱节点释放时,节点会发出信号。队列的每个节点在其他方面都充当一个特定的通知样式监视器,其中包含一个等待线程。要进入CLH锁的队列,可以将其原子性地拼接在队列的尾部。要退出队列,只需将其放在队列头部,也就是将此节点设置为head字段,原理图如下。
4.1:AQS的数据结构

首先我们先看一下AQS中最基本的数据结构,也就是CLH队列中的节点,是一个名为Node的静态内部类
  1. static final class Node {
  2.         /** 标记此节点是一个以共享模式等待的锁 */
  3.         static final Node SHARED = new Node();
  4.         /** 标记此节点是一个以互斥模式等待的锁 */
  5.         static final Node EXCLUSIVE = null;
  6.         /** 表示线程获取锁的线程已经取消了*/
  7.         static final int CANCELLED =  1;
  8.         /** 原文注释:waitStatus value to indicate successor's thread needs unparking
  9.                 表示此线程的后继线程需要通过park()方法挂起。
  10.                 我的理解是此线程的后继节点需要被挂起,
  11.                 但当前节点必须是释放锁或者取消获取锁,后继线程等待被唤醒获取锁,后续可以通过源码解释。
  12.          */
  13.         static final int SIGNAL    = -1;
  14.         /** 表示节点在等待队列中,节点线程等待唤醒,在使用Conditional的时候会有此状态 */
  15.         static final int CONDITION = -2;
  16.         /**
  17.          * 当前线程处在SHARED情况下,该字段才会使用
  18.          */
  19.         static final int PROPAGATE = -3;
  20.         /**
  21.                      这些值以数字形式排列,以简化使用。非负值表示节点不需要信号。因此,大多数代码不需要检查特定的值,仅用检查符号就可以                         了。对于正常同步节点,该字段初始化为0,并且条件节点的CONDITION。使用CAS进行修改。
  22.          */
  23.         volatile int waitStatus;
  24.         /**
  25.                         前驱界定
  26.          */
  27.         volatile Node prev;
  28.         /**
  29.                     后继节点
  30.          */
  31.         volatile Node next;
  32.         /**
  33.          * 此节点的线程
  34.          */
  35.         volatile Thread thread;
  36.         /**
  37.           指向下一个处于CONDITION状态的节点,使用Conditional模式才会用到此节点,
  38.           篇幅原因,本片不讲述Condition Queue队列,故对此字段不多作介绍。
  39.          */
  40.         Node nextWaiter;
  41.         /**
  42.          * 是否是共享模式
  43.          */
  44.         final boolean isShared() {
  45.             return nextWaiter == SHARED;
  46.         }
  47.         /**
  48.          * 返回当前节点的前驱节点,前驱节点为null,则抛出NPE
  49.          */
  50.         final Node predecessor() throws NullPointerException {
  51.             Node p = prev;
  52.             if (p == null)
  53.                 throw new NullPointerException();
  54.             else
  55.                 return p;
  56.         }
  57.         Node() {    // Used to establish initial head or SHARED marker
  58.         }
  59.         Node(Thread thread, Node mode) {     // Used by addWaiter
  60.             this.nextWaiter = mode;
  61.             this.thread = thread;
  62.         }
  63.         Node(Thread thread, int waitStatus) { // Used by Condition
  64.             this.waitStatus = waitStatus;
  65.             this.thread = thread;
  66.         }
  67.     }
复制代码
AQS中其他重要字段
  1.     /**
  2.      * 队列的头节点(虚节点,不存储数据),懒初始化。除了初始化之外,仅能通过setHead方法修改。
  3.      * 注:假如头节点存在,一定会保证waitStatus变量的值不是CANCELLED
  4.      */
  5.     private transient volatile Node head;
  6.     /**
  7.      * 队列的尾节点(虚节点,不存储数据),懒初始化,仅仅可以通过enq方法初始化
  8.      */
  9.     private transient volatile Node tail;
  10.     /**
  11.      * 同步状态
  12.      */
  13.     private volatile int state;
复制代码
由静态内部类Node中的prev和next字段我们可以知道CLH变体队列是一个由Node组成的双向队列,由字段head和tail可以知道AQS中保存了头节点和尾节点。state字段则是用来作为同步的重要字段,AQS中提供了三个final类型的方法来访问该字段。
  1.     protected final int getState() {
  2.         return state;
  3.     }
  4.     protected final void setState(int newState) {
  5.         state = newState;
  6.     }
  7.     protected final boolean compareAndSetState(int expect, int update) {
  8.         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  9.     }
复制代码
4.2:模板方法
  1.     /**
  2.      尝试以独占模式获取锁,此方法此方法应查询对象的状态是否允许以独占模式获取该对象,若允许的话则可以获取它。
  3.          此方法总是由执行acquire方法的线程调用。如果此方法返回false且线程尚未排队,则acquire方法可以对线程进行排队,直到某                     个其他线程发出释放信号,则该线程停止挂起。
  4.      */
  5.     protected boolean tryAcquire(int arg) {
  6.         throw new UnsupportedOperationException();
  7.     }
  8.     /**
  9.      尝试以独占模型释放锁
  10.      */
  11.     protected boolean tryRelease(int arg) {
  12.         throw new UnsupportedOperationException();
  13.     }
  14.     /**
  15.      尝试以共享模式获取锁,此方法此方法应查询对象的状态是否允许以独占模式获取该对象,若允许的话则可以获取它。
  16.          此方法总是由执行acquire方法的线程调用。如果此方法返回false且线程尚未排队,则acquire方法可以对线程进行排队,直到某                     个其他线程发出释放信号,则该线程停止挂起。
  17.      */
  18.     protected int tryAcquireShared(int arg) {
  19.         throw new UnsupportedOperationException();
  20.     }
  21.     /**
  22.     尝试以共享模式释放锁
  23.      */
  24.     protected boolean tryReleaseShared(int arg) {
  25.         throw new UnsupportedOperationException();
  26.     }
  27.     /**
  28.                 返回是否以独占模式持有锁
  29.      */
  30.     protected boolean isHeldExclusively() {
  31.         throw new UnsupportedOperationException();
  32.     }
复制代码
思考:AQS作为一个模板框架类,为什么 tryLock和tryRelease等方法,那么为什么这些方法不定义成abstract方法,而要定义成普通方法,且在方法中抛出异常呢?我的理解是AQS作为一个模板框架提供了加锁和释放锁的通用逻辑,但是又不仅仅是提供了独占锁或共享锁的逻辑,而是作为一个底层的通用执行模板类,如何定义成抽象的模板方法,那么所有的子类都要实现所有的模板方法,但是有些子类仅仅需要独占锁,比如ReentrantLock,那么就会要多先实现共享锁的逻辑(即使是空方法也要实现),所以我猜想是为了子类不必要实现多余的方法,所以才定义成普通的方法并在方法内部抛出异常。
4.3:获取锁源码及流程分析

由于篇幅原因,本文将以独占且忽略中断的模式的方法未入口分析,首先看一下获取锁的方法,获取锁的总体流程图如下:

获取锁的入口方法
  1.   public final void acquire(int arg) {
  2.         if (!tryAcquire(arg) &&
  3.             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.             //acquireQueued方法会返回线程在挂起过程中是否被中断,然后返回线程中断的状态
  5.             selfInterrupt();
  6.     }
复制代码
可以看到获取锁的方法是一个由final修饰的不可被子类重写的方法,首先调用了上面的模板方法(必须由子类重写获取逻辑)获取锁
如果获取成功则获取锁流程执行结束。否则执行addWaiter方法执行入队逻辑。
线程入队方法
  1.     private Node addWaiter(Node mode) {
  2.         //mode线程获取锁的模式(独占或者共享)
  3.         Node node = new Node(Thread.currentThread(), mode);
  4.         //尝试快速入队,失败则执行完整入队逻辑
  5.         Node pred = tail;
  6.         if (pred != null) {
  7.             node.prev = pred;
  8.             //如果尾节点不为null,则以原子方式把当前节点设置为尾节点并返回
  9.             if (compareAndSetTail(pred, node)) {
  10.                 pred.next = node;
  11.                 return node;
  12.             }
  13.         }
  14.         //如果尾节点不存(说明头节点和尾节点未初始化)在或者由于竞争导致一次设置尾节点失败,
  15.         //则执行完整入队逻辑(会进行头节点和尾节点初始化的工作)
  16.         enq(node);
  17.         return node;
  18.     }
复制代码
addWaiter方法会先进行快速入队操作,如果快速入队失败(由于竞争或者头、尾节点未初始化),则进行完整入队操作(如果头、尾节点未初始化的话会先进行初始化操作)
完整入队逻辑
  1.     private Node enq(final Node node) {
  2.         //自旋把当前节点链接到尾节点之后
  3.         for (;;) {
  4.             Node t = tail;
  5.             if (t == null) {
  6.                 //尾节为空,说明队列为空,则需要进行头节点和尾节点的初始化
  7.                 //这里直接new Node(),一个虚节点作为头节点,然后将头节点和尾节点相同
  8.                 //这里说明头节点和尾节点不存储数据
  9.                 if (compareAndSetHead(new Node()))
  10.                     tail = head;
  11.             } else {
  12.                 //尾节点不为空,使用cas把当前节点设置为尾节点
  13.                 node.prev = t;
  14.                 if (compareAndSetTail(t, node)) {
  15.                     t.next = node;
  16.                     return t;
  17.                 }
  18.             }
  19.         }
  20.     }
复制代码
enq方法会利用自旋先检查头节点和尾节点是否初始化,如果未初始化的话则先进行初始化。初始化完成之后以原子的方式插入node到队尾并且插入成功之后返回此节点。
挂起线程并等待获取锁
  1. final boolean acquireQueued(final Node node, int arg) {
  2.             //是否失败,此线程被中断可能失败
  3.         boolean failed = true;
  4.         try {
  5.             //线程是否被中断
  6.             boolean interrupted = false;
  7.             //自旋一直获取锁
  8.             for (;;) {
  9.                 //获取当前节点的前驱节点
  10.                 final Node p = node.predecessor();
  11.                 //如果当前节点的前驱节点是头节点(因为头节点是虚节点,所以当前节点可以获取锁),
  12.                 //且当前节点获取所成功
  13.                 if (p == head && tryAcquire(arg)) {
  14.                     //设置node为头节点,因为当前节点已经获取锁成功了,当前节点需要作为头节点
  15.                     setHead(node);
  16.                     p.next = null; // help GC
  17.                     //设置失败标志为false
  18.                     failed = false;
  19.                     //返回中断状态
  20.                     return interrupted;
  21.                 }
  22.                 //shouldParkAfterFailedAcquire方法检查并更新获取失败的节点的状态,如果线程应该挂起则返回true
  23.                 //parkAndCheckInterrupt则挂起线程并返回是否中断
  24.                 if (shouldParkAfterFailedAcquire(p, node) &&
  25.                     parkAndCheckInterrupt())
  26.                     interrupted = true;
  27.             }
  28.         } finally {
  29.             //失败,则取消获取锁
  30.             if (failed)
  31.                 cancelAcquire(node);
  32.         }
  33.     }
复制代码
​                通过上面流程分析可知,获取锁失败,会调用addWaiter方法把当前节点放到队尾,那么线程入队之后什么时候挂起线程,什么时候出队,我们一点一点分析acquireQueued方法这些问题就会逐渐显露出来。
​                首先该方法会一直自旋获取锁(中间可能会被挂起,防止无效自旋),判断当前节点的前驱节点是否是头节点来判断当前是否有获取锁的资格,如果有的话则设置当前节点为头节点并返回中断状态。否则调用shouldParkAfterFailedAcquire判断获取锁失败后是否可以挂起,可以的话则调用parkAndCheckInterrupt进行线程挂起操作。
设置头节点
  1.     private void setHead(Node node) {
  2.         head = node;
  3.         node.thread = null;
  4.         node.prev = null;
  5.     }
复制代码
注:setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为其他地方要用到。
检查并更新获取失败的节点的状态
  1.     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2.         int ws = pred.waitStatus;
  3.         if (ws == Node.SIGNAL)
  4.             /*
  5.              * node的状态已经是SIGNAL可以安全的挂起,直接返回true
  6.              */
  7.             return true;
  8.         if (ws > 0) {
  9.             /*
  10.              *        由之前的waitStatus变量枚举值可知,waitStatus大于0为取消状态,直接跳过此节点
  11.              */
  12.             do {
  13.                 //重新链接prev指针,至于为什么没有操作next指针后面会通过代码解释
  14.                 node.prev = pred = pred.prev;
  15.             } while (pred.waitStatus > 0);
  16.             pred.next = node;
  17.         } else {
  18.             /*
  19.              * 原子方式设置waitStatus的值为SIGNAL
  20.              */
  21.             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  22.         }
  23.         return false;
  24.     }
复制代码
挂起并检查线程的中断状态
  1.     private final boolean parkAndCheckInterrupt() {
  2.             //使用LockSupport挂起此线程
  3.         LockSupport.park(this);
  4.         //返回并清除中断状态
  5.         return Thread.interrupted();
  6.     }
复制代码
取消获取锁
[code]private void cancelAcquire(Node node) {        // 忽略不存在的节点        if (node == null)            return;                //设置当前节点不持有线程        node.thread = null;        // 跳过取消的前驱节点        Node pred = node.prev;            //waitStatus>0未已取消的节点        while (pred.waitStatus > 0)            node.prev = pred = pred.prev;        // 未取消的节点的后继节点        Node predNext = pred.next;        //设置状态未取消状态        node.waitStatus = Node.CANCELLED;        // 如果node为尾节点,设置pred为尾节点,然后设置尾节点的下一个节点为null        if (node == tail && compareAndSetTail(node, pred)) {            compareAndSetNext(pred, predNext, null);        } else {            int ws;                // 如果当前节点不是head的后继节点,            //1:判断当前节点前驱节点的是否为SIGNAL,            //2:如果不是,则把前驱节点设置为SINGAL看是否成功                    // 如果1和2中有一个为true,再判断当前节点的线程是否为null                            // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点            if (pred != head &&                ((ws = pred.waitStatus) == Node.SIGNAL ||                 (ws  0) {            s = null;            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

河曲智叟

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表