ZooKeeper实现分布式锁

打印 上一主题 下一主题

主题 1585|帖子 1585|积分 4755

基础

ZooKeeper的4个节点


  • 持久节点:默认的节点类型,不停存在于ZooKeeper中
  • 持久顺序节点:在创建节点时,ZooKeeper根据节点创建的时间顺序对节点举行编号
  • 临时节点:当客户端与ZooKeeper断开毗连后,该历程创建的临时节点就会被删除
  • 临时顺序节点:按时间顺序编号的临时节点
ZK分布式锁相干基础知识


  • zk分布式锁一样平常由多个节点构成(单数),采用 zab 一致性协议。因此可以将 zk 看成一个单点结构,对其修改数据其内部自动将全部节点数据举行修改而后才提供查询服务。
  • zk 的数据以目次树的形式,每个目次称为 znode, znode 中可存储数据(一样平常不凌驾 1M),还可以在其中增长子节点。
  • 子节点有三种类型。序列化节点,每在该节点下增长一个节点自动给该节点的名称上自增。临时节点,一旦创建这个 znode 的客户端与服务器失去接洽,这个 znode 也将自动删除。最后就是普通节点。
  • Watch 机制,client 可以监控每个节点的变革,当产生变革会给 client 产生一个事件。
实现原理

焦点思想

焦点思想就是基于 临时顺序节点 和 Watcher(事件监听器) 实现的。
当客户端要获取锁,则创建节点,利用完锁,则删除该节点。

  • 客户端获取锁时,在lock节点下创建临时顺序节点。

    • 临时是防止客户端宕机后,无法正常删除锁的环境
    • 利用顺序节点,是因为全部尝试获取锁的客户端都会对持有锁的子节点加监听器。当该锁被释放之后,势必会造成全部尝试获取锁的客户端来争夺锁,这样对性能不友好。利用顺序节点之后,只需要监听前一个节点就好了,对性能更友好

  • 然后获取lock下面的全部子节点,客户端获取到全部的子节点之后,如果发现自己创建的子节点序号最小,那么就以为该客户端获取到了锁。利用完锁后,将该节点删除。
  • 如果发现自己创建的节点并非lock全部子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
  • 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应关照,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

获取锁步骤:


  • 在 /lock 节点下创建一个有序临时节点 (EPHEMERAL_SEQUENTIAL)。
  • 判断创建的节点序号是否最小,如果是最小则获取锁成功。不是则取锁失败,然后 watch 序号比本身小的前一个节点。
  • 当取锁失败,设置 watch 后则等待 watch 事件到来后,再次判断是否序号最小。
  • 取锁成功则实行代码,最后释放锁(删除该节点)
释放锁步骤:


  • 成功获取锁的客户端在实行完业务流程之后,会将对应的子节点删除。
  • 成功获取锁的客户端在出现故障之后,对应的子节点由于是临时顺序节点,也会被自动删除,避免了锁无法被释放。
  • 事件监听器实在监听的就是这个子节点删除事件,子节点删除就意味着锁被释放。
羊群效应息争决方法


  • 羊群效应

    • 在整个分布式锁的竞争过程中,大量的「Watcher关照」和「子节点列表的获取」操纵重复运行,而且大多数节点的运行结果都是判断出自己当前并不是编号最小的节点,继续等待下一次关照,而不是实行业务逻辑
    • 这就会对 ZooKeeper 服务器造成巨大的性能影响和网络冲击。更甚的是,如果同一时间多个节点对应的客户端完成事务或事务中断引起节点消失,ZooKeeper 服务器就会在短时间内向其他客户端发送大量的事件关照

  • 解决方法

    • 在与该方法对应的持久节点的目次下,为每个历程创建一个临时顺序节点
    • 每个历程获取全部临时节点列表,对比自己的编号是否最小,若最小,则获得锁。
    • 若本历程对应的临时节点编号不是最小的,则继续判断

      • 若本历程为读哀求,则向比自己序号小的最后一个写哀求节点注册watch监听,当监听到该节点释放锁后,则获取锁
      • 若本历程为写哀求,则向比自己序号小的最后一个读哀求节点注册watch监听,当监听到该节点释放锁后,获取锁


实现

实际项目中,推荐利用 Curator 来实现 ZooKeeper 分布式锁。Curator 是 Netflix 公司开源的一套 ZooKeeper Java 客户端框架,相比于 ZooKeeper 自带的客户端 zookeeper 来说,Curator 的封装更加完善,各种 API 都可以比较方便地利用。
原生API实现
  1. /**
  2. * 自己本身就是一个 watcher,可以得到通知
  3. * AutoCloseable 实现自动关闭,资源不使用的时候
  4. */
  5. @Slf4j
  6. public class ZkLock implements AutoCloseable, Watcher {
  7.     private ZooKeeper zooKeeper;
  8.     /**
  9.      * 记录当前锁的名字
  10.      */
  11.     private String znode;
  12.     public ZkLock() throws IOException {
  13.         this.zooKeeper = new ZooKeeper("localhost:2181",
  14.                 10000,this);
  15.     }
  16.     public boolean getLock(String businessCode) {
  17.         try {
  18.             //创建业务 根节点
  19.             Stat stat = zooKeeper.exists("/" + businessCode, false);
  20.             if (stat==null){
  21.                 zooKeeper.create("/" + businessCode,businessCode.getBytes(),
  22.                         ZooDefs.Ids.OPEN_ACL_UNSAFE,
  23.                         CreateMode.PERSISTENT);
  24.             }
  25.             //创建瞬时有序节点  /order/order_00000001
  26.             znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
  27.                     ZooDefs.Ids.OPEN_ACL_UNSAFE,
  28.                     CreateMode.EPHEMERAL_SEQUENTIAL);
  29.             //获取业务节点下 所有的子节点
  30.             List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
  31.             //获取序号最小的(第一个)子节点
  32.             Collections.sort(childrenNodes);
  33.             String firstNode = childrenNodes.get(0);
  34.             //如果创建的节点是第一个子节点,则获得锁
  35.             if (znode.endsWith(firstNode)){
  36.                 return true;
  37.             }
  38.             //如果不是第一个子节点,则监听前一个节点
  39.             String lastNode = firstNode;
  40.             for (String node:childrenNodes){
  41.                 if (znode.endsWith(node)){
  42.                     zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
  43.                     break;
  44.                 }else {
  45.                     lastNode = node;
  46.                 }
  47.             }
  48.             synchronized (this){
  49.                 wait();
  50.             }
  51.             return true;
  52.         } catch (Exception e) {
  53.             e.printStackTrace();
  54.         }
  55.         return false;
  56.     }
  57.     @Override
  58.     public void close() throws Exception {
  59.         zooKeeper.delete(znode,-1);
  60.         zooKeeper.close();
  61.         log.info("我已经释放了锁!");
  62.     }
  63.     @Override
  64.     public void process(WatchedEvent event) {
  65.         if (event.getType() == Event.EventType.NodeDeleted){
  66.             synchronized (this){
  67.                 notify();
  68.             }
  69.         }
  70.     }
  71. }
复制代码
Curator实现

Curator有五种锁:

  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
  • InterProcessMutex:分布式可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量
  1. CuratorFramework client = ZKUtils.getClient();
  2. client.start();
  3. // 分布式可重入排它锁
  4. InterProcessLock lock1 = new InterProcessMutex(client, lockPath1);
  5. // 分布式不可重入排它锁
  6. InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, lockPath2);
  7. // 将多个锁作为一个整体
  8. InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
  9. if (!lock.acquire(10, TimeUnit.SECONDS)) {
  10.    throw new IllegalStateException("不能获取多锁");
  11. }
  12. System.out.println("已获取多锁");
  13. System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
  14. System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
  15. try {
  16.     // 资源操作
  17.     resource.use();
  18. } finally {
  19.     System.out.println("释放多个锁");
  20.     lock.release();
  21. }
  22. System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
  23. System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
  24. client.close();
复制代码
Curator实现可重入锁


当调用 InterProcessMutex#acquire方法获取锁的时间,会调用InterProcessMutex#internalLock方法。
  1. // 获取可重入互斥锁,直到获取成功为止
  2. @Override
  3. public void acquire() throws Exception {
  4.   if (!internalLock(-1, null)) {
  5.     throw new IOException("Lost connection while trying to acquire lock: " + basePath);
  6.   }
  7. }
复制代码
internalLock 方法会先获取当前哀求锁的线程,然后从 threadData( ConcurrentMap 类型)中获取当前线程对应的 lockData 。 lockData 包含锁的信息和加锁的次数,是实现可重入锁的关键。
第一次获取锁的时间,lockData为 null。获取锁成功之后,会将当前线程和对应的 lockData 放到 threadData 中
  1. private boolean internalLock(long time, TimeUnit unit) throws Exception {
  2.   // 获取当前请求锁的线程
  3.   Thread currentThread = Thread.currentThread();
  4.   // 拿对应的 lockData
  5.   LockData lockData = threadData.get(currentThread);
  6.   // 第一次获取锁的话,lockData 为 null
  7.   if (lockData != null) {
  8.     // 当前线程获取过一次锁之后
  9.     // 因为当前线程的锁存在, lockCount 自增后返回,实现锁重入.
  10.     lockData.lockCount.incrementAndGet();
  11.     return true;
  12.   }
  13.   // 尝试获取锁
  14.   String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
  15.   if (lockPath != null) {
  16.     LockData newLockData = new LockData(currentThread, lockPath);
  17.      // 获取锁成功之后,将当前线程和对应的 lockData 放到 threadData 中
  18.     threadData.put(currentThread, newLockData);
  19.     return true;
  20.   }
  21.   return false;
  22. }
复制代码
LockData是 InterProcessMutex中的一个静态内部类。
  1. private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
  2. private static class LockData
  3. {
  4.     // 当前持有锁的线程
  5.     final Thread owningThread;
  6.     // 锁对应的子节点
  7.     final String lockPath;
  8.     // 加锁的次数
  9.     final AtomicInteger lockCount = new AtomicInteger(1);
  10.     private LockData(Thread owningThread, String lockPath)
  11.     {
  12.       this.owningThread = owningThread;
  13.       this.lockPath = lockPath;
  14.     }
  15. }
复制代码
如果已经获取过一次锁,后面再来获取锁的话,直接就会在 if (lockData != null) 这里被拦下了,然后就会实行lockData.lockCount.incrementAndGet(); 将加锁次数加 1。
整个可重入锁的实现逻辑非常简单,直接在客户端判断当前线程有没有获取锁,有的话直接将加锁次数加 1 就可以了。
案例-模拟12306售票

  1. import org.apache.curator.RetryPolicy;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. import java.util.concurrent.TimeUnit;
  7. public class Ticket12306 implements Runnable{
  8.     private int tickets = 10;//数据库的票数
  9.     private InterProcessMutex lock ;
  10.     public Ticket12306(){
  11.         //重试策略
  12.         RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  13.         //2.第二种方式
  14.         //CuratorFrameworkFactory.builder();
  15.         CuratorFramework client = CuratorFrameworkFactory.builder()
  16.                 .connectString("192.168.149.135:2181")
  17.                 .sessionTimeoutMs(60 * 1000)
  18.                 .connectionTimeoutMs(15 * 1000)
  19.                 .retryPolicy(retryPolicy)
  20.                 .build();
  21.         //开启连接
  22.         client.start();
  23.         lock = new InterProcessMutex(client,"/lock");
  24.     }
  25.     @Override
  26.     public void run() {
  27.         while(true){
  28.             //获取锁
  29.             try {
  30.                 lock.acquire(3, TimeUnit.SECONDS);
  31.                 if(tickets > 0){
  32.                     System.out.println(Thread.currentThread()+":"+tickets);
  33.                     Thread.sleep(100);
  34.                     tickets--;
  35.                 }
  36.             } catch (Exception e) {
  37.                 e.printStackTrace();
  38.             }finally {
  39.                 //释放锁
  40.                 try {
  41.                     lock.release();
  42.                 } catch (Exception e) {
  43.                     e.printStackTrace();
  44.                 }
  45.             }
  46.         }
  47.     }
  48. }
复制代码
测试方法:
  1. import org.apache.curator.RetryPolicy;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.cache.*;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. import org.junit.After;
  7. import org.junit.Before;
  8. import org.junit.Test;
  9. public class LockTest {
  10.     public static void main(String[] args) {
  11.         Ticket12306 ticket12306 = new Ticket12306();
  12.         //创建客户端
  13.         Thread t1 = new Thread(ticket12306,"去哪儿");
  14.         Thread t2 = new Thread(ticket12306,"飞猪");
  15.         t1.start();
  16.         t2.start();
  17.     }
  18. }
复制代码
优缺点

优点:

  • 可靠性:ZooKeeper 是一个高可用的分布式协调服务,基于它的分布式锁具有较高的可靠性和稳定性。
  • 顺序性: ZooKeeper 的有序临时节点包管了锁的获取顺序,避免了死锁和竞争标题。
  • 避免死锁:在锁的持有者释放锁之前,其他节点无法获取锁,从而避免了死锁标题。
  • 容错性:即使部分节点发生故障,其他节点仍然可以正常获取锁,包管了系统的稳定性。
缺点:

  • 性能:ZooKeeper 是一个中心化的协调服务,大概在高并发场景下成为性能瓶颈。
  • 复杂性:ZooKeeper 的部署和维护相对复杂,需要一定的运维工作。
  • 单点故障:只管 ZooKeeper 本身是高可用的,但如果 ZooKeeper 集群出现标题,大概会影响到基于它的分布式锁。
有序临时节点的机制确保了获取锁的顺序,避免了循环等待,从而有用地避免了死锁标题。因为任何一个客户端在释放锁之前都会删除自己的节点,从而触发下一个等待的客户端获取锁。
需要注意的是,这种机制虽然能够有用避免死锁,但也大概带来性能标题。当某个客户端释放锁时,需要触发全部等待的客户端获取锁,大概会导致较多的网络通讯和监听事件。因此,在高并发环境下,需要综合考虑性能和锁的可靠性。
总的来说,基于 ZooKeeper 的分布式锁能够确保数据一致性和锁的可靠性,但需要权衡性能和复杂性。在选择时,需要根据具体场景来决定是否利用该种锁机制。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

风雨同行

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表