10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)

打印 上一主题 下一主题

主题 901|帖子 901|积分 2703

  1. 10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)
复制代码
前言

上篇文章15000字、6个代码案例、5个原理图让你彻底搞懂Synchronized有说到synchronized由object monitor实现的
object monitor中由cxq栈和entry list来实现阻塞队列,wait set实现等待队列,从而实现synchronized的等待/通知模式
而JDK中的JUC并发包也通过类似的阻塞队列和等待队列实现等待/通知模式
这篇文章就来讲讲JUC的基石AQS(AbstractQueuedSynchronizer)
需要了解的前置知识:CAS、volatile
如果不了解CAS可以看上篇讲述synchronized的文章(链接在上面)
如果不了解volatile可以看这篇文章 5个案例和流程图让你从0到1搞懂volatile关键字
本篇文章以AQS为中心,深入浅出描述AQS中的数据结构、设计以及获取、释放同步状态的源码流程、Condition等
观看本文大约需要10分钟,可以带着几个问题去观看

  • 什么是AQS,它是干啥用的?
  • AQS是使用什么数据结构实现的?
  • AQS获取/释放同步状态是如何实现的?
  • AQS除了具有synchronized的功能还拥有什么其他特性?
  • AQS如何去实现非公平锁、公平锁?
  • 什么是Condition?它跟AQS是什么关系?
AQS数据结构

什么是AQS呢?
AQS是一个同步队列(阻塞队列),是并发包中的基础,很多并发包中的同步组件底层都使用AQS来实现,比如:ReentrantLock、读写锁、信号量等等...
AQS有三个重要的字段,分别是: head 头节点、tail 尾节点、state 同步状态
  1. public abstract class AbstractQueuedSynchronizer
  2.    extends AbstractOwnableSynchronizer
  3.    implements java.io.Serializable {
  4.    //头节点
  5.    private transient volatile Node head;
  6.     //尾节点
  7.    private transient volatile Node tail;
  8.    //同步状态
  9.    private volatile int state;  
  10. }    
复制代码
头尾节点很好理解,因为AQS本身就是个双向链表,那么state同步状态是什么?
AQS中使用同步状态表示资源,然后使用CAS来获取/释放资源,比如设置资源为1,一个线程来尝试获取资源,由于同步状态目前为1,于是该线程CAS替换同步状态为0,成功后表示获取到资源,之后其他线程再来获取资源就无法获取了(状态为0),直到获取资源的线程来释放资源
上述获取/释放资源也可以理解成获取/释放锁
同时三个字段都被volatile修饰,用volatile来保证内存可见性,防止其他线程修改这些数据时当前线程无法感知
通过上面的描述,我们可以知道AQS大概长这样

当某个线程获取资源失败时,会被构建成节点加入AQS中
节点Node是AQS中的内部类,Node中有些重要的字段一起来看看
  1. static final class Node {
  2.         //节点状态
  3.        volatile int waitStatus;
  4.    
  5.        //前驱节点
  6.        volatile Node prev;
  7.        //后继节点
  8.        volatile Node next;
  9.         
  10.        //当前节点所代表的线程
  11.        volatile Thread thread;
  12.        //等待队列使用时的后继节点指针
  13.        Node nextWaiter;
  14. }
复制代码
prev、next、thread应该都好理解
AQS同步队列和等待队列都使用这种节点,当等待队列节点被唤醒出队时,方便加入同步队列
nextWaiter就是用于节点在等待队列中指向下一个节点
waitStatus表示节点的状态
状态说明INITIAL0 初始状态CANCELLED1 该节点对应的线程取消调度SIGNAL-1 该节点对应的线程阻塞,等待唤醒竞争资源CONDITION-2 该节点在等待(条件)队列中,等待唤醒后从等待队列出队进入同步队列竞争PROPAGATE-3 共享情况下,会唤醒后续所有共享节点不太理解状态不要紧,我们后文遇到再说
经过上面的描述,节点大概是长成这样的

AQS中还有另外一个内部类ConditionObject用于实现等待队列/条件队列,我们后文再来说说
AQS中可以分为独占、共享模式,其中这两种模式下还可以支持响应中断、纳秒级别超时
独占模式可以理解为同一时间只有一个线程能够获取同步状态
共享模式可以理解为可以有多个线程能够获取同步状态,方法中常用shared标识
方法中常用acquire标识获取同步状态,release标识释放同步状态

这些方法都是模板方法,规定流程,将具体的实现留给实现类去做(比如获取同步状态,该如何获取交给实现类去实现)
独占式

独占式实际就是时刻上只允许一个线程独占该资源,多线程竞争情况下也只能有一个线程获取同步状态成功
获取同步状态

不响应中断的独占获取和响应中断、超时的类似,我们以acquire为例查看源码
  1.     public final void acquire(int arg) {
  2.        if (!tryAcquire(arg) &&
  3.            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.            selfInterrupt();
  5.    }
复制代码
tryAcquire 方法用于尝试获取同步状态,参数arg表示获取多少同步状态,获取成功返回true 则会退出方法,留给实现类去实现
addWaiter

addWaiter(Node.EXCLUSIVE) 构建独占式节点,并用CAS+失败重试的方式加入AQS的末尾
  1.     private Node addWaiter(Node mode) {
  2.        //构建节点
  3.        Node node = new Node(Thread.currentThread(), mode);
  4.        //尾节点不为空则CAS替换尾节点
  5.        Node pred = tail;
  6.        if (pred != null) {
  7.            node.prev = pred;
  8.            if (compareAndSetTail(pred, node)) {
  9.                pred.next = node;
  10.                return node;
  11.            }
  12.        }
  13.        //尾节点为空或则CAS失败执行enq
  14.        enq(node);
  15.        return node;
  16.    }
复制代码
  1.    private Node enq(final Node node) {
  2.        //失败重试
  3.        for (;;) {
  4.            Node t = tail;
  5.            //没有尾节点 则CAS设置头节点(头尾节点为一个节点),否则CAS设置尾节点
  6.            if (t == null) { // Must initialize
  7.                if (compareAndSetHead(new Node()))
  8.                    tail = head;
  9.            } else {
  10.                node.prev = t;
  11.                if (compareAndSetTail(t, node)) {
  12.                    t.next = node;
  13.                    return t;
  14.                }
  15.            }
  16.        }
  17.    }
复制代码
enq方法主要以自旋(中途不会进入等待模式)去CAS设置尾节点,如果AQS中没有节点则头尾节点为同一节点
由于添加到尾节点存在竞争,因此需要用CAS去替换尾节点

acquireQueued

acquireQueued方法主要用于AQS队列中的节点来自旋获取同步状态,在这个自旋中并不是一直执行的,而是会被park进入等待
  1. final boolean acquireQueued(final Node node, int arg) {
  2.    //记录是否失败
  3.    boolean failed = true;
  4.    try {
  5.        //记录是否中断过
  6.        boolean interrupted = false;
  7.        //失败重试
  8.        for (;;) {
  9.            //p 前驱节点
  10.            final Node p = node.predecessor();
  11.            //如果前驱节点为头节点,并尝试获取同步状态成功则返回
  12.            if (p == head && tryAcquire(arg)) {
  13.                //设置头节点
  14.                setHead(node);
  15.                p.next = null; // help GC
  16.                failed = false;
  17.                return interrupted;
  18.            }
  19.            //失败则设置下标记然后进入等待检查中断
  20.            if (shouldParkAfterFailedAcquire(p, node) &&
  21.                parkAndCheckInterrupt())
  22.                interrupted = true;
  23.        }
  24.    } finally {
  25.        //如果失败则取消获取
  26.        if (failed)
  27.            cancelAcquire(node);
  28.    }
  29. }
复制代码
在尝试获取同步状态前有个条件p == head && tryAcquire(arg):前驱节点是头节点
因此AQS中的节点获取状态是FIFO的
但即使满足前驱节点是头节点,并不一定就能获取同步状态成功,因为还未加入AQS的线程也可能尝试获取同步状态,以此来实现非公平锁
那如何实现公平锁呢?
在尝试获取同步状态前都加上这个条件就行了呗!
再来看看shouldParkAfterFailedAcquire 获取同步状态失败后应该停放
  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2.    //前驱节点状态
  3.    int ws = pred.waitStatus;
  4.    if (ws == Node.SIGNAL)
  5.        //前驱节点状态是SIGNAL 说明前驱释放同步状态回来唤醒 直接返回
  6.        return true;
  7.    if (ws > 0) {
  8.        //如果前驱状态大于0 说明被取消了,就一直往前找,找到没被取消的节点
  9.        do {
  10.            node.prev = pred = pred.prev;
  11.        } while (pred.waitStatus > 0);
  12.        //排在没被取消的节点后面
  13.        pred.next = node;
  14.    } else {
  15.        //前驱没被取消,而且状态不是SIGNAL CAS将状态更新为SIGNAL,释放同步状态要来唤醒
  16.        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  17.    }
  18.    return false;
  19. }
复制代码
实际上是park前的一些准备
再来看看 parkAndCheckInterrupt ,用工具类进入等待状态,被唤醒后检查是否中断
  1. private final boolean parkAndCheckInterrupt() {
  2.        //线程进入等待状态...
  3.        LockSupport.park(this);
  4.         //检查是否中断 (会清除中断标记位)
  5.        return Thread.interrupted();
  6. }
复制代码
在acquireQueued的中如果未获取同步状态并且抛出异常,最终会执行cancelAcquire取消
当感知到中断时返回true回去,来到第一层acquire方法执行selfInterrupt方法,自己中断线程
acquire流程图:

  • 先尝试获取同步状态失败则CAS+失败重试添加到AQS末尾

  • 前驱节点为头节点且获取同步状态成功则返回,否则进入等待状态等待唤醒,唤醒后重试

  • 在2期间发生异常取消当前节点

释放同步状态

先进行释放同步状态,成功后头节点状态不为0 唤醒下一个状态不是被取消的节点
  1. public final boolean release(int arg) {
  2.    //释放同步状态
  3.    if (tryRelease(arg)) {
  4.        Node h = head;
  5.        if (h != null && h.waitStatus != 0)
  6.            //唤醒下一个状态不大于0(大于0就是取消)的节点
  7.            unparkSuccessor(h);
  8.        return true;
  9.    }
  10.    return false;
  11. }
复制代码
响应中断

acquireInterruptibly用于响应中断的获取同步状态
  1. public final void acquireInterruptibly(int arg)
  2.        throws InterruptedException {
  3.    //查看是否被中断,中断抛出异常
  4.    if (Thread.interrupted())
  5.        throw new InterruptedException();
  6.    if (!tryAcquire(arg))
  7.        doAcquireInterruptibly(arg);
  8. }
复制代码
doAcquireInterruptibly 与原过程类似,就是在被唤醒后检查到被中断时抛出中断异常
  1.     private void doAcquireInterruptibly(int arg)
  2.        throws InterruptedException {
  3.        final Node node = addWaiter(Node.EXCLUSIVE);
  4.        boolean failed = true;
  5.        try {
  6.            for (;;) {
  7.                final Node p = node.predecessor();
  8.                if (p == head && tryAcquire(arg)) {
  9.                    setHead(node);
  10.                    p.next = null; // help GC
  11.                    failed = false;
  12.                    return;
  13.                }
  14.                if (shouldParkAfterFailedAcquire(p, node) &&
  15.                    parkAndCheckInterrupt())
  16.                    //被唤醒后检查到被中断时抛出中断异常
  17.                    throw new InterruptedException();
  18.            }
  19.        } finally {
  20.            if (failed)
  21.                cancelAcquire(node);
  22.        }
  23.    }
复制代码
响应中断的获取同步状态被中断时会直接抛出中断异常,而不响应的是自己中断
响应超时

响应超时的获取同步状态使用tryAcquireNanos 超时时间为纳秒级别
  1. public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  2.        throws InterruptedException {
  3.    if (Thread.interrupted())
  4.        throw new InterruptedException();
  5.    return tryAcquire(arg) ||
  6.        doAcquireNanos(arg, nanosTimeout);
  7. }
复制代码
可以看出响应超时同时也会响应中断
doAcquireNanos也与原过程类似
  1.     private boolean doAcquireNanos(int arg, long nanosTimeout)
  2.            throws InterruptedException {
  3.        if (nanosTimeout <= 0L)
  4.            return false;
  5.        final long deadline = System.nanoTime() + nanosTimeout;
  6.        final Node node = addWaiter(Node.EXCLUSIVE);
  7.        boolean failed = true;
  8.        try {
  9.            for (;;) {
  10.                final Node p = node.predecessor();
  11.                if (p == head && tryAcquire(arg)) {
  12.                    setHead(node);
  13.                    p.next = null; // help GC
  14.                    failed = false;
  15.                    return true;
  16.                }
  17.                //还有多久超时
  18.                nanosTimeout = deadline - System.nanoTime();
  19.                if (nanosTimeout <= 0L)
  20.                    //已超时
  21.                    return false;
  22.                if (shouldParkAfterFailedAcquire(p, node) &&
  23.                    //大于1ms
  24.                    nanosTimeout > spinForTimeoutThreshold)
  25.                    //超时等待
  26.                    LockSupport.parkNanos(this, nanosTimeout);
  27.                //响应中断
  28.                if (Thread.interrupted())
  29.                    throw new InterruptedException();
  30.            }
  31.        } finally {
  32.            if (failed)
  33.                cancelAcquire(node);
  34.        }
  35.    }
复制代码
响应中断、超时等方法也与独占式类似,只是有些设置细节不同
Condition

上文曾说过AQS充当阻塞(同步)队列,Condition来充当等待队列
AQS的内部类ConditionObject就是Condition的实现,它充当等待队列,用字段记录头尾节点
  1. public final void acquireShared(int arg) {
  2.    if (tryAcquireShared(arg) < 0)
  3.        doAcquireShared(arg);
  4. }
复制代码
节点之间使用nextWait指向下一个节点,形成单向链表

同时提供await系列方法来让当前线程进入等待,signal系列方法来唤醒
  1.     private void doAcquireShared(int arg) {
  2.        //添加共享式节点
  3.        final Node node = addWaiter(Node.SHARED);
  4.        boolean failed = true;
  5.        try {
  6.            boolean interrupted = false;
  7.            for (;;) {
  8.                //获取前驱节点
  9.                final Node p = node.predecessor();
  10.                if (p == head) {
  11.                    int r = tryAcquireShared(arg);
  12.                    if (r >= 0) {
  13.                        //如果前驱节点为头节点 并且 获取同步状态成功 设置头节点
  14.                        setHeadAndPropagate(node, r);
  15.                        p.next = null; // help GC
  16.                        if (interrupted)
  17.                            selfInterrupt();
  18.                        failed = false;
  19.                        return;
  20.                    }
  21.                }
  22.                //获取失败进入会等待的自旋
  23.                if (shouldParkAfterFailedAcquire(p, node) &&
  24.                    parkAndCheckInterrupt())
  25.                    interrupted = true;
  26.            }
  27.        } finally {
  28.            if (failed)
  29.                cancelAcquire(node);
  30.        }
  31.    }
复制代码
await主要将节点添加到condition object末尾,释放获取的同步状态,进入等待,唤醒后自旋获取同步状态
signal的主要逻辑在transferForSignal中
  1. public class ConditionObject implements Condition{
  2.        //头节点
  3.        private transient Node firstWaiter;
  4.        //尾节点
  5.        private transient Node lastWaiter;  
  6. }
复制代码
signal 主要把状态从-2condition 修改为 0(失败则取消节点), 然后加入AQS的末尾,最后再将状态该为-1 signal,成功则唤醒节点
为什么加入AQS末尾还是使用enq去CAS+失败重试操作保证原子性呢?
因为ConditionObject允许有多个,也就一个AQS同步队列可能对应多个Condition等待(条件)队列

总结

本篇文章以AQS为核心,深入浅出的描述AQS实现的数据结构、设计思想、获取/释放同步资源的源码级流程、Condition等
AQS使用头尾节点来实现双向队列,提供同步状态和获取/释放同步状态的模板方法来实现阻塞(同步)队列,并且这些字段使用volatile修饰,保证可见性与读取的场景配合,不需要保证原子性,在写的场景下常用CAS保证原子性
AQS与Condition使用相同类型的节点,在AQS中节点维护成双向链表,在Condition中节点维护成单向链表,节点除了维护指向关系,还需要记录对应线程和节点状态
AQS分为独占式和共享式,使用独占式时只允许一个线程获取同步状态,使用共享式时则允许多个线程获取同步状态;其中还提供响应中断、等待超时的类似方法
获取同步状态:先尝试获取同步状态,如果失败则CAS+失败重试的方式将节点添加到AQS末尾,等待被前驱节点唤醒;只有当前驱节点为头节点并且获取同步状态成功才返回,否则进入等待,被唤醒后继续尝试(自旋);在此期间如果发生异常,在抛出异常前会取消该节点
释放同步状态:尝试释放同步状态,成功后唤醒后继未被取消的节点
在获取同步状态时,被唤醒后会检查中断标识,如果是响应中断的则会直接抛出中断异常,不响应的则是在最外层自己中断
响应超时时,在自旋获取同步状态期间会计时,如果距离超时小于1ms就不进入等待的自旋,大于则再等待对应时间
AQS充当阻塞队列,Condition充当它的等待队列来实现等待/通知模式,AQS的内部类ConditionObject在await时会加入Condition末尾并释放同步状态进入等待队列,在被唤醒后自旋(失败会进入等待)获取同步状态;在single时会CAS的将condition头节点并加入AQS尾部再去唤醒(因为一个AQS可能对应多个Condition因此要CAS保证原子性)
最后(不要白嫖,一键三连求求拉~)

本篇文章被收入专栏 由点到线,由线到面,深入浅出构建Java并发编程知识体系,感兴趣的同学可以持续关注喔
本篇文章笔记以及案例被收入 gitee-StudyJavagithub-StudyJava 感兴趣的同学可以stat下持续关注喔~
有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
关注菜菜,分享更多干货,公众号:菜菜的后端私房菜
本文由博客一文多发平台 OpenWrite 发布!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

火影

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表