java操作zookeeper

打印 上一主题 下一主题

主题 915|帖子 915|积分 2745

java操作zookeeper


  • 创建一个maven项目在pom文件里引入如下依赖:
  1. <dependencies>
  2.     <dependency>
  3.       <groupId>junit</groupId>
  4.       <artifactId>junit</artifactId>
  5.       <version>4.10</version>
  6.       <scope>test</scope>
  7.     </dependency>
  8.    
  9.     <dependency>
  10.       <groupId>org.apache.curator</groupId>
  11.       <artifactId>curator-framework</artifactId>
  12.       <version>4.0.0</version>
  13.     </dependency>
  14.     <dependency>
  15.       <groupId>org.apache.curator</groupId>
  16.       <artifactId>curator-recipes</artifactId>
  17.       <version>4.0.0</version>
  18.     </dependency>
  19.    
  20.     <dependency>
  21.       <groupId>org.slf4j</groupId>
  22.       <artifactId>slf4j-api</artifactId>
  23.       <version>1.7.21</version>
  24.     </dependency>
  25.     <dependency>
  26.       <groupId>org.slf4j</groupId>
  27.       <artifactId>slf4j-log4j12</artifactId>
  28.       <version>1.7.21</version>
  29.     </dependency>
  30. </dependencies>
复制代码

  • 创建一个测试类进行相关操作的测试


  • 连接客户端
  1. @Before
  2. public void testConnect(){
  3.         ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 10);
  4.         client = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象
  5.                 .connectString("192.168.223.131:2181") // 指定连接zookeeper的服务器地址
  6.                 .retryPolicy(retryPolicy) // 指定重试策略
  7.                 .namespace("test") // 指定命名空间
  8.                 .build(); // 建造客户端实例对象
  9.         client.start(); // 启动客户端
  10.     }
复制代码

  • 关闭客户端
  1. @After
  2. public void testClose(){
  3.         // 关闭客户端
  4.         if (client != null){
  5.             client.close();
  6.         }
  7.     }
复制代码

  • 创建节点
  1. @Test
  2. public void testCreateNode() throws Exception {
  3.         // 如果没有指定命名空间,那么节点的完整路径为 /node2,如果指定了命名空间,那么节点的完整路径为 /test/node2
  4.         // 如果没有数据,那么节点的数据为当前客户端的ip地址
  5.         // 如果没有指定节点类型,那么节点类型为持久节点
  6.         // CreateMode.EPHEMERAL 临时节点
  7.         // creatingParentsIfNeeded() 如果父节点不存在,那么自动创建父节点
  8.         String path = client.create()
  9.                 .creatingParentsIfNeeded()
  10.                 .withMode(CreateMode.EPHEMERAL)
  11.                 .forPath("/node3/min", "node3min".getBytes());
  12.         System.out.println(path);
  13.         // 让线程阻塞,不让程序结束,这样可以在zookeeper中看到创建的临时节点,因为临时节点的生命周期是和客户端绑定的
  14.         Thread.sleep(100000);
  15.     }
复制代码

  • 获取节点数据
  1. @Test
  2. public void testGetData() throws Exception {
  3.         byte[] bytes = client.getData().forPath("/node2");
  4.         System.out.println(new String(bytes));
  5.     }
复制代码

  • 查询子节点
  1. @Test
  2. public void testGetChildren() throws Exception {
  3.         List<String> childrenList = client.getChildren().forPath("/");
  4.         for (String child : childrenList) {
  5.             System.out.println(child);
  6.         }
  7.     }
复制代码

  • 查询节点状态信息
  1. @Test
  2. public void testGetStat() throws Exception {
  3.         // Stat类用于存储节点状态信息
  4.         Stat stat = new Stat();
  5.         // storingStatIn(stat) 将节点状态信息存储到stat对象中
  6.         byte[] data = client.getData().storingStatIn(stat).forPath("/node2");
  7.         System.out.println(new String(data));
  8.         System.out.println(stat);
  9.     }
复制代码

  • 更新节点数据
  1. @Test
  2. public void testSetData() throws Exception {
  3.         client.setData().forPath("/node2/min1", "minqiliang".getBytes());
  4.     }
复制代码

  • 更新节点数据,带版本号
  1. @Test
  2. public void testSetDataWithVersion() throws Exception {
  3.         Stat stat = new Stat();
  4.         client.getData().storingStatIn(stat).forPath("/node3");
  5.         System.out.println(stat.getVersion());
  6.         client.setData().withVersion(stat.getVersion()).forPath("/node3", "minqiliang".getBytes());
  7.     }
复制代码

  • 删除节点
  1. @Test
  2. public void testDeleteNode() throws Exception {
  3.         // deletingChildrenIfNeeded() 如果存在子节点,那么先删除子节点,再删除父节点
  4.         client.delete().deletingChildrenIfNeeded().forPath("/node2");
  5.     }
复制代码

  • 删除节点,必须成功
  1. @Test
  2. public void testDeleteNodeWithVersion() throws Exception {
  3.         // guaranteed() 如果删除失败,那么会在后台一直尝试删除,直到删除成功为止
  4.         client.delete().guaranteed().forPath("/node3");
  5.     }
复制代码

  • 删除节点,回调函数
  1. @Test
  2. public void testDeleteNodeWithCallback() throws Exception {
  3.         // inBackground() 指定回调函数
  4.         client.delete().guaranteed().inBackground((client, event) -> System.out.println(event)).forPath("/node3");
  5.         Thread.sleep(100000);
  6.     }
复制代码

  • 监听节点的创建、修改、删除
  1. @Test
  2. public void testNodeCache() throws Exception {
  3.         // 创建一个nodeCache对象
  4.         NodeCache nodeCache = new NodeCache(client, "/node3");
  5.         // 注册监听器
  6.         nodeCache.getListenable().addListener(() -> {
  7.             System.out.println("节点数据发生变化");
  8.             byte[] bytes = nodeCache.getCurrentData().getData();
  9.             System.out.println(new String(bytes));
  10.         });
  11.         // 启动监听器
  12.         nodeCache.start(true);
  13.         while (true){
  14.         }
  15.     }
复制代码

  • 监听子节点的创建、修改、删除
  1. @Test
  2. public void testpathChildrenCache() throws Exception {
  3.         // 创建一个nodeCache对象
  4.         PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node3",true);
  5.         // 注册监听器
  6.         pathChildrenCache.getListenable().addListener((client,event) -> {
  7.             System.out.println("节点数据发生变化");
  8.             System.out.println(event);
  9.             PathChildrenCacheEvent.Type type = event.getType();
  10.             if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
  11.                 System.out.println("子节点数据发生变化");
  12.                 byte[] data = event.getData().getData();
  13.                 System.out.println(new String(data));
  14.             }
  15.         });
  16.         // 启动监听器
  17.         pathChildrenCache.start(true);
  18.         while (true){
  19.         }
  20.     }
复制代码

  • 树形监听器
  1. @Test
  2. public void testTreeCache() throws Exception {
  3.         // 创建一个nodeCache对象
  4.             TreeCache treeCache = new TreeCache(client, "/node3");
  5.         // 注册监听器
  6.         treeCache.getListenable().addListener((client,event) -> {
  7.             System.out.println("节点数据发生变化");
  8.             System.out.println(event);
  9.             TreeCacheEvent.Type type = event.getType();
  10.             if (type.equals(TreeCacheEvent.Type.NODE_UPDATED)){
  11.                 System.out.println("子节点数据发生变化");
  12.                 byte[] data = event.getData().getData();
  13.                 System.out.println(new String(data));
  14.             }
  15.         });
  16.         // 启动监听器
  17.         treeCache.start();
  18.         while (true){
  19.         }
  20.     }
复制代码

  • 分布式锁
  1. InterProcessMutex lock = new InterProcessMutex(client, "/lock");
  2. // 获取锁
  3. try {
  4.     // 获取锁
  5.     boolean acquire = lock.acquire(3, TimeUnit.SECONDS);
  6. }catch (Exception e) {
  7.    e.printStackTrace();
  8. }finally {
  9.   // 释放锁
  10.   try {
  11.       lock.release();
  12.    } catch (Exception e) {
  13.       e.printStackTrace();
  14.    }              
  15. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

愛在花開的季節

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表