java操作zookeeper
- 创建一个maven项目在pom文件里引入如下依赖:
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.10</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>4.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>4.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.21</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.21</version>
- </dependency>
- </dependencies>
复制代码
- @Before
- public void testConnect(){
- ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 10);
- client = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象
- .connectString("192.168.223.131:2181") // 指定连接zookeeper的服务器地址
- .retryPolicy(retryPolicy) // 指定重试策略
- .namespace("test") // 指定命名空间
- .build(); // 建造客户端实例对象
- client.start(); // 启动客户端
- }
复制代码- @After
- public void testClose(){
- // 关闭客户端
- if (client != null){
- client.close();
- }
- }
复制代码- @Test
- public void testCreateNode() throws Exception {
- // 如果没有指定命名空间,那么节点的完整路径为 /node2,如果指定了命名空间,那么节点的完整路径为 /test/node2
- // 如果没有数据,那么节点的数据为当前客户端的ip地址
- // 如果没有指定节点类型,那么节点类型为持久节点
- // CreateMode.EPHEMERAL 临时节点
- // creatingParentsIfNeeded() 如果父节点不存在,那么自动创建父节点
- String path = client.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL)
- .forPath("/node3/min", "node3min".getBytes());
- System.out.println(path);
- // 让线程阻塞,不让程序结束,这样可以在zookeeper中看到创建的临时节点,因为临时节点的生命周期是和客户端绑定的
- Thread.sleep(100000);
- }
复制代码- @Test
- public void testGetData() throws Exception {
- byte[] bytes = client.getData().forPath("/node2");
- System.out.println(new String(bytes));
- }
复制代码- @Test
- public void testGetChildren() throws Exception {
- List<String> childrenList = client.getChildren().forPath("/");
- for (String child : childrenList) {
- System.out.println(child);
- }
- }
复制代码- @Test
- public void testGetStat() throws Exception {
- // Stat类用于存储节点状态信息
- Stat stat = new Stat();
- // storingStatIn(stat) 将节点状态信息存储到stat对象中
- byte[] data = client.getData().storingStatIn(stat).forPath("/node2");
- System.out.println(new String(data));
- System.out.println(stat);
- }
复制代码- @Test
- public void testSetData() throws Exception {
- client.setData().forPath("/node2/min1", "minqiliang".getBytes());
- }
复制代码- @Test
- public void testSetDataWithVersion() throws Exception {
- Stat stat = new Stat();
- client.getData().storingStatIn(stat).forPath("/node3");
- System.out.println(stat.getVersion());
- client.setData().withVersion(stat.getVersion()).forPath("/node3", "minqiliang".getBytes());
- }
复制代码- @Test
- public void testDeleteNode() throws Exception {
- // deletingChildrenIfNeeded() 如果存在子节点,那么先删除子节点,再删除父节点
- client.delete().deletingChildrenIfNeeded().forPath("/node2");
- }
复制代码- @Test
- public void testDeleteNodeWithVersion() throws Exception {
- // guaranteed() 如果删除失败,那么会在后台一直尝试删除,直到删除成功为止
- client.delete().guaranteed().forPath("/node3");
- }
复制代码- @Test
- public void testDeleteNodeWithCallback() throws Exception {
- // inBackground() 指定回调函数
- client.delete().guaranteed().inBackground((client, event) -> System.out.println(event)).forPath("/node3");
- Thread.sleep(100000);
- }
复制代码- @Test
- public void testNodeCache() throws Exception {
- // 创建一个nodeCache对象
- NodeCache nodeCache = new NodeCache(client, "/node3");
- // 注册监听器
- nodeCache.getListenable().addListener(() -> {
- System.out.println("节点数据发生变化");
- byte[] bytes = nodeCache.getCurrentData().getData();
- System.out.println(new String(bytes));
- });
- // 启动监听器
- nodeCache.start(true);
- while (true){
- }
- }
复制代码- @Test
- public void testpathChildrenCache() throws Exception {
- // 创建一个nodeCache对象
- PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node3",true);
- // 注册监听器
- pathChildrenCache.getListenable().addListener((client,event) -> {
- System.out.println("节点数据发生变化");
- System.out.println(event);
- PathChildrenCacheEvent.Type type = event.getType();
- if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
- System.out.println("子节点数据发生变化");
- byte[] data = event.getData().getData();
- System.out.println(new String(data));
- }
- });
- // 启动监听器
- pathChildrenCache.start(true);
- while (true){
- }
- }
复制代码- @Test
- public void testTreeCache() throws Exception {
- // 创建一个nodeCache对象
- TreeCache treeCache = new TreeCache(client, "/node3");
- // 注册监听器
- treeCache.getListenable().addListener((client,event) -> {
- System.out.println("节点数据发生变化");
- System.out.println(event);
- TreeCacheEvent.Type type = event.getType();
- if (type.equals(TreeCacheEvent.Type.NODE_UPDATED)){
- System.out.println("子节点数据发生变化");
- byte[] data = event.getData().getData();
- System.out.println(new String(data));
- }
- });
- // 启动监听器
- treeCache.start();
- while (true){
- }
- }
复制代码- InterProcessMutex lock = new InterProcessMutex(client, "/lock");
- // 获取锁
- try {
- // 获取锁
- boolean acquire = lock.acquire(3, TimeUnit.SECONDS);
- }catch (Exception e) {
- e.printStackTrace();
- }finally {
- // 释放锁
- try {
- lock.release();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |