概述
AQS ( Abstract Queued Synchronizer )是一个抽象的队列同步器,通过维护一个共享资源状态( Volatile Int State )来体现同步状态 和一个先辈先出( FIFO )的线程等待队列来完成资源获取的列队工作,通过CAS完成对State值的修改。
AQS团体框架如下:
当有自界说同步器接入时,只需重写第一层所须要的部分方法即可,不须要关注底层详细的实现流程。当自界说同步器举行加锁大概解锁操纵时,先颠末第一层的API进入AQS内部方法,然后颠末第二层举行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处置惩罚,而这些处置惩罚方式均依靠于第五层的底子数据提供层
原理
AQS 为每个共享资源都设置一个共享资源锁,线程在须要访问共享资源时起首须要获取共享资源锁,假如获取到了共享资源锁,便可以在当火线程中利用该共享资源,假如获取不到,则将该线程放入线程等待队列,等待下一次资源调理,流程图如下所示:
Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。
底层结构
state:状态
Abstract Queued Synchronizer 维护了 volatile int 范例的变量,用于体现当前的同步状态。volatile固然不能包管操纵的原子性,但是能包管当前变量state的可见性。
state的访问方式有三种: getState()、setState()和 compareAndSetState(),均是原子操纵,此中,compareAndSetState的实现依靠于 Unsafe的compareAndSwaplnt()- // java.util.concurrent.locks.AbstractQueuedSynchronizer
- private volatile int state;
- protected final int getState() {
- return state;
- }
- protected final void setState(int newState) {
- state = newState;
- }
- protected final boolean compareAndSetState(int expect, int update) {
- // See below for intrinsics setup to support this
- return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
- }
复制代码
CLH队列
Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的假造双向队列(FIFO),AQS是通过将每条哀求共享资源的线程封装成一个节点来实现锁的分配。
AQS利用一个Volatile的int范例的成员变量来体现同步状态,通过内置的FIFO队列来完成资源获取的列队工作,通过CAS完成对State值的修改。
AQS的独占式和共享式
- 独占式:只有一个线程能实行,详细的 Java 实现有 ReentrantLock。
- 共享式:多个线程可同时实行,详细的 Java 实现有 Semaphore和CountDownLatch。
AQS只是一个框架 ,只界说了一个接口,详细资源的获取、开释都由自界说同步器去实现。差别的自界说同步器争用共享资源的方式也差别,自界说同步器在实现时只需实现共享资源state的获取与开释方式即可,至于详细线程等待队列的维护,如获取资源失败入队、叫醒出队等, AQS 已经在顶层实现好(就是模板方法模式),不须要详细的同步器再做处置惩罚。自界说同步器实现时告急实现以下几种方法:
- 以ReentrantLock为例,ReentrantLock中的state初始值为0体现无锁状态。在线程实行 tryAcquire()获取该锁后ReentrantLock中的state+1,这时该线程独占ReentrantLock锁,其他线程在通过tryAcquire() 获取锁时均会失败,直到该线程开释锁后state再次为0,其他线程才有机遇获取该锁。该线程在开释锁之前可以重复获取此锁,每获取一次便会实行一次state+1, 因此ReentrantLock也属于可重入锁。 但获取多少次锁就要开释多少次锁,如许才气包管state终极为0。假如获取锁的次数多于开释锁的次数,则会出现该线程不绝持有该锁的情况;假如获取锁的次数少于开释锁的次数,则运行中的步调会报锁非常。
- 以CountDownLatch以例,任务分为N个子线程去实行,state也初始化为N(注意N要与线程个数同等)。这N个子线程是并行实行的,每个子线程实行完后countDown()一次,state会CAS减1。比及全部子线程都实行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继承背面的动作。
- 以Semaphore为例,state则代表可以同时访问的线程数量,也大概明白为访问的答应证(permit)数量。每个线程访问(acquire)时须要拿到对应的答应证,否则举行壅闭,访问竣事则返还(release)答应证。state只能在Semaphore的构造方法中举行初始化,后续不能举行修改。
一样平常来说,自界说同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自界说同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
Node节点
Node即为上面CLH变体队列中的节点。
Node结点是每一个等待获取资源的线程的封装,其包罗了须要同步的线程本身及其等待状态waitStatus
Node中几个方法和属性值的寄义:
- waitStatus:当前节点在队列中的状态
- thread:体现处于该节点的线程
- prev:前驱指针
- predecessor:返回前驱节点,没有的话抛出npe
- nextWaiter:指向下一个处于CONDITION状态的节点(由于本篇文章不陈诉Condition Queue队列,这个指针不多先容)
- next:后继指针
等待状态waitStatus
waitStatus有下面几个摆列值:如是否被壅闭、是否等待叫醒、是否已经被取消等。共有5种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。
- CANCELLED(1):体现当前结点已取消调理,不再想去获取资源了。当timeout或被制止(相应制止的情况下),会触发变更为此状态,进入该状态后的结点将不会再厘革。
- SIGNAL(-1):体现后继结点在等待当前结点叫醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
- CONDITION(-2):体现结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同队伍列中,等待获取同步锁。
- PROPAGATE(-3):共享模式下,前继结点不但会叫醒厥后继结点,同时也大概会叫醒后继的后继结点。
- 0:新结点入队时的默认状态。
注意,负值体现结点处于有效等待状态,而正值体现结点已被取消。以是源码中许多地方用>0、0是取消状态 if (ws > 0) { do { // 循环向前查找取消节点,把取消节点从队列中剔除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 设置前任节点等待状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}[/code]parkAndCheckInterrupt告急用于挂起当火线程,壅闭调用栈,返回当火线程的制止状态。- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
复制代码 详细挂起流程用流程图体现如下(shouldParkAfterFailedAcquire流程):
整个流程中,假如前驱结点的状态不是SIGNAL,那么本身就不能安心去苏息,须要去找个安心的苏息点,同时可以再实行下看有没有机遇轮到本身拿号。
park()会让当火线程进入waiting状态。在此状态下,有两种途径可以叫醒该线程:1)被unpark();2)被interrupt()。须要注意的是,Thread.interrupted()会打扫当火线程的制止标志位。
那么shouldParkAfterFailedAcquire中取消节点是怎么天生的呢?什么时间会把一个节点的waitStatus设置为-1?
是在什么时间开释节点关照到被挂起的线程呢?
CANCELLED状态节点天生
回看acquireQueued方法中的Finally代码:- protected boolean tryAcquire(int arg) {
- throw new UnsupportedOperationException();
- }
复制代码 显然,当failed为true时才会实行方法cancelAcquire,那什么情况下failed为true呢?try代码段实行过程中出现非常。
这里不知道那边会出现非常?假设tryAcquire出现的非常,那么acquire方法就已经不会今后实行,也就不会实行到acquireQueued
通过cancelAcquire方法,将Node的状态标志为CANCELLED。- protected final boolean tryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (!hasQueuedPredecessors() && //公平锁加锁时判断等待队列中是否存在有效节点的方法
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
复制代码 releaseShared()
此方法是共享模式下线程开释共享资源的顶层入口。它会开释指定量的资源,假如乐成开释且答应叫醒等待线程,它会叫醒等待队列里的其他线程来获取资源。- protected final boolean tryAcquire(int acquires) {
- return nonfairTryAcquire(acquires);
- }
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
复制代码 此方法的流程也比力简朴,一句话:开释掉资源后,叫醒后继。跟独占模式下的release()相似,但有一点轻微须要注意:独占模式下的tryRelease()在完全开释掉资源(state=0)后,才会返回true去叫醒其他线程,这告急是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制肯定量的线程并发实行,那么拥有资源的线程在开释掉部分资源时就可以叫醒后继等待结点。比方,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就须要等待。A在运行过程中开释掉2个资源量,然后tryReleaseShared(2)返回true叫醒C,C一看只有3个仍不敷继承等待;随后B又开释2个,tryReleaseShared(2)返回true叫醒C,C一看有5个够本身用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全开释掉资源(state=0)才返回true,以是自界说同步器可以根据须要决定tryReleaseShared()的返回值
doReleaseShared()
此方法告急用于叫醒后继- public final boolean hasQueuedPredecessors() {
- // The correctness of this depends on head being initialized
- // before tail and on head.next being accurate if the current
- // thread is first in queue.
- Node t = tail; // Read fields in reverse initialization order
- Node h = head;
- Node s;
- return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
- }
复制代码 应用
Mutex是一个不可重入的互斥锁实现。锁资源(AQS里的state)只有两种状态:0体现未锁定,1体现锁定。焦点源码:- private Node addWaiter(Node mode) {
- //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
- Node node = new Node(Thread.currentThread(), mode);
- //尝试快速方式直接放到队尾。
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- //上一步失败则通过enq入队。
- enq(node);
- return node;
- }
复制代码 除了Mutex,ReentrantLock/CountDownLatch/Semphore这些同步类的实现方式都差不多,差别的地方就在获取-开释资源的方式tryAcquire-tryRelelase。
ReentrantLock 的利用
ReentrantLock 的利用方式与 synchronized 关键字雷同,都是通过加锁和开释锁来实现同步的。我们来看看 ReentrantLock 的利用方式,以非公平锁为例:- // java.util.concurrent.locks.AbstractQueuedSynchronizer
- static {
- try {
- stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
- headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
- tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
- waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
- nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
- } catch (Exception ex) {
- throw new Error(ex);
- }
- }
复制代码 代码很简朴,两个线程分别对 count 变量举行 10000 次累加操纵,末了输出 count 的值。我们来看看运行结果:- private Node enq(final Node node) {
- //CAS"自旋",直到成功加入队尾
- for (;;) {
- Node t = tail;
- if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {//正常流程,放入队尾
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
复制代码 可以看到,两个线程对 count 变量举行了 private Node enq(final Node node) {
//CAS"自旋",直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
} 次累加操纵,分析 ReentrantLock 是支持重入性的。再来看看公平锁的利用方式,只须要将 ReentrantLock 的构造方法改为公平锁即可:- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;//标记是否成功拿到资源
- try {
- boolean interrupted = false;//标记等待过程中是否被中断过
- //CAS“自旋”!
- for (;;) {
- final Node p = node.predecessor();//拿到前驱
- //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源,也就是当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)。
- if (p == head && tryAcquire(arg)) {
- setHead(node);// 获取锁成功,头指针移动到当前node
- p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
- failed = false; // 成功获取资源
- return interrupted;//返回等待过程中是否被中断过
- }
- // 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者 是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
- }
- } finally {
- if (failed) //说明发生了意料之外的异常,将节点移除,避免影响到其他节点
- cancelAcquire(node);
- }
- }
复制代码 运行结果为:- private Node enq(final Node node) {
- //CAS"自旋",直到成功加入队尾
- for (;;) {
- Node t = tail;
- if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {//正常流程,放入队尾
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
复制代码 可以看到,公平锁的运行结果与非公平锁的运行结果同等,这是由于公平锁的实现方式与非公平锁的实现方式根本同等,只是在获取锁时增长了判定当前节点是否有前驱节点的逻辑判定。
- 公平锁: 按照线程哀求锁的次序获取锁,即先到先得。
- 非公平锁: 线程获取锁的次序大概与哀求锁的次序差别,大概导致某些线程获取锁的速率较快。
须要注意的是,利用 ReentrantLock 时,锁必须在 try 代码块开始之前获取,而且加锁之前不能有非常抛出,否则在 finally 块中就无法开释锁(ReentrantLock 的锁必须在 finally 中手动开释)。
错误示例:- // java.util.concurrent.locks.AbstractQueuedSynchronizer
- // 靠前驱节点判断当前线程是否应该被阻塞
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- // 获取头结点的节点状态
- int ws = pred.waitStatus;
- // 说明头结点处于唤醒状态
- if (ws == Node.SIGNAL)
- return true;
- // 通过枚举值我们知道waitStatus>0是取消状态
- if (ws > 0) {
- do {
- // 循环向前查找取消节点,把取消节点从队列中剔除
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- // 设置前任节点等待状态为SIGNAL
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
复制代码 准确示例:- // java.util.concurrent.locks.AbstractQueuedSynchronizer
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);//调用park()使线程进入waiting状态
- return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金. |