ToB企服应用市场:ToB评测及商务社交产业平台

标题: 详解AQS四:ReentrantLock条件队列原理 [打印本页]

作者: 梦见你的名字    时间: 2024-12-27 14:03
标题: 详解AQS四:ReentrantLock条件队列原理
ReentrantLock的条件队列是实现“等待通知”机制的关键,之前在《java线程间通信:等待通知机制》一文中讲过了使用ReentrantLock实现多生产者、多消费者的案例,这个案例实际上在java源码的注释中已经给了,可以看Condition接口上的注释中相干的代码:
  1. class BoundedBuffer {
  2.     final Lock lock = new ReentrantLock();
  3.     final Condition notFull  = lock.newCondition();
  4.     final Condition notEmpty = lock.newCondition();
  5.     final Object[] items = new Object[100];
  6.     int putptr, takeptr, count;
  7.     public void put(Object x) throws InterruptedException {
  8.         lock.lock();
  9.         try {
  10.             while (count == items.length)
  11.                 notFull.await();
  12.             items[putptr] = x;
  13.             if (++putptr == items.length) putptr = 0;
  14.             ++count;
  15.             notEmpty.signal();
  16.         } finally {
  17.             lock.unlock();
  18.         }
  19.     }
  20.     public Object take() throws InterruptedException {
  21.         lock.lock();
  22.         try {
  23.             while (count == 0)
  24.                 notEmpty.await();
  25.             Object x = items[takeptr];
  26.             if (++takeptr == items.length) takeptr = 0;
  27.             --count;
  28.             notFull.signal();
  29.             return x;
  30.         } finally {
  31.             lock.unlock();
  32.         }
  33.     }
  34. }
复制代码
在文章《详解AQS二:ReentrantLock公平锁原理》中已经详细说了lock和unlock方法的实现原理,实际上就是利用了AQS队列实现壅闭、加锁、解锁。那么当lock、unlock方法中间夹杂着Condition的await、signal方法的调用,又发生了什么事变呢?
一、条件等待的原理:ConditionObject

统统都要从ConditionObject类说起,它是Condition接口的实现类,lock.newCondition方法调用实际返回的就是ConditionObject类的实例。那么condition.await方法调用和condition.signal方法调用到底发生了什么呢?这实际上就是AQS队列中的Node元素和条件等待队列中元素的相互移动:
复习一下lock和unlock的句式
  1. private final Lock lock=new ReentrantLock(); // 创建一个Lock接口实例
  2. ……
  3. // 申请锁lock,如果发生竞争且竞争锁失败,则当前线程进入AQS队列等待
  4. lock.lock();
  5. try{
  6.   // 阻塞被释放后,当前线程执行临界区代码
  7.     //await、signal方法调用
  8.   ……
  9. }finally{
  10.   // 在finally块中释放锁,当前线程节点从AQS队列中移除
  11.   lock.unlock();
  12. }
复制代码
await方法、signal方法都必须在临界区代码中执行,await方法调用之后大家都猜到了:当前线程会进入条件等待队列等待。但是明明当前线程还在AQS队列中,还没有调用unlock方法呢,那条件等待队列中和AQS队列中是不是都会有当前线程的等待实例?答案是否定的。实际上调用sinal方法之后当前线程会创建新Node等待在Condition队列尾部,同时释放AQS锁,本身节点将会在AQS队列中被移除。
同样的,当调用signal方法之后,Condition队列中的节点会被移除,同时会创建新节点到AQS队列中等待重新抢占锁。
二、await方法原理

await方法执行时当前线程在AQS队列中必然是头部节点,也就是已经获取锁的节点。await方法执行的原理就是将节点从AQS队列中的头部挪到Condition队列的尾部,之后释放锁并唤醒AQS队列的下一个节点让其抢占锁。
  1. public final void await() throws InterruptedException {
  2.     //如果当前线程发生了中断,就抛出中断异常
  3.     if (Thread.interrupted())
  4.         throw new InterruptedException();
  5.     //将当前线程封装Node节点并加到Condition等待队列
  6.     Node node = addConditionWaiter();
  7.     //释放锁并获取释放锁时查询的锁状态值(重入次数),以便于以后重新进入AQS队列使用
  8.     int savedState = fullyRelease(node);
  9.     //中断状态暂存标记
  10.     int interruptMode = 0;
  11.     /*
  12.      * 循环查询节点是否在AQS队列中,如果在AQS队列中表示已经重新进入AQS队列了,
  13.      * 这意味着有别的线程调用了signal方法唤醒了当前线程
  14.      */
  15.     while (!isOnSyncQueue(node)) {
  16.         //不在AQS队列中,那就继续挂起等待
  17.         LockSupport.park(this);
  18.         //检查中断状态,防止由于发生中断导致LockSupport.park(this);失效
  19.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  20.             break;
  21.     }
  22.     //抢锁,如果在抢锁过程中发生了异常,则将中断标记设置为REINTERRUPT
  23.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  24.         interruptMode = REINTERRUPT;
  25.     //如果是因中断导致的LockSupport.park(this);挂起失效,则遍历等待队列中的节点,删除无效节点
  26.     if (node.nextWaiter != null) // clean up if cancelled
  27.         unlinkCancelledWaiters();
  28.     //判定是否应当抛出中断异常还是仅仅恢复中断标记
  29.     if (interruptMode != 0)
  30.         reportInterruptAfterWait(interruptMode);
  31. }
复制代码
await方法的代码连续了AQS的代码风格,非常精简,但是信息量很大,每一行代码都值得推敲。总体来说,await方法干了以下的事变:
(1)封装当前线程为新Node节点,并添加到等待队列尾部
(2)AQS队列释放锁,并唤醒AQS同步队列中头部节点的后继节点
(3)执行while循环,将该节点的线程壅闭,直到该节点离开等待队列,进入AQS同步队列;或者检测到了中断异常竣事while循环。
(4)退出while循环后,执行acquireQueued方法尝试获取锁。
(5)执行善后工作,遍历等待队列中的节点删除无效节点。
(6)最后根据中断发生的时机,signal方法调用前,就抛出异常;否则就设置下中断标记,是否抛出异常看业务代码处理。
1、加入等待队列:addConditionWaiter
  1. /**
  2. * 添加一个新的等待节点到等待队列
  3. * @return 添加到等待队列中的节点对象
  4. */
  5. private Node addConditionWaiter() {
  6.     Node t = lastWaiter;
  7.     // 如果最后一个节点取消等待了,则执行unlinkCancelledWaiters方法遍历等待队列删除无效节点
  8.     if (t != null && t.waitStatus != Node.CONDITION) {
  9.         unlinkCancelledWaiters();
  10.         t = lastWaiter;
  11.     }
  12.     //封装当前线程为新Node节点,并且状态为CONDITION
  13.     Node node = new Node(Thread.currentThread(), Node.CONDITION);
  14.     if (t == null)
  15.         firstWaiter = node;
  16.     else
  17.         t.nextWaiter = node;//将新节点放到队尾
  18.     lastWaiter = node;
  19.     return node;
  20. }
复制代码
这个方法很简朴,就是创建了新节点Node对象,而且置为CONDTION状态表示等待在等待队列中;最后将其放到等待队列的队尾。
等待队列是一个单向队列,节点和节点之间使用nextWaiter指针链接。
值得一提的是,等待队列中节点Node类和AQS同步队列中的Node类是同一个类,之前在分析AQS时没使用到的节点状态Node.CONDITION以及属性nextWaiter在等待队列中全都用到了。
addConditionWaiter方法中比力让人费解的是这段代码:
  1. Node t = lastWaiter;
  2. // 如果最后一个节点取消等待了,则执行unlinkCancelledWaiters方法遍历等待队列删除无效节点
  3. if (t != null && t.waitStatus != Node.CONDITION) {
  4.     unlinkCancelledWaiters();
  5.     t = lastWaiter;
  6. }
复制代码
我们知道等待在等待队列中的节点必定是CONDITION状态,若不是这个状态,表示该节点已经取消等待了,那这时候就要将该节点从等待队列中移除。我们会有疑问,为什么该节点取消等待的时候不本身出队?感觉AQS所有的操作都似乎有“Lazy”的特性,包罗AQS队列的初始化是在自旋入队的时候、AQS节点的出队在自旋抢占锁的时候。。都不是本身自动操作,而是“变乱驱动”模式的。
2、删除无效节点:unlinkCancelledWaiters

节点入等待队列的时候可能会执行unlinkCancelledWaiters方法,该方法的作用是删除等待队列中已经取消等待的节点。
  1. private void unlinkCancelledWaiters() {
  2.     Node t = firstWaiter;
  3.     Node trail = null;
  4.     while (t != null) {
  5.         Node next = t.nextWaiter;
  6.         if (t.waitStatus != Node.CONDITION) {
  7.             t.nextWaiter = null;
  8.             if (trail == null){
  9.                 firstWaiter = next;
  10.             }else{
  11.                 trail.nextWaiter = next;
  12.             }
  13.             if (next == null){
  14.                 lastWaiter = trail;
  15.             }
  16.         }else{
  17.             trail = t;
  18.         }
  19.         t = next;
  20.     }
  21. }
复制代码
这段代码通过trail和t两个指针一前一后遍历整个等待队列,并剔除掉非Node.CONDITION状态的节点。
unlinkCancelledWaiters方法团体来说比力简朴,该方法仅仅在当前线程持有锁的时候被调用,用于将取消等待的节点从等待队列中移除。在以下两个场景下该方法会被调用:
该方法需要避免在signal方法没有被调用的情况下产生无法被回收的垃圾节点,全遍历等待队列中的所有节点也是无奈之举,好在该方法被调用有前提:sinal方法没有被调用,而且等待节点因为一些缘故原由取消了等待。
该方法遍历了等待队列中的所有节点,这样做有个好处:如果有大量的等待节点取消了等待,可能会导致“取消风暴”,这样一次遍历就取消所有指向垃圾节点的指针,可以避免多次重新遍历以提高运行效率。
3、释放锁:fullRelease

节点加入等待队列以后会释放锁,并唤醒后继节点
  1. final int fullyRelease(Node node) {
  2.     boolean failed = true;
  3.     try {
  4.         //获取锁状态,实际上就是当前线程的锁重入次数,以方便后续恢复
  5.         int savedState = getState();
  6.         //释放锁
  7.         if (release(savedState)) {
  8.             failed = false;
  9.             return savedState;
  10.         } else {
  11.             throw new IllegalMonitorStateException();
  12.         }
  13.     } finally {
  14.         if (failed)
  15.             node.waitStatus = Node.CANCELLED;
  16.     }
  17. }
复制代码
ReentrantLock公平锁和非公平锁锁释放都调用的同一个方法:release方法,fullRelease方法不停没有被提及,因为它是为等待队列准备的方法。fullRelease方法内部会调用release方法。
fullRelease方法和release方法最大的区别就是release方法只会返回true/false表示释放锁成功了照旧失败了,fullRelease方法则会先生存锁的状态,释放完锁以后会将该state返回给调用方await方法,用于以后规复线程在AQS同步队列中的状态。
4、是否在AQS队列:isOnSyncQueue

释放完成锁以后会执行while循环,不断执行isOnSyncQueue方法以判断节点是否在同步队列中,如果节点出现在了AQS同步队列中,阐明节点已经被唤醒了,它可以重新尝试获取锁了。
  1. final boolean isOnSyncQueue(Node node) {
  2.     /**
  3.      * 节点状态为CONDITION表示节点肯定在等待队列
  4.      * AQS同步队列是双向队列,等待在AQS同步队列中的所有节点prev都有值,
  5.      * 为null表示不在AQS同步队列中等待;反过来node.prev不为空并不表示node节点
  6.      * 一定在AQS同步队列。
  7.      */
  8.     if (node.waitStatus == Node.CONDITION || node.prev == null)
  9.         return false;
  10.     /**
  11.      * 等待队列中的节点通过nextWaiter指针相互链接,next指针用于AQS队列
  12.      * next指针不为空表示节点必定在AQS同步队列;反过来,node.next为空说明不了
  13.      * node节点不在AQS队列
  14.      */
  15.     if (node.next != null)
  16.         return true;
  17.        
  18.     //从尾部到头部遍历节点
  19.     return findNodeFromTail(node);
  20. }
  21. /**
  22. * 从尾部到头部遍历AQS同步队列查找指定节点
  23. */
  24. private boolean findNodeFromTail(Node node) {
  25.     Node t = tail;
  26.     for (;;) {
  27.         if (t == node)
  28.             return true;
  29.         if (t == null)
  30.             return false;
  31.         t = t.prev;
  32.     }
  33. }
复制代码
得表明一下:
node.prev不为空阐明不了node节点在AQS同步队列:
可以看下AQS的enq自旋入队方法
可以看到,在CAS操作入队之前,node.prev就已经设置了值了,而CAS可能会失败,所以就算node.prev有值,node节点也可能没有在队列中。
node.next为空阐明不了node节点不在AQS同步队列:
这个很好明白,AQS同步队列tail节点的next指针是null。
最后一个题目是:为什么要从尾部到头部遍历AQS同步队列查找指定节点,而不是从头部到尾部查找?
一个重要的缘故原由是当前节点刚加入AQS队列尾部的话,从尾部查找可能不会经历很多遍历就能查找到该节点,从尾部开始查找可以提高效率。
三、await方法中的中断

上一章节讲到了isOnSyncQueue方法,该方法判定当前节点是否在AQS同步队列中,如果在同步队列中,则竣事while循环,开始执行acquireQueue方法抢占锁;否则执行LockSupport.park方法将线程壅闭。看似逻辑很简朴,但是涉及到了中断相干的逻辑,比力复杂,需要单独拎出来掰扯掰扯。
  1. public final void await() throws InterruptedException {
  2.     ....省略之前的代码....
  3.     while (!isOnSyncQueue(node)) {
  4.             //不在AQS队列中,那就继续挂起等待
  5.             LockSupport.park(this);//①
  6.             //检查中断状态,防止由于发生中断导致LockSupport.park(this);失效
  7.             if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//②
  8.                 break;
  9.     }
  10.     //抢锁,如果在抢锁过程中发生了异常,则将中断标记设置为REINTERRUPT
  11.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  12.         interruptMode = REINTERRUPT;
  13.     //如果是因中断导致的LockSupport.park(this);挂起失效,则遍历等待队列中的节点,删除无效节点
  14.     if (node.nextWaiter != null) // clean up if cancelled
  15.         unlinkCancelledWaiters();
  16.     //判定是否应当抛出中断异常还是仅仅恢复中断标记
  17.     if (interruptMode != 0)
  18.         reportInterruptAfterWait(interruptMode);
  19. }
复制代码
1、LockSupport.park放行的缘故原由

在①处执行了LockSupport.park代码,线程被壅闭挂起,那什么时候线程会被释放继续运行临界区代码呢?
有三个可能的缘故原由:
可能很多人都会想到signal方法会唤醒线程,但是想不到可能发生的中断会让LockSupport.lock方法失效仍然会释放线程让代码继续运行。这也是为什么接下来的代码要紧接着检查线程中断。
2、检查中断:checkInterruptWhileWaiting

确切的说是检查是否发生了中断,以及如果发生了中断,是在signal方法执行前发生了中断,照旧signal方法执行后发生了中断。
  1. /**
  2. * 返回值有三种:
  3. * 0:没有发生中断
  4. * -1(THROW_IE):发生了中断,而且中断是在signal方法调用前发生的
  5. * 1(REINTERRUPT):发生了中断,而且中断是在signal方法调用后发生的
  6. */
  7. private int checkInterruptWhileWaiting(Node node) {
  8.     return Thread.interrupted() ?
  9.         (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  10.         0;
  11. }
  12. /**
  13. * 如果需要,在取消等待后将节点转移到AQS同步队列。
  14. * 如果节点是在被唤醒前取消等待的,则返回true,否则返回false。
  15. * 只有发生了中断才会调用该方法。
  16. */
  17. final boolean transferAfterCancelledWait(Node node) {
  18.    
  19.     /*
  20.      * 这里的CAS操作如果成功,说明当前节点的状态是Node.CONDITION
  21.      * 也就是说,当前节点必定是在等待队列,更进一步,说明了当前中断
  22.      * 是在被唤醒之前发生的,因为如果调用了signal方法,当前节点必然
  23.      * 已经在AQS同步队列中,已经是状态0了。
  24.      */
  25.     if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  26.         //自旋入队
  27.         enq(node);
  28.         return true;
  29.     }
  30.     /*
  31.      * 代码执行到这里,并不能说明signal方法已经执行完毕,有可能signal方法
  32.      * 正在执行中,Node状态已经更改成了0,但是enq方法还没执行完成,节点还在
  33.      * 疯狂自旋入队。所以这里要判断是否完成了enq方法,就要判定下当前节点是否
  34.      * 已经在AQS同步队列中。如果enq方法还没执行完,就让出CPU时间片,稍稍等待
  35.      * 下,通常enq方法会很快完成,所以不用担心这里会浪费CPU资源。
  36.      */
  37.     while (!isOnSyncQueue(node))
  38.         Thread.yield();
  39.     //singal方法执行后发生的中断,返回false
  40.     return false;
  41. }
复制代码
checkInterruptWhileWaiting方法有三个返回值:0、1(REINTERRUPT)、-1(THROW_IE)。在这里,要特别注意Thread.interrupted()方法的调用,该方法在上一章节《详解AQS二:ReentrantLock公平锁原理》就说过了,它有个很重要的特点是会重置中断状态为false,而且返回中断状态。checkInterruptWhileWaiting的返回值1和-1都代表发生过中断。不管是1照旧-1,都会停止while循环,中断值则会暂存在interruptMode变量中。
  1. while (!isOnSyncQueue(node)) {
  2.         //不在AQS队列中,那就继续挂起等待
  3.         LockSupport.park(this);//①
  4.         //检查中断状态,防止由于发生中断导致LockSupport.park(this);失效
  5.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//②
  6.             break;
  7. }
复制代码
3、抢锁以及重设中断状态

中断循环以后,当前线程就开始抢锁,抢锁方法是acquireQueued方法,该方法已经在文章《详解AQS二:ReentrantLock公平锁原理》中讲过,不再赘述。
acquireQueued方法返回值是true/false,true表示发生了中断,false表示未发生中断,这个中断是在抢锁过程中发生的;在抢锁之前,while循环中调用了checkInterruptWhileWaiting方法,该方法调用了Thread.interrupted()方法重置了中断状态,正是为了不影响acquireQueued方法的调用。
我们分析下抢锁代码
  1. //抢锁,如果在抢锁过程中发生了异常,则将中断标记设置为REINTERRUPT
  2. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  3.     interruptMode = REINTERRUPT;
复制代码
这段代码执行了如下代码流程:
现在我们聚焦于interruptMode != THROW_IE这个判断条件,如果interruptMode不是THROW_IE,它会是什么值?只有可能是剩下的两种值:
在以上两种情况下,interruptMode会被修改成REINTERRUPT值(实际上第二种情况interruptMode的值已经是REINTERRUPT,重复修改也无妨)
这样,终极interruptMode一共只有可能有三种范例的值:
针对这三种范例的值,接下来会如何处理呢?
4、中断的处理方式

await方法最后一段代码:
  1. //判定是否应当抛出中断异常还是仅仅恢复中断标记
  2.     if (interruptMode != 0)
  3.         reportInterruptAfterWait(interruptMode);
复制代码
这里首先判断了interruptMode是否是0,只有非0的值才会被处理,也就是说,只有异常范例才会被处理。
  1. private void reportInterruptAfterWait(int interruptMode)
  2.     throws InterruptedException {
  3.     if (interruptMode == THROW_IE)
  4.         throw new InterruptedException();
  5.     else if (interruptMode == REINTERRUPT)
  6.         selfInterrupt();
  7. }
复制代码
终于看到了对THROW_IE和REINTERRUPT范例的处理:
THROW_IE:抛出InterruptedException异常
REINTERRUPT:设置中断标记,这样如果在接下来的业务处理中出现了sleep等等待方法,将会抛出InterruptedException异常。
使用表格总结一下异常处理的各种情况
可以看到,是否要抛出异常照旧设置中断标记,主要取决于节点在等待队列中发生中断的时机:
等待队列中在signal方法调用前发生的中断: 一定会抛出中断异常,不管acquireQueued方法有没有发生中断
等待队列中在signal方法调用后发生的中断: 一定会设置中断标记位,不管acquireQueued方法有没有发生中断
等待队列中没有发生中断:如果抢锁过程中发生了中断,就设置中断标记位;如果抢锁过程中没有发生中断,就什么都不做。
思索一下,为什么要这么做呢?
acquireQueued是一个“uninterruptible”的类,也就是说,它不会抛出中断异常,但是它会将中断反馈给调用方,让调用方决定该如何处理异常,那await就作为调用方,结合了在等待队列中是否发生异常做出了综合决定:统统看是否在等待队列中发生的异常种类,如果没有发生异常,就向AQS同步队列处理异常范例一样,让业务本身决定是否抛出异常;如果在等待队列中发生了异常:
5、删除无效节点:unlinkCancelledWaiters

await方法最后会判定node.nextWaiter是否为null,如果是null则遍历等待队列中的节点,删除无效节点
  1. //如果是因中断导致的LockSupport.park(this);挂起失效,则遍历等待队列中的节点,删除无效节点
  2. if (node.nextWaiter != null) // clean up if cancelled
  3.     unlinkCancelledWaiters();
复制代码
为什么会判断node.nextWaiter是否为null?想要知道这个答案,得看看signal方法的实现逻辑。
四、signal方法原理

signal方法就是把线程节点从等待队列转移到AQS同步队列,之后就是AQS正常获取锁并被前置节点线程唤醒后继续执行同步代码块中await方法之后的的临界区代码的过程。
  1. public final void signal() {
  2.     //如果当前锁的持有线程不是当前线程,就抛出异常
  3.     if (!isHeldExclusively())
  4.         throw new IllegalMonitorStateException();
  5.     //firstWaiter是等待时间最久的线程,从它开始唤醒最公平
  6.     Node first = firstWaiter;
  7.     if (first != null)
  8.         doSignal(first);
  9. }
复制代码
1、判定锁的持有线程:isHeldExclusively

这个方法是个AQS的钩子方法,实现在ReentrantLock类中的Sync抽象类
  1. protected final boolean isHeldExclusively() {
  2.     return getExclusiveOwnerThread() == Thread.currentThread();
  3. }
复制代码
这个方法特别简朴,就是简朴判定了下当前线程是否是持有锁的线程,什么情况下会发生不是持有锁的线程呢?明明当前线程正在运行中,它肯定是持有锁的线程啊,举个简朴的例子如下所示:
  1. /**
  2. * @author kdyzm
  3. * @date 2024/12/27
  4. */
  5. public class AQSNotHolderThread {
  6.    
  7.     public static void main(String[] args) {
  8.         Lock lock1 = new ReentrantLock();
  9.         Lock lock2 = new ReentrantLock();
  10.         Condition con1 = lock1.newCondition();
  11.         
  12.         Thread thread = new Thread(() -> {
  13.             lock1.lock();
  14.             try {
  15.                 con1.await();
  16.             } catch (InterruptedException e) {
  17.                 e.printStackTrace();
  18.             } finally {
  19.                 lock1.unlock();
  20.             }
  21.         });
  22.         thread.start();
  23.         lock2.lock();
  24.         try {
  25.             //在lock2的临界区代码使用lock1的con1,会抛出IllegalMonitorStateException异常
  26.             con1.signal();
  27.         } finally {
  28.             lock2.unlock();
  29.         }
  30.     }
  31. }
复制代码
以上代码会抛出IllegalMonitorStateException异常。
isHeldExclusively方法调用的目的就是要保证Condition方法的调用必须在对应的锁下的同步代码块中。
2、doSignal方法

doSignal方法会从头部开始查找第一个没有被取消的节点,将其转移到AQS同步队列或者直接唤醒线程。
  1. private void doSignal(Node first) {
  2.     do {
  3.         if ( (firstWaiter = first.nextWaiter) == null)
  4.             lastWaiter = null;
  5.         //不管后续有没有transferForSignal成功,都将nextWaiter指针置为null
  6.         first.nextWaiter = null;
  7.     } while (!transferForSignal(first) &&
  8.              (first = firstWaiter) != null);
  9. }
复制代码
在上面的代码中,first.nextWaiter = null;这块代码是亮点,固然不经意的一行代码,却是表明第三章第五节:删除无效节点:unlinkCancelledWaiters 的关键,之前留下了一个疑问,在await方法的最后:
  1. //如果是因中断导致的LockSupport.park(this);挂起失效,则遍历等待队列中的节点,删除无效节点
  2. if (node.nextWaiter != null) // clean up if cancelled
  3.     unlinkCancelledWaiters();
复制代码
会判定node.nextWaiter是否为null,如果不是null,就执行unlinkCancelledWaiters方法打扫无效节点。
从doSignal方法中可以看到,只要调用了调用了doSignal方法,节点的nextWaiter就会被置为null;那await方法中在LockSupport.park方法调用之后发现了node.nextWaiter竟然不为空,那就表示signal方法没有被调用,换句话说,是因为中断才使得LockSupport.park失效终极导致代码运行到这里的,在signal方法调用前发生了中断,那节点是取消等待状态,需要将节点从等待队列中移除出去,执行unlinkCancelledWaiters()方法就公道了。
3、转移节点到AQS:transferForSignal

该方法的主要目的是将线程节点从条件等待队列转移到AQS同步队列。
  1. final boolean transferForSignal(Node node) {
  2.     /*
  3.      * 如果节点已经取消等待,则不再尝试转移节点到AQS同步队列
  4.      */
  5.     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  6.         return false;
  7.     /*
  8.      * 节点自旋转进入AQS同步队列,并返回前置节点
  9.      */
  10.     Node p = enq(node);
  11.     int ws = p.waitStatus;
  12.     /**
  13.      * ws>0:前置节点状态是取消状态
  14.      * !compareAndSetWaitStatus(p, ws, Node.SIGNAL):设置前驱节点为Signal状态失败
  15.      */
  16.     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  17.         LockSupport.unpark(node.thread);//唤醒节点线程
  18.     return true;
  19. }
复制代码
我们分析一下这段代码:
本来目标节点在await方法中已经是被LockSupport.park方法将线程壅闭了,enq方法完成之后,实际上该节点已经完成了入队,应该是是等待前驱节点唤醒它的状态,似乎等价于AQS的acquire方法
  1. public final void acquire(int arg) {
  2.     if (!tryAcquire(arg) &&
  3.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.         selfInterrupt();
  5. }
复制代码
中的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法调用,然而真的等价吗?实际上不一定等价,其区别就是前驱节点的状态,由于目标节点是“先壅闭,后入队”,和正常进入AQS队列等待的线程“先入队,后壅闭”的流程是反着来的,所以它没有调用过shouldParkAfterFailedAcquire方法校正前驱节点状态:
  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2.     int ws = pred.waitStatus;
  3.     if (ws == Node.SIGNAL)
  4.         return true;
  5.     if (ws > 0) {
  6.         do {
  7.             node.prev = pred = pred.prev;
  8.         } while (pred.waitStatus > 0);
  9.         pred.next = node;
  10.     } else {
  11.         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  12.     }
  13.     return false;
  14. }
复制代码
shouldParkAfterFailedAcquire方法执行完成后会将取消状态的节点剔除,而且将前置节点状态修正为SIGNAL。
在transferForSignal方法中的代码
  1. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  2.         LockSupport.unpark(node.thread);//唤醒节点线程
复制代码
实际上就是在快速验证前置节点状态,如果前置节点状态错误且无法修正,则将目标线程直接唤醒让其继续执行await方法中LockSupport.park方法之后的代码逻辑:
  1. while (!isOnSyncQueue(node)) {
  2.         //不在AQS队列中,那就继续挂起等待
  3.         LockSupport.park(this);//①
  4.         //检查中断状态,防止由于发生中断导致LockSupport.park(this);失效
  5.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//②
  6.             break;
  7. }//抢锁,如果在抢锁过程中发生了异常,则将中断标记设置为REINTERRUPT
  8. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  9.     interruptMode = REINTERRUPT;
复制代码
由于在signal方法中已经入队,所以这里会竣事while循环,执行下一个if语句中的acquireQueued(node, savedState),这个方法中如果获取锁失败,它会调用shouldParkAfterFailedAcquire方法:
  1. /**
  2. * 已经在队列中的线程通过独占模式获取锁的方法,该方法不受中断的影响。
  3. * 该方法也同样用于等待在条件等待队列中的线程获取锁。
  4. */
  5. final boolean acquireQueued(final Node node, int arg) {
  6.     boolean failed = true;
  7.     try {
  8.         boolean interrupted = false;
  9.         for (;;) {
  10.             final Node p = node.predecessor();
  11.             //如果前置节点是头部节点,当前节点就尝试抢占锁
  12.             if (p == head && tryAcquire(arg)) {
  13.                 //抢锁成功后将抢锁节点设置为头结点
  14.                 setHead(node);
  15.                 //释放头结点利于GC
  16.                 p.next = null;
  17.                 failed = false;
  18.                 return interrupted;
  19.             }
  20.             //如果应该阻塞等待就挂起线程进入阻塞状态
  21.             if (shouldParkAfterFailedAcquire(p, node) &&
  22.                 parkAndCheckInterrupt())
  23.                 interrupted = true;
  24.         }
  25.     } finally {
  26.         if (failed)
  27.             cancelAcquire(node);
  28.     }
  29. }
复制代码
调用shouldParkAfterFailedAcquire方法就会重新校正前置节点状态为SIGNAL而且剔除掉前置节点中的取消状态的节点。
这样await方法将会在acquireQueued方法调用处壅闭,等待获取锁之后执行后续的临界区代码。
这样就完美闭环了。


最后,接待关注我的博客呀: https://blog.kdyzm.cn
END.

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4