Zookeeper是什么?基于zookeeper实现分布式锁

打印 上一主题 下一主题

主题 1494|帖子 1494|积分 4482

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
zookeeper听的很多,但实际在应用开发中用的不错,主要是作为中心件配合使用的,比方:Kafka。
相识zk首先必要知道它的数据结构,可以想象为树、文件夹目次。每个节点有基本的信息,比方:创建时间、修改时间、版本,数据长度等。别的节点可以设置data,也就是数据,以字节的方式进行插入/获取,别的节点还拥有权限和状态。
状态很关键,有持久、临时(会话级别)、持久+次序、临时+次序、持久+TTL、临时+TTL。
次序是给同一个节点增加一个编号,比方:path:/distributed_locks/lock
插入多个,在zk中是:/distributed_locks/lock0000000001和/distributed_locks/lock0000000002、、。
到这里数据结构已经大致清晰了,那么zk存在的意义是什么?
首先,zk的定义:是一个集中式服务,用于维护配置信息、命名、提供分布式同步和提供组服务。
关键点:集中、分布式。
在步调进行分布式、多节点摆设时,传统步调内存中的变量大概锁机制等都不能在多节点中进行通用。此时,就必要一个集中式的一个中心件,在中心件上存储我们必要同时方案的变量大概其他定义。
那么,我们为什么不直接使用db数据库呢,可能是因为重?也可能是一些特殊的功能db中并不能实现?(临时会话、TTL?)。
作为目前很火热的一个中心件,存在它的意义肯定是有的。为什么说呢,zk是Java实现的,与 Hadoop、Kafka 等 Java 生态项目无缝集成。同理,可以想象,每个语言的特性不一致,都会有不同的中心件大概包。
上述,基本都是个人的一些理解,希望能给大家带来点启发。
zookeeper,咱们的扩展功能到分布式锁这里。通过节点的特性,我们接纳会话级别、次序性子的节点进行实现。
当我们的线程必要去尝试获取锁时,毗连zk肯定是个会话,同时zk会根据次序将不同的线程进行排序,线程内部只必要轮询、wait/notify等方式判定是否轮到本身得到锁了。获取到锁后,实行业务逻辑之后,随之可以将锁进行开释,以便让别的一个线程得到锁。
   代码实现用2种方式实现:
  原生zookeeper方法实现

  1. package com.fahe.testdistrubutedlock.zk;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.zookeeper.*;
  4. import org.apache.zookeeper.data.Stat;
  5. import java.util.List;
  6. import java.util.concurrent.CountDownLatch;
  7. /**
  8. * @program: test-distrubuted-lock
  9. * @description: client
  10. * @author: <linfahe-694204477@qq.com>
  11. * @create: 2025-04-23 14:05
  12. **/
  13. @Slf4j
  14. public class ZkClient implements Watcher {
  15.    public static final String ZK_ADDR = "127.0.0.1:32181";
  16.    public ZooKeeper zk;
  17.    public CountDownLatch connectedSignal = new CountDownLatch(1);
  18.    public ZkClient() {
  19.        try {
  20.            zk = new ZooKeeper(ZK_ADDR, 3000, this);
  21.            connectedSignal.await(); // 等待连接成功
  22.        } catch (Exception e) {
  23.            throw new RuntimeException(e);
  24.        }
  25.    }
  26.    @Override
  27.    public void process(WatchedEvent watchedEvent) {
  28.        log.info("process WatchedEvent : {}", watchedEvent);
  29.        if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
  30.            connectedSignal.countDown();
  31.        }
  32.    }
  33.    // 创建持久节点
  34.    public void createNode() throws KeeperException, InterruptedException {
  35.        Stat existsed = zk.exists("/my-node", false);
  36.        if (existsed != null) {
  37. //            zk.delete("/my-node", -1);
  38.            return;
  39.        }
  40.        String path = zk.create("/my-node", "data".getBytes(),
  41.                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  42.        System.out.println("创建节点:" + path);
  43.    }
  44.    // 获取节点数据
  45.    public void getData() throws KeeperException, InterruptedException {
  46.        byte[] data = zk.getData("/my-node", false, null);
  47.        System.out.println("节点数据:" + new String(data));
  48.    }
  49.    public static void main(String[] args) throws InterruptedException, KeeperException {
  50.        ZkClient zkClient = new ZkClient();
  51.        List<String> children = zkClient.zk.getChildren("/", true);
  52.        for (String child : children) {
  53.            log.info("child : {}", child);
  54.        }
  55.        zkClient.createNode();
  56.        zkClient.getData();
  57.    }
  58.    public void close() {
  59.        try {
  60.            if (zk != null) {
  61.                zk.close();
  62.            }
  63.        } catch (InterruptedException e) {
  64.            throw new RuntimeException(e);
  65.        }
  66.    }
  67. }
  68. package com.fahe.testdistrubutedlock.zk;
  69. import org.apache.zookeeper.*;
  70. import org.apache.zookeeper.data.Stat;
  71. import java.util.Collections;
  72. import java.util.List;
  73. public class DistributedLock {
  74.    private static final String LOCK_ROOT = "/locks";
  75.    private static final String LOCK_NODE = LOCK_ROOT + "/lock_";
  76.    private ZooKeeper zooKeeper;
  77.    private String lockPath;
  78.    public DistributedLock(ZooKeeper zooKeeper) throws Exception {
  79.        this.zooKeeper = zooKeeper;
  80.        Stat stat = zooKeeper.exists(LOCK_ROOT, false);
  81.        if (stat == null) {
  82.            zooKeeper.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  83.        }
  84.    }
  85.    public void acquireLock() throws Exception {
  86.        lockPath = zooKeeper.create(LOCK_NODE, "new byte[0]".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE
  87.                , CreateMode.EPHEMERAL_SEQUENTIAL);
  88.        System.out.println("Lock path: " + lockPath);
  89.        while (true) {
  90.            List<String> children = zooKeeper.getChildren(LOCK_ROOT, false);
  91.            Collections.sort(children);
  92.            String smallestChild = LOCK_ROOT + "/" + children.get(0);
  93.            if (lockPath.equals(smallestChild)) {
  94.                System.out.println("Acquired lock: " + lockPath);
  95.                return;
  96.            }
  97.            System.out.println("Waiting for lock: " + lockPath + "; smallestChild : " + smallestChild);
  98.            String watchNode = null;
  99.            for (int i = children.size() - 1; i >= 0; i--) {
  100.                String child = LOCK_ROOT + "/" + children.get(i);
  101.                if (child.compareTo(lockPath) < 0) {
  102.                    watchNode = child;
  103.                    break;
  104.                }
  105.            }
  106.            System.out.println("Waiting for lock: " + lockPath + "; smallestChild : " + smallestChild + " ; watchNode = " + watchNode);
  107.            if (watchNode != null) {
  108.                final Object lock = new Object();
  109.                Watcher watcher = new Watcher() {
  110.                    @Override
  111.                    public void process(WatchedEvent event) {
  112.                        synchronized (lock) {
  113.                            lock.notifyAll();
  114.                        }
  115.                    }
  116.                };
  117.                Stat stat = zooKeeper.exists(watchNode, watcher);
  118.                if (stat != null) {
  119.                    synchronized (lock) {
  120.                        lock.wait();
  121.                    }
  122.                }
  123.            }
  124.        }
  125.    }
  126.    public void releaseLock() throws Exception {
  127.        if (lockPath != null) {
  128.            zooKeeper.delete(lockPath, -1);
  129.            System.out.println("Released lock: " + lockPath);
  130.            lockPath = null;
  131.        }
  132.    }
  133.    public static void main(String[] args) {
  134.        ZkClient client = new ZkClient();
  135.        // 模拟多线程。
  136.        for (int i = 0; i < 30; i++) {
  137.            new Thread(() -> {
  138.                try {
  139.                    mainTest(client);
  140.                } catch (Exception e) {
  141.                    e.printStackTrace();
  142.                }
  143.            }).start();
  144.        }
  145.        // 模拟多实例。
  146.        ZkClient client2 = new ZkClient();
  147.        for (int i = 0; i < 30; i++) {
  148.            new Thread(() -> {
  149.                try {
  150.                    mainTest(client2);
  151.                } catch (Exception e) {
  152.                    e.printStackTrace();
  153.                }
  154.            }).start();
  155.        }
  156.    }
  157.    public static void mainTest(ZkClient client) {
  158. //         = new ZkClient();
  159.        try {
  160.            ZooKeeper zooKeeper = client.zk;
  161.            DistributedLock lock = new DistributedLock(zooKeeper);
  162.            lock.acquireLock();
  163.            System.out.println("Lock acquired");
  164.            // 模拟业务逻辑
  165.            int randomSleepTime = (int) (Math.random() * 100);
  166.            System.out.println("randomSleepTime = " + randomSleepTime);
  167.            Thread.sleep(randomSleepTime);
  168.            System.out.println("Business logic completed");
  169.            lock.releaseLock();
  170. //            client.close();
  171.        } catch (Exception e) {
  172.            e.printStackTrace();
  173.        }
  174.    }
  175. }
复制代码
使用Curator三方包实现:

  1. package com.fahe.testdistrubutedlock.zk;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. /**
  7. * @program: test-distrubuted-lock
  8. * @description: curator 测试
  9. * @author: <linfahe-694204477@qq.com>
  10. * @create: 2025-04-23 15:04
  11. **/
  12. public class CuratorMain {
  13.    private final InterProcessMutex lock;
  14.    private static final String LOCK_PATH = "/distributed_lock/my_lock";
  15.    private static final String ZK_ADDR = "127.0.0.1:32181";
  16.    public CuratorMain() {
  17.        CuratorFramework client = CuratorFrameworkFactory.newClient(
  18.                ZK_ADDR,
  19.                new ExponentialBackoffRetry(200, 2));
  20.        client.start();
  21.        this.lock = new InterProcessMutex(client, LOCK_PATH);
  22.    }
  23.    public boolean acquireLock() {
  24.        try {
  25.            lock.acquire();
  26.            return true;
  27.        } catch (Exception e) {
  28.            e.printStackTrace();
  29.            return false;
  30.        }
  31.    }
  32.    public void releaseLock() {
  33.        try {
  34.            if (lock.isAcquiredInThisProcess()) {
  35.                lock.release();
  36.            }
  37.        } catch (Exception e) {
  38.            e.printStackTrace();
  39.        }
  40.    }
  41.    public static void main(String[] args) {
  42.        CuratorMain curatorMain = new CuratorMain();
  43.        for (int i = 0; i < 100; i++) {
  44.            new Thread(() -> {
  45.                boolean acquireLock = curatorMain.acquireLock();
  46.                System.out.println("thread-" + Thread.currentThread().getName() + " is running");
  47.                System.out.println("acquireLock = " + acquireLock);
  48.                if (acquireLock) {
  49.                    curatorMain.releaseLock();
  50.                }
  51.            }, "thread-" + i).start();
  52.        }
  53.        CuratorMain curatorMain2 = new CuratorMain();
  54.        for (int i = 100; i < 200; i++) {
  55.            new Thread(() -> {
  56.                boolean acquireLock = curatorMain2.acquireLock();
  57.                System.out.println("thread-" + Thread.currentThread().getName() + " is running");
  58.                System.out.println("acquireLock = " + acquireLock);
  59.                if (acquireLock) {
  60.                    curatorMain2.releaseLock();
  61.                }
  62.            }, "thread-" + i).start();
  63.        }
  64.        try {
  65.            Thread.sleep(3000);
  66.        } catch (InterruptedException e) {
  67.            e.printStackTrace();
  68.        }
  69.    }
  70. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

圆咕噜咕噜

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表