IT评测·应用市场-qidao123.com

标题: 大数据最全图解curator怎样实现zookeeper分布式锁_curator 锁,非科班程序 [打印本页]

作者: 兜兜零元    时间: 2024-7-16 02:49
标题: 大数据最全图解curator怎样实现zookeeper分布式锁_curator 锁,非科班程序



既有得当小白学习的零底子资料,也有得当3年以上履历的小伙伴深入学习提拔的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比力多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲门路、讲授视频,而且后续会持续更新
需要这份系统化资料的朋侪,可以戳这里获取

一、媒介

更多内容见Zookeeper专栏:https://blog.csdn.net/saintmm/category_11579394.html
至此,Zookeeper系列的内容已出:
   1.zookeeper集群搭建
2. Zookeeper集群推举机制
3. Paxos算法解析
4. Zookeeper(curator)实现分布式锁案例
  紧接着上一篇的内容,从源码层面来看curator是怎样实现zookeeper分布式锁的?
二、curator分布式锁种类

curator提供了四种分布式锁,都实现自接口InterProcessLock;
JAVA-doc:https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/locks/package-summary.html
   1> InterProcessMutex
  
  2> InterProcessSemaphoreMutex
  
  3> InterProcessReadWriteLock
  
  4> InterProcessMultiLock
  
  下面以可重入排他锁InterProcessMutex为例,展开讨论;
三、Zookeeper分布式锁概述

1、Zookeeper分布式锁实现思绪

Zookeeper实现排他锁的设计思绪如下:

2、Zookeeper分布式锁办理的问题

   1> 锁无法释放?
  
  2> 互斥壅闭锁?
  
  3> 不可重入?
  
  4> 单点问题?
  
  3、Zookeeper分布式锁优缺点?

   1> 长处?
  
  2> 缺点?
  
  四、InterProcessMute实现分布式锁原理

InterProcessMute首先是一个互斥锁,其次是依靠Zookeeper临时顺序节点实现的分布式锁;对于锁而言,最重要的是保护临界区,让多个线程对临界区的访问互斥;InterProcessMute依靠Zookeeper临时顺序节点的有序性实现分布式情况下互斥,依靠JVM层面的synchronized实现节点监听的互斥(防止羊群效应)。
InterProcessMute的acquire()方法用于获取锁,release()方法用于释放锁。
以如下测试类为例,展开源码分析:
  1. public class LockTest {
  2.     public static void main(String[] args) {
  3.         //重试策略,定义初试时间3s,重试3次
  4.         ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 3);
  5.         //初始化客户端
  6.         CuratorFramework client = CuratorFrameworkFactory.builder()
  7.                 .connectString("127.0.0.1:2181")
  8.                 .sessionTimeoutMs(3000)
  9.                 .connectionTimeoutMs(3000)
  10.                 .retryPolicy(exponentialBackoffRetry)
  11.                 .build();
  12.         // start()开始连接,没有此会报错
  13.         client.start();
  14.         //利用zookeeper的类似于文件系统的特性进行加锁 第二个参数指定锁的路径
  15.         InterProcessMutex interProcessMutex = new InterProcessMutex(client, "/lock");
  16.         try {
  17.             //加锁
  18.             interProcessMutex.acquire();
  19.             System.out.println(Thread.currentThread().getName() + "获取锁成功");
  20.             Thread.sleep(60\_000);
  21.         } catch (Exception e) {
  22.             e.printStackTrace();
  23.         } finally {
  24.             try {
  25.                 //释放锁
  26.                 interProcessMutex.release();
  27.                 System.out.println(Thread.currentThread().getName() + "释放锁成功");
  28.             } catch (Exception e) {
  29.                 e.printStackTrace();
  30.             }
  31.         }
  32.     }
  33. }
复制代码
1、加锁流程(acquire()方法)

InterProcessMutex#acquire()方法:

acquire()方法中直接调用internalLock()方法以不加锁成功就一直等候的方式加锁;
假如加锁出现非常,则直接抛出IOException。
0)加锁流程图


1)internalLock()

  1. private boolean internalLock(long time, TimeUnit unit) throws Exception
  2. {
  3.     /\*
  4. Note on concurrency: a given lockData instance
  5. can be only acted on by a single thread so locking isn't necessary
  6. \*/
  7.     // 当前线程
  8.     Thread currentThread = Thread.currentThread();
  9.     // 当前线程持有的锁信息
  10.     LockData lockData = threadData.get(currentThread);
  11.     if ( lockData != null )
  12.     {
  13.         // 可重入,lockCount +1;
  14.         // 此处只在本地变量变化了,没发生任何网络请求;对比redisson的分布式锁可重入的实现是需要操作redis的
  15.         lockData.lockCount.incrementAndGet();
  16.         return true;
  17.     }
  18.     // 进行加锁,继续往里跟
  19.     String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
  20.     if ( lockPath != null )
  21.     {
  22.         // 加锁成功
  23.         LockData newLockData = new LockData(currentThread, lockPath);
  24.         // 放入map
  25.         threadData.put(currentThread, newLockData);
  26.         return true;
  27.     }
  28.     return false;
  29. }
复制代码
internalLock()方法有两个入参:long类型的time 和 TimeUnit类型的 unit 共同表示加锁的超时时间。
一个InterProcessMutex在同一个JVM中可以由多个线程共同操纵,由于其可重入性体如今JVM的线程层面,以是其维护了一个Map类型的变量threadData:
  1. private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
复制代码
用于记录每个线程持有的锁信息;锁信息采用LockData表示;
LockData

LockData是InterProcessMutex的静态内部类,其仅有三个变量:持有锁的线程、锁路径、锁重入的次数;
  1. private static class LockData {
  2.     // 持有锁的线程
  3.     final Thread owningThread;
  4.     // 锁的路径
  5.     final String lockPath;
  6.     // 重入锁的次数
  7.     final AtomicInteger lockCount = new AtomicInteger(1);
  8.     private LockData(Thread owningThread, String lockPath)
  9.     {
  10.         this.owningThread = owningThread;
  11.         this.lockPath = lockPath;
  12.     }
  13. }
复制代码
internalLock()方法逻辑

     
    
  2)LockInternals#attemptLock() --> 实验加锁

  1. String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
  2.     final long startMillis = System.currentTimeMillis();
  3.     // 将时间统一格式化ms
  4.     final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
  5.     final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
  6.     int retryCount = 0;
  7.     String ourPath = null;
  8.     boolean hasTheLock = false;
  9.     boolean isDone = false;
  10.     while (!isDone) {
  11.         isDone = true;
  12.         try {
  13.             // 创建临时有序节点
  14.             ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
  15.             // 判断是否为第一个节点 如果是表明加锁成功。跟进去
  16.             hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
  17.         } catch (KeeperException.NoNodeException e) {
  18.             // 重试机制
  19.             // gets thrown by StandardLockInternalsDriver when it can't find the lock node
  20.             // this can happen when the session expires, etc. So, if the retry allows, just try it all again
  21.             if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
  22.                 isDone = false;
  23.             } else {
  24.                 throw e;
  25.             }
  26.         }
  27.     }
  28.     if (hasTheLock) {
  29.         return ourPath;
  30.     }
  31.     return null;
  32. }
复制代码
attemptLock()方法有三个入参:long类型的time 和 TimeUnit类型的 unit 共同表示实验加锁的超时时间,字节数组类型的lockNodeBytes表示锁路径对应的节点值。
通过InterProcessMutex#internalLock()方法进入到attemptLock()方法时,lockNodeBytes为null,即:不给锁路径对应的节点赋值。

实验加锁逻辑:
     
    
  1> StandardLockInternalsDriver#createsTheLock() --> 创建临时有序节点

为什么LockInternalsDriver接口的实现是StandardLockInternalsDriver?

createsTheLock()方法:
  1. @Override
  2. public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
  3. {
  4.     String ourPath;
  5.     if ( lockNodeBytes != null )
  6. ![img](https://img-blog.csdnimg.cn/img_convert/ee04d0700449119b0524e360f3c77b70.png)
  7. ![img](https://img-blog.csdnimg.cn/img_convert/80735d878637df0b4a5cc4f1be2a07eb.png)
  8. **网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。**
  9. **[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/forums/4f45ff00ff254613a03fab5e56a57acb)**
  10. **一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
  11. yte[] lockNodeBytes) throws Exception
  12. {
  13.     String ourPath;
  14.     if ( lockNodeBytes != null )
  15. [外链图片转存中...(img-5EJEgcmU-1715428831906)]
  16. [外链图片转存中...(img-ZIBZWhKz-1715428831907)]
  17. **网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。**
  18. **[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/forums/4f45ff00ff254613a03fab5e56a57acb)**
  19. **一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4