原文链接
JavaGuide
并发编程的原理
目的:
- Lock 的使用
- AQS 原理分析
- Condition
- CountDownLatch 、 Semaphore
- 线程池分析
J.U.C = java.util.concurrent
Lock 的使用
- volatile 去解决可见性问题,防止指令重排序
- synchronized 是保证 可见性,有序性,原子性的一种手段
这是 JVM 层面提供的关键字。
JDK 层次有一个 java.util.concurrent 的工具包,属于 并发 在 JDK 层次的一种手段。这个手段也是对我们多线程在操作系统情况下一些控制的保证线程安全的方式。
同步锁
我们知道,锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源,在 Lock 接口出现之前,JAVA 应用程序只能依靠 synchronized 关键字来实现同步锁的功能,在 JAVA5 以后,增长了 JUC 的并发包且提供了 Lock 接口用来实现锁的功能,它提供了与 synchroinzed 关键字类似的同步功能,只是它比 synchronized 更灵活,能够显示的获取和释放锁。
Lock的初步使用
Lock 是一个接口,核心的两个方法 Lock 和 unlock ,它有很多的实现,比如 ReentrantLock 、 ReentrantReadWriteLock;- public interface Lock {
- void lock();
- boolean tryLock();
- boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
- void unlock();
- Condition newCondition();
- }
复制代码
fair 公平
ReentrantLock
重入锁,表示支持重新进入的锁,也就是说,假如当火线程 t1 通过调用 #lock 方法获取了锁之后,再次调用lock,是不会再壅闭去获取锁的,直接增长重试次数就行了。- public class AtomicDemo {
- private static int count=0;
- static Lock lock=new ReentrantLock();
- public static void inc(){
- lock.lock();
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- count++;
- lock.unlock();
- }
-
- public static void main(String[] args) throws InterruptedException {
- for(int i=0;i<1000;i++){
- new Thread(()->{AtomicDemo.inc();}).start();;
- }
- Thread.sleep(3000);
- }
- }
复制代码 ReentrantReadWriteLock
我们从前理解的锁,基本都是排他锁,也就是这些锁在同一时刻只允许一个线程进行访问,而读写所在同一时刻可以允许多个线程访问,但是在写线程访问时,所有的读线程和其他写线程都会被壅闭。读写锁维护了一对锁,一个读锁、一个写锁;一般情况下,读写锁的性能都会比排它锁好,因为大多数场景 读是多于写的 。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。- public class RWLockDemo {
- // 排他锁
- // 共享锁,在同一时刻可以有多个线程获得锁
- // 读锁, 写锁
- static Map<String, Object> cacheMap = new HashMap<>();
- // 重入读写锁
- static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- static Lock read = readWriteLock.readLock(); // 读锁
- static Lock write = readWriteLock.writeLock(); // 写锁
- // 缓存的更新和读取的时候
- public static final Object get(String key) {
- read.lock(); // 读取的时候加上读锁
- out.println("开始读取数据");
- try {
- return cacheMap.get(key);
- } finally {
- read.unlock();
- }
- }
- public static final Object set(String key, Object value) {
- write.lock(); // 每一次写数据,都需要先加上写锁
- out.println("开始写数据");
- try {
- return cacheMap.put(key, value);
- } finally {
- write.unlock();
- }
- }
- }
复制代码 在这个案例中,通过hashmap来模拟了一个内存缓存,然后使用读写锁来保证这个内存缓存的线程安全性。当执行读操作的时候,需要获取读锁,在并发访问的时候,读锁不会被壅闭,因为读操作不会影响执行结果。
在执行写操作时,线程必须要获取写锁,当已经有线程持有写锁的情况下,当火线程会被壅闭,只有当写锁释放以后,其他读写操作才华继续执行。使用读写锁提升读操作的并发性。以保证每次写操作对所有的读写操作的可见性。
- 读锁与读锁可以共享
- 读锁与写锁不可以共享(排他)
- 写锁与写锁不可以共享(排他)
Lock 和 synchronized 的简单对比
通过我们对 Lock 的使用以及对 synchronized 的了解,基本上可以对比出这两种锁的区别了。因为这个也是在口试过程中比较常见的问题。
- 从层次上,一个是关键字、一个是类, 这是最直观的差异
- 从使用上, Lock 具备更大的灵活性,可以控制锁的释放和获取; 而 synchronized 的锁的释放是被动的,当出现异常大概同步代码块执行完以后,才会释放锁
- Lock 可以判断锁的状态、而 synchronized 无法做到
Lock 可以实现公平锁、非公平锁; 而 synchronized 只有非公平锁
AQS
Lock 之所以能实现线程安全的锁,主要的核心是 AQS ( AbstractQueuedSynchronizer ) , AbstractQueuedSynchronizer 提供了一个 FIFO 队列,可以看作是一个用来实现锁以及其他需要同步功能的框架。这里简称该类为 AQS 。 AQS 的使用依靠继承来完成,子类通过继承 AQS 并实现所需的方法来管理同步状态。例如常见的 ReentrantLock ,CountDownLatch 等 AQS 的两种功能。
从使用上来说, AQS 的功能可以分为两种:独占和共享。
- 独占锁模式下,每次只能有一个线程持有锁,比如前面给大家演示的 ReentrantLock 就是以独占方式实现的互斥锁
- 共享锁模式下,允许多个线程同时获取锁,并发访问共享资源,比如 ReentrantReadWriteLock 。
很显然,独占锁是一种悲观守旧的加锁策略,它限制了 读/读 辩说,假如某个只读线程获取锁,则其他读线程都只能等待,这种情况下就限制了不须要的并发性,因为读操作并不会影响数据的一致性。共享锁则是一种乐观锁,它放宽了加锁策略,允许多个执行读操作的线程同时访问共享资源。- public class LockDemo {
- static Lock lock = new ReentrantLock();// 有公平重入锁和非公平重入锁
- private static int count = 0;
- public static void incr(){
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- lock.lock(); // 获得锁
- count ++;
- lock.unlock();
- }
- }
复制代码 通过一个重入锁的方式实现一个锁的等待。JVM 层面是如何让一个锁等待的。
lock.lock 用到了 CXQ 以及我们的 EntryList ,通过队列的方式,让线程等待,等待之前,它用到了 CAS ,通过自旋的方式去实验获得锁。假如在指定时间内获得锁失败的话,它会去 park,最后当我们这个锁被释放的时候,他会从 EntryList 里边取出一个线程。再次去争夺锁。取出线程,叫醒一个线程 叫做 unpark 。
synchronized 来到了......
AQS的内部实现
同步器依靠内部的同步队列(一个 FIFO双向队列 )来完成同步状态的管理,当火线程获取同步状态失败时,同步器会将当火线程以及等待状态等信息构造成为一个节点( Node )并将其加入同步队列,同时会壅闭当火线程,当同步状态释放时,会把首节点中的线程叫醒,使其再次实验获取同步状态。
Node 的主要属性如下- static final class Node {
- int waitStatus; //表示节点的状态,包含cancelled(取消);condition 表示节点在等待condition也就是在condition队列中
- Node prev; //前继节点
- Node next; //后继节点
- Node nextWaiter; //存储在condition队列中的后继节点
- Thread thread; //当前线程
- }
复制代码 AQS 类底层的数据布局是使用双向链表,是队列的一种实现。包括一个 head 节点和一个 tail 节点,分别表示头结点和尾节点,此中头结点不存储 Thread ,仅保存 next 结点的引用。
当一个线程成功地获取了同步状态(大概锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于 CAS 的设置尾节点的方法: compareAndSetTail ( Node expect , Nodeupdate ) ,它需要通报当火线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
同步队列遵循 FIFO ,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会叫醒后继节点,而后继节点将会在获取同步状态成功时将本身设置为首节点。
设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的 next 引用即可
compareAndSet
java.util.concurrent.locks.AbstractQueuedSynchronizer
AQS 中,除了本身的链表布局以外,还有一个很关键的功能,就是 CAS ,这个是保证在多线程并发的情况下保证线程安全的前提下去把线程加入到 AQS 中的方法,可以简单理解为乐观锁- private final boolean compareAndSetHead(Node update) {
- return unsafe.compareAndSwapObject(this, headOffset, null, update);
- }
复制代码 这个方法里面,首先,用到了 unsafe 类,(Unsafe类是在 sun.misc 包下,不属于 JAVA 标准。但是很多 Java 的基础类库,包括一些被广泛使用的高性能开发库都是基于 Unsafe 类开发的,比如 Netty 、 Hadoop 、 Kafka 等; Unsafe 可认为是 JAVA 中留下的后门,提供了一些低层次操作,如直接内存访问、线程调度等) 。然后调用了 #compareAndSwapObject 这个方法。- public final native boolean compareAndSwapObject(Object var1, long var2, Object var4,
- Object var5);
复制代码 这个是一个 native 方法,
第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的 headOffset 的值),第三个参数为期待的值,第四个为更新后的值
整个方法的作用是假如当前时刻的值等于预期值 var4 相等,则更新为新的期望值 var5,假如更新成功,则返回 true ,否则返回 false ;
这里传入了一个 headOffset ,这个 headOffset 是什么呢?在下面的代码中,通过 unsafe.objectFieldOffset
然后通过反射获取了 AQS 类中的成员变量,并且这个成员变量被 volatile 修饰的
unsafe.objectFieldOffset
headOffset 这个是指类中相应字段在该类的偏移量,在这里详细即是指 head 这个字段在 AQS 类的内存中相对于该类首地址的偏移量。
一个 JAVA 对象可以看成是一段内存,每个字段都得按照一定的序次放在这段内存里,通过这个方法可以准确地告诉你某个字段相对于对象的起始内存地址的字节偏移。用于在后面的 compareAndSwapObject 中,去根据偏移量找到对象在内存中的详细位置。
这个方法在 unsafe.cpp 文件中,代码如下:- UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapObject(JNIEnv *env, jobject unsafe, jobject
- obj, jlong offset, jobject e_h, jobject x_h))
- UnsafeWrapper("Unsafe_CompareAndSwapObject");
- oop x = JNIHandles::resolve(x_h); // 新值
- oop e = JNIHandles::resolve(e_h); // 预期值
- oop p = JNIHandles::resolve(obj);
- HeapWord* addr = (HeapWord *)index_oop_from_field_offset_long(p, offset);// 在内存中的具体位置
- oop res = oopDesc::atomic_compare_exchange_oop(x, addr, e, true);// 调用了另一个方法,实际上就是通过cas操作来替换内存中的值是否成功
- jboolean success = (res == e); // 如果返回的res等于e,则判定满足compare条件(说明res应该为内存中的当前值),但实际上会有ABA的问题
- if (success) // success为true时,说明此时已经交换成功(调用的是最底层的cmpxchg指令)
- update_barrier_set((void*)addr, x); // 每次Reference类型数据写操作时,都会产生一个WriteBarrier暂时中断操作,配合垃圾收集器
- return success;
- UNSAFE_END
复制代码 所以其实 #compareAndSet 这个方法,最终调用的是 unsafe 类的 #compareAndSwap ,这个指令会对内存中的共享数据做原子的读写操作。
- 首先, cpu会把内存中将要被更改的数据与期望值作比较
- 然后,当两个值相等时,cpu才会将内存中的对象替换为新的值。否则,不做变更操作。
- 最后,返回操作执行结果
很显然,这是一种乐观锁的实现思绪。
ReentrantLock的实现原理分析
之所以叫重入锁是因为同一个线程假如已经获得了锁,那么后续该线程调用lock方法时不需要再次获取锁,也就是不会壅闭;重入锁提供了两种实现,一种是非公平的重入锁,另一种是公平的重入锁。怎么理解公平和非公平呢?
假如在绝对时间上,先对锁进行获取的请求一定先被满意获得锁,那么这个锁就是公平锁,反之,就是不公平的。简单来说公平锁就是等待时间最长的线程最优先获取锁。
默认的情况下就是非公平锁。
非公平锁的实现流程时序图
源码分析
ReentrantLock.lock
- public void lock() {
- sync.lock();
- }
复制代码 这个是获取锁的入口,调用了 sync.lock ; sync 是一个实现了 AQS 的抽象类,这个类的主要作用是用来实现同步控制的,并且 sync 有两个实现,一个是 NonfairSync (非公平锁)、另一个是 FailSync (公平锁); 我们先来分析一下非公平锁的实现
NonfairSync.lock
- final void lock() {
- if (compareAndSetState(0, 1)) //这是跟公平锁的主要区别,一上来就试探锁是否空闲,如果可以插队,则设置获得锁的线程为当前线程
- //exclusiveOwnerThread属性是AQS从父类AbstractOwnableSynchronizer中继承的属性,用来保存当前占用同步状态的线程
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1); //尝试去获取锁
- }
复制代码 compareAndSetState,这个方法在前面提到过了,再简单讲解一下,通过 CAS 算法去改变 state 的值,而这个 state 是什么呢? 在 AQS 中存在一个变量 state ,对于ReentrantLock 来说,假如 state = 0 表示无锁状态、假如 state > 0 表示有锁状态。 ( state 在不同的锁里边表达的意思是不一样的 )
所以在这里,是表示当前的 state 假如等于 0 ,则替换为 1 ,假如替换成功表示获取锁成功了由于 ReentrantLock 是可重入锁,所以持有锁的线程可以多次加锁,经过判断加锁线程就是当前持有锁的线程时
(即 exclusiveOwnerThread==Thread.currentThread() ),即可加锁,每次加锁都会将 state 的值 +1 , state 等于几,就代表当前持有锁的线程加了几次锁;
解锁时每解一次锁就会将 state 减 1 , state 减到 0 后,锁就被释放掉,这时其他线程可以加锁;
AbstractQueuedSynchronizer.acquire
假如 CAS 操作未能成功,阐明 state 已经不为 0 ,此时继续 acquire(1) 操作, acquire 是AQS中的方法 当多个线程同时进入这个方法时,首先通过 CAS 去修改 state 的状态,假如修改成功表示竞争锁成功,竞争失败的, tryAcquire 会返回 false- public final void acquire(int arg) {
- if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
复制代码 这个方法的主要作用是
- 实验获取独占锁,获取成功则返回,否则
- 自旋获取锁,并且判断中断标识,假如中断标识为 true ,则设置线程中断
- addWaiter方法把当火线程封装成Node,并添加到队列的尾部
NonfairSync.tryAcquire
tryAcquire 方法实验获取锁,假如成功就返回,假如不成功,则把当火线程和等待状态信息构造成一个Node节点,并将结点放入同步队列的尾部。然后为同步队列中的当前节点循环等待获取锁,直到成功。- protected final boolean tryAcquire(int acquires) {
- return nonfairTryAcquire(acquires);
- }
复制代码 nofairTryAcquire
这里可以看出非公平锁的寄义,即获取锁并不会严格根据争用锁的先后序次决定。这里的实现逻辑类似 synchroized 关键字的偏向锁的做法,即可重入而不用进一步进行锁的竞争,也解释了 ReentrantLock 中 Reentrant 的意义。- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState(); //获取当前的状态,前面讲过,默认情况下是0表示无锁状态
- if (c == 0) {
- if (compareAndSetState(0, acquires)) { //通过cas来改变state状态的值,如果更新成功,表示获取锁成功,这个操作外部方法lock()就做过一次,这里再做只是为了再尝试一次,尽量以最简单的方式获取锁。
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {//如果当前线程等于获取锁的线程,表示重入,直接累加重入次数
- int nextc = c + acquires;
- if (nextc < 0) // overflow 如果这个状态值越界,抛出异常;如果没有越界,则设置后返回true
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- //如果状态不为0,且当前线程不是owner,则返回false。
- return false; //获取锁失败,返回false
- }
复制代码 addWaiter
当前锁假如已经被其他线程锁持有,那么当火线程往复请求锁的时候,会进入这个方法,这个方法主要是把当火线程封装成 node ,添加到 AQS 的链表中- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode); //创建一个独占的Node节点,mode为排他模式
- // 尝试快速入队,如果失败则降级至full enq
- Node pred = tail; // tail是AQS中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方法
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) { // 防止有其他线程修改tail,使用CAS进行修改,如果失败则降级至full enq
- pred.next = node; // 如果成功之后旧的tail的next指针再指向新的tail,成为双向链表
- return node;
- }
- }
- enq(node); // 如果队列为null或者CAS设置新的tail失败
- return node;
- }
复制代码 enq
enq就是通过自旋操作把当前节点加入到队列中- private Node enq(final Node node) {
- for (;;) { //无效的循环,为什么采用for(;;),是因为它执行的指令少,不占用寄存器
- Node t = tail;// 此时head, tail都为null
- if (t == null) { // Must initialize// 如果tail为null则说明队列首次使用,需要进行初始化
- if (compareAndSetHead(new Node()))// 设置头节点,如果失败则存在竞争,留至下一轮循环
- tail = head; // 用CAS的方式创建一个空的Node作为头结点,因为此时队列中只一个头结点,所以tail也指向head,第一次循环执行结束
- } else {
- //进行第二次循环时,tail不为null,进入else区域。将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node
- //这部分代码和addWaiter代码一样,将当前节点添加到队列
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node; //t此时指向tail,所以可以CAS成功,将tail重新指向CNode。此时t为更新前的tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向Node,返回头结点。
- return t;
- }
- }
- }
- }
复制代码 代码运行到这里, AQS 队列的布局就是这样一个表现。
acquireQueued
addWaiter 返回了插入的节点,作为 acquireQueued 方法的入参,这个方法主要用于争抢锁。- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();// 获取prev节点,若为null即刻抛出NullPointException
- if (p == head && tryAcquire(arg)) {// 如果前驱为head才有资格进行锁的抢夺
- setHead(node); // 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的head节点
- //凡是head节点,head.thread与head.prev永远为null,但是head.next不为null
- p.next = null; // help GC
- failed = false; //获取锁成功
- return interrupted;
- }
- //如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())// 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志
- interrupted = true;
- }
- } finally {
- if (failed) // 如果抛出异常则取消锁的获取,进行出队(sync queue)操作
- cancelAcquire(node);
- }
- }
复制代码 原来的 head 节点释放锁以后,会从队列中移除,原来 head 节点的 next 节点会成为 head 节点
shouldParkAfterFailedAcquire
从上面的分析可以看出,只有队列的第二个节点可以有机会争用锁,假如成功获取锁,则此节点晋升为头节点。对于第三个及以后的节点, if (p == head) 条件不建立,首先进行 shouldParkAfterFailedAcquire(p, node) 操作。
#shouldParkAfterFailedAcquire 方法是判断一个争用锁的线程是否应该被壅闭。它首先判断一个节点的前置节点的状态是否为 Node.SIGNAL ,假如是,是阐明此节点已经将状态设置-假如锁释放,则应当通知它,所以它可以安全地壅闭了,返回 true 。- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus; // 前继节点的状态
- if (ws == Node.SIGNAL)//如果是SIGNAL状态,意味着当前线程需要被unpark唤醒
- return true;
- // 如果前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这个操作实际是把队列中CANCELLED的节点剔除掉。
- if (ws > 0) {// 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点'的前继节点”。
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
- /*
- * waitStatus must be 0 or PROPAGATE. Indicate that we
- * need a signal, but don't park yet. Caller will need to
- * retry to make sure it cannot acquire before parking.
- */
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
复制代码 解读:假如有t1,t2两个线程都加入到了链表中
img
假如head节点位置的线程一直持有锁,那么t1和t2就是挂起状态,而HEAD以及Thread1的awaitStatus都是 SIGNAL ,在多次实验获取锁失败以后,就会通过下面的方法进行挂起(这个地方就是制止了惊群效应,每个节点只需要关心上一个节点的状态即可)
- SIGNAL:值为 -1 ,表示当前节点的后继节点将要大概已经被壅闭,在当前节点释放的时候需要 unpark 后继节点;
- CONDITION:值为 -2 ,表示当前节点在等待 condition ,即在 condition 队列中;
- PROPAGATE:值为 -3 ,表示 releaseShared 需要被通报给后续节点(仅在共享模式下使用);
parkAndCheckInterrupt
假如 shouldParkAfterFailedAcquire 返回了 true ,则执行:“ parkAndCheckInterrupt() ”方法,它是通过 LockSupport.park(this)将当火线程挂起到WATING状态,它需要等待一个中断、unpark方法来叫醒它,通过这样一种FIFO的机制的等待,来实现了Lock的操作- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);// LockSupport提供park()和unpark()方法实现阻塞线程和解除线程阻塞
- return Thread.interrupted();
- }
复制代码 ReentrantLock.unlock
加锁的过程分析完以后,再来分析一下释放锁的过程,调用 release 方法,这个方法里面做两件事。
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
复制代码 tryRelease
这个动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是 1 ),假如结果状态为 0 ,就将排它锁的 Owner 设置为 null ,以使得其他的线程有机会进行执行。 在排它锁中,加锁的时候状态会增长 1 (固然可以本身修改这个值),在解锁的时候减掉 1 ,同一个锁,在可以重入后,大概会被叠加为 2、3、4这些值,只有 unlock() 的次数与 lock() 的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下才会返回 true 。- protected final boolean tryRelease(int releases) {
- int c = getState() - releases; // 这里是将锁的数量减1
- if (Thread.currentThread() != getExclusiveOwnerThread())// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
- throw new IllegalMonitorStateException();
- boolean free = false;
- if (c == 0) {
- // 由于重入的关系,不是每次释放锁c都等于0,
- // 直到最后一次释放锁时,才会把当前线程释放
- free = true;
- setExclusiveOwnerThread(null);
- }
- setState(c);
- return free;
- }
复制代码 LockSupport
LockSupport 类是 Java6 引入的一个类,提供了基本的线程同步原语。LockSupport 实际上是调用了 Unsafe 类里的函数,归结到 Unsafe 里,只有两个函数:- public native void unpark(Thread jthread);
- public native void park(boolean isAbsolute, long time);
复制代码 unpark 函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。
permit相当于0/1的开关,默认是0,调用一次 unpark 就加 1 酿成了 1 .调用一次 park 会消费 permit ,又会酿成 0 。 假如再调用一次 park 会壅闭,因为 permit 已经是 0 了。直到 permit 酿成 1 .这时调用 unpark 会把permit 设置为 1 .每个线程都有一个相关的 permit , permit 最多只有一个,重复调用unpark不会累积。
在使用 LockSupport 之前,我们对线程做同步,只能使用 wait 和 notify ,但是 wait 和 notify 其实不是很灵活,并且耦合性很高,调用notify必须要确保某个线程处于 wait 状态,而 park/unpark 模子真正解耦了线程之间的同步,先后序次没有没有直接关联,同时线程之间不再需要一个Object大概其他变量来存储状态,不再需要关心对方的状态。
总结
分析了独占式同步状态获取和释放过程后,做个简单的总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或克制自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用 tryRelease(int arg) 方法释放同步状态,然后叫醒头节点的后继节点。
公平锁和非公平锁的区别
锁的公平性是相对于获取锁的序次而言的,假如是一个公平锁,那么锁的获取序次就应该符合请求的绝对时间序次,也就是 FIFO 。 在上面分析的例子来说,只要 CAS 设置同步状态成功,则表示当火线程获取了锁,而公平锁则不一样,差异点有两个。
FairSync.tryAcquire
- final void** lock() {
- acquire(1);
- }
复制代码 非公平锁在获取锁的时候,会先通过CAS进行抢占,而公平锁则不会
FairSync.tryAcquire
- protected final boolean* tryAcquire(int acquires) {
- final Thread current = Thread.currentThread*();
- int c = getState();
- if (c == 0) {
- if (!hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
复制代码 这个方法与 nonfairTryAcquire(int acquires) 比较,不同的地方在于判断条件多了 hasQueuedPredecessors() 方法,也就是加入了[同步队列中当前节点是否有前驱节点]的判断,假如该方法返回 true ,则表示有线程比当火线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才华继续获取锁。
Condition
通过前面的课程学习,我们知道任意一个Java对象,都拥有一组监视器方法(定义在 java.lang.Object上),主要包括 wait() 、 notify() 以及 notifyAll() 方法,这些方法与同步关键字配合,可以实现等待/通知模式 J.U.C 包提供了 Condition 来对锁进行精准控制, Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件( Condition ),只有满意条件时,线程才会被叫醒。
condition使用案例
ConditionWait
- @RequiredArgsConstructor
- public class ConditionWait extends Thread {
- private final Lock lock;
- private final Condition condition;
- @Override
- public void run() {
- lock.lock();
- try {
- System.out.println("【" + Thread.currentThread().getName() + "】开始执行 condition.await()");
- condition.await(); //
- System.out.println("【" + Thread.currentThread().getName() + "】执行结束 condition.await()");
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- }
复制代码 ConditionSignal
- @RequiredArgsConstructor
- public class ConditionNotify extends Thread {
- private final Lock lock;
- private final Condition condition;
- @Override
- public void run() {
- lock.lock();
- try {
- System.out.println("【" + Thread.currentThread().getName() + "】开始执行 condition.signal()");
- condition.signal(); // signal 和 signalAll
- System.out.println("【" + Thread.currentThread().getName() + "】执行结束 condition.signal()");
- } finally {
- lock.unlock();
- }
- }
- }
复制代码 通过这个案例简单实现了 wait 和 notify 的功能,当调用 await 方法后,当火线程会释放锁并等待,而其他线程调用 condition 对象的 signal 大概 signalall 方法通知被壅闭的线程,然后本身执行 unlock 释放锁,被叫醒的线程获得之前的锁继续执行,最后释放锁。
所以,condition 中两个最重要的方法,一个是 await ,一个是 signal 方法
- await : 把当火线程壅闭挂起
- signal : 叫醒壅闭的线程
- public class ConnditionDemo {
- public static void main(String[] args) {
- Lock lock = new ReentrantLock();
- Condition condition = lock.newCondition();
- ConditionWait conditionWait = new ConditionWait(lock, condition);
- conditionWait.start();
- ConditionNotify conditionNotify = new ConditionNotify(lock, condition);
- conditionNotify.start();
- }
- }
复制代码- 【Thread-0】开始执行 condition.await()
- 【Thread-1】开始执行 condition.signal()
- 【Thread-1】执行结束 condition.signal()
- 【Thread-0】执行结束 condition.await()
复制代码 await 方法
调用 Condition 的 await() 方法(大概以 await 开头的方法),会使当火线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await()方法返回时,当火线程一定获取了 Condition 相关联的锁。- public final void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- Node node = addConditionWaiter(); //创建一个新的节点,节点状态为condition,采用的数据结构仍然是链表
- int savedState = fullyRelease(node); //释放当前的锁,得到锁的状态,并唤醒AQS队列中的一个线程
- int interruptMode = 0;
- //如果当前节点没有在同步队列上,即还没有被signal,则将当前线程阻塞
- //isOnSyncQueue 判断当前 node 状态,如果是 CONDITION 状态,或者不在队列上了,就继续阻塞,还在队列上且不是 CONDITION 状态了,就结束循环和阻塞
- while (!isOnSyncQueue(node)) {//第一次判断的是false,因为前面已经释放锁了
- LockSupport.park(this); // 第一次总是 park 自己,开始阻塞等待
- // 线程判断自己在等待过程中是否被中断了,如果没有中断,则再次循环,会在 isOnSyncQueue 中判断自己是否在队列上.
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
- // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
- // interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
- // 将这个变量设置成 REINTERRUPT.
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- // 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点.
- // 如果是 null ,就没有什么好清理的了.
- if (node.nextWaiter != null) // clean up if cancelled
- unlinkCancelledWaiters();
- // 如果线程被中断了,需要抛出异常.或者什么都不做
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
复制代码 signal
调用 Condition 的 signal() 方法,将会叫醒在等待队列中等待时间最长的节点(首节点),在叫醒节点之前,会将节点移到同步队列中- public final void signal() {
- if (!isHeldExclusively()) //先判断当前线程是否获得了锁
- throw new IllegalMonitorStateException();
- Node first = firstWaiter; // 拿到 Condition 队列上第一个节点
- if (first != null)
- doSignal(first);
- }
复制代码- private void doSignal(Node first) {
- do {
- if ( (firstWaiter = first.nextWaiter) == null)// 如果第一个节点的下一个节点是 null,那么, 最后一个节点也是 null.
- lastWaiter = null; // 将 next 节点设置成 null
- first.nextWaiter = null;
- } while (!transferForSignal(first) &&
- (first = firstWaiter) != null);
- }
复制代码 该方法先是 CAS 修改了节点状态,假如成功,就将这个节点放到 AQS 队列中,然后叫醒这个节点上的线程。此时,那个节点就会在 await 方法中苏醒。- final boolean transferForSignal(Node node) {
- if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
- return false;
- Node p = enq(node);
- int ws = p.waitStatus;
- // 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL 失败了(SIGNAL 表示: 他的
- next 节点需要停止阻塞),
- if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
- LockSupport.unpark(node.thread); // 唤醒输入节点上的线程.
- return true;
- }
复制代码 读写锁与 synchronized 在只读的线程中需要泯灭大量的时间的时候的性能对比,本质在于当读操作时,不进行壅闭
- // [readWrite] 9999次读操作{每次读操作 睡眠 1 ms},1次 " +1" 操作以后, 结果为:1[耗时]:665
- // [synchronized] 9999次读操作{每次读操作 睡眠 1 ms},1次 " +1" 操作以后, 结果为:1[耗时]:19007
复制代码- public class TestperformanceDemo {
- static Integer demoInteger = 0;
- // 重入读写锁
- static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- static Lock read = readWriteLock.readLock(); // 读锁
- static Lock write = readWriteLock.writeLock(); // 写锁
- public static void main(String[] args) {
- int countSum = 10000;
- add1000Seconds(countSum);
- add1000synchronized(countSum);
- // [readWrite] 9999次读操作{每次读操作 睡眠 1 ms},1次 " +1" 操作以后, 结果为:1[耗时]:665
- // [synchronized] 9999次读操作{每次读操作 睡眠 1 ms},1次 " +1" 操作以后, 结果为:1[耗时]:19007
- }
- /***
- * 第二种方式
- */
- public static void add1000Seconds(int countSum) {
- demoInteger = 0;
- ExecutorService executorService = Executors.newFixedThreadPool(countSum);
- long start = System.currentTimeMillis();
- CompletionService completionService = new ExecutorCompletionService(executorService);
- for (int i = 0; i < countSum; i++) {
- int finalI = i;
- completionService.submit(() -> {
- if (finalI == 0) {
- write.lock();
- try {
- demoInteger++;
- } finally {
- write.unlock();
- }
- } else {
- read.lock();
- try {
- Integer value = demoInteger;
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- read.unlock();
- }
- }
- }, null);
- }
- int count = 0;
- while (count < countSum) { // 等待任务完成全部
- if (completionService.poll() != null) {
- count++;
- }
- }
- long end = System.currentTimeMillis();
- System.out.println("[readWrite] " + (countSum - 1) + "次读操作{每次读操作 睡眠 1 ms},"
- + 1 + "次 " +1" 操作以后, 结果为:" + demoInteger
- + "[耗时]:" + (end - start));
- }
- /***
- * synchronized
- */
- public static void add1000synchronized(int countSum) {
- demoInteger = 0;
- ExecutorService executorService = Executors.newFixedThreadPool(countSum);
- long start = System.currentTimeMillis();
- CompletionService completionService = new ExecutorCompletionService(executorService);
- for (int i = 0; i < countSum; i++) {
- int finalI = i;
- completionService.submit(() -> {
- synchronized (RWLockDemo.class) {
- if (finalI == 0) {
- demoInteger++;
- } else {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- Integer value = demoInteger;
- }
- }
- }, null);
- }
- int count = 0;
- while (count < countSum) { // 等待任务完成全部
- if (completionService.poll() != null) {
- count++;
- }
- }
- long end = System.currentTimeMillis();
- System.out.println("[synchronized] " + (countSum - 1) + "次读操作{每次读操作 睡眠 1 ms},"
- + 1 + "次 " +1" 操作以后, 结果为:" + demoInteger
- + "[耗时]:" + (end - start));
- }
- }
复制代码JavaGuide
来源于: https://javaguide.net
微信公众号:不止极客
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |