大数据最全图解curator怎样实现zookeeper分布式锁_curator 锁,非科班程序 ...

打印 上一主题 下一主题

主题 641|帖子 641|积分 1923




既有得当小白学习的零底子资料,也有得当3年以上履历的小伙伴深入学习提拔的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比力多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲门路、讲授视频,而且后续会持续更新
需要这份系统化资料的朋侪,可以戳这里获取


  • 一、媒介
  • 二、curator分布式锁种类
  • 三、Zookeeper分布式锁概述


    • 1、Zookeeper分布式锁实现思绪

      • 2、Zookeeper分布式锁办理的问题
      • 3、Zookeeper分布式锁优缺点?


  • 四、InterProcessMute实现分布式锁原理


    • 1、加锁流程(acquire()方法)



        • 0)加锁流程图

          • 1)internalLock()


            • LockData

              • internalLock()方法逻辑


          • 2)LockInternals#attemptLock() --> 实验加锁


            • 1> StandardLockInternalsDriver#createsTheLock() --> 创建临时有序节点

              • 2> 判断刚创建的锁路径是否为第一个节点


          • 3)监听器的运作


      • 2、解锁流程(release()方法)


        • 0)解锁流程图

          • 1)文字描述




  • 五、总结


    • 网络IO次数

一、媒介

更多内容见Zookeeper专栏:https://blog.csdn.net/saintmm/category_11579394.html
至此,Zookeeper系列的内容已出:
   1.zookeeper集群搭建
2. Zookeeper集群推举机制
3. Paxos算法解析
4. Zookeeper(curator)实现分布式锁案例
  紧接着上一篇的内容,从源码层面来看curator是怎样实现zookeeper分布式锁的?
二、curator分布式锁种类

curator提供了四种分布式锁,都实现自接口InterProcessLock;
JAVA-doc:https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/locks/package-summary.html
   1> InterProcessMutex
  

  • 可重入排它锁,每成功加锁一次,就要解锁一次。
  2> InterProcessSemaphoreMutex
  

  • 不可重入排他锁
  3> InterProcessReadWriteLock
  

  • 可重入读写锁,读共享,写互斥;
  • 一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁
  • 这意味着写锁可以降级成读锁, 比如请求写锁 —>请求读锁—>释放读锁 —->释放写锁。
  4> InterProcessMultiLock
  

  • 联锁, 将多个锁作为单个实体管理的容器;
  • 当调用acquire(), 全部的锁都会被acquire(),假如请求失败,全部的锁都会被release。 同样调用release时全部的锁都被release(失败被忽略)。
  下面以可重入排他锁InterProcessMutex为例,展开讨论;
三、Zookeeper分布式锁概述

1、Zookeeper分布式锁实现思绪

Zookeeper实现排他锁的设计思绪如下:


  • zk用/lock节点作为分布式锁,当不同的客户端到zk竞争这把锁的时候,zk会按顺序给不同的客户端创建一个临时子节点,挂在作为分布式锁的节点下面。
  • 假设第一个来到的客户端为A,第二个来到的是B,分布式锁节点下挂的第一个节点就是A(/lock/_c_A),B(/lock/_c_B)紧跟着A,且B会监听着A的生命状态;

    • 这里B会先获取到/lock路径下全部的节点,发现本身的锁节点(/lock/_c_B)不在第一位,进而监听本身前一位的锁节点(/lock/_c_A)。

  • 当A释放锁后A节点会被删除;B监听到A被删除,B可以实验得到分布式锁了。

    • 具体表现为:客户端B获取/lock下的全部子节点,并进行排序,判断排在最前面的是否为本身,假如本身的锁节点在第一位,代表取锁成功。
    • 假如还有C节点、D节点,他们都只会监听他们前一个节点,即:C监听B、D监听C。

2、Zookeeper分布式锁办理的问题

   1> 锁无法释放?
  

  • 使用Zookeeper可以有用的办理锁无法释放的问题,由于在创建锁的时候,客户端会在ZK中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session毗连断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次得到锁。
  2> 互斥壅闭锁?
  

  • 使用Zookeeper可以实现壅闭的锁,客户端可以通过在ZK中创建顺序节点,而且在节点上绑定监听器,一旦节点有变革,Zookeeper会关照客户端,客户端可以查抄本身创建的节点是不是当前全部节点中序号最小的,假如是,那么本身就获取到锁,便可以执行业务逻辑了。
  3> 不可重入?
  

  • 使用Zookeeper也可以有用的办理不可重入的问题,客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。假如和本身的信息一样,那么本身直接获取到锁,假如不一样就再创建一个临时的顺序节点,参与列队。
  4> 单点问题?
  

  • 使用Zookeeper可以有用的办理单点问题,ZK是集群摆设的,只要集群中有半数以上的机器存活,就可以对外提供服务。
  3、Zookeeper分布式锁优缺点?

   1> 长处?
  

  • 有用的办理单点问题,不可重入问题,非壅闭问题以及锁无法释放的问题。实现起来较为简单。
  • zookeeper的锁天生是公平锁 根据创建临时节点的顺序。
  2> 缺点?
  

  • 性能上不如使用缓存实现分布式锁。

    • 由于每次在创建锁和释放锁的过程中,都要动态创建、烧毁瞬时节点来实现锁功能。
    • ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到全部的Follower机器上。

  • 加锁不管成功还是失败的第一步是先创建临时节点 这样假如加锁的过多 会对zookeeper的存储压力过大。
  四、InterProcessMute实现分布式锁原理

InterProcessMute首先是一个互斥锁,其次是依靠Zookeeper临时顺序节点实现的分布式锁;对于锁而言,最重要的是保护临界区,让多个线程对临界区的访问互斥;InterProcessMute依靠Zookeeper临时顺序节点的有序性实现分布式情况下互斥,依靠JVM层面的synchronized实现节点监听的互斥(防止羊群效应)。
InterProcessMute的acquire()方法用于获取锁,release()方法用于释放锁。
以如下测试类为例,展开源码分析:
  1. public class LockTest {
  2.     public static void main(String[] args) {
  3.         //重试策略,定义初试时间3s,重试3次
  4.         ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 3);
  5.         //初始化客户端
  6.         CuratorFramework client = CuratorFrameworkFactory.builder()
  7.                 .connectString("127.0.0.1:2181")
  8.                 .sessionTimeoutMs(3000)
  9.                 .connectionTimeoutMs(3000)
  10.                 .retryPolicy(exponentialBackoffRetry)
  11.                 .build();
  12.         // start()开始连接,没有此会报错
  13.         client.start();
  14.         //利用zookeeper的类似于文件系统的特性进行加锁 第二个参数指定锁的路径
  15.         InterProcessMutex interProcessMutex = new InterProcessMutex(client, "/lock");
  16.         try {
  17.             //加锁
  18.             interProcessMutex.acquire();
  19.             System.out.println(Thread.currentThread().getName() + "获取锁成功");
  20.             Thread.sleep(60\_000);
  21.         } catch (Exception e) {
  22.             e.printStackTrace();
  23.         } finally {
  24.             try {
  25.                 //释放锁
  26.                 interProcessMutex.release();
  27.                 System.out.println(Thread.currentThread().getName() + "释放锁成功");
  28.             } catch (Exception e) {
  29.                 e.printStackTrace();
  30.             }
  31.         }
  32.     }
  33. }
复制代码
1、加锁流程(acquire()方法)

InterProcessMutex#acquire()方法:

acquire()方法中直接调用internalLock()方法以不加锁成功就一直等候的方式加锁;
假如加锁出现非常,则直接抛出IOException。
0)加锁流程图


1)internalLock()

  1. private boolean internalLock(long time, TimeUnit unit) throws Exception
  2. {
  3.     /\*
  4. Note on concurrency: a given lockData instance
  5. can be only acted on by a single thread so locking isn't necessary
  6. \*/
  7.     // 当前线程
  8.     Thread currentThread = Thread.currentThread();
  9.     // 当前线程持有的锁信息
  10.     LockData lockData = threadData.get(currentThread);
  11.     if ( lockData != null )
  12.     {
  13.         // 可重入,lockCount +1;
  14.         // 此处只在本地变量变化了,没发生任何网络请求;对比redisson的分布式锁可重入的实现是需要操作redis的
  15.         lockData.lockCount.incrementAndGet();
  16.         return true;
  17.     }
  18.     // 进行加锁,继续往里跟
  19.     String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
  20.     if ( lockPath != null )
  21.     {
  22.         // 加锁成功
  23.         LockData newLockData = new LockData(currentThread, lockPath);
  24.         // 放入map
  25.         threadData.put(currentThread, newLockData);
  26.         return true;
  27.     }
  28.     return false;
  29. }
复制代码
internalLock()方法有两个入参:long类型的time 和 TimeUnit类型的 unit 共同表示加锁的超时时间。
一个InterProcessMutex在同一个JVM中可以由多个线程共同操纵,由于其可重入性体如今JVM的线程层面,以是其维护了一个Map类型的变量threadData:
  1. private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
复制代码
用于记录每个线程持有的锁信息;锁信息采用LockData表示;
LockData

LockData是InterProcessMutex的静态内部类,其仅有三个变量:持有锁的线程、锁路径、锁重入的次数;
  1. private static class LockData {
  2.     // 持有锁的线程
  3.     final Thread owningThread;
  4.     // 锁的路径
  5.     final String lockPath;
  6.     // 重入锁的次数
  7.     final AtomicInteger lockCount = new AtomicInteger(1);
  8.     private LockData(Thread owningThread, String lockPath)
  9.     {
  10.         this.owningThread = owningThread;
  11.         this.lockPath = lockPath;
  12.     }
  13. }
复制代码
internalLock()方法逻辑

   

  • 根据当火线程从InterProcessMutex的threadData变量中获取当火线程持有的锁信息;
  

  • 假如已经持有锁,阐明是JVM层面的锁重入,则直接对LockData.lockCount + 1,然后返回加锁成功。
  • 锁重入的过程是没有产生任何网络请求的;而Redisson分布式锁可重入的实现是需要每次都操纵Redis的。
  

  • 假如未持有锁,则实验加锁;
  

  • 加锁逻辑体如今LockInternals#attemptLock()方法中;
  • 加锁成功,则将加锁的路径和当火线程一起封装为锁数据LockData,以线程为key,LockData为value,作为键值对加入到threadData中;并返回加锁成功
  • 加锁失败,则直接返回加锁失败。
  2)LockInternals#attemptLock() --> 实验加锁

  1. String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
  2.     final long startMillis = System.currentTimeMillis();
  3.     // 将时间统一格式化ms
  4.     final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
  5.     final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
  6.     int retryCount = 0;
  7.     String ourPath = null;
  8.     boolean hasTheLock = false;
  9.     boolean isDone = false;
  10.     while (!isDone) {
  11.         isDone = true;
  12.         try {
  13.             // 创建临时有序节点
  14.             ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
  15.             // 判断是否为第一个节点 如果是表明加锁成功。跟进去
  16.             hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
  17.         } catch (KeeperException.NoNodeException e) {
  18.             // 重试机制
  19.             // gets thrown by StandardLockInternalsDriver when it can't find the lock node
  20.             // this can happen when the session expires, etc. So, if the retry allows, just try it all again
  21.             if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
  22.                 isDone = false;
  23.             } else {
  24.                 throw e;
  25.             }
  26.         }
  27.     }
  28.     if (hasTheLock) {
  29.         return ourPath;
  30.     }
  31.     return null;
  32. }
复制代码
attemptLock()方法有三个入参:long类型的time 和 TimeUnit类型的 unit 共同表示实验加锁的超时时间,字节数组类型的lockNodeBytes表示锁路径对应的节点值。
通过InterProcessMutex#internalLock()方法进入到attemptLock()方法时,lockNodeBytes为null,即:不给锁路径对应的节点赋值。

实验加锁逻辑:
   

  • 首先实验加锁是支持重试机制的;实验加锁的返回值为加锁成功的锁路径,假如加锁未成功则返回null。
  • 通过锁驱动器LockInternalsDriver直接创建Zookeeper的临时有序节点,并返回节点路径;
  

  • 具体逻辑体如今StandardLockInternalsDriver#createsTheLock()方法中;
  

  • 判断节点路径是否为第一个节点,假如是,表明加锁成功;否则等候Watcher唤醒。
  

  • 具体逻辑体如今internalLockLoop()方法中;
  1> StandardLockInternalsDriver#createsTheLock() --> 创建临时有序节点

为什么LockInternalsDriver接口的实现是StandardLockInternalsDriver?


  • 由于在LockInternals构造器被调用时,传入的LockInternalsDriver是StandardLockInternalsDriver。

createsTheLock()方法:
  1. @Override
  2. public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
  3. {
  4.     String ourPath;
  5.     if ( lockNodeBytes != null )
  6. ![img](https://img-blog.csdnimg.cn/img_convert/ee04d0700449119b0524e360f3c77b70.png)
  7. ![img](https://img-blog.csdnimg.cn/img_convert/80735d878637df0b4a5cc4f1be2a07eb.png)
  8. **网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。**
  9. **[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/forums/4f45ff00ff254613a03fab5e56a57acb)**
  10. **一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
  11. yte[] lockNodeBytes) throws Exception
  12. {
  13.     String ourPath;
  14.     if ( lockNodeBytes != null )
  15. [外链图片转存中...(img-5EJEgcmU-1715428831906)]
  16. [外链图片转存中...(img-ZIBZWhKz-1715428831907)]
  17. **网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。**
  18. **[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/forums/4f45ff00ff254613a03fab5e56a57acb)**
  19. **一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

兜兜零元

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表