【后端面经-Java】AQS详解

铁佛  金牌会员 | 2023-6-29 09:07:04 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 911|帖子 911|积分 2733

目录

1. AQS是什么?

AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock。
简单来说,AQS定义了一套框架,来实现同步类
2. AQS核心思想

2.1 基本框架

AQS的核心思想是对于共享资源,维护一个双端队列来管理线程,队列中的线程依次获取资源,获取不到的线程进入队列等待,直到资源释放,队列中的线程依次获取资源。
AQS的基本框架如图所示:

2.1.1 资源state

state变量表示共享资源,通常是int类型。

  • 访问方法
    state类型用户无法直接进行修改,而需要借助于AQS提供的方法进行修改,即getState()、setState()、compareAndSetState()等。
  • 访问类型
    AQS定义了两种资源访问类型:

    • 独占(Exclusive):一个时间点资源只能由一个线程占用;
    • 共享(Share):一个时间点资源可以被多个线程共用。

2.1.2 CLH双向队列

CLH队列是一种基于逻辑队列非线程饥饿的自旋公平锁,具体介绍可参考此篇博客。CLH中每个节点都表示一个线程,处于头部的节点获取资源,而其他资源则等待。

  • 节点结构
    Node类源码如下所示:
  1. static final class Node {
  2.     // 模式,分为共享与独占
  3.     // 共享模式
  4.     static final Node SHARED = new Node();
  5.     // 独占模式
  6.     static final Node EXCLUSIVE = null;        
  7.     // 结点状态
  8.     // CANCELLED,值为1,表示当前的线程被取消
  9.     // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
  10.     // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
  11.     // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
  12.     // 值为0,表示当前节点在sync队列中,等待着获取锁
  13.     static final int CANCELLED =  1;
  14.     static final int SIGNAL    = -1;
  15.     static final int CONDITION = -2;
  16.     static final int PROPAGATE = -3;        
  17.     // 结点状态
  18.     volatile int waitStatus;        
  19.     // 前驱结点
  20.     volatile Node prev;   
  21.     // 后继结点
  22.     volatile Node next;        
  23.     // 结点所对应的线程
  24.     volatile Thread thread;        
  25.     // 下一个等待者
  26.     Node nextWaiter;
  27.    
  28.     // 结点是否在共享模式下等待
  29.     final boolean isShared() {
  30.         return nextWaiter == SHARED;
  31.     }
  32.    
  33.     // 获取前驱结点,若前驱结点为空,抛出异常
  34.     final Node predecessor() throws NullPointerException {
  35.         // 保存前驱结点
  36.         Node p = prev;
  37.         if (p == null) // 前驱结点为空,抛出异常
  38.             throw new NullPointerException();
  39.         else // 前驱结点不为空,返回
  40.             return p;
  41.     }
  42.    
  43.     // 无参构造方法
  44.     Node() {    // Used to establish initial head or SHARED marker
  45.     }
  46.    
  47.     // 构造方法
  48.         Node(Thread thread, Node mode) {    // Used by addWaiter
  49.         this.nextWaiter = mode;
  50.         this.thread = thread;
  51.     }
  52.    
  53.     // 构造方法
  54.     Node(Thread thread, int waitStatus) { // Used by Condition
  55.         this.waitStatus = waitStatus;
  56.         this.thread = thread;
  57.     }
  58. }
复制代码
Node的方法和属性值如图所示:

其中,

  • waitStatus表示当前节点在队列中的状态;
  • thread表示当前节点表示的线程;
  • prev和next分别表示当前节点的前驱节点和后继节点;
  • nextWaiterd当存在CONDTION队列时,表示一个condition状态的后继节点。

  • waitStatus
    结点的等待状态是一个整数值,具体的参数值和含义如下所示:


  • 1-CANCELLED,表示节点获取锁的请求被取消,此时节点不再请求资源;
  • 0,是节点初始化的默认值;
  • -1-SIGNAL,表示线程做好准备,等待资源释放;
  • -2-CONDITION,表示节点在condition等待队列中,等待被唤醒而进入同步队列;
  • -3-PROPAGATE,当前线程处于共享模式下的时候会使用该字段。
2.2 AQS模板

AQS提供一系列结构,作为一个完整的模板,自定义的同步器只需要实现资源的获取和释放就可以,而不需要考虑底层的队列修改、状态改变等逻辑。
使用AQS实现一个自定义同步器,需要实现的方法:

  • isHeldExclusively():该线程是否独占资源,在使用到condition的时候会实现这一方法;
  • tryAcquire(int):独占模式获取资源的方式,成功获取返回true,否则返回false;
  • tryRelease(int):独占模式释放资源的方式,成功获取返回true,否则返回false;
  • tryAcquireShared(int):共享模式获取资源的方式,成功获取返回true,否则返回false;
  • tryReleaseShared(int):共享模式释放资源的方式,成功获取返回true,否则返回false;
一般来说,一个同步器是资源独占模式或者资源共享模式的其中之一,因此tryAcquire(int)和tryAcquireShared(int)只需要实现一个即可,tryRelease(int)和tryReleaseShared(int)同理。
但是同步器也可以实现两种模式的资源获取和释放,从而实现独占和共享两种模式。
3. 源码分析

3.1 acquire(int)

acquire(int)是获取源码部分的顶层入口,源码如下所示:
  1. public final void acquire(int arg) {
  2.     if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  3.         selfInterrupt();
  4. }
复制代码
这段代码展现的资源获取流程如下:

  • tryAcquire()尝试直接去获取资源;获取成功则直接返回
  • 如果获取失败,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  • acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。
简单总结就是:

  • 获取资源;
  • 失败就排队;
  • 排队要等待。
从上文的描述可见重要的方法有三个:tryAquire()、addWaiter()、acquireQueued()。下面将逐个分析其源码:
3.1.1 tryAcquire(int)

tryAcquire(int)是获取资源的方法,源码如下所示:
  1. protected boolean tryAcquire(int arg) {
  2.       throw new UnsupportedOperationException();
  3. }
复制代码
该方法是一个空方法,需要自定义同步器实现,因此在使用AQS实现同步器时,需要重写该方法。这也是“自定义的同步器只需要实现资源的获取和释放就可以”的体现。
3.1.2 addWaiter(Node.EXCLUSIVE)

addWaiter(Node.EXCLUSIVE)是将线程加入等待队列的尾部,源码如下所示:
  1. private Node addWaiter(Node mode) {
  2.     //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
  3.     //aquire()方法是独占模式,因此直接使用Exclusive参数。
  4.     Node node = new Node(Thread.currentThread(), mode);
  5.     //尝试快速方式直接放到队尾。
  6.     Node pred = tail;
  7.     if (pred != null) {
  8.         node.prev = pred;
  9.         if (compareAndSetTail(pred, node)) {
  10.             pred.next = node;
  11.             return node;
  12.         }
  13.     }
  14.     //上一步失败则通过enq入队。
  15.     enq(node);
  16.     return node;
  17. }
复制代码
首先,使用模式将当前线程构造为一个节点,然后尝试将该节点放入队尾,如果成功则返回,否则调用enq(node)将节点放入队尾,最终返回当前节点的位置指针。
其中,enq(node)方法是将节点加入队列的方法,源码如下所示:
  1. private Node enq(final Node node) {
  2.     for (;;) { // 无限循环,确保结点能够成功入队列
  3.         // 保存尾结点
  4.         Node t = tail;
  5.         if (t == null) { // 尾结点为空,即还没被初始化
  6.             if (compareAndSetHead(new Node())) // 头节点为空,并设置头节点为新生成的结点
  7.                 tail = head; // 头节点与尾结点都指向同一个新生结点
  8.         } else { // 尾结点不为空,即已经被初始化过
  9.             // 将node结点的prev域连接到尾结点
  10.             node.prev = t;
  11.             if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
  12.                 // 设置尾结点的next域为node
  13.                 t.next = node;
  14.                 return t; // 返回尾结点
  15.             }
  16.         }
  17.     }
  18. }
复制代码
3.1.3 acquireQueued(Node node, int arg)

这部分源码是将线程阻塞在等待队列中,线程处于等待状态,直到获取到资源后才返回,源码如下所示:
  1. // sync队列中的结点在独占且忽略中断的模式下获取(资源)
  2. final boolean acquireQueued(final Node node, int arg) {
  3.     // 标志
  4.     boolean failed = true;
  5.     try {
  6.         // 中断标志
  7.         boolean interrupted = false;
  8.         for (;;) { // 无限循环
  9.             // 获取node节点的前驱结点
  10.             final Node p = node.predecessor();
  11.             if (p == head && tryAcquire(arg)) { // 前驱为头节点并且成功获得锁
  12.                 setHead(node); // 设置头节点
  13.                 p.next = null; // help GC
  14.                 failed = false; // 设置标志
  15.                 return interrupted;
  16.             }
  17.             if (shouldParkAfterFailedAcquire(p, node) &&
  18.                 parkAndCheckInterrupt())//
  19.                 //shouldParkAfterFailedAcquire只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。
  20.                 //parkAndCheckInterrupt首先执行park操作,即禁用当前线程,然后返回该线程是否已经被中断
  21.                 interrupted = true;
  22.         }
  23.     } finally {
  24.         if (failed)
  25.             cancelAcquire(node);
  26.     }
  27. }
复制代码
acquireQueued(Node node, int arg)方法的主要逻辑如下:

  • 获取node节点的前驱结点,判断前驱节点是不是头部节点head,有没有成功获取资源。
  • 如果前驱结点是头部节点head并且获取了资源,说明自己应该被唤醒,设置该节点为head节点等待下一个获得资源;
  • 如果前驱节点不是头部节点或者没有获取资源,则判断是否需要park当前线程,

    • 判断前驱节点状态是不是SIGNAL,是的话则park当前节点,否则不执行park操作;

  • park当前节点之后,当前节点进入等待状态,等待被其他节点unpark操作唤醒。然后重复此逻辑步骤。
3.2 release(int)

release(int)是释放资源的顶层入口方法,源码如下所示:
  1. public final boolean release(int arg) {
  2.     if (tryRelease(arg)) { // 释放成功
  3.         // 保存头节点
  4.         Node h = head;
  5.         if (h != null && h.waitStatus != 0) // 头节点不为空并且头节点状态不为0
  6.             unparkSuccessor(h); //释放头节点的后继结点
  7.         return true;
  8.     }
  9.     return false;
  10. }
复制代码
release(int)方法的主要逻辑如下:

  • 尝试释放资源,如果释放成功则返回true,否则返回false;
  • 释放成功之后,需要调用unparkSuccessor(h)唤醒后继节点。
下面介绍两个重要的源码函数:tryRelease(int)和unparkSuccessor(h)。
3.2.1 tryRelease(int)

tryRelease(int)是释放资源的方法,源码如下所示:
  1. protected boolean tryRelease(int arg) {
  2.     throw new UnsupportedOperationException();
  3. }
复制代码
这部分是需要自定义同步器自己实现的,要注意的是返回值需要为boolean类型,表示释放资源是否成功。
3.2.2 unparkSuccessor(h)

unparkSuccessor(h)是唤醒后继节点的方法,源码如下所示:
[code]private void unparkSuccessor(Node node) {    //这里,node一般为当前线程所在的结点。    int ws = node.waitStatus;    if (ws < 0)//置零当前线程所在的结点状态,允许失败。        compareAndSetWaitStatus(node, ws, 0);    Node s = node.next;//找到下一个需要唤醒的结点s    if (s == null || s.waitStatus > 0) {//如果为空或已取消        s = null;        for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。            if (t.waitStatus

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

铁佛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表