ToB企服应用市场:ToB评测及商务社交产业平台

标题: 多线程与高并发(五)—— 源码解析 ReentrantLock [打印本页]

作者: 滴水恩情    时间: 2022-9-16 17:17
标题: 多线程与高并发(五)—— 源码解析 ReentrantLock
一、前言

ReentrantLock 是基于 AQS 实现的同步框架,关于 AQS 的源码在 这篇文章 已经讲解过,ReentrantLock 的主要实现都依赖AQS,因此在阅读本文前应该先了解 AQS 机制。本文并不关注 ReentrantLock 如何使用,只叙述其具体实现。
二、ReentrantLock 的继承体系以及特点

AQS 是基于模板方法模式设计的,理解该设计模式可以帮助阅读 ReentrantLock 源码,当然不熟悉该设计模式并不影响下文的阅读。
首先我们来看 ReentrantLock 的类结构,该类实现了 Lock 接口,该接口定义了一些需要实现的方法,这些方法是提供给应用程序员编写并发程序使用时的 API。ReentrantLock 如何提供这些 API的支持则依赖 AQS 框架,在其内部定义了一个类 Sync ,在 AQS 一文中提到过要使用 AQS 构建同步工具,主要是实现自己的 tryAccquire 和 tryRealease 方法,Sync 类即是 AQS 的模板方法具体实现,对加锁解锁方法进行了重新实现,但稍有不同的是,其具体实现又继续向下委托,在对 FairSync 和 NonfairSync 类中皆有各自的实现,对应的是公平锁和非公平锁的实现。
  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2.     abstract static class Sync extends AbstractQueuedSynchronizer {
  3.     }
  4.     static final class FairSync extends Sync {
  5.     }
  6.     static final class NonfairSync extends Sync {
  7.     }
  8. }
复制代码
Lock 接口

Lock 接口体现了 ReentrantLock 的特性,我们看看定义了哪些方法,然后在源码解析篇依次对各个功能实现详细讲解。
  1. public interface Lock {
  2.     void lock();
  3.     void lockInterruptibly() throws InterruptedException;
  4.     boolean tryLock();
  5.     boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  6.     void unlock();
  7.     Condition newCondition();
  8. }
复制代码
三、源码解析

3.1 加锁过程

我们首先来看下调用链,图中只画出 reentrantLock 的逻辑,关于 acquire() 中的后续逻辑见 AQS 篇。

3.1.1 非公平锁加锁

ReentrantLock 的默认构造器实现的是非公平锁,因此我们也先来看非公平锁的实现。lock() 即加锁的入口,使用时调用 lock.lock() 方法,在非公平锁模式下其最终交由 NonfairSync 来实现。

首先尝试 CAS 修改锁状态,即修改 AQS 中 State 的值,如果修改成功则表明加锁成功,接下来修改 exclusiveOwnerThread 为当前线程,表明当前线程独占该锁。如果修改失败,表明已经有线程获取了锁,此时调用 AQS 中的方法 acquire(1) 再次尝试获取锁。

该方法在 AQS 篇章有讲过,首先是调用  tryAcquire()  尝试获取锁,我们看其最终调用的逻辑 nonfairTryAcquire()。

3.1.2 公平锁加锁

公平锁的调用链与非公平锁大同小异,非公平锁与公平锁的加锁逻辑区别有二。

这两处不同即是两种锁思想的具体体现:线程在获取锁之前是否需要排队?
排队的含义其实不准确,这个定义很宽泛,很难准确描述 hasQueuedPredecessors 所涉及的情形,但是我们可以用这个通俗的意义来直觉的感受这个过程,即当前线程在加锁之前首先需要判断 CLH 队列中有没有其他线程优先级比我高,如果有的话,那我就去队列等待(加锁失败,走acquire 剩余逻辑入队),如果没有优先级比我高的线程,那么我就有资格获取锁,走 CAS 逻辑去加锁。
有人可能不明白 CLH 队列就是先进先出队列,为何会出现要排队的情况,然而抢占锁的线程实际上有两种,一是在队列中阻塞的线程,二是新来的一个线程,还没进入队列排队。因此即使 CLH 队列是先进先出的,这个此时正好来竞争锁的线程就需要判断自己是否有资格获取锁,也就是队列中有没有线程优先级比自己高。
非公平锁则没有这个排队的过程,新来的这个线程直接就有资格获取锁,因此在请求锁时直接去 CAS 抢占锁。可以看到非公平锁的抢占和操作系统的进程抢占是有不同的,并不是所有线程被唤醒去随机抢占,而是通常意义上也有个排队的队列,只是这个新来的线程可以插队,直接高过队头线程的优先级去加锁。
那么为什么 lock() 中尝试一次加锁,tryAcquire() 中又一次加锁呢?其实只保留一处加锁即可,在 CAS 失败后再尝试一次,我猜测可能只是为了优化,多一次加锁成功的机会。
3.2 hasQueuedPredecessors() 解析
  1. /**
  2.      * Queries whether any threads have been waiting to acquire longer
  3.      * than the current thread.
  4.      * 查找是否有其他线程比当前线程等待了更久的时间
  5.      *
  6.      * 这个方法有点类似下面伪代码的逻辑,但是更有效
  7.      * <p>An invocation of this method is equivalent to (but may be
  8.      * more efficient than):
  9.      *  <pre> {@code
  10.      * getFirstQueuedThread() != Thread.currentThread() &&
  11.      * hasQueuedThreads()}</pre>
  12.      *
  13.      * 注意由于线程可能在任何时候因为中断或者超时被取消,返回 true 并不能保证其他线程会在当前线程之前获取锁。
  14.      * 因此,其他线程也可能在当前线程返回 false 之后由于队列为空而赢得竞争去入队
  15.      * <p>Note that because cancellations due to interrupts and
  16.      * timeouts may occur at any time, a {@code true} return does not
  17.      * guarantee that some other thread will acquire before the current
  18.      * thread.  Likewise, it is possible for another thread to win a
  19.      * race to enqueue after this method has returned {@code false},
  20.      * due to the queue being empty.
  21.      *
  22.      * <p>This method is designed to be used by a fair synchronizer to
  23.      * avoid <a target="_blank" href="https://www.cnblogs.com/AbstractQueuedSynchronizer#barging">barging</a>.
  24.      * Such a synchronizer's {@link #tryAcquire} method should return
  25.      * {@code false}, and its {@link #tryAcquireShared} method should
  26.      * return a negative value, if this method returns {@code true}
  27.      * (unless this is a reentrant acquire).  For example, the {@code
  28.      * tryAcquire} method for a fair, reentrant, exclusive mode
  29.      * synchronizer might look like this:
  30.      *
  31.      *  <pre> {@code
  32.      * protected boolean tryAcquire(int arg) {
  33.      *   if (isHeldExclusively()) {
  34.      *     // A reentrant acquire; increment hold count
  35.      *     return true;
  36.      *   } else if (hasQueuedPredecessors()) {
  37.      *     return false;
  38.      *   } else {
  39.      *     // try to acquire normally
  40.      *   }
  41.      * }}</pre>
  42.      *
  43.      * @return {@code true} if there is a queued thread preceding the
  44.      *         current thread, and {@code false} if the current thread
  45.      *         is at the head of the queue or the queue is empty
  46.      *         如果队列中有比当前线程处于更靠前的节点返回 true,如果当前队列为空或者当前线程为队列第一个节点返回 false
  47.      * @since 1.7
  48.      */
  49.     public final boolean hasQueuedPredecessors() {
  50.         // 如果当前线程为队列中的第一个节点,那么正确性取决于头节点是否先于尾节点被初始化以及头节点的 next 指针是正确的
  51.         // The correctness of this depends on head being initialized
  52.         // before tail and on head.next being accurate if the current
  53.         // thread is first in queue.
  54.         Node t = tail; // Read fields in reverse initialization order
  55.         Node h = head;
  56.         Node s;
  57.         return h != t &&
  58.             ((s = h.next) == null || s.thread != Thread.currentThread());
  59.     }
复制代码
这个方法很简短,但却是最复杂的,我们一段一段分析这些逻辑判断。该方法的注释我上面已经翻译,需要注意的是在调用该方法的时候使用了取反逻辑,当该方法返回 true 时,说明需要排队,!hasQueuedPredecessors() 为false所以不能直接加锁,当该方法返回 false 时,才有资格加锁。

我们下面分段的去看这些与或逻辑,看什么情况下满足不需要排队的条件,也就是返回 false 的情况。

h != t

因此队列还未初始化时,以及队列中没有实际节点时(只有 dummy 节点),返回 false。
返回 true 有两种情况,队列中有实际节点或者队列正在初始化还没初始化完毕。
(s = h.next) == null

因此当队列中有两个节点时,h != t  &&  ((s = h.next) == null 为 false,接下来是或逻辑,因此我们还需要保证 s.thread != Thread.currentThread() 为 false。
s.thread != Thread.currentThread()

s.thread 即哑节点的下一个节点(实际节点),即实际节点
因此整体逻辑返回 false 的情况就是当前线程就是该队列中的第二个节点,也就是优先级最高的节点。那么有同学可能有疑问,这里仅考虑了队列中的线程,队列还没初始化时,线程竞争的情况下都不用排队吗?这个问题下面分场景来分析。
PS:插些题外话,为了分析清楚这段代码的作用,我采用的方法是分段去分析逻辑,再倒推这些逻辑判断具体是为了做什么事情。分析完毕后我发现存在两个弊端,一是要将逻辑整合起来理解比较难以厘清(主要是难以讲述),二是这有通过代码逆推思路的嫌疑,如果要理解别人编码上的巧妙,应该从场景递推,也就是我们写代码时的编码思维,思考自己会怎么写,对比 Doug Lea 怎么写。不过如果从场景递推,我想关于这段代码我不一定能对所有情况考虑周全,因而上面的分析过程我没有删除,接下来我再从场景开始递推分析为什么要这样设计,首先我们来看整个流程图。


以上就对当前线程是否需要排队的所有场景分析完毕。公平锁与非公平锁之间的不同就已经讲完了,其余逻辑一致,在非公平锁已经讲过,接下来看解锁流程。
3.3 解锁过程


不管是公平锁还是非公平锁,在解锁时都是调用 AQS 中的 realease 方法,在 AQS 篇已经讲解过,不重复讲了。
3.4 tryLock()
  1. public boolean tryLock() {
  2.         return sync.nonfairTryAcquire(1);
  3.     }
复制代码
对比下 lock 的源码


重点是 acquire 方法,lock() 方法在调用 tryAcquire 尝试加锁失败后就开始走入队的逻辑,也就是如果当前线程获取不到锁那么就需要阻塞。而 tryLock() 方法调用 nonfairTryAcquire() 尝试加锁,成功则返回 true,失败则返回 false,没有入队的逻辑。注意的是 tryLock() 只有非公平锁的实现,因此我们可以理解为这个方法用于插队获取锁,也就是说该方法用于公平锁的时候实际上破坏了公平原则。如果要维持公平原则,可以使用tryLock(0, TimeUnit.SECONDS) 等效 tryLock。另外该方法也不支持打断,带超时的 tryLock() 才支持打断,这也很好理解,tryLock 去抢占锁也不进行入队,结果很快就能返回,中断对该方法来说毫无意义。
3.5 tryLock(long timeout, TimeUnit unit)
  1. public boolean tryLock(long timeout, TimeUnit unit)
  2.             throws InterruptedException {
  3.         return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  4.     }
复制代码
我们先看源码:首先该方法支持中断,如果被设置了中断标记,抛出中断异常。在调用该方法的时候首先会尝试一次加锁,即 tryAcquire(arg),如果加锁成功就整个方法返回 true,加锁失败了则进入 doAcquireNanos 逻辑。
  1. public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  2.             throws InterruptedException {
  3.         // 如果被中断则直接抛出中断异常
  4.         if (Thread.interrupted())
  5.             throw new InterruptedException();
  6.         // 获取锁失败就执行 doAcquireNanos 入队,直到超时返回 false
  7.         return tryAcquire(arg) ||
  8.             doAcquireNanos(arg, nanosTimeout);
  9.     }
复制代码
doAcquireNanos 才是该方法的核心,该方法中再次尝试一次加锁,加锁成功的话返回结果 true,如果加锁失败那么判断下设定的超时时间有没有到,时间到了的话直接返回 false 表明加锁失败。接下来调用 shouldParkAfterFailedAcquire 方法判断当前节点是否需要阻塞,如果要阻塞的话还需要当前时间大于 1000 ns,如果时间小于这个值,那么就不阻塞了。这么设置也很有道理,如果时间太短,刚被 park 阻塞就要唤醒,这因上下文切换浪费的 CPU 时间片不如直接进行下一次循环尝试加锁。如果不用阻塞就进行下一轮循环重复之前流程再次尝试加锁。
[code]private boolean doAcquireNanos(int arg, long nanosTimeout)            throws InterruptedException {        if (nanosTimeout




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4