愛在花開的季節 发表于 2023-7-15 19:05:43

java操作zookeeper

java操作zookeeper


[*]创建一个maven项目在pom文件里引入如下依赖:
<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
    </dependency>
   
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>4.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.0.0</version>
    </dependency>
   
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.21</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.21</version>
    </dependency>
</dependencies>
[*]创建一个测试类进行相关操作的测试


[*]连接客户端
@Before
public void testConnect(){
      ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 10);
      client = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象
                .connectString("192.168.223.131:2181") // 指定连接zookeeper的服务器地址
                .retryPolicy(retryPolicy) // 指定重试策略
                .namespace("test") // 指定命名空间
                .build(); // 建造客户端实例对象
      client.start(); // 启动客户端
    }

[*]关闭客户端
@After
public void testClose(){
      // 关闭客户端
      if (client != null){
            client.close();
      }
    }

[*]创建节点
@Test
public void testCreateNode() throws Exception {
      // 如果没有指定命名空间,那么节点的完整路径为 /node2,如果指定了命名空间,那么节点的完整路径为 /test/node2
      // 如果没有数据,那么节点的数据为当前客户端的ip地址
      // 如果没有指定节点类型,那么节点类型为持久节点
      // CreateMode.EPHEMERAL 临时节点
      // creatingParentsIfNeeded() 如果父节点不存在,那么自动创建父节点
      String path = client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/node3/min", "node3min".getBytes());
      System.out.println(path);
      // 让线程阻塞,不让程序结束,这样可以在zookeeper中看到创建的临时节点,因为临时节点的生命周期是和客户端绑定的
      Thread.sleep(100000);
    }

[*]获取节点数据
@Test
public void testGetData() throws Exception {
      byte[] bytes = client.getData().forPath("/node2");
      System.out.println(new String(bytes));
    }

[*]查询子节点
@Test
public void testGetChildren() throws Exception {
      List<String> childrenList = client.getChildren().forPath("/");
      for (String child : childrenList) {
            System.out.println(child);
      }
    }

[*]查询节点状态信息
@Test
public void testGetStat() throws Exception {
      // Stat类用于存储节点状态信息
      Stat stat = new Stat();
      // storingStatIn(stat) 将节点状态信息存储到stat对象中
      byte[] data = client.getData().storingStatIn(stat).forPath("/node2");
      System.out.println(new String(data));
      System.out.println(stat);
    }

[*]更新节点数据
@Test
public void testSetData() throws Exception {
      client.setData().forPath("/node2/min1", "minqiliang".getBytes());
    }

[*]更新节点数据,带版本号
@Test
public void testSetDataWithVersion() throws Exception {
      Stat stat = new Stat();
      client.getData().storingStatIn(stat).forPath("/node3");
      System.out.println(stat.getVersion());
      client.setData().withVersion(stat.getVersion()).forPath("/node3", "minqiliang".getBytes());
    }

[*]删除节点
@Test
public void testDeleteNode() throws Exception {
      // deletingChildrenIfNeeded() 如果存在子节点,那么先删除子节点,再删除父节点
      client.delete().deletingChildrenIfNeeded().forPath("/node2");
    }

[*]删除节点,必须成功
@Test
public void testDeleteNodeWithVersion() throws Exception {
      // guaranteed() 如果删除失败,那么会在后台一直尝试删除,直到删除成功为止
      client.delete().guaranteed().forPath("/node3");
    }

[*]删除节点,回调函数
@Test
public void testDeleteNodeWithCallback() throws Exception {
      // inBackground() 指定回调函数
      client.delete().guaranteed().inBackground((client, event) -> System.out.println(event)).forPath("/node3");
      Thread.sleep(100000);
    }

[*]监听节点的创建、修改、删除
@Test
public void testNodeCache() throws Exception {
      // 创建一个nodeCache对象
      NodeCache nodeCache = new NodeCache(client, "/node3");
      // 注册监听器
      nodeCache.getListenable().addListener(() -> {
            System.out.println("节点数据发生变化");
            byte[] bytes = nodeCache.getCurrentData().getData();
            System.out.println(new String(bytes));
      });
      // 启动监听器
      nodeCache.start(true);
      while (true){

      }
    }

[*]监听子节点的创建、修改、删除
@Test
public void testpathChildrenCache() throws Exception {
      // 创建一个nodeCache对象
      PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node3",true);
      // 注册监听器
      pathChildrenCache.getListenable().addListener((client,event) -> {
            System.out.println("节点数据发生变化");
            System.out.println(event);
            PathChildrenCacheEvent.Type type = event.getType();
            if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                System.out.println("子节点数据发生变化");
                byte[] data = event.getData().getData();
                System.out.println(new String(data));
            }
      });
      // 启动监听器
      pathChildrenCache.start(true);
      while (true){

      }
    }

[*]树形监听器
@Test
public void testTreeCache() throws Exception {
      // 创建一个nodeCache对象
            TreeCache treeCache = new TreeCache(client, "/node3");
      // 注册监听器
      treeCache.getListenable().addListener((client,event) -> {
            System.out.println("节点数据发生变化");
            System.out.println(event);
            TreeCacheEvent.Type type = event.getType();
            if (type.equals(TreeCacheEvent.Type.NODE_UPDATED)){
                System.out.println("子节点数据发生变化");
                byte[] data = event.getData().getData();
                System.out.println(new String(data));
            }
      });

      // 启动监听器
      treeCache.start();
      while (true){

      }
    }
[*]分布式锁
InterProcessMutex lock = new InterProcessMutex(client, "/lock");
// 获取锁
try {
    // 获取锁
    boolean acquire = lock.acquire(3, TimeUnit.SECONDS);
}catch (Exception e) {
   e.printStackTrace();
}finally {
// 释放锁
try {
      lock.release();
   } catch (Exception e) {
      e.printStackTrace();
   }            
}
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: java操作zookeeper