ReentrantLock 公平锁源码 第0篇

打印 上一主题 下一主题

主题 853|帖子 853|积分 2559

ReentrantLock 0

关于ReentrantLock的文章其实写过的,但当时写的感觉不是太好,就给删了,那为啥又要再写一遍呢
最近闲着没事想自己写个锁,然后整了几天出来后不是跑丢线程就是和没加锁一样,而且五六段就一个cas性能很差,感觉离大师写的差十万八千里
于是!我就想重新研究研究看看大师咋写的,这篇博客也算个笔记吧,这篇看的是ReentrantLock的公平锁,准备写个两三篇关于ReentrantLock 就这两天写!
这篇博客完全个人理解,如果有不对的地方欢迎您评论或者私信我,我非常乐意接受您的意见或建议
CAS

首先要知道,ReentrantLock是基本都是在java代码层面实现的,而最主要的一个东西就是CAS compare and swap 比较并交换

这个操作可以看成为是一个原子性的,在java中可以使用反射获取到Unsafe类来进行cas操作
  1. public test() {
  2.     try {
  3.         Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
  4.         if (!unsafeField.isAccessible()) {
  5.             unsafeField.setAccessible(true);
  6.         }
  7.         unsafe = (Unsafe) unsafeField.get(null);
  8.     } catch (NoSuchFieldException | IllegalAccessException e) {
  9.         e.printStackTrace();
  10.     }
  11. }
复制代码
Park

在juc包下LockSupport类中有两个方法park和unpark 这两个就像是wait和notify/notifyAll,但是又不相同,可以暂时理解为就是暂停线程和启动线程
详细的介绍可以看看这个博客 : https://www.jianshu.com/p/da76b6ab56be
关于如何使用ReentrantLock就不在赘述了直接开始看代码,本来是想把类的关系图放这的,但是我的idea好像有点问题,你们可以自己打开idea看,ctrl+alt+u打开类的关系图
  1. public static void main(String[] args) {
  2.     ReentrantLock lock = new ReentrantLock(true);
  3.     lock.lock();
  4.     lock.unlock();
  5. }
复制代码
构造方法
  1. public ReentrantLock(boolean fair) {
  2.     sync = fair ? new FairSync() : new NonfairSync();
  3. }
复制代码
lock方法
  1. public void lock() {
  2.     sync.lock();
  3. }
复制代码
点到里面实际调用的是FairSync类中的lock()方法
  1. final void lock() {
  2.     acquire(1);
  3. }
复制代码
  1. public final void acquire(int arg) {
  2.     if (!tryAcquire(arg) &&
  3.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.         selfInterrupt();
  5. }
复制代码
tryAcuqire方法
  1. protected final boolean tryAcquire(int acquires) {
  2.     final Thread current = Thread.currentThread();
  3.     int c = getState();
  4.     if (c == 0) {
  5.         if (!hasQueuedPredecessors() &&
  6.             compareAndSetState(0, acquires)) {
  7.             setExclusiveOwnerThread(current);
  8.             return true;
  9.         }
  10.     }
  11.     else if (current == getExclusiveOwnerThread()) {
  12.         int nextc = c + acquires;
  13.         if (nextc < 0)
  14.             throw new Error("Maximum lock count exceeded");
  15.         setState(nextc);
  16.         return true;
  17.     }
  18.     return false;
  19. }
复制代码
首先是获取了当前的线程,之后有个getState,这个方法返回的是当前这个锁的状态
  1. protected final int getState() {
  2.     return state;
  3. }
复制代码
hasQueuedPredecessors

先来看Node,这个Node是一个组成双向链表的实体类,几个重要的属性
  1. volatile int waitStatus;
  2. volatile Node prev;
  3. volatile Node next;
  4. volatile Thread thread;
  5. Node nextWaiter;
复制代码
waitStatus 存放当前结点的状态
prev 存放上个结点
next 存放下个结点
thread 存放线程
nextWaiter    翻译是下个服务员,我理解是为下个节点服务,后面咱们细说
  1. public final boolean hasQueuedPredecessors() {
  2.     Node t = tail;
  3.     Node h = head;
  4.     Node s;
  5.     return h != t &&
  6.         ((s = h.next) == null || s.thread != Thread.currentThread());
  7. }
复制代码
这里有两个属性,tail尾结点,head头结点,之后下面一个判断分别是
头结点不等于尾结点 并且 (头结点的下一结点不等于null或者头结点后面一个结点的线程不等于当前线程)
  1. if (c == 0) {
  2.     if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
  3.         setExclusiveOwnerThread(current);
  4.         return true;
  5.     }
  6. }
复制代码
在hasQueuedPredecessors()接着就是一个cas,修改的这个锁的状态
如果成功,则调用setExclusiveOwnerThread()
  1. protected final void setExclusiveOwnerThread(Thread thread) {
  2.     exclusiveOwnerThread = thread;
  3. }
复制代码
将当前线程存放到exclusiveOwnerThread属性中
那么在没有冲突的情况下lock的方法就走完了,现在咱们假设只有一个线程,从头来捋一下加锁的过程
试跑一下

咱们顺着逻辑捋一下,从最开始的lock()方法开始,前面的就不写了,直接到acquire
  1. public final void acquire(int arg) {
  2.     if (!tryAcquire(arg) &&
  3.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.         selfInterrupt();
  5. }
复制代码
进入acquire
  1. protected final boolean tryAcquire(int acquires) {
  2.     final Thread current = Thread.currentThread();
  3.     int c = getState();
  4.     if (c == 0) {
  5.         if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
  6.             setExclusiveOwnerThread(current);
  7.             return true;
  8.         }
  9.     }
  10.     else if (current == getExclusiveOwnerThread()) {
  11.         int nextc = c + acquires;
  12.         if (nextc < 0)
  13.             throw new Error("Maximum lock count exceeded");
  14.         setState(nextc);
  15.         return true;
  16.     }
  17.     return false;
  18. }
  19. }
复制代码
因为这个getState()方法获取的是属性state 而这个属性又没有其他的赋值操作,初始化就是0,进入if(c==0)
之后进入hasQueuedPredecessors()
  1. public final boolean hasQueuedPredecessors() {
  2.     Node t = tail;
  3.     Node h = head;
  4.     Node s;
  5.     return h != t &&
  6.         ((s = h.next) == null || s.thread != Thread.currentThread());
  7. }
复制代码
首先第一个判断就已经是false了,因为tail和head都没有进行过初始化,都是null,所以等于,直接返回 false,而在hasQueuedPredecessors()方法前面还有一个!取反为true,直接进入if代码块
设置完exclusiveOwnerThread属性后就return true,走出lock()方法,加锁方法结束
exclusiveOwnerThread属性存放的是当前那个线程在占有锁
这是在没有线程获取锁冲突的情况下,如果现在两个线程同时来的话,还是看tryAcquire方法
  1. protected final boolean tryAcquire(int acquires) {
  2.     final Thread current = Thread.currentThread();
  3.     int c = getState();
  4.     if (c == 0) {
  5.         if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
  6.             setExclusiveOwnerThread(current);
  7.             return true;
  8.         }
  9.     }
  10.     else if (current == getExclusiveOwnerThread()) {
  11.         int nextc = c + acquires;
  12.         if (nextc < 0)
  13.             throw new Error("Maximum lock count exceeded");
  14.         setState(nextc);
  15.         return true;
  16.     }
  17.     return false;
  18. }
复制代码
咱们现在假设线程A获取到锁,去执行业务代码了,线程B进入
getState()获取的值就不在是0了,因为线程A执行完compareAndSetState(0, acquires)改的就是getState()方法获取的state属性
那么进入else if (current == getExclusiveOwnerThread()) 哎这个不是获取当前占有锁的那个线程的方法吗,是的
那为什么有这个判断呢,ReentrantLock的特性 重入锁,啥叫重入锁?:同一个线程可以多次获取同一个锁 例如下面的例子
  1. public class Test{
  2.     private static final ReentrantLock LOCK=new ReentrantLock(true);
  3.     public void a(){
  4.         LOCK.lock();
  5.         b();
  6.         LOCK.unlock();
  7.     }
  8.     public void b(){
  9.         LOCK.lock();
  10.         //xxxxxx
  11.         LOCK.unlock();
  12.     }
  13. }
复制代码
如果没有重入锁的特性,那么这个方法是不是就死锁了呢?,假设当我们一个线程去调用a方法时..

  • a : 兄弟我需要锁才能执行你的代码啊
  • b : 那你先解锁啊
  • a : 我要调用完你我才能解锁啊
  • b : 那你不解锁咋调用我啊
........
好了回到代码中,就是将锁持有的状态+1,设置后返回true,因为我们现在是B线程,所以这个if不成立,返回false
回到acquire方法
  1. public final void acquire(int arg) {
  2.     if (!tryAcquire(arg) &&
  3.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.         selfInterrupt();
  5. }
复制代码
因为tryAcquire为false,取反继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
addWaiter

先来看里面的addWaiter方法吧,传了一个参数Node.EXCLUSIVE
  1. static final Node EXCLUSIVE = null;
复制代码
这个参数是Node类中的一个属性
  1. private Node addWaiter(Node mode) {
  2.     //创建一个Node
  3.     Node node = new Node(Thread.currentThread(), mode);
  4.     Node pred = tail;
  5.     if (pred != null) {
  6.         node.prev = pred;
  7.         if (compareAndSetTail(pred, node)) {
  8.             pred.next = node;
  9.             return node;
  10.         }
  11.     }
  12.     enq(node);
  13.     return node;
  14. }
复制代码
Node的有参构造如下
  1. Node(Thread thread, Node mode) {     // Used by addWaiter
  2.     this.nextWaiter = mode;
  3.     this.thread = thread;
  4. }
复制代码
首先是创建了一个Node结点,之后判断如果tail结点不为null,因为A线程走完tryAcquire直接返回了,tail和head都是null,所以if不成立,进入enq方法
  1. private Node enq(final Node node) {
  2.     for (;;) {
  3.         Node t = tail;
  4.         if (t == null) { // Must initialize
  5.             if (compareAndSetHead(new Node()))
  6.                 tail = head;
  7.         } else {
  8.             node.prev = t;
  9.             if (compareAndSetTail(t, node)) {
  10.                 t.next = node;
  11.                 return t;
  12.             }
  13.         }
  14.     }
  15. }
复制代码
首先还是获取tail,那么这时候还是为null,因为我们的假设就两个线程,A线程已经去执行业务了,所以进去第一个if
通过cas来设置头节点为一个new Node() 注意!这里是新new的Node,设置完后将头在设置给尾,那么此时的节点关系如下

em?? 咱们这B节点也没有添加进链表啊,别急,看看上面的for(;;)
在下次循环的时候tail还等于null吗?答案是否定的
之后还是头节点赋值给t,将B节点的上一个设置为t,cas设置tail,成功后t节点的next设置为B节点,返回t (返回的值其实没接收)
挺简单的逻辑说的太费劲了,还是看图吧,执行完第二遍for后节点关系如下

这个enq方法是百分百能确定这个节点已经添加进去的,因为你不添加进去就出不来这个方法,那么返回addWaiter方法,走完这个enq就还有一句话,return node;
acquireQueued
  1. final boolean acquireQueued(final Node node, int arg) {
  2.     boolean failed = true;
  3.     try {
  4.         boolean interrupted = false;
  5.         for (;;) {
  6.             final Node p = node.predecessor();
  7.             if (p == head && tryAcquire(arg)) {
  8.                 setHead(node);
  9.                 p.next = null; // help GC
  10.                 failed = false;
  11.                 return interrupted;
  12.             }
  13.             if (shouldParkAfterFailedAcquire(p, node) &&
  14.                 parkAndCheckInterrupt())
  15.                 interrupted = true;
  16.         }
  17.     } finally {
  18.         if (failed)
  19.             cancelAcquire(node);
  20.     }
  21. }
复制代码
那么一进来就定义了一个failed用来处理如果发生错误需要将链表中错误的节点移除,咱先不看
之后的一个interrupted存放是否被打断过,继续发现还是一个for(;;),第一步执行了node.predecessor()
  1. final Node predecessor() throws NullPointerException {
  2.     Node p = prev;
  3.     if (p == null)
  4.         throw new NullPointerException();
  5.     else
  6.         return p;
  7. }
复制代码
也就是先获取了下B节点的上一个,也就是那个线程为null的空节点(注意:不是null,而是一个空的Node)
判断上个节点是不是head,如果是,则尝试获取锁,这个tryAcquire()方法就是开始的那个方法,那么这一步是什么意思呢
ReentrantLock的做法,如果必须创建链表,则head指向的Node节点一直就是一个空节点
这句话可能不太严谨,但是在链表存在的大部分时间内,head也确实指向的是一个空节点
继续看代码,假设现在A线程还是没有完成业务代码,没有执行unlock(),那么我们进入下个if,
  1. if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
  2.     interrupted = true;
复制代码
shouldParkAfterFailedAcquire
  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2.     int ws = pred.waitStatus;
  3.     if (ws == Node.SIGNAL)
  4.         return true;
  5.     if (ws > 0) {
  6.         do {
  7.             node.prev = pred = pred.prev;
  8.         } while (pred.waitStatus > 0);
  9.         pred.next = node;
  10.     } else {
  11.         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  12.     }
  13.     return false;
  14. }
复制代码
代码不多,但是不太好理解 开始还是获取B节点上个节点的状态,也就是那个空节点,因为咱们一路代码跟过来的,没有看到哪里对空节点的state属性进行过修改,所以它还是0
那么第一个判断
  1. static final int SIGNAL    = -1;  //Node类中的属性
复制代码
空节点的状态是否为-1,显然不是,if(ws>0)也不会进,直接进入else,cas修改空节点的状态,改为了-1,然后返回
  1. if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
  2.     interrupted = true;
复制代码
因为是&&阻断了后面的方法,所以不进入,那么本次循环结束,最外层是个for(;;)所以下次循环开始
我们还是假设通过tryAcquire()没有获取到锁,又进入了shouldParkAfterFailedAcquire
那么这次第一个if我们就会进去,因为上次循环已经将B节点前面的一个空节点的状态改为-1了,return true
回到if那么就进入parkAndCheckInterrupt方法
  1. private final boolean parkAndCheckInterrupt() {
  2.     LockSupport.park(this);
  3.     return Thread.interrupted();
  4. }
复制代码
park,那么B线程就停在这里了,把目光回到A线程,它终于执行完业务代码了,执行unlock
unlock
  1. public void unlock() {
  2.     sync.release(1);
  3. }
复制代码
  1. public final boolean release(int arg) {
  2.     if (tryRelease(arg)) {
  3.         Node h = head;
  4.         if (h != null && h.waitStatus != 0)
  5.             unparkSuccessor(h);
  6.         return true;
  7.     }
  8.     return false;
  9. }
复制代码
看第一个if中的tryRelease
  1. protected final boolean tryRelease(int releases) {
  2.     int c = getState() - releases;
  3.     if (Thread.currentThread() != getExclusiveOwnerThread())
  4.         throw new IllegalMonitorStateException();
  5.     boolean free = false;
  6.     if (c == 0) {
  7.         free = true;
  8.         setExclusiveOwnerThread(null);
  9.     }
  10.     setState(c);
  11.     return free;
  12. }
复制代码
第一件事就是将锁的状态-1,因为它重入一次就+1,这也是为什么调用lock多少次就需要调用unlock多少次,因为要保证锁的状态为0
之后判断加锁的线程和解锁的线程是不是同一个,不是抛出异常
boolean free = false;这个来标识这个锁是不是已经没有人持有了,因为A线程就调用了一次lock,所以if(c==0)成立
将free 改为true后将当前持有锁的线程设置为null,设置锁的状态,返回true,回到release方法
因为返回true,所以进入if,判断head节点不为空,并且头节点的状态不为0
那么头节点是空的吗?    -不是 因为B节点在初始化链表是添加了一个空的节点(再说一遍不是null!是空的Node节点)
那么头节点的状态是0吗?  -不是 我们在第二次执行shouldParkAfterFailedAcquire()方法时已经将头节点的状态设置为-1了
那么进入unparkSuccessor()
[code]    private void unparkSuccessor(Node node) {        int ws = node.waitStatus;        if (ws < 0)            compareAndSetWaitStatus(node, ws, 0);        Node s = node.next;        if (s == null || s.waitStatus > 0) {            s = null;            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus  0) 为false,走最下面的if(s!=null)</p>就一句话 unpark(s.thread)
到此,AB线程都走完了

篇幅有点长了,这两天再接着写,拜拜
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

小秦哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表