圆咕噜咕噜 发表于 前天 03:00

Zookeeper是什么?基于zookeeper实现分布式锁

zookeeper听的很多,但实际在应用开发中用的不错,主要是作为中心件配合使用的,比方:Kafka。
相识zk首先必要知道它的数据结构,可以想象为树、文件夹目次。每个节点有基本的信息,比方:创建时间、修改时间、版本,数据长度等。别的节点可以设置data,也就是数据,以字节的方式进行插入/获取,别的节点还拥有权限和状态。
状态很关键,有持久、临时(会话级别)、持久+次序、临时+次序、持久+TTL、临时+TTL。
次序是给同一个节点增加一个编号,比方:path:/distributed_locks/lock
插入多个,在zk中是:/distributed_locks/lock0000000001和/distributed_locks/lock0000000002、、。
到这里数据结构已经大致清晰了,那么zk存在的意义是什么?
首先,zk的定义:是一个集中式服务,用于维护配置信息、命名、提供分布式同步和提供组服务。
关键点:集中、分布式。
在步调进行分布式、多节点摆设时,传统步调内存中的变量大概锁机制等都不能在多节点中进行通用。此时,就必要一个集中式的一个中心件,在中心件上存储我们必要同时方案的变量大概其他定义。
那么,我们为什么不直接使用db数据库呢,可能是因为重?也可能是一些特殊的功能db中并不能实现?(临时会话、TTL?)。
作为目前很火热的一个中心件,存在它的意义肯定是有的。为什么说呢,zk是Java实现的,与 Hadoop、Kafka 等 Java 生态项目无缝集成。同理,可以想象,每个语言的特性不一致,都会有不同的中心件大概包。
上述,基本都是个人的一些理解,希望能给大家带来点启发。
zookeeper,咱们的扩展功能到分布式锁这里。通过节点的特性,我们接纳会话级别、次序性子的节点进行实现。
当我们的线程必要去尝试获取锁时,毗连zk肯定是个会话,同时zk会根据次序将不同的线程进行排序,线程内部只必要轮询、wait/notify等方式判定是否轮到本身得到锁了。获取到锁后,实行业务逻辑之后,随之可以将锁进行开释,以便让别的一个线程得到锁。
   代码实现用2种方式实现:
原生zookeeper方法实现

package com.fahe.testdistrubutedlock.zk;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
* @program: test-distrubuted-lock
* @description: client
* @author: <linfahe-694204477@qq.com>
* @create: 2025-04-23 14:05
**/
@Slf4j
public class ZkClient implements Watcher {

 public static final String ZK_ADDR = "127.0.0.1:32181";
 public ZooKeeper zk;
 public CountDownLatch connectedSignal = new CountDownLatch(1);

 public ZkClient() {
     try {
         zk = new ZooKeeper(ZK_ADDR, 3000, this);
         connectedSignal.await(); // 等待连接成功
     } catch (Exception e) {
         throw new RuntimeException(e);
     }
 }

 @Override
 public void process(WatchedEvent watchedEvent) {
     log.info("process WatchedEvent : {}", watchedEvent);
     if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
         connectedSignal.countDown();
     }
 }


 // 创建持久节点
 public void createNode() throws KeeperException, InterruptedException {
     Stat existsed = zk.exists("/my-node", false);
     if (existsed != null) {
//          zk.delete("/my-node", -1);
         return;
     }
     String path = zk.create("/my-node", "data".getBytes(),
             ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     System.out.println("创建节点:" + path);
 }

 // 获取节点数据
 public void getData() throws KeeperException, InterruptedException {
     byte[] data = zk.getData("/my-node", false, null);
     System.out.println("节点数据:" + new String(data));
 }

 public static void main(String[] args) throws InterruptedException, KeeperException {
     ZkClient zkClient = new ZkClient();
     List<String> children = zkClient.zk.getChildren("/", true);
     for (String child : children) {
         log.info("child : {}", child);
     }
     zkClient.createNode();
     zkClient.getData();
 }

 public void close() {
     try {
         if (zk != null) {
             zk.close();
         }
     } catch (InterruptedException e) {
         throw new RuntimeException(e);
     }
 }
}

package com.fahe.testdistrubutedlock.zk;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;

public class DistributedLock {
 private static final String LOCK_ROOT = "/locks";
 private static final String LOCK_NODE = LOCK_ROOT + "/lock_";
 private ZooKeeper zooKeeper;
 private String lockPath;

 public DistributedLock(ZooKeeper zooKeeper) throws Exception {
     this.zooKeeper = zooKeeper;
     Stat stat = zooKeeper.exists(LOCK_ROOT, false);
     if (stat == null) {
         zooKeeper.create(LOCK_ROOT, new byte, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 }

 public void acquireLock() throws Exception {
     lockPath = zooKeeper.create(LOCK_NODE, "new byte".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE
             , CreateMode.EPHEMERAL_SEQUENTIAL);
     System.out.println("Lock path: " + lockPath);

     while (true) {
         List<String> children = zooKeeper.getChildren(LOCK_ROOT, false);
         Collections.sort(children);
         String smallestChild = LOCK_ROOT + "/" + children.get(0);

         if (lockPath.equals(smallestChild)) {
             System.out.println("Acquired lock: " + lockPath);
             return;
         }
         System.out.println("Waiting for lock: " + lockPath + "; smallestChild : " + smallestChild);
         String watchNode = null;
         for (int i = children.size() - 1; i >= 0; i--) {
             String child = LOCK_ROOT + "/" + children.get(i);
             if (child.compareTo(lockPath) < 0) {
                 watchNode = child;
                 break;
             }
         }
         System.out.println("Waiting for lock: " + lockPath + "; smallestChild : " + smallestChild + " ; watchNode = " + watchNode);

         if (watchNode != null) {
             final Object lock = new Object();
             Watcher watcher = new Watcher() {
                 @Override
                 public void process(WatchedEvent event) {
                     synchronized (lock) {
                         lock.notifyAll();
                     }
                 }
             };

             Stat stat = zooKeeper.exists(watchNode, watcher);
             if (stat != null) {
                 synchronized (lock) {
                     lock.wait();
                 }
             }
         }
     }
 }

 public void releaseLock() throws Exception {
     if (lockPath != null) {
         zooKeeper.delete(lockPath, -1);
         System.out.println("Released lock: " + lockPath);
         lockPath = null;
     }
 }

 public static void main(String[] args) {
     ZkClient client = new ZkClient();
     // 模拟多线程。
     for (int i = 0; i < 30; i++) {
         new Thread(() -> {
             try {
                 mainTest(client);
             } catch (Exception e) {
                 e.printStackTrace();
             }
         }).start();
     }
     // 模拟多实例。
     ZkClient client2 = new ZkClient();
     for (int i = 0; i < 30; i++) {
         new Thread(() -> {
             try {
                 mainTest(client2);
             } catch (Exception e) {
                 e.printStackTrace();
             }
         }).start();
     }
 }

 public static void mainTest(ZkClient client) {
//         = new ZkClient();
     try {
         ZooKeeper zooKeeper = client.zk;

         DistributedLock lock = new DistributedLock(zooKeeper);
         lock.acquireLock();
         System.out.println("Lock acquired");
         // 模拟业务逻辑
         int randomSleepTime = (int) (Math.random() * 100);
         System.out.println("randomSleepTime = " + randomSleepTime);
         Thread.sleep(randomSleepTime);
         System.out.println("Business logic completed");
         lock.releaseLock();
//          client.close();
     } catch (Exception e) {
         e.printStackTrace();
     }
 }
}
​ 使用Curator三方包实现:

package com.fahe.testdistrubutedlock.zk;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;


/**
* @program: test-distrubuted-lock
* @description: curator 测试
* @author: <linfahe-694204477@qq.com>
* @create: 2025-04-23 15:04
**/
public class CuratorMain {
   private final InterProcessMutex lock;
   private static final String LOCK_PATH = "/distributed_lock/my_lock";
   private static final String ZK_ADDR = "127.0.0.1:32181";

   public CuratorMain() {
       CuratorFramework client = CuratorFrameworkFactory.newClient(
               ZK_ADDR,
               new ExponentialBackoffRetry(200, 2));
       client.start();
       this.lock = new InterProcessMutex(client, LOCK_PATH);
 }

   public boolean acquireLock() {
       try {
           lock.acquire();
           return true;
     } catch (Exception e) {
           e.printStackTrace();
           return false;
     }
 }

   public void releaseLock() {
       try {
           if (lock.isAcquiredInThisProcess()) {
               lock.release();
         }
     } catch (Exception e) {
           e.printStackTrace();
     }
 }

   public static void main(String[] args) {
       CuratorMain curatorMain = new CuratorMain();
       for (int i = 0; i < 100; i++) {
           new Thread(() -> {
               boolean acquireLock = curatorMain.acquireLock();
               System.out.println("thread-" + Thread.currentThread().getName() + " is running");
               System.out.println("acquireLock = " + acquireLock);
               if (acquireLock) {
                   curatorMain.releaseLock();
             }
         }, "thread-" + i).start();
     }
       CuratorMain curatorMain2 = new CuratorMain();
       for (int i = 100; i < 200; i++) {
           new Thread(() -> {
               boolean acquireLock = curatorMain2.acquireLock();
               System.out.println("thread-" + Thread.currentThread().getName() + " is running");
               System.out.println("acquireLock = " + acquireLock);
               if (acquireLock) {
                   curatorMain2.releaseLock();
             }
         }, "thread-" + i).start();
     }
       try {
           Thread.sleep(3000);
     } catch (InterruptedException e) {
           e.printStackTrace();
     }
 }
}


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Zookeeper是什么?基于zookeeper实现分布式锁