媒介
日常开辟中,我们经常利用锁大概其他同步器来控制并发,那么它们的基础框架是什么呢?如何实现的同步功能呢?本文将详细讲授构建锁和同步器的基础框架--AQS,并根据源码分析其原理。
一、什么是 AQS?
1. AQS 简介
AQS(Abstract Queued Synchronizer),抽象队列同步器,它是用来构建锁或其他同步器的基础框架。虽然大多数步伐员可能永远不会利用到它,但是知道 AQS 的原理有助于明白一些锁或同步器的是如何运行的。
那么有哪些同步器是基于 AQS 实现的呢?这里仅是简单介绍,详情后续会单独总结一篇文章。
同步器说明CountDownLatch递减的计数器,直至所有线程的使命都执行完毕,才继续执行后续使命。Semaphore信号量,控制同时访问某个资源的数目。CyclicBarrier递增的计数器,所有线程达到屏障时,才会继续执行后续使命。ReentrantLock防止多个线程同时访问共享资源,类似 synchronized 关键字。ReentrantReadWriteLock维护了读锁和写锁,读锁允许多线程访问,读锁阻塞所有线程。Condition提供类似 Object 监视器的方法,于 Lock 共同可以实现等候/关照模式。FutureTask当一个线程需要等候另一线程把某个使命执行完后才能继续执行,此时可以利用 FutureTask如果你明白了 AQS 的原理,也可以基于它去自定义一个同步组件,下文会介绍。
2. AQS 数据布局
AQS 焦点是通过对同步状态的管理,来完成线程同步,底层是依靠一个双端队列来完成同步状态的管理。
- 当火线程获取同步状态失败后,会构造成一个 Node 节点并参加队列末尾,同实阻塞线程。
- 当同步状态释放时,会把头节点中的线程唤醒,让其再次尝试获取同步状态
如下图,这里只是简单绘制,详细流程见下面原理分析:
这里的每个 Node 节点都存储着当火线程、等候信息等。
3. 资源共享模式
我们在获取共享资源时,有两种模式:
模式说明示例独占模式Exclusive,资源同一时刻只能被一个线程获取ReentrantLock共享模式Share,资源可同时被多个线程获取Semaphore、CountDownLatch二、AQS 原理分析
先简单说下原理分析的流程:
- 同步状态相关源码;
- 须重写的方法;
- Node 节点布局分析;
- 独占模式下的同步状态的获取与释放;
- 共享模式下的同步状态的获取与释放;
1. 同步状态相关
上面介绍到, AQS 焦点是通过对同步状态的管理,来完成线程同步,所以起首介绍管理同步状态的三个方法,在自定义同步组件时,需要通过它们获取和修改同步状态。- //保证可见性
- private volatile int state
- //获取当前同步状态。
- protected final int getState() {
- return state;
- }
- //设置当前同步状态。
- protected final void setState(int newState) {
- state = newState;
- }
- //使用 CAS 设置当前状态,保证原子性。
- protected final boolean compareAndSetState(int expect, int update) {
- // See below for intrinsics setup to support this
- return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
- }
复制代码 2. 须重写的方法
AQS 是基于模板方法模式的,通过第一个 abstract 也可知道,AQS 是个抽象类,利用者需要继承 AQS 并重写指定方法。
以下这些方式是没有详细实现的,需要在利用 AQS 时在子类中去实现详细方法,比及介绍一些同步组件时,会详细说明如何重写。- //独占式获取同步状态,实现该方法须查询并判断当前状态是否符合预期,然后再进行CAS设置状态。
- protected boolean tryAcquire (int arg)
- //独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态。
- protected boolean tryRelease (int arg)
- //共享式获取同步状态,返回大于0的值表示获取成功,反之获取失败。
- protected int tryAcquireShared (int arg)
- //共享式释放同步状态。
- protected boolean tryReleaseShared (int arg)
- //当前同步器是否再独占模式下被线程占用,一般用来表示是否被当前线程独占。
- protected boolean isHeldExclusively ()
复制代码 3. Node 源码
Node 是双端队列中的节点,是数据布局的告急部门,线程相关的信息都存在每一个 Node 中。
3.1 Node 布局源码
源码如下:- static final class Node {
- //标记当前节点的线程在共享模式下等待。
- static final Node SHARED = new Node();
-
- //标记当前节点的线程在独占模式下等待。
- static final Node EXCLUSIVE = null;
-
- //waitStatus的值,表示当前节点的线程已取消(等待超时或被中断)
- static final int CANCELLED = 1;
-
- //waitStatus的值,表示后继节点的线程需要被唤醒
- static final int SIGNAL = -1;
-
- //waitStatus的值,表示当前节点在等待某个条件,正处于condition等待队列中
- static final int CONDITION = -2;
-
- //waitStatus的值,表示在当前有资源可用,能够执行后续的acquireShared操作
- static final int PROPAGATE = -3;
- //等待状态,值如上,1、-1、-2、-3。
- volatile int waitStatus;
-
- //前趋节点
- volatile Node prev;
- //后继节点
- volatile Node next;
-
- //当前线程
- volatile Thread thread;
-
- //等待队列中的后继节点,共享模式下值为SHARED常量
- Node nextWaiter;
-
- //判断共享模式的方法
- final boolean isShared() {
- return nextWaiter == SHARED;
- }
-
- //返回前趋节点,没有报NPE
- final Node predecessor() throws NullPointerException {
- Node p = prev;
- if (p == null)
- throw new NullPointerException();
- else
- return p;
- }
- //下面是三个构造方法
- Node() {} // Used to establish initial head or SHARED marke
-
- Node(Thread thread, Node mode) { // Used by addWaiter
- this.nextWaiter = mode;
- this.thread = thread;
- }
- Node(Thread thread, int waitStatus) { // Used by Condition
- this.waitStatus = waitStatus;
- this.thread = thread;
- }
- }
复制代码 3.2 设置头尾节点
Unsafe 类中,提供了一个基于 CAS 的设置头尾节点的方法,AQS 调用该方法进行设置头尾节点,包管并发编程中的线程安全。- //CAS自旋设置头节点
- private final boolean compareAndSetHead(Node update) {
- return unsafe.compareAndSwapObject(this, headOffset, null, update);
- }
- //CAS自旋设置尾节点,expect为当前线程“认为”的尾节点,update为当前节点
- private final boolean compareAndSetTail(Node expect, Node update) {
- return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
- }
-
复制代码 4. 独占模式
资源同一时刻只能被一个线程获取,如 ReentrantLock。
4.1 获取同步状态
代码如下,调用 acquire 方法可以获取同步状态,底层就是调用须重写方法中的 tryAcquire。如果获取失败则进入同步队列中,即使后续对线程进行终端操作,线程也不会从同步队列中移除。- public final void acquire(int arg) {
- //调用须重写方法中的tryAcquire
- if (!tryAcquire(arg) &&
- //失败则进入同步队列中
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
复制代码 获取失败会先调用 addWaiter 方法将当火线程封装成独占式模式的节点,添加到AQS的队列尾部,源码如下。- private Node addWaiter(Node mode) {
- //将当前线程封装成对应模式下的Node节点
- Node node = new Node(Thread.currentThread(), mode);
- Node pred = tail;//尾节点
- if (pred != null) {
- //双端队列需要两个指针指向
- node.prev = pred;
- //通过CAS方式
- if (compareAndSetTail(pred, node)) {
- //添加到队列尾部
- pred.next = node;
- return node;
- }
- }
- //等待队列中没有节点,或者添加队列尾部失败则调用end方法
- enq(node);
- return node;
- }
- //Node节点通过CAS自旋的方式被添加到队列尾部,直到添加成功为止。
- private Node enq(final Node node) {
- //死循环,类似 while(1)
- for (;;) {
- Node t = tail;
- if (t == null) { // 须要初始化,代表队列的第一个元素
- if (compareAndSetHead(new Node()))
- //头节点就是尾节点
- tail = head;
- } else {
- //双端队列需要两个指针指向
- node.prev = t;
- //通过自旋放入队列尾部
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
复制代码 此时,通过 addWaiter 已经将当火线程封装成独占模式的 Node 节点,并成功放入队列尾部。接下来会调用acquireQueued 方法在等候队列中排队。- final boolean acquireQueued(final Node node, int arg) {
- //获取资源失败标识
- boolean failed = true;
- try {
- //线程是否被中断标识
- boolean interrupted = false;
- //死循环,类似 while(1)
- for (;;) {
- //获取当前节点的前趋节点
- final Node p = node.predecessor();
- //前趋节点是head,即队列的第二个节点,可以尝试获取资源
- if (p == head && tryAcquire(arg)) {
- //资源获取成功将当前节点设置为头节点
- setHead(node);
- p.next = null; // help GC,表示head节点出队列
- failed = false;
- return interrupted;
- }
- //判断当前线程是否可以进入waitting状态,详解见下方
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt()) //阻塞当前线程,详解见下方
- interrupted = true;
- }
- } finally {
- if (failed)
- //取消获取同步状态,源码见下方的取消获取同步状态章节
- cancelAcquire(node);
- }
- }
- //将当前节点设置为头节点
- private void setHead(Node node) {
- head = node;
- node.thread = null;
- node.prev = null;
- }
- //判断当前线程是否可以进入waitting状态
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- //获取前趋节点的等待状态,含义见上方Node结构源码
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL) //表示当前节点的线程需要被唤醒
- return true;
- if (ws > 0) { //表示当前节点的线程被取消
- //则当前节点一直向前移动,直到找到一个waitStatus状态小于或等于0的节点
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- //排在这个节点的后面
- pred.next = node;
- } else {
- //通过CAS设置等待状态
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
- //阻塞当前线程
- private final boolean parkAndCheckInterrupt() {
- //底层调用的UnSafe类的方法 park:阻塞当前线程, unpark:使给定的线程停止阻塞
- LockSupport.park(this);
- //中断线程
- return Thread.interrupted();
- }
复制代码 acquireQueued 方法中,只有当前驱节点等于 head 节点时,才能够尝试获取同步状态,这时为什么呢?
因为 head 节点是占有资源的节点,它释放后才会唤醒它的后继节点,所以需要检测。还有一个缘故原由是因为如果遇到了非 head 节点的其他节点出队或因停止而从等候中唤醒,这时种情况则需要判定前趋节点是否为 head 节点,是才允许获取同步状态。
获取同步状态的整体流程图如下:
4.2 释放同步状态
调用须重写方法中的 tryAcquire 进行同步状态的释放,成功则唤醒队列中最前面的线程,详细如下。- public final boolean release(int arg) {
- //调用须重写方法中的tryRelease
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- //唤醒后继节点的线程,详情见下方
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
- //唤醒后继节点的线程
- private void unparkSuccessor(Node node) {
- //获取当前节点的等待状态
- int ws = node.waitStatus;
- if (ws < 0)
- //小于0则,则尝试CAS设为0
- compareAndSetWaitStatus(node, ws, 0);
- //获取后继节点
- Node s = node.next;
- //后继节点为空或者等待状态大于0,代表被节点被取消
- if (s == null || s.waitStatus > 0) {
- s = null;
- //将队列中的所有节点都向前移动
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- //不为空则进行唤醒操作
- if (s != null)
- //底层调用的UnSafe类的方法 park:阻塞当前线程, unpark:使给定的线程停止阻塞
- LockSupport.unpark(s.thread);
- }
复制代码 5.2 释放同步状态
代码如下,调用 releaseShared 方法可以释放同步状态,底层就是先调用须重写方法中的 tryReleaseShared。- public final void acquireInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())//中断则抛出异常
- throw new InterruptedException();
- if (!tryAcquire(arg))
- doAcquireInterruptibly(arg);
- }
复制代码 6. 取消获取同步状态
无论是独占模式照旧共享模式,所有的程获取同步状态的过程中,如果发生异常或是超时唤醒等,都需要将当前的节点出队,源码如下。
[code]//一般在获取同步状态方法的finally块中private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; //当火线程节点设为null Node pred = node.prev; //获取前驱节点 //前趋节点为取消状态,向前遍历找到非取消状态的节点 while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; //获取非取消节点的下一个节点 node.waitStatus = Node.CANCELLED; //将当前节点的等候状态设为取消状态 //当前节点是尾节点,则自旋将尾节点设置为前一个非取消节点 if (node == tail && compareAndSetTail(node, pred)) { //将尾节点设为前一个非取消的节点,并将其后继节点设为null,help GC compareAndSetNext(pred, predNext, null); } else { int ws;//用于表示等候状态 //pred != head:前一个非取消的节点非头节点也非尾节点 //ws == Node.SIGNAL:当前等候状态为待唤醒 //若不是待唤醒则CAS设置为待唤醒状态 //前一个非取消的节点的线程不为null if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws |