IT评测·应用市场-qidao123.com技术社区

标题: ZooKeeper实现分布式锁 [打印本页]

作者: 风雨同行    时间: 2025-4-15 07:55
标题: ZooKeeper实现分布式锁
基础

ZooKeeper的4个节点

ZK分布式锁相干基础知识

实现原理

焦点思想

焦点思想就是基于 临时顺序节点 和 Watcher(事件监听器) 实现的。
当客户端要获取锁,则创建节点,利用完锁,则删除该节点。

获取锁步骤:

释放锁步骤:

羊群效应息争决方法

实现

实际项目中,推荐利用 Curator 来实现 ZooKeeper 分布式锁。Curator 是 Netflix 公司开源的一套 ZooKeeper Java 客户端框架,相比于 ZooKeeper 自带的客户端 zookeeper 来说,Curator 的封装更加完善,各种 API 都可以比较方便地利用。
原生API实现
  1. /**
  2. * 自己本身就是一个 watcher,可以得到通知
  3. * AutoCloseable 实现自动关闭,资源不使用的时候
  4. */
  5. @Slf4j
  6. public class ZkLock implements AutoCloseable, Watcher {
  7.     private ZooKeeper zooKeeper;
  8.     /**
  9.      * 记录当前锁的名字
  10.      */
  11.     private String znode;
  12.     public ZkLock() throws IOException {
  13.         this.zooKeeper = new ZooKeeper("localhost:2181",
  14.                 10000,this);
  15.     }
  16.     public boolean getLock(String businessCode) {
  17.         try {
  18.             //创建业务 根节点
  19.             Stat stat = zooKeeper.exists("/" + businessCode, false);
  20.             if (stat==null){
  21.                 zooKeeper.create("/" + businessCode,businessCode.getBytes(),
  22.                         ZooDefs.Ids.OPEN_ACL_UNSAFE,
  23.                         CreateMode.PERSISTENT);
  24.             }
  25.             //创建瞬时有序节点  /order/order_00000001
  26.             znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
  27.                     ZooDefs.Ids.OPEN_ACL_UNSAFE,
  28.                     CreateMode.EPHEMERAL_SEQUENTIAL);
  29.             //获取业务节点下 所有的子节点
  30.             List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
  31.             //获取序号最小的(第一个)子节点
  32.             Collections.sort(childrenNodes);
  33.             String firstNode = childrenNodes.get(0);
  34.             //如果创建的节点是第一个子节点,则获得锁
  35.             if (znode.endsWith(firstNode)){
  36.                 return true;
  37.             }
  38.             //如果不是第一个子节点,则监听前一个节点
  39.             String lastNode = firstNode;
  40.             for (String node:childrenNodes){
  41.                 if (znode.endsWith(node)){
  42.                     zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
  43.                     break;
  44.                 }else {
  45.                     lastNode = node;
  46.                 }
  47.             }
  48.             synchronized (this){
  49.                 wait();
  50.             }
  51.             return true;
  52.         } catch (Exception e) {
  53.             e.printStackTrace();
  54.         }
  55.         return false;
  56.     }
  57.     @Override
  58.     public void close() throws Exception {
  59.         zooKeeper.delete(znode,-1);
  60.         zooKeeper.close();
  61.         log.info("我已经释放了锁!");
  62.     }
  63.     @Override
  64.     public void process(WatchedEvent event) {
  65.         if (event.getType() == Event.EventType.NodeDeleted){
  66.             synchronized (this){
  67.                 notify();
  68.             }
  69.         }
  70.     }
  71. }
复制代码
Curator实现

Curator有五种锁:
  1. CuratorFramework client = ZKUtils.getClient();
  2. client.start();
  3. // 分布式可重入排它锁
  4. InterProcessLock lock1 = new InterProcessMutex(client, lockPath1);
  5. // 分布式不可重入排它锁
  6. InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, lockPath2);
  7. // 将多个锁作为一个整体
  8. InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
  9. if (!lock.acquire(10, TimeUnit.SECONDS)) {
  10.    throw new IllegalStateException("不能获取多锁");
  11. }
  12. System.out.println("已获取多锁");
  13. System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
  14. System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
  15. try {
  16.     // 资源操作
  17.     resource.use();
  18. } finally {
  19.     System.out.println("释放多个锁");
  20.     lock.release();
  21. }
  22. System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
  23. System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
  24. client.close();
复制代码
Curator实现可重入锁


当调用 InterProcessMutex#acquire方法获取锁的时间,会调用InterProcessMutex#internalLock方法。
  1. // 获取可重入互斥锁,直到获取成功为止
  2. @Override
  3. public void acquire() throws Exception {
  4.   if (!internalLock(-1, null)) {
  5.     throw new IOException("Lost connection while trying to acquire lock: " + basePath);
  6.   }
  7. }
复制代码
internalLock 方法会先获取当前哀求锁的线程,然后从 threadData( ConcurrentMap 类型)中获取当前线程对应的 lockData 。 lockData 包含锁的信息和加锁的次数,是实现可重入锁的关键。
第一次获取锁的时间,lockData为 null。获取锁成功之后,会将当前线程和对应的 lockData 放到 threadData 中
  1. private boolean internalLock(long time, TimeUnit unit) throws Exception {
  2.   // 获取当前请求锁的线程
  3.   Thread currentThread = Thread.currentThread();
  4.   // 拿对应的 lockData
  5.   LockData lockData = threadData.get(currentThread);
  6.   // 第一次获取锁的话,lockData 为 null
  7.   if (lockData != null) {
  8.     // 当前线程获取过一次锁之后
  9.     // 因为当前线程的锁存在, lockCount 自增后返回,实现锁重入.
  10.     lockData.lockCount.incrementAndGet();
  11.     return true;
  12.   }
  13.   // 尝试获取锁
  14.   String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
  15.   if (lockPath != null) {
  16.     LockData newLockData = new LockData(currentThread, lockPath);
  17.      // 获取锁成功之后,将当前线程和对应的 lockData 放到 threadData 中
  18.     threadData.put(currentThread, newLockData);
  19.     return true;
  20.   }
  21.   return false;
  22. }
复制代码
LockData是 InterProcessMutex中的一个静态内部类。
  1. private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
  2. private static class LockData
  3. {
  4.     // 当前持有锁的线程
  5.     final Thread owningThread;
  6.     // 锁对应的子节点
  7.     final String lockPath;
  8.     // 加锁的次数
  9.     final AtomicInteger lockCount = new AtomicInteger(1);
  10.     private LockData(Thread owningThread, String lockPath)
  11.     {
  12.       this.owningThread = owningThread;
  13.       this.lockPath = lockPath;
  14.     }
  15. }
复制代码
如果已经获取过一次锁,后面再来获取锁的话,直接就会在 if (lockData != null) 这里被拦下了,然后就会实行lockData.lockCount.incrementAndGet(); 将加锁次数加 1。
整个可重入锁的实现逻辑非常简单,直接在客户端判断当前线程有没有获取锁,有的话直接将加锁次数加 1 就可以了。
案例-模拟12306售票

  1. import org.apache.curator.RetryPolicy;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. import java.util.concurrent.TimeUnit;
  7. public class Ticket12306 implements Runnable{
  8.     private int tickets = 10;//数据库的票数
  9.     private InterProcessMutex lock ;
  10.     public Ticket12306(){
  11.         //重试策略
  12.         RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  13.         //2.第二种方式
  14.         //CuratorFrameworkFactory.builder();
  15.         CuratorFramework client = CuratorFrameworkFactory.builder()
  16.                 .connectString("192.168.149.135:2181")
  17.                 .sessionTimeoutMs(60 * 1000)
  18.                 .connectionTimeoutMs(15 * 1000)
  19.                 .retryPolicy(retryPolicy)
  20.                 .build();
  21.         //开启连接
  22.         client.start();
  23.         lock = new InterProcessMutex(client,"/lock");
  24.     }
  25.     @Override
  26.     public void run() {
  27.         while(true){
  28.             //获取锁
  29.             try {
  30.                 lock.acquire(3, TimeUnit.SECONDS);
  31.                 if(tickets > 0){
  32.                     System.out.println(Thread.currentThread()+":"+tickets);
  33.                     Thread.sleep(100);
  34.                     tickets--;
  35.                 }
  36.             } catch (Exception e) {
  37.                 e.printStackTrace();
  38.             }finally {
  39.                 //释放锁
  40.                 try {
  41.                     lock.release();
  42.                 } catch (Exception e) {
  43.                     e.printStackTrace();
  44.                 }
  45.             }
  46.         }
  47.     }
  48. }
复制代码
测试方法:
  1. import org.apache.curator.RetryPolicy;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.cache.*;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. import org.junit.After;
  7. import org.junit.Before;
  8. import org.junit.Test;
  9. public class LockTest {
  10.     public static void main(String[] args) {
  11.         Ticket12306 ticket12306 = new Ticket12306();
  12.         //创建客户端
  13.         Thread t1 = new Thread(ticket12306,"去哪儿");
  14.         Thread t2 = new Thread(ticket12306,"飞猪");
  15.         t1.start();
  16.         t2.start();
  17.     }
  18. }
复制代码
优缺点

优点:
缺点:
有序临时节点的机制确保了获取锁的顺序,避免了循环等待,从而有用地避免了死锁标题。因为任何一个客户端在释放锁之前都会删除自己的节点,从而触发下一个等待的客户端获取锁。
需要注意的是,这种机制虽然能够有用避免死锁,但也大概带来性能标题。当某个客户端释放锁时,需要触发全部等待的客户端获取锁,大概会导致较多的网络通讯和监听事件。因此,在高并发环境下,需要综合考虑性能和锁的可靠性。
总的来说,基于 ZooKeeper 的分布式锁能够确保数据一致性和锁的可靠性,但需要权衡性能和复杂性。在选择时,需要根据具体场景来决定是否利用该种锁机制。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4