手写一个模拟的ReentrantLock

打印 上一主题 下一主题

主题 882|帖子 882|积分 2648

  1. package cn.daheww.demo.juc.reentrylock;
  2. import sun.misc.Unsafe;
  3. import java.lang.reflect.Field;
  4. import java.util.concurrent.locks.LockSupport;
  5. /**
  6. * @author daheww
  7. * @date 2022/7/7
  8. */
  9. public class MiniReentryLock implements Lock {
  10.     /**
  11.      * 锁的是什么 --> 资源 --> state
  12.      * 0 --> 未加锁
  13.      * >0 -> 加锁
  14.      */
  15.     private volatile int state;
  16.     /**
  17.      * 独占模式
  18.      * 同一时刻只有一个线程可以持有锁,其它线程在未获取到锁的时候会被阻塞
  19.      *
  20.      * 当前独占锁的线程(占用锁的线程)
  21.      */
  22.     private Thread exclusiveOwnerThread;
  23.     /**
  24.      * 需要有两个节点去维护阻塞队列
  25.      * Head 指向队列的头节点
  26.      * Tail 指向队列的尾节点
  27.      *
  28.      * 比较特殊:Head节点对应的线程就是当前占用锁的线程
  29.      */
  30.     private Node head;
  31.     private Node tail;
  32.     /**
  33.      * 获取锁
  34.      * 假设当前锁被占用,则会阻塞调用者线程,直到它抢占到锁为止
  35.      *
  36.      * 模拟公平锁
  37.      * --> 先来后到
  38.      *
  39.      * lock的过程
  40.      * 情景1.线程进来后发现,当前state == 0 --> 直接去抢锁
  41.      * 情景2.线程进来后发现,当前state > 0 --> 将当前线程入队
  42.      */
  43.     @Override
  44.     public void lock() {
  45.         // 第一次获取到锁时,将state设置为1
  46.         // 第n次重入时,将state设置为n
  47.         acquire(1);
  48.     }
  49.     @Override
  50.     public void unlock() {
  51.         release(1);
  52.     }
  53.     private void release(int arg) {
  54.         // 条件成立:说明线程已经完全释放锁了
  55.         if (tryRelease(arg)) {
  56.             // 阻塞队列里面,还有睡觉的线程,应该唤醒一个线程
  57.             // 首先需要知道有没有等待的node --> head.next == null
  58.             Node head = this.head;
  59.             if (head.nx != null) {
  60.                 // 公平锁,唤醒head.nx节点
  61.                 unparkSuccessor(head);
  62.             }
  63.         }
  64.     }
  65.     private void unparkSuccessor(Node node) {
  66.         Node s = node.nx;
  67.         if (s != null && s.thread != null) {
  68.             LockSupport.unpark(s.thread);
  69.         }
  70.     }
  71.     /**
  72.      * 完全释放锁成功则返回true
  73.      */
  74.     private boolean tryRelease(int arg) {
  75.         int c = getState() - arg;
  76.         if (getExclusiveOwnerThread() != Thread.currentThread()) {
  77.             throw new RuntimeException("must get lock first");
  78.         }
  79.         // 如果执行到这里,不存在并发,只会有一个线程会来到这里
  80.         // 条件成立,则说明当前线程持有的lock锁已经完全释放了
  81.         if (c == 0) {
  82.             this.exclusiveOwnerThread = null;
  83.             this.state = c;
  84.             return true;
  85.         } else {
  86.             this.state = c;
  87.             return false;
  88.         }
  89.     }
  90.     /**
  91.      * 竞争资源
  92.      * 1.尝试获取锁。成功则占用锁,且返回
  93.      * 2.抢占锁失败,阻塞当前线程
  94.      * @param arg
  95.      */
  96.     private void acquire(int arg) {
  97.         if (!tryAcquire(arg)) {
  98.             // 抢锁失败
  99.             // step1.将当前线程封装成node,加入到阻塞队列中
  100.             Node node = addWaiter();
  101.             // step2.将当前线程park,使线程处于挂起状态
  102.             acquireQueued(node, arg);
  103.         }
  104.         // 抢锁成功
  105.         // 1.抢到了锁
  106.         // 2.重入了锁
  107.     }
  108.     /**
  109.      * 尝试抢锁失败,需要做的事:
  110.      * 1.需要将当前线程封装成node,加入到阻塞队列中
  111.      * 2.需要将当前线程park,使线程处于挂起状态
  112.      *
  113.      * 唤醒的流程:
  114.      * 1.检查当前node是否为head.next节点
  115.      *      head.next是拥有抢占权限的线程,其它node都没有抢占的权限
  116.      * 2.抢占:
  117.      *      成功:
  118.      *          1.将当前node设置为node,将老的head出队,返回到业务层面
  119.      *          2.继续park等待被唤醒
  120.      *
  121.      * ----------------------------------------------
  122.      * 1.添加到阻塞队列的逻辑 addWaiter()
  123.      * 2.竞争资源的逻辑      acquireQueued()
  124.      */
  125.     private void acquireQueued(Node node, int arg) {
  126.         // 当前线程已经放到queue中了
  127.         // 只有当前node成功获取到锁以后才会跳出自旋
  128.         for (; ; ) {
  129.             // 什么情况下,当前node被唤醒后可以尝试去获取锁呢?
  130.             // 只有一种情况,当前node是head的后继节点,才有这个权限
  131.             // 不是就先来后到
  132.             Node pvNode = node.pv;
  133.             // 条件1:pvNode == head
  134.             //      true --> 说明当前node拥有抢占权限
  135.             //               queue中的第一个节点代表的是当前锁正在执行的线程 --> head指向的线程
  136.             //               head后面的线程代表的是正在排队的线程 --> 所以只有head.nx节点拥有抢锁的权利
  137.             // 条件2:tryAcquire(arg)
  138.             //      true --> 当前线程获取到了锁
  139.             //
  140.             if (pvNode == head && tryAcquire(arg)) {
  141.                 // 进入到这里面说明当前线程竞争锁成功了
  142.                 // 需要做的操作:
  143.                 // 1.设置当前head为当前线程的node
  144.                 // 2.协助原来的对象出队
  145.                 setHead(node);
  146.                 pvNode.nx = null;
  147.                 // 因为获取到了锁,所以就return了
  148.                 return;
  149.             }
  150.             // 当前不是head.nx节点,或者去尝试获取锁失败了,这个时候都需要去把当前线程park掉
  151.             System.out.println("线程:" + Thread.currentThread().getName() + " 挂起");
  152.             LockSupport.park();
  153.             // 直到某个线程做了当前线程的unPark操作,这个线程才会继续执行
  154.             /*
  155.                 所以总结一下,lock的逻辑就是:
  156.                    1.在没锁的情况下,如果有个线程调用了lock方法,它就会改变lock中的state值。此时state值就不会为0了。
  157.                    那么其它线程调用lock方法时,会看到这个state不为0。
  158.                    2.然后这个线程会被封装成一个node节点
  159.                    3.然后会去尝试竞争一下锁,做一下最后的挽救工作,如果实在挽救不了,就park了
  160.                      --> 线程就在这个lock的lock()方法里被阻塞了。就达到了锁的效果
  161.                      --> 所有调用这个锁对象lock的方法只能有一个线程能继续执行,然后其它线程会被阻塞,直到这个线程做了unlock操作
  162.              */
  163.             System.out.println("线程:" + Thread.currentThread().getName() + " 唤醒");
  164.             // 什么时候唤醒被park的线程?--> unlock()
  165.         }
  166.     }
  167.     /**
  168.      * 把当前线程入队
  169.      * 返回当前线程对应的node节点
  170.      *
  171.      * addWaiter执行完成后,保证当前线程已经入队成功
  172.      */
  173.     private Node addWaiter() {
  174.         Node newNode = new Node(Thread.currentThread());
  175.         // 如何入队?
  176.         // Case1.当前node不是第一个入队的node,队列已经有等待的node了
  177.         //     1.找到newNode的pv节点
  178.         //     2.更新newNode.pvNode = pv节点
  179.         //     3.CAS更新tail为newNode
  180.         //     4.更新pv节点
  181.         Node pvNode = tail;
  182.         if (pvNode != null) {
  183.             newNode.pv = pvNode;
  184.             // 条件成立,说明当前线程成功入队
  185.             if (compareAndSetTail(pvNode, newNode)) {
  186.                 pvNode.nx = newNode;
  187.                 return newNode;
  188.             }
  189.         }
  190.         // 执行到这里的几种情况
  191.         // 1.tail == null队列是空
  192.         // 2.cas设置当前newNode为tail时失败了 --> 循环入队 --> 自旋
  193.         enq(newNode);
  194.         return newNode;
  195.     }
  196.     /**
  197.      * 自旋入队,只有成功之后才返回
  198.      * 1.tail == null 队列是空队列
  199.      * 2.cas设置当前newNode为tail时失败了
  200.      */
  201.     private void enq(Node node) {
  202.         for (; ; ) {
  203.             // 第一种情况:队列是空队列
  204.             // --> 当前线程是第一个抢占锁的线程...
  205.             // 当前持有锁的线程,并没有设置过任何node,所以作为该线程的第一个后驱节点
  206.             // 需要给他擦屁股
  207.             // 给当前持有锁的线程补充一个node作为head节点
  208.             // head节点任何时候都代表当前占用锁的线程
  209.             if (tail == null) {
  210.                 // 条件成立:说明当前线程给当前持有锁的线程补充head操作成功了
  211.                 if (compareAndSetHead(new Node())) {
  212.                     tail = head;
  213.                     // 注意,并没有直接返回,而是会继续自旋
  214.                 }
  215.             } else {
  216.                 // 当前队列中已经有node了,说明这是一个追加node的过程
  217.                 // 如何入队呢?
  218.                 //     1.找到newNode的pv节点 --> 最新的tail节点
  219.                 //     2.更新newNode.pvNode = pv节点
  220.                 //     3.CAS更新tail为newNode
  221.                 //     4.更新pv节点
  222.                 Node pvNode = tail;
  223.                 node.pv = pvNode;
  224.                 // 条件成立,说明当前线程成功入队
  225.                 if (compareAndSetTail(pvNode, node)) {
  226.                     pvNode.nx = node;
  227.                     return;
  228.                 }
  229.             }
  230.         }
  231.     }
  232.     /**
  233.      * 尝试获取锁,不会去阻塞线程
  234.      * true --> 抢占成功
  235.      * false --> 抢占失败
  236.      */
  237.     private boolean tryAcquire(int arg) {
  238.         if (state == 0) {
  239.             // 当前state为0
  240.             // 不能直接抢锁 --> 公平锁 --> 先来后到
  241.             // 条件一:!hasQueuedPredecessors() ---> 取反之后为true,表示当前线程前面没有等待着的线程
  242.             // 条件二:compareAndSetState(0, arg) -> 使用cas的原因:lock方法可能有多线程调用的情况
  243.             //      true --> 当前线程抢锁成功
  244.             //          (1) volatile --> state被volatile修饰了,所以其它线程能第一时间知道这个值不为0了 --> 缓存能够一致了
  245.             //          (2) cas -------> state从0变为arg的操作用cas实现,用于保证只会有一个线程能够改变state的值(0->arg) --> 只会有一个线程能够执行接下来的操作 --> 锁
  246.             //                1.如果cas的变量不用volatile修饰就没有意义:
  247.             //                   因为A线程改变了state的值,但是B线程并不知道
  248.             //                  (可见性,volatile会让B线程中的副本马上失效,然后获取最新的state的值,此时B线程工作空间中的state值就不为0了)
  249.             //                2.如果volatile的变量不用cas去改变它的值的话,也没有意义:
  250.             //·                  step1.A线程,B线程都拿到了state的副本信息,此时state值为0
  251.             //                   step2.A线程改变了state的值。B线程还在写,因为state的值改变了,所以B线程工作空间中的state值改变,然后B继续写。
  252.             //                   所以所有判断出state值为0的线程都能写成功,并且能执行写成功后续的操作
  253.             //                所以要用cas+volatile去保证只会有一个线程能够写成功这个值
  254.             //                Ps.可以看到,如果这些线程想写的值都是同一个值的话,多写了几次,但是结果和只写一次是一致的
  255.             //                   cas+volatile主要还是去控制写成功之后的操作只会被执行一次,这样就像一个锁一样了
  256.             if (!hasQueuedPredecessors() && compareAndSetState(0, arg)) {
  257.                 // 抢锁成功
  258.                 // 1.将exclusiveOwnerThread设置为当前线程
  259.                 this.exclusiveOwnerThread = Thread.currentThread();
  260.                 return true;
  261.                 // 不会入队任何node,接返回true
  262.                 // 接下来第一个竞争失败的线程会先去帮忙创建一个node,然后再执行后续的操作
  263.             }
  264.             // 当前线程前面有线程在等待 || 多个线程和当前线程一起在尝试获取这个锁,然后当前线程失败了 --> return false;
  265.         } else if (Thread.currentThread() == this.exclusiveOwnerThread) {
  266.             // 执行的时机:
  267.             // 1.当前锁被占用
  268.             // 2.当前线程即为持锁线程
  269.             // 这里面不存在并发。只有当前加锁的线程才有权限修改state
  270.             //   即使是同一个线程多次进入到这,设置state的值,那么它们都是使用的同一个工作空间
  271.             //   不存在不同工作空间下,这个值的不一样的情况(因为没有了缓存)
  272.             // 锁重入的流程
  273.             int c = getState();
  274.             c += arg;
  275.             // TODO 越界判断
  276.             this.state = c;
  277.             return true;
  278.         }
  279.         // 什么时候会返回false?
  280.         // 1.cas加锁失败
  281.         // 2.state大于0,且当前线程不是持锁线程
  282.         return false;
  283.     }
  284.     /**
  285.      * 当前线程前面是否有等待着的线程
  286.      * true --> 当前线程前面有等待着的线程
  287.      * false -> 当前线程前面没有其它等待着的线程
  288.      *
  289.      * 调用链
  290.      * lock --> acquire -> tryAcquire -> hasQueuedPredecessors(state值为0时,即当前lock为无主状态)
  291.      *
  292.      * 什么时候返回false?
  293.      * 1.当前队列是空
  294.      * 2.当前线程为head.next节点 --> head.next在任何时候都有权力去争取lock
  295.      */
  296.     private boolean hasQueuedPredecessors() {
  297.         Node h = head;
  298.         Node t = tail;
  299.         Node s;
  300.         // 条件一:h != t
  301.         //     true --> 当前队列已经有node了
  302.         //     false -> h == t
  303.         //          case1. h == t == null --> 还没初始化过queue
  304.         //          case2. h == t == head
  305.         //              第一个获取锁失败的线程会为当前持有锁的线程补充创建一个head node
  306.         // 条件二:
  307.         //     前置条件:条件一成立
  308.         //     排除几种情况:
  309.         //       条件2.1:极端情况 --> 第一个获取锁失败的线程,会为持锁的线程补充创建head节点,然后在自旋入队
  310.         //                           step1.cas设置tail成功了
  311.         //                           step2.head.next = node
  312.         //                           在这两步中间的时候,有线程来检查前面是否有等待的线程
  313.         //               这种情况应该返回true:已经有head.next节点了,其它线程来这的时候需要返回true
  314.         //       条件2.2:
  315.         //               前置条件:h.next不是null
  316.         //               true --> 条件成立说明当前线程就是持有锁的线程
  317.         //               false -> 说明当前线程就是h.next节点对应的线程,需要返回false。回头线程就会去竞争锁了
  318.         return h != t && ((s = h.nx) == null || s.thread != Thread.currentThread());
  319.     }
  320.     private static final Unsafe UNSAFE;
  321.     private static final long STATE_OFFSET;
  322.     private static final long HEAD_OFFSET;
  323.     private static final long TAIL_OFFSET;
  324.     static {
  325.         try {
  326.             Field f = Unsafe.class.getDeclaredField("theUnsafe");
  327.             f.setAccessible(true);
  328.             UNSAFE = (Unsafe) f.get(null);
  329.             STATE_OFFSET = UNSAFE.objectFieldOffset(MiniReentryLock.class.getDeclaredField("state"));
  330.             HEAD_OFFSET = UNSAFE.objectFieldOffset(MiniReentryLock.class.getDeclaredField("head"));
  331.             TAIL_OFFSET = UNSAFE.objectFieldOffset(MiniReentryLock.class.getDeclaredField("tail"));
  332.         } catch (Exception e) {
  333.             throw new Error(e);
  334.         }
  335.     }
  336.     private boolean compareAndSetHead(Node update) {
  337.         return UNSAFE.compareAndSwapObject(this, HEAD_OFFSET, null, update);
  338.     }
  339.     private boolean compareAndSetTail(Node expect, Node update) {
  340.         return UNSAFE.compareAndSwapObject(this, TAIL_OFFSET, expect, update);
  341.     }
  342.     private boolean compareAndSetState(int expect, int update) {
  343.         return UNSAFE.compareAndSwapInt(this, STATE_OFFSET, expect, update);
  344.     }
  345.     /**
  346.      * 阻塞的线程被封装成node节点,然后放进FIFO队列
  347.      */
  348.     static final class Node {
  349.         /**
  350.          * 封装的线程本身
  351.          */
  352.         Thread thread;
  353.         /**
  354.          * 前置节点引用
  355.          */
  356.         Node pv;
  357.         /**
  358.          * 后置节点引用
  359.          */
  360.         Node nx;
  361.         public Node(Thread thread) {
  362.             this.thread = thread;
  363.         }
  364.         public Node() {
  365.         }
  366.     }
  367.     public int getState() {
  368.         return state;
  369.     }
  370.     private void setHead(Node node) {
  371.         this.head = node;
  372.         // 当前线程已经是获取到锁的线程
  373.         node.thread = null;
  374.         node.pv = null;
  375.     }
  376.     public void setState(int state) {
  377.         this.state = state;
  378.     }
  379.     public Thread getExclusiveOwnerThread() {
  380.         return exclusiveOwnerThread;
  381.     }
  382.     public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
  383.         this.exclusiveOwnerThread = exclusiveOwnerThread;
  384.     }
  385.     public Node getHead() {
  386.         return head;
  387.     }
  388.     public Node getTail() {
  389.         return tail;
  390.     }
  391.     public void setTail(Node tail) {
  392.         this.tail = tail;
  393.     }
  394.    
  395. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

南七星之家

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表