从ReentrantLock角度解析AQS

打印 上一主题 下一主题

主题 917|帖子 917|积分 2751

是它,是它,就是它,并发包的基石;
一、概述

闲来不卷,随便聊一点。
一般情况下,大家系统中至少也是JDK8了,那想必对于JDK5加入的一系列功能并不陌生吧。那时候重点加入了java.util.concurrent并发包,我们简称为JUC。JUC下提供了很多并发编程实用的工具类,比如并发锁lock、原子操作atomic、线程池操作Executor等等。下面,我对JUC做了整理,大致分为下面几点:

基于JDK8,今天重点来聊下JUC并发包下的一个类,AbstractQueuedSynchronizer。
首先,浅显的从名字上看,抽象的队列同步器;实际上,这名字也跟它的作用如出一辙。抽象,即需要被继承;队列同步器,其内部维护了一个队列,供线程入队等待;最终实现多个线程访问共享资源的功能。
二、源码解析

进入AbstractQueuedSynchronizer内部,需要掌握三个重要的属性:
  1. private transient volatile Node head;
  2. private transient volatile Node tail;
  3. private volatile int state;
复制代码

  • head:标记等待队列头部节点。
  • tail:标记等待队列尾部节点。
  • state:线程的锁定状态;state=0,表示资源未被上锁;state>0,表示资源被上锁
我们调试AQS的源码,必须寻找一个源码调试的切入点,我这里用我们并发编程常用的Lock锁作为调试AQS的切入点,因为这是解决线程安全问题常用的手段之一。
2.1、源码的切入点

AQS的源码调试,从Lock接口出发,JDK源码定义如下:
  1. public interface Lock {
  2.     void lock();
  3.     void lockInterruptibly() throws InterruptedException;
  4.     boolean tryLock();
  5.     boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  6.     void unlock();
  7.     Condition newCondition();
  8. }
复制代码
从源码中看到,Lock是一个接口,所以该接口会有一些实现类,其中有一个实现类ReentrantLock,可重入锁,想必大家都不会陌生。
2.2、ReentrantLock的lock方法

通过跟踪源码可以看到,ReentrantLock#lock内部实现貌似比较简单,只有简短的一行代码
  1. public void lock() {
  2.     sync.lock();
  3. }
复制代码
其实内部是维护了一个Sync的抽象类,调用的是Sync的lock()方法。
  1. abstract static class Sync extends AbstractQueuedSynchronizer {
  2.     private static final long serialVersionUID = -5179523762034025860L;
  3.     abstract void lock();
  4.     final boolean nonfairTryAcquire(int acquires) {
  5.         final Thread current = Thread.currentThread();
  6.         int c = getState();
  7.         if (c == 0) {
  8.             if (compareAndSetState(0, acquires)) {
  9.                 setExclusiveOwnerThread(current);
  10.                 return true;
  11.             }
  12.         }
  13.         else if (current == getExclusiveOwnerThread()) {
  14.             int nextc = c + acquires;
  15.             if (nextc < 0) // overflow
  16.                 throw new Error("Maximum lock count exceeded");
  17.             setState(nextc);
  18.             return true;
  19.         }
  20.         return false;
  21.     }
  22.     protected final boolean tryRelease(int releases) {
  23.         int c = getState() - releases;
  24.         if (Thread.currentThread() != getExclusiveOwnerThread())
  25.             throw new IllegalMonitorStateException();
  26.         boolean free = false;
  27.         if (c == 0) {
  28.             free = true;
  29.             setExclusiveOwnerThread(null);
  30.         }
  31.         setState(c);
  32.         return free;
  33.     }
  34.     protected final boolean isHeldExclusively() {
  35.         return getExclusiveOwnerThread() == Thread.currentThread();
  36.     }
  37.     // ...
  38. }
复制代码
可以看到,Sync也是个抽象类,它有两个实现类:NonfairSync和FairSync,这里其实就引出了我们今天的主角,AbstractQueuedSynchronizer,Sync继承了它。
  1. static final class NonfairSync extends Sync {
  2.     private static final long serialVersionUID = 7316153563782823691L;
  3.     final void lock() {
  4.         if (compareAndSetState(0, 1))
  5.             setExclusiveOwnerThread(Thread.currentThread());
  6.         else
  7.             acquire(1);
  8.     }
  9.     protected final boolean tryAcquire(int acquires) {
  10.         return nonfairTryAcquire(acquires);
  11.     }
  12. }
复制代码
  1. static final class FairSync extends Sync {
  2.     private static final long serialVersionUID = -3000897897090466540L;
  3.     final void lock() {
  4.         acquire(1);
  5.     }
  6.     protected final boolean tryAcquire(int acquires) {
  7.         final Thread current = Thread.currentThread();
  8.         int c = getState();
  9.         if (c == 0) {
  10.             if (!hasQueuedPredecessors() &&
  11.                 compareAndSetState(0, acquires)) {
  12.                 setExclusiveOwnerThread(current);
  13.                 return true;
  14.             }
  15.         }
  16.         else if (current == getExclusiveOwnerThread()) {
  17.             int nextc = c + acquires;
  18.             if (nextc < 0)
  19.                 throw new Error("Maximum lock count exceeded");
  20.             setState(nextc);
  21.             return true;
  22.         }
  23.         return false;
  24.     }
  25. }
复制代码
下面我整理了这一系列类的UML图

通过类图可知,lock()方法最终调用的是ReentrantLock类下,内部类NonfairSync或FairSync的lock方法;对于这两个类,前者叫非公平锁,后者叫公平锁。通过ReentrantLock的构造器可知,默认使用NonfairSync类。
  1. public ReentrantLock() {
  2.     sync = new NonfairSync();
  3. }
复制代码
从NonfairSync类的lock方法出发,引出第一个AQS下的方法compareAndSetState。
  1. final void lock() {
  2.     if (compareAndSetState(0, 1))
  3.         setExclusiveOwnerThread(Thread.currentThread());
  4.     else
  5.         acquire(1);
  6. }
复制代码
从compareAndSetState方法的命名可以发现,就是比较并交换的意思,典型的CAS无锁机制。
  1. protected final boolean compareAndSetState(int expect, int update) {
  2.     // See below for intrinsics setup to support this
  3.     return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  4. }
复制代码
我们可以观察到,这里其实调用的是Unsafe类的compareAndSwapInt方法,传入的expect为0,update为1;意思是如果当前值为0,那我就把值最终更新为1。
Unsafe这个类下面,发现好多方法都是用native这个关键词进行修饰的(也包括compareAndSwapInt方法),用native关键词修饰的方法,表示原生的方法;原生方法的实现并不是Java语言,最终实现是C/C++;这并不是本文的讨论范围。
回到AQS的compareAndSetState方法,返回值是boolean类型,true表示值更新为1成功,false表示不成功。这里出现两个分支,成功,走setExclusiveOwnerThread方法;不成功,走acquire方法。咱优先讨论acquire方法。
2.3、AQS的acquire方法

先来看一下该方法的源码;
  1. public final void acquire(int arg) {
  2.     if (!tryAcquire(arg) &&
  3.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.         selfInterrupt();
  5. }
复制代码
这里的核心是两个方法,tryAcquire方法和acquireQueued方法。首先会调用tryAcquire()方法,看方法命名是尝试获取;实际上这个方法确实在就在做一件事“尝试获取资源”。
  1. protected boolean tryAcquire(int arg) {
  2.     throw new UnsupportedOperationException();
  3. }
复制代码
不过AQS中的这个方法是protected修饰,并没有去实现,仅仅只是预留了方法入口,后期需要由其子类去实现;这里的子类就是上文中的NonfairSync类,该类的源码在上文中已经贴出。这段源码其实运用了我们常见的一个设计模式,“模板方法模式”。
2.4、NonfairSync的tryAcquire方法

NonfairSync的tryAcquire方法源码如下
  1. protected final boolean tryAcquire(int acquires) {
  2.     return nonfairTryAcquire(acquires);
  3. }
复制代码
这里并没有直接去实现tryAcquire方法,而是调用了Sync类下的nonfairTryAcquire方法。
  1. final boolean nonfairTryAcquire(int acquires) {
  2.     final Thread current = Thread.currentThread();
  3.     int c = getState();
  4.     if (c == 0) {
  5.         if (compareAndSetState(0, acquires)) {
  6.             setExclusiveOwnerThread(current);
  7.             return true;
  8.         }
  9.     }
  10.     else if (current == getExclusiveOwnerThread()) {
  11.         int nextc = c + acquires;
  12.         if (nextc < 0) // overflow
  13.             throw new Error("Maximum lock count exceeded");
  14.         setState(nextc);
  15.         return true;
  16.     }
  17.     return false;
  18. }
复制代码
这里有个getState方法,最终返回的是AQS中的state字段,这个字段就是多个线程抢占的共享资源,所以这个字段很重要;volatile关键字修饰,保证内存的可见性,int类型,对于ReentrantLock锁而言,当state=0时,表示无锁,当state>0时,表示资源已被线程锁定。
下面分析下这段代码:

  • 如果state=0表示无锁,通过cas去更新state的值,这里更新为1。
  • 将持有锁的线程更新为当前线程。
  • 如果上述cas未更新成功,或者state!=0,表示已上锁。
  • 继续判断下持有锁的线程如果是当前线程,state字段做叠加,这里表示ReentrantLock的含义,表示可重入锁。
  • 最后,state!=0,持有锁的线程也不是当前线程,表示不能对资源加锁,返回false。
tryAcquire方法的判断至此结束,不过最终的走向需要看它的返回值;返回true,表示当前线程抢占到锁,或者当前线程就是抢占锁的线程,直接重入,加锁流程结束;返回false,表示没有抢占到锁,流程继续,这里就引出下个话题,CLH线程等待队列。
2.5、AQS的addWaiter方法

2.5.1、CLH队列

首先咱来看一段源码中的注释
The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks
大致意思是:CLH队列是由Craig、Landin、Hagersten这三位老哥名字的首字母叠加在一起命名的,它是一个等待队列,它是一个变种队列,用到了自旋。
这里的信息要抓住三点:等待队列、变种队列、自旋。

2.5.2、Node类

在解析addWaiter方法实现之前,就不得不提到一个内部类Node;addWaiter方法的入参是这个类型,所以先来看看这个类。源码如下:
  1. static final class Node {
  2.    
  3.     static final Node SHARED = new Node();
  4.    
  5.     static final Node EXCLUSIVE = null;
  6.    
  7.     static final int CANCELLED =  1;
  8.    
  9.     static final int SIGNAL    = -1;
  10.     static final int CONDITION = -2;
  11.     static final int PROPAGATE = -3;
  12.     volatile int waitStatus;
  13.     volatile Node prev;
  14.     volatile Node next;
  15.     volatile Thread thread;
  16.     Node nextWaiter;
  17.     final boolean isShared() {
  18.         return nextWaiter == SHARED;
  19.     }
  20.     final Node predecessor() throws NullPointerException {
  21.         Node p = prev;
  22.         if (p == null)
  23.             throw new NullPointerException();
  24.         else
  25.             return p;
  26.     }
  27.     Node() {
  28.     }
  29.     Node(Thread thread, Node mode) {     // Used by addWaiter
  30.         this.nextWaiter = mode;
  31.         this.thread = thread;
  32.     }
  33.     Node(Thread thread, int waitStatus) { // Used by Condition
  34.         this.waitStatus = waitStatus;
  35.         this.thread = thread;
  36.     }
  37. }
复制代码
这里先大致介绍下,每个属性的意思:

  • SHARED:类型就是Node,表示共享模式。
  • EXCLUSIVE:类型也是Node,表示独占模式,这里的ReentrantLock就是独占模式。
  • waitStatus:int类型,当前Node节点下,存储的线程状态。
  • CANCELLED:int类型,等于1,waitStatus属性的值之一,表示节点被取消状态。
  • SIGNAL:int类型,等于-1,waitStatus属性的值之一,表示当前节点需要去唤醒下一个节点。
  • CONDITION:int类型,等于-2,waitStatus属性的值之一,表示节点处于等待状态。
  • PROPAGATE:int类型,等于-2,waitStatus属性的值之一,表示下一个被获取的对象应该要无条件传播,该值仅在共享模式下使用。
  • prev:Node类型,指向队列中当前节点的前一个节点。
  • next:Node类型,指向队列中当前节点的下一个节点。
  • thread:存储当前线程信息。
  • nextWaiter:用来存储节点的指针,不过会出现两种情况;等待队列中,会将该属性的值设置成SHARED或者EXCLUSIVE,用来区分当前节点处于共享模式还是独享模式;条件队列中,用于存放下一个节点的指针,所以当是条件队列的情况下,这个队列是单向队列。
  • isShared():返回是否属于共享模式,true表示共享模式,false表示独享模式。
  • predecessor():获取当前节点的前一个节点。
另外,Node类还有两个有参构造器:
从作者的注释就能看出来,第一个构造器是在等待队列的时,创建节点使用,第二个构造器是在条件队列时,创建节点使用。
2.5.3、方法解析
  1. private Node addWaiter(Node mode) {
  2.     Node node = new Node(Thread.currentThread(), mode);
  3.     Node pred = tail;
  4.     if (pred != null) {
  5.         node.prev = pred;
  6.         if (compareAndSetTail(pred, node)) {
  7.             pred.next = node;
  8.             return node;
  9.         }
  10.     }
  11.     enq(node);
  12.     return node;
  13. }
  14. private Node enq(final Node node) {
  15.     for (;;) {
  16.         Node t = tail;
  17.         if (t == null) { // Must initialize
  18.             if (compareAndSetHead(new Node()))
  19.                 tail = head;
  20.         } else {
  21.             node.prev = t;
  22.             if (compareAndSetTail(t, node)) {
  23.                 t.next = node;
  24.                 return t;
  25.             }
  26.         }
  27.     }
  28. }
复制代码
其实这段方法是在创建Node对象,Node对象就是组成CLH队列的基础元素。

  • 创建一个Node对象,mode参数由上述的acquire()方法传递而来,可以看到传入Node.EXCLUSIVE,表示独占模式。
  • 判断队尾有指向节点,刚创建的节点放入队列的队尾,并且通过cas将队尾指针改成当前创建节点,最后返回当前创建节点。
  • 如果队尾没有指向节点,调用enq方法,做队列的初始化操作。
  • 这里出现了第一个自旋,enq方法是无限循环的,就像作者注释的一样,Must initialize,必须初始化。
  • 这里先是重新new了一个新的node(也可以叫空节点),标记它为队列头。
  • 随后再将addWaiter方法中创建的node,加入到队列尾。
总结下addWaiter方法干的事情:

  • 创建一个节点,存储当前线程,并标记独占模式。
  • 判断队列是否为空,不为空,通过cas将存储当前线程的node节点加入到对尾,并且对该节点做对尾标记。
  • 队列为空,通过自旋,做初始化操作。
  • 初始化过后的队列,队列头是一个空节点,队列尾是存储当前线程的节点。
2.6、AQS的acquireQueued方法

还是先来看下这个方法的源码;
  1. final boolean acquireQueued(final Node node, int arg) {
  2.     boolean failed = true;
  3.     try {
  4.         boolean interrupted = false;
  5.         for (;;) {
  6.             final Node p = node.predecessor();
  7.             if (p == head && tryAcquire(arg)) {
  8.                 setHead(node);
  9.                 p.next = null; // help GC
  10.                 failed = false;
  11.                 return interrupted;
  12.             }
  13.             if (shouldParkAfterFailedAcquire(p, node) &&
  14.                 parkAndCheckInterrupt())
  15.                 interrupted = true;
  16.         }
  17.     } finally {
  18.         if (failed)
  19.             cancelAcquire(node);
  20.     }
  21. }
  22. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  23.     int ws = pred.waitStatus;
  24.     if (ws == Node.SIGNAL)
  25.         return true;
  26.     if (ws > 0) {
  27.         do {
  28.             node.prev = pred = pred.prev;
  29.         } while (pred.waitStatus > 0);
  30.         pred.next = node;
  31.     } else {
  32.         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  33.     }
  34.     return false;
  35. }
  36. private void cancelAcquire(Node node) {
  37.     if (node == null)
  38.         return;
  39.     node.thread = null;
  40.     Node pred = node.prev;
  41.     while (pred.waitStatus > 0)
  42.         node.prev = pred = pred.prev;
  43.     Node predNext = pred.next;
  44.     node.waitStatus = Node.CANCELLED;
  45.     if (node == tail && compareAndSetTail(node, pred)) {
  46.         compareAndSetNext(pred, predNext, null);
  47.     } else {
  48.         int ws;
  49.         if (pred != head &&
  50.             ((ws = pred.waitStatus) == Node.SIGNAL ||
  51.              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  52.             pred.thread != null) {
  53.             Node next = node.next;
  54.             if (next != null && next.waitStatus <= 0)
  55.                 compareAndSetNext(pred, predNext, next);
  56.         } else {
  57.             unparkSuccessor(node);
  58.         }
  59.         node.next = node; // help GC
  60.     }
  61. }
复制代码
这段代码做了模拟查询各种用户信息的操作,其中每个线程都暂停1秒,代表在查询这五种数据;最终打印的用时时间是1017ms,说明这五个线程是同时进行的,大大提高了接口的效率。
四、写在最后

AQS提供了一个FIFO队列,这里称为CLH队列,可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件,常见的有:ReentrantLock、CountDownLatch、Semaphore等。
AQS是一个抽象类,主要是通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提供自定义的同步组件。
可以这么说,只要搞懂了AQS,那么J.U.C中绝大部分的api都能轻松掌握。
本文主要提供了从ReentrantLock出发,解析了AQS中的各种公用的方法,如果需要知道其他类中怎么去使用AQS中的方法,其实也只需要找到切入点,一步步调试下去即可,不过,我想很多地方都是和ReentrantLock中一致的。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

天空闲话

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表