阅读本文前,需要储备的知识点如下,点击链接直接跳转。
java线程详解
Java不能操作内存?Unsafe了解一下
一文读懂LockSupport
AQS简介
AQS即AbstractQueuedSynchronizer的简称,翻译过来就是抽象队列同步器的意思,由Doug Lea大神开发的。说他抽象是因为它提供的是一个基于队列的同步器框架,定义了一些基础功能方法(控制状态变量,获取和释放同步状态方法以及入队出队操作等),具体场景使用只需要根据需要实现对应的方法即可。我们在锁(比如ReentrantLock)、并发工具类(比如CountDownLatch)都可以看到内部类继承了AbstractQueuedSynchronizer,也就是说AQS才是这些类的基石。说了这么多,感觉把抽象说的越抽象了,下面我们从几个栗子入手吧。
注意:本文使用的JDK版本为JDK8,AQS的代码非常巧妙和经典,很多细节和模块都可以单独拉出来写一篇文章,很多细节问题建议自行阅读和思考。
本篇文章主要讲独占模式的应用和原理分析,关于共享模式不再这里展开细讲。
应用举例
ReentrantLock的使用
3个线程获取同一个锁,获得后休眠1秒结束,所以3个线程间隔1秒打印输出。- public class ReentrantLockTest {
- public static void main(String[] args) {
- lockTest();
- }
- public static void lockTest() {
- ReentrantLock lock = new ReentrantLock();
- PrintThread t1 = new PrintThread(lock, "t1");
- PrintThread t2 = new PrintThread(lock, "t2");
- PrintThread t3 = new PrintThread(lock, "t3");
- t1.start();
- t2.start();
- t3.start();
- }
- }
- class PrintThread extends Thread {
- private Lock lock;
- public PrintThread(Lock lock, String threadName) {
- this.lock = lock;
- this.setName(threadName);
- }
- @Override
- public void run() {
- lock.lock();
- try {
- System.out.println(String.format("time:%s,thread:%s,result:%s",
- DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()),
- Thread.currentThread().getName(), "get lock success"));
- Thread.sleep(1000);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- }
复制代码 打印结果如下- time:2021-04-13 13:53:55,thread:t1,result:get lock success
- time:2021-04-13 13:53:56,thread:t2,result:get lock success
- time:2021-04-13 13:53:57,thread:t3,result:get lock success
复制代码 是因为这3个线程执行时都要先获取锁执行完逻辑后再释放锁,而ReentrantLock是独占锁,相当于这3个线程间是串行执行的,相互间隔1秒(注意,线程的先后执行顺序不一定是固定的,但线程内有休眠1秒的操作,所以至少相隔1秒)
CountDownLatch的使用
main线程创建一个CountDownLatch latch = new CountDownLatch(1),3个线程持有该CountDownLatch并调用CountDownLatch的await()方法,直到main线程休眠2秒后执行CountDownLatch的countDown()方法,释放一个同步状态使得数量值为0,唤醒等待在await()的线程继续执行。- public class CountDownLatchTest {
- public static void main(String[] args) throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(1);
- ConcurrentThread concurrentThread1 = new ConcurrentThread(latch, "t1");
- ConcurrentThread concurrentThread2 = new ConcurrentThread(latch, "t2");
- ConcurrentThread concurrentThread3 = new ConcurrentThread(latch, "t3");
- concurrentThread1.start();
- concurrentThread2.start();
- concurrentThread3.start();
- Thread.sleep(2000);
- System.out.println(Thread.currentThread().getName() + " countDown...");
- latch.countDown();
- }
- }
- class ConcurrentThread extends Thread {
- private CountDownLatch latch;
- public ConcurrentThread(CountDownLatch latch, String threadName) {
- this.latch = latch;
- this.setName(threadName);
- }
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName() + " is ready...");
- try {
- latch.await();
- System.out.println(Thread.currentThread().getName() + " is executing...");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
复制代码 打印结果如下(注意,线程的先后执行顺序不一定是固定的)- t1 is ready...
- t3 is ready...
- t2 is ready...
- main countDown...
- t1 is executing...
- t3 is executing...
- t2 is executing...
复制代码 这三个线程在执行时先打印“...ready”后,然后等待在await()方法上,由于CountDownLatch是共享锁,而初始的state是1,main线程休眠2秒后调用了countDown()方法会将state置成0,会唤起等待队列里的所有后继线程,所以会相继打印“executing...”。
这里就两个简单的使用栗子,不过可以看出,均是在多线程场景中使用,而且代码里并没有AQS相关的影子,那是因为在这些类的内部有内部类去继承了AbstractQueuedSynchronizer,由这些内部类处理业务逻辑,底层核心逻辑是由AQS框架提供的(线程排队、线程等待、线程唤醒、超时处理、中断处理等),子类调用API实现核心逻辑,AQS在多线程中使用发挥真正的作用。下面我们一步步来分析AQS。
AQS原理分析
类UML图
图中红色连接的线表示内部类,蓝色线表示继承
我们首先来看看AQS相关的URL类图吧,从JDK的源码中我们发现,AQS真正出现的在两个地方,第一个就是lock锁(比如ReentrantLock等),第二个就是并发工具类(比如CountDownLatch、Semaphore等),由这些内部类继承了AQS去实现相关的方法辅助主类实现相关控制,但是我们在JDK的源码中可以看先这些lock锁和并发工具类应在了很多的地方,比如队列、线程池及并发类相关的一些地方。
上图把各类的方法展示出来了,我们可以看到继承了AQS类的那些Sync内部类都只用覆盖实现一小部分方法即可完成特定的功能。因为在AQS类中已经实现了大部分底层通用的逻辑,对于其子类来说只用实现部分对外暴露的方法即可,同样我们也可以继承AQS实现自定义的锁或者工具类。
类及方法介绍
AbstractOwnableSynchronizer
- public abstract class AbstractOwnableSynchronizer
- implements java.io.Serializable {
- private transient Thread exclusiveOwnerThread;
- protected final void setExclusiveOwnerThread(Thread thread) {
- exclusiveOwnerThread = thread;
- }
- protected final Thread getExclusiveOwnerThread() {
- return exclusiveOwnerThread;
- }
- }
复制代码 AbstractOwnableSynchronizer类里包含一个Thread的属性并提供了get、set方法,这个Thread对象就是当前持有锁的线程。线程能否支持重入功能就是判断当前线程和持有锁的线程是不是同一个对象,只是同步状态state值增加而已,等线程主动释放锁后该同步状态state值数量值减少。
该类使用了abstract修饰,但是类中并没有抽象方法,目的就是这个类不对外直接使用,而get、set方法使用了protected final修饰,说明方法可被子类使用但不能被子类重写。
另外,exclusiveOwnerThread是用了transient修饰,说明这个属性不参与序列化,因为Thread没有实现Serializable接口,不能进行序列化处理,另外进程是系统资源分配的最小单位,线程是进程执行的最小单位,线程是由操作系统分配和调度的,所以不能将线程进行序列化。
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer类也是一个抽象类,继承自AbstractOwnableSynchronizer,也就拥有了设置持有锁线程的能力,同样该类使用了abstract修饰,目的就是这个类不对外直接使用,需要具体子类去继承后使用。虽然他实现了序列化接口,但是其内部类Node并未实现序列化接口,所以在AbstractQueuedSynchronizer类的属性head、tail都是Node类型并且加了transient关键字不参与序列化,从以上我们大概就能猜到如果将AQS序列化它只保存一些基本属性的值,并不包含线程以及队列,基本在使用过程中也不会对其进行序列化,具体的属性和队列后续会详细介绍,下面列举一些AQS类里重要的方法和属性。- public abstract class AbstractQueuedSynchronizer
- extends AbstractOwnableSynchronizer
- implements java.io.Serializable {
- /**
- * 独占模式,尝试获取同步状态,立即返回获取成功或失败,需要子类实现
- */
- protected boolean tryAcquire(int arg) {
- throw new UnsupportedOperationException();
- }
- /**
- * 独占模式,尝试释放同步状态,立即返回获取成功或失败,需要子类实现
- */
- protected boolean tryRelease(int arg) {
- throw new UnsupportedOperationException();
- }
- /**
- * 共享模式,尝试获取共享锁,需要子类实现,
- * 立即返回获取的数量值
- * 0:获取锁成功,没有剩余资源
- * > 0:获取锁成功,并且有剩余资源
- * < 0:获取失败
- */
- protected int tryAcquireShared(int arg) {
- throw new UnsupportedOperationException();
- }
- /**
- * 共享模式,尝试释放共享锁,需要子类实现,释放成功返回true
- */
- protected boolean tryReleaseShared(int arg) {
- throw new UnsupportedOperationException();
- }
- /**
- * 当前线程是否独占资源,需要子类实现,true:是,false:否
- */
- protected boolean isHeldExclusively() {
- throw new UnsupportedOperationException();
- }
- /**
- * 入队
- */
- private Node enq(final Node node) {...}
- /**
- * 将当前线程封装成Node逻辑里也有调入队enq方法的逻辑
- */
- private Node addWaiter(Node mode){...}
- /**
- * 【重要】对外提供的获取锁的方法,子类调用此方法执行获取锁的动作,
- * 内部调用包含了获取锁、排队、阻塞、中断等操作
- */
- public final void acquire(int arg) {...}
- /**
- * 【重要】对外提供的释放锁方法,子类调用此方法执行释放锁的动作,
- * 内部包含更新state、唤醒等待队列的第一个等待节点
- */
- public final boolean release(int arg) {...}
- /**
- * 【重要】双向队列头结点
- */
- private transient volatile Node head;
- /**
- * 【重要】双向队列尾结点
- */
- private transient volatile Node tail;
- /**
- * 【重要】同步状态,控制线程是否可获取资源,是用一个整型的变量表示,
- * 加了volatile,保证了该变量在多线程间的可见性
- */
- private volatile int state;
- /**
- * 静态内部类,将等待锁的线程封装成Node进行排队
- */
- static final class Node {
- ...
- }
- // 其他方法、属性、内部类未列出
- ...
- }
复制代码 该类中没有抽象方法,但是上面提到的几个方法都是抛了UnsupportedOperationException异常,说明需要具体子类实现时去复写,这也正是独占模式和共享模式要对应实现的方法。
head、tail两个Node类型的属性分别表示了双向链表的队头和队尾,如果线程不能获取到锁则进入队列排队并且等待唤醒或者超时中断,后续细讲。
整型的state属性比较核心,表示同步状态,就是用它来控制线程是否需要阻塞。上面的代码没有列出其他方法,部分方法源码后文会详细分析。
Node类
AQS类中有一个非常重要的内部类Node,我们称作它为节点,这个内部类是AQS框架线程排队的基石,非常核心,按照注释上所说Node类是CLH队列的一种变种(CLH队列是一种单向队列,这里不做介绍,感兴趣可自行搜索),Node类是一种双向队列,内部有Node prev,Node next属性,分别表示前驱节点和后继节点,还有一个Thread属性,表示封装的当前线程,所以AQS的队列其实就是以Node节点形成的一个双向链表,结构如下:
我们看下Node类的属性和方法类图。
- 节点模式:
Node SHARED = new Node()来表示共享模式,Node EXCLUSIVE = null表示独占模式。
- 节点等待状态waitStatus:
这个属性字段比较重要,因为它是AQS控制线程执行的关键字段,这个值的改变是采用CAS操作的。他的取值只有以下几种。
(1)1:CANCELLED,取消状态,可能情况有节点等待超时被取消或者被中断,那么代表这个Node节点中包含的线程未获取到锁,由具体业务判断是否需要执行后续逻辑。
(2)0:初始化值,创建节点的时候默认会初始化,0也就是他的默认值。
(3)-1:SIGNAL,表明该节点以后的线程需要等待唤醒,后续节点的线程可以阻塞。
(4)-2:CONDITION,表明该节点的线程需要等待,由ConditionObject实现条件队列会用到。
(5)-3:PROPAGATE,一般在共享模式下会有该状态,表明头节点获取到了共享资源,可向后传播,等待队列里的其他节点也都可以获取共享资源。
- Thread thread属性对象
AQS框架将当前正在获取同步状态的线程包装成Node节点的一个属性,根据Node节点的waitStatus状态来控制当前线程是被唤醒继续尝试获取锁还是线程取消。
队列
AQS内部的两个变量head代表队列的头结点,tail代表队列的尾节点,是一个双向队列,如Node类所介绍,head和tail指向如下图所示。
注意:head节点比较特殊,队列里需要唤醒的线程是从head节点的next节点开始, 在队列初始化时放的是一个new Node()对象,属性thread并没有赋值,后续排队的线程被唤醒时会把他自己设置成head并且将thread属性设置成null。所以head节点可以这么理解,head节点初始化时是一个虚拟节点,没有用处,只是充当一个队头标识,当队列中有线程排队时,说明head节点已经是获取到锁的线程的节点了,等这个线程执行完需要唤醒head.next之后的线程继续执行,这就是排队和唤醒的逻辑。
同步状态
在AQS类中,有一个state属性,描述如下- /**
- * The synchronization state.
- */
- private volatile int state;
复制代码 state是整型变量,叫同步状态,也可叫加锁的次数,使用了volatile修饰,保证了线程间的可见性,所有的线程是否可获取到锁资源都是基于对这个字段值的操作来确定。对于独占锁来说,初始情况下state=0,表示当前资源空闲,可被线程获取到锁,如果state>0,表示已经有线程占用资源,后续的线程(非持有锁的线程)需要进入队列,不会存在 0) { /* 当前线程的前一个节点的waitStatus状态是Node.CANCELLED时,说明前驱节点已经取消获取锁了 需要从当前节点一直向前查找知道节点没有被取消, 然后把找到的第一个没有被取消的节点的next指向当前节点,这样就把当前节点前取消状态的都删掉 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* 前一个节点的waitStatus状态还是0,或者是共享锁的传播状态PROPAGATE时, 则会把前一个节点的waitStatus状态改成Node.SIGNAL 所以是后一个节点排队时把前一个节点waitStatus改成Node.SIGNAL, 表示前一个节点执行完释放锁了要走唤醒后续节点的逻辑, 依次类推,队列里只有最后一个Node节点的waitStatus是0,因为它没有后续节点, 也不需要执行唤醒操作,其余在没有被中断状态下应该都是Node.SIGNAL */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}private final boolean parkAndCheckInterrupt() { /* 阻塞当前线程调的就是LockSupport.park,原理之前文章有讲过,这就是线程阻塞等待的核心实现了 线程被LockSupport.park了不会响应中断, 如果线程被中断了需要用Thread.interrupted()获取当前线程的中断标识 */ LockSupport.park(this); return Thread.interrupted();}[/code]独占锁释放锁流程
以ReentrantLock释放锁为例,释放锁不区分公平锁还是非公平锁,释放的逻辑是一样的,整体流程如下。
release(int arg)这是AQS里定义的模板方法,主要释放锁代码如下,这也是调用释放锁的入口,逻辑看代码注释:- final void lock() {
- acquire(1);
- }
复制代码 tryRelease(int releases)是尝试释放锁的逻辑,AQS定义的方法,默认是抛异常,子类根据具体场景实现逻辑。以下是ReentrantLock的内部类Sync的具体实现,返回true表示现在锁空闲了,返回false表示锁现在还被占用。- final void lock() {
- if (compareAndSetState(0, 1))
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1);
- }
复制代码 unparkSuccessor(Node node) 这个方法就是关键的唤醒后续等待队列里的线程关键方法。通过调用LockSupport.unpark方法将阻塞的线程唤醒继续执行。- public final void acquire(int arg) {
- /*
- (1)tryAcquire方法由子类实现尝试获取锁的逻辑,
- 返回true就不走后面的判断,表示获取到了锁,返回false表示未获取到锁,走后续入队等待流程
- (2)addWaiter方法是将当前线程封装成Node对象返回,里面也有关于入队的操作
- (3)acquireQueued方法主要是先再尝试获取一次锁,
- 获取到了就返回是否被中断标识,获取不到则需要确认线程是否需要阻塞以及阻塞操作,
- 最终返回释放被中断标识
- (4)selfInterrupt是将当前线程中断,因为LockSupport.park阻塞线程时是不会响应中断的,
- 但是通过Thread.interrupted()这个方法可以获取到当前线程是否被中断标识
- */
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
复制代码 有两种可能的输出:
这种情况输出如下,t1先加锁成功,t2等待,实现了多线程间的加锁互斥,另外t1加锁成功后有再次加锁,发现还是等待,这说明锁不可重入,功能实现,这两个线程都将一直等下去。- protected final boolean tryAcquire(int acquires) {
- // 获取当前线程
- final Thread current = Thread.currentThread();
- // 获取AQS的同步状态值state
- int c = getState();
- // state是0则表示没有线程持有锁,可以尝试去获取锁
- if (c == 0) {
- /*
- (1)hasQueuedPredecessors方法判断队列里当前线程的Node之前是否还有其他Node,
- 返回true说明有其他线程也在等待,尝试获取锁失败,返回false说明前面没有线程等待,
- 可以继续执行逻辑,这里先判断了state=0没有直接cas操作而是再判断队列里是否有等待的线程,
- 充分体现了公平性
- (2)如果compareAndSetState(0, acquires)也设置成功,则说明加锁成功,
- 将exclusiveOwnerThread设置成当前线程,返回true表示获取锁成功
- */
- if (!hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- /*
- 这个else if逻辑主要就是可重入的判断和处理,
- 如果持有锁的线程是当前线程则state= state + acquires
- */
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
复制代码 这种情况输出如下,t2先加锁成功,正常执行业务逻辑后释放锁,t2释放锁后线程可正常结束。t2释放了锁,则t1加锁成功,当t1想第二次再加锁时,发现需要等待,锁不可重入。- private Node addWaiter(Node mode) {
- // 将当前节点封装成Node对象
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- /*
- (1)队列不为空的情况下,先尝试将node插入到队尾,
- compareAndSetTail返回成功则说明node变成队列成功,直接返回,否则需要走入队流程
- (2)主要是将当前node的prev指向原tail,原tail节点的next指向当前node上,
- 这样就完成了node的入队
- */
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- // 尝试直接插入队尾失败了就走入队逻辑
- enq(node);
- // 返回当前线程封装成的Node对象
- return node;
- }
- private Node enq(final Node node) {
- // 入队使用的for无限循环,是一个自旋的过程,直到成功
- for (;;) {
- Node t = tail;
- /*
- 如果队尾tail为空,则说明队列还未初始化,先初始化head节点,然后tail也指向head,
- 完成初始化队列,虽然只有一个节点,但head和tail都有了指向
- */
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- /*
- 如果队尾tail不为空,则采用cas方式将当前node插入队尾,
- 成功则返回,否则一直自旋尝试直到成功
- */
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
复制代码 通过这两个例子,我们可以看出,这种独占锁、不可重入的情况下,lock()和unlock()方法必须配对使用,不能连续加锁和释放锁。
JUC包下AQS子类锁的实现
java.util.concurrent包下有几个基于AQS实现的锁,如下所示,有了以上知识基础,再理解这些锁是很容易的,了解详细可参考具体源码实现。
锁类型描述ReentrantLock独享锁可重入锁ReentrantReadWriteLock独享锁、共享锁兼备ReadLock是共享锁,WriteLock是独享锁CountDownLatch共享锁不可重复使用Semaphore共享锁可重复使用CyclicBarrier共享锁使用ReentrantLock实现的共享锁,可重复使用总结
主要讲解了AQS的独占模式,提到了一些共享模式相关的知识,有了独享模式的基础,理解共享模式并不难,还有关于Condition相关的知识没有讲,所以关于共享模式和Condition相关的大家可以自行去阅读源码,后续有机会也会出相关的文章。
还有另外一个类AbstractQueuedLongSynchronizer,这个类是AbstractQueuedSynchronizer的一个变种,只是把state的类型从int变成long了,所有涉及跟这个state相关的操作参数和返回都改成long类型了,理论上使用这个类实现的锁可以超过Integer.MAX_VALUE的限制,最大的可获取锁的次数就变成Long.MAX_VALUE,这个在如多级锁和需要64位状态时会非常有用,目前在JDK里并没有发现使用的地方,而在HikariCP连接池com.zaxxer.hikari.util.QueuedSequenceSynchronizer这个类内部使用到了这个类,感兴趣的可自行阅读。
AQS的设计确实相当巧妙、逻辑非常严谨,在多线程下使用,已尽可能最大限度支持高并发操作,通过对源码的学习,我们了解了锁的设计,大部分的工作都由AQS完成(包括线程的包装排队、阻塞、唤醒、超时处理、中断处理等),剩下的小部分代码由开发者根据业务场景具体实现(尝试获取锁,释放锁),不得不佩服如此精美巧妙的设计和实现,Doug Lea,我永远的神!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |