ToB企服应用市场:ToB评测及商务社交产业平台

标题: 【ZooKeeper学习条记】 [打印本页]

作者: 用户云卷云舒    时间: 2024-7-15 09:31
标题: 【ZooKeeper学习条记】
1. ZooKeeper基本概念

   Zookeeper官网:https://zookeeper.apache.org/index.html
  
2. ZooKeeper常用命令

2.1 ZooKeeper数据模型

在正式介绍Zookeeper的常用命令之前,我们先来了解一下Zookeeper的相干数据模型:


2.2 ZooKeeper常用命令

Zookeeper是一个常见的客户端-服务器模型,我们可以利用命令行大概JavaAPI的方式充当客户端进行访问,其架构如下图所示:


















2.3 ZooKeeper的JavaAPI操纵

2.3.1 Curator介绍

Curator:是一个Zookeeper的Java客户端库

   Curator官网:http://curator.apache.org/
  2.3.2 Curator API操纵

2.3.2.1 创建连接

我们可以利用CuratorFrameworkFactory静态工厂类进行创建,可以通过如下两种方式配置:
下面我们就给出对应两种代码的实现方式:
newClient:
  1. /**
  2. * ZooKeeper测试类
  3. */
  4. public class ZooKeeperTest {
  5.     private CuratorFramework client = null;
  6.     @Before
  7.     public void initByNewClient() {
  8.         CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
  9.                 3000,
  10.                 3000,
  11.                 new ExponentialBackoffRetry(3000, 1));
  12.         this.client = client;
  13.         this.client.start();
  14.     }
  15. }
复制代码
build:
  1. /**
  2. * ZooKeeper测试类
  3. */
  4. public class ZooKeeperTest {
  5.     private CuratorFramework client = null;
  6.     @Before
  7.     public void init() {
  8.         CuratorFramework client = CuratorFrameworkFactory
  9.                 .builder()
  10.                 .connectString("127.0.0.1:2181")
  11.                 .sessionTimeoutMs(3000)
  12.                 .connectionTimeoutMs(3000)
  13.                 .retryPolicy(new ExponentialBackoffRetry(3000, 1))
  14.                 .namespace("")
  15.                 .build();
  16.         client.start();
  17.         this.client = client;
  18.     }
  19. }
复制代码
其中各个配置项含义如下:

2.3.2.2 创建节点

创建节点有如下常见的四种方式:
Case1:创建节点(不携带数据)
  1. @Test
  2. public void testCreate1() throws Exception {
  3.     String path = client.create().forPath("/app1");
  4.     System.out.println(path);
  5. }
复制代码
Case2:创建节点(携带数据)
  1. @Test
  2. public void testCreate2() throws Exception {
  3.     String path = client.create().forPath("/app2", "curator java api".getBytes());
  4.     System.out.println(path);
  5. }
复制代码
Case3:创建多级节点
  1. @Test
  2. public void testCreate4() throws Exception {
  3.     client.create().creatingParentsIfNeeded().forPath("/test/test1/test2");
  4. }
复制代码
Case4:创建节点并指定类型
  1. @Test
  2. public void testCreate3() throws Exception {
  3.     client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
  4.     client.create().withMode(CreateMode.PERSISTENT).forPath("/app4");
  5.     client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/app5");
  6.     client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/app6");
  7. }
复制代码
2.3.2.3 删除节点

删除节点有如下常见的两种方式:
Case1:删除节点(不含子节点)
  1. @Test
  2. public void testDelete() throws Exception {
  3.     client.delete().forPath("/app1");
  4. }
复制代码
Case2:删除节点(递归删除子节点)
  1. @Test
  2. public void testDeleteAll() throws Exception {
  3.     client.delete().deletingChildrenIfNeeded().forPath("/test");
  4. }
复制代码
2.3.2.4 查询节点

查询节点有如下常见的三种方式:
Case1:查询子节点信息
  1. @Test
  2. public void testGetChildren() throws Exception {
  3.     List<String> childrenList = client.getChildren().forPath("/");
  4.     System.out.println(childrenList);
  5. }
复制代码
Case2:查询节点数据
  1. @Test
  2. public void testGetData() throws Exception {
  3.     byte[] bytes = client.getData().forPath("/app2");
  4.     System.out.println("data: " + new String(bytes));
  5. }
复制代码
Case3:查询节点具体信息
  1. @Test
  2. public void testGetData3() throws Exception {
  3.     Stat stat = new Stat();
  4.     client.getData().storingStatIn(stat).forPath("/app2");
  5.     System.out.println(stat);
  6. }
复制代码
2.3.2.5 修改节点

修改节点有如下常见的两种方式:
Case1:修改节点数据
  1. @Test
  2. public void testSetData() throws Exception {
  3.     Stat stat = client.setData().forPath("/app1", "some data".getBytes());
  4.     System.out.println(stat);
  5. }
复制代码
Case2:修改节点数据(带有版本号)
  1. @Test
  2. public void testSetData2() throws Exception {
  3.     Stat stat = new Stat();
  4.     client.getData().storingStatIn(stat).forPath("/app2");
  5.     int version = stat.getVersion();
  6.     System.out.println(version);
  7.     client.setData().withVersion(version).forPath("/app2", "set with version".getBytes());
  8. }
复制代码
3. ZooKeeper的变乱监听机制

Watcher变乱监听机制:

3.1 Node Cache

代码实现:
  1. @Test
  2. public void testCuratorCache() throws Exception {
  3.     NodeCache cache = new NodeCache(client, "/app1");
  4.     cache.getListenable().addListener(new NodeCacheListener() {
  5.         @Override
  6.         public void nodeChanged() throws Exception {
  7.             System.out.println("监听到节点变化...");
  8.         }
  9.     });
  10.     cache.start();
  11.     while (true) {
  12.     }
  13. }
复制代码
3.2 PathChildren Cache

代码实现:
  1. @Test
  2. public void testPathChildrenCache() throws Exception {
  3.     PathChildrenCache cache = new PathChildrenCache(client, "/app1", true);
  4.     cache.getListenable().addListener(new PathChildrenCacheListener() {
  5.         @Override
  6.         public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
  7.             System.out.println("监听到子节点变化...");
  8.             PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
  9.             if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
  10.                 System.out.println("监听到子节点数据变化...");
  11.                 System.out.println("更新后数据: " + pathChildrenCacheEvent.getData().getData());
  12.             }
  13.         }
  14.     });
  15.     cache.start();
  16.     while (true) {
  17.     }
  18. }
复制代码
3.3 Tree Cache

代码实现:
  1. @Test
  2.     public void testTreeCache() throws Exception {
  3.         TreeCache cache = new TreeCache(client, "/app1");
  4.         cache.getListenable().addListener(new TreeCacheListener() {
  5.             @Override
  6.             public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
  7.                 System.out.println("监听到节点发生变化...");
  8.                 System.out.println(treeCacheEvent);
  9.             }
  10.         });
  11.         cache.start();
  12.         while (true) {
  13.         }
  14.     }
复制代码
4. ZooKeeper分布式锁

4.1 ZooKeeper分布式锁原理


   注意:这里创建临时节点是因为防止获取到锁的客户端宕机了,进而导致锁永远不会被删的情况;这是创建次序节点是方便编号的排序
  Cutator提供了下面五种分布式锁的方式:

4.2 分布式锁实战(模拟12306抢票)

代码如下:
  1. package org.example;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. import java.util.concurrent.TimeUnit;
  7. public class ZooKeeperLockTest {
  8.     private static int tickets = 10; // 票数
  9.     public static void main(String[] args) {
  10.         // 建立连接
  11.         CuratorFramework client = CuratorFrameworkFactory
  12.                 .builder()
  13.                 .connectString("127.0.0.1:2181")
  14.                 .sessionTimeoutMs(3000)
  15.                 .connectionTimeoutMs(3000)
  16.                 .retryPolicy(new ExponentialBackoffRetry(3000, 1))
  17.                 .namespace("")
  18.                 .build();
  19.         client.start();
  20.         // 获取分布式锁
  21.         InterProcessMutex lock = new InterProcessMutex(client, "/lock");
  22.         Thread t1 = new Thread(() -> {
  23.             while (true) {
  24.                 try {
  25.                     boolean hasLock = lock.acquire(3, TimeUnit.SECONDS);
  26.                     if (hasLock && tickets > 0) {
  27.                         // 不断抢票
  28.                         System.out.println("线程" + Thread.currentThread().getName() + "抢到了当前第" + tickets + "张票");
  29.                         tickets--;
  30.                         if (tickets <= 0) {
  31.                             break;
  32.                         }
  33.                     }
  34.                 } catch (Exception e) {
  35.                     throw new RuntimeException(e);
  36.                 } finally {
  37.                     try {
  38.                         lock.release();
  39.                     } catch (Exception e) {
  40.                         throw new RuntimeException(e);
  41.                     }
  42.                 }
  43.             }
  44.         }, "携程");
  45.         Thread t2 = new Thread(() -> {
  46.             while (true) {
  47.                 try {
  48.                     boolean hasLock = lock.acquire(3, TimeUnit.SECONDS);
  49.                     if (hasLock && tickets > 0) {
  50.                         // 不断抢票
  51.                         System.out.println("线程" + Thread.currentThread().getName() + "抢到了当前第" + tickets + "张票");
  52.                         tickets--;
  53.                         if (tickets <= 0) {
  54.                             break;
  55.                         }
  56.                     }
  57.                 } catch (Exception e) {
  58.                     throw new RuntimeException(e);
  59.                 } finally {
  60.                     try {
  61.                         lock.release();
  62.                     } catch (Exception e) {
  63.                         throw new RuntimeException(e);
  64.                     }
  65.                 }
  66.             }
  67.         }, "飞猪");
  68.         t1.start();
  69.         t2.start();
  70.     }
  71. }
复制代码
5. ZooKeeper集群管理

Leader推举过程:

比如有三台服务器,编号分别是1,2,3。则编号越大在选择算法中的权重就越大

服务器中存放的数据ID越大,值越大阐明更新的越频繁,则在选择算法中的权重就越大


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4