Zookeeper应用场景实战一

打印 上一主题 下一主题

主题 555|帖子 555|积分 1665

目录
1. Zookeeper Java客户端实战
1.1 Zookeeper 原生Java客户端利用
ZooKeeper常用构造器
示例代码:
Zookeeper主要方法
1.2 Curator开源客户端利用
引入依赖
示例代码:
创建一个客户端实例
创建节点
一次性创建带层级布局的节点

获取数据

更新节点

删除节点
异步接口
指定线程池
Curator 监听器
Curator Caches
2. Zookeeper在分布式定名服务中的实战

2.1 分布式API目录
2.2 分布式节点的定名
2.3 分布式的ID生成器
基于Zookeeper实现分布式ID生成器
基于Zookeeper实现SnowFlakeID算法
3. zookeeper实现分布式队列

3.1 计划思绪
3.2 利用Apache Curator实现分布式队列
3.3 留意事项


1. Zookeeper Java客户端实战

ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有:


  • ZooKeeper官方的Java客户端API。
  • 第三方的Java客户端API,比如Curator。
ZooKeeper官方的客户端API提供了根本的操作。例如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。不过,对于现实开发来说,ZooKeeper官方API有一些不敷之处,具体如下:


  • ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新举行注册。
  • 会话超时之后没有实现重连机制。
  • 非常处理烦琐,ZooKeeper提供了很多非常,对于开发职员来说大概根本不知道应该怎样处理这些抛出的非常。
  • 仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口。
  • 创建节点时假如抛出非常,需要自行检查节点是否存在。
  • 无法实现级联删除。
总之,ZooKeeper官方API功能比力简单,在现实开发过程中比力笨重,一般不保举利用。

1.1 Zookeeper 原生Java客户端利用

引入zookeeper client依赖
  1. <!-- zookeeper client -->
  2. <dependency>
  3.     <groupId>org.apache.zookeeper</groupId>
  4.     <artifactId>zookeeper</artifactId>
  5.     <version>3.8.0</version>
  6. </dependency>
复制代码
留意:保持与服务端版本同等,不然会有很多兼容性的题目

ZooKeeper原生客户端主要利用org.apache.zookeeper.ZooKeeper这个类来利用ZooKeeper服务。

ZooKeeper常用构造器

  1. ZooKeeper (connectString, sessionTimeout, watcher)
复制代码


  • connectString:利用逗号分隔的列表,每个ZooKeeper节点是一个host.port对,host 是机器名或者IP地址,port是ZooKeeper节点对客户端提供服务的端标语。客户端会任意选取connectString 中的一个节点建立连接。
  • sessionTimeout : session timeout时间。
  • watcher:用于接收到来自ZooKeeper集群的变乱。
利用 zookeeper 原生 API,连接zookeeper集群

示例代码:

  1. public class ZkClientDemo {
  2.     private static final  String  CONNECT_STR="192.168.189.131";
  3.     private final static  String CLUSTER_CONNECT_STR="192.168.65.163:2181,192.168.65.184:2181,192.168.65.186:2181";
  4.     public static void main(String[] args) throws Exception {
  5.         //获取zookeeper对象
  6.         ZooKeeper zooKeeper = ZooKeeperFacotry.create(CLUSTER_CONNECT_STR);
  7.         //CONNECTED
  8.         System.out.println(zooKeeper.getState());
  9.         Stat stat = zooKeeper.exists("/user",false);
  10.         if(null ==stat){
  11.             //创建持久节点
  12.             zooKeeper.create("/user","bubble".getBytes(),
  13.                     ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  14.         }
  15.         //永久监听  addWatch -m mode  /user
  16.         zooKeeper.addWatch("/user",new Watcher() {
  17.             @Override
  18.             public void process(WatchedEvent event) {
  19.                 System.out.println("event:=> " + event);
  20.                 //TODO
  21.             }
  22.         },AddWatchMode.PERSISTENT);
  23.         stat = new Stat();
  24.         byte[] data = zooKeeper.getData("/user", false, stat);
  25.         System.out.println(" data:=> "+new String(data));
  26.         // -1: 无条件更新
  27.         //zooKeeper.setData("/user", "third".getBytes(), -1);
  28.         // 带版本条件更新
  29.         int version = stat.getVersion();
  30.         zooKeeper.setData("/user", "bubble".getBytes(), version);
  31.         Thread.sleep(Integer.MAX_VALUE);
  32.     }
  33. }
复制代码
  1. public class ZooKeeperFacotry {
  2.     private static final int SESSION_TIMEOUT = 5000;
  3.     public static ZooKeeper create(String connectionString) throws Exception {
  4.         final CountDownLatch connectionLatch = new CountDownLatch(1);
  5.         ZooKeeper zooKeeper = new ZooKeeper(connectionString, SESSION_TIMEOUT, new Watcher() {
  6.             @Override
  7.             public void process(WatchedEvent event) {
  8.                 if (event.getType()== Event.EventType.None
  9.                         && event.getState() == Watcher.Event.KeeperState.SyncConnected) {
  10.                     connectionLatch.countDown();
  11.                     System.out.println("连接建立");
  12.                 }
  13.             }
  14.         });
  15.         System.out.println("等待连接建立...");
  16.         connectionLatch.await();
  17.         return zooKeeper;
  18.     }
  19. }
复制代码
运行效果: 
  1. 等待连接建立...
  2. // ...省略
  3. 连接建立
  4. CONNECTED
  5. data:=> bubble
  6. event:=> WatchedEvent state:SyncConnected type:NodeDataChanged path:/user//第一次运行触发的监听
  7. event:=> WatchedEvent state:SyncConnected type:NodeDataChanged path:/user//操作/user触发的监听,set /user bubble
  8. event:=> WatchedEvent state:SyncConnected type:NodeDataChanged path:/user//操作一次触发一次
复制代码

Zookeeper主要方法



  • create(path, data, acl,createMode): 创建一个给定路径的 znode,并在 znode 生存 data[]的 数据,createMode指定 znode 的类型。
  • delete(path, version):假如给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode。
  • exists(path, watch):判定给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。
  • getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。
  • setData(path, data, version):假如给定 path 上的 znode 的版本和给定的 version 匹配,设置 znode 数据。
  • getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。
  • sync(path):把客户端 session 连接节点和 leader 节点举行同步。
方法特点:


  • 所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变革。
  • 所有更新 znode 数据的 API 都有两个版本: 无条件更新版本和条件更新版本。假如 version 为 -1,更新为无条件更新。否则只有给定的 version 和 znode 当前的 version 一样,才会举行更新,这样的更新是条件更新。
  • 所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并期待服务器的响 应。异步版本把请求放入客户端的请求队列,然后马上返回。异步版本通过 callback 来接受来自服务端的相应。

同步创建节点:
  1. @Test
  2. public void createTest() throws KeeperException, InterruptedException {
  3.     String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  4.     log.info("created path: {}",path);
  5. }
复制代码
异步创建节点:
  1. @Test
  2. public void createAsycTest() throws InterruptedException {
  3.      zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
  4.              CreateMode.PERSISTENT,
  5.              (rc, path, ctx, name) -> log.info("rc  {},path {},ctx {},name {}",rc,path,ctx,name),"context");
  6.     TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  7. }
复制代码
修改节点数据:
  1. @Test
  2. public void setTest() throws KeeperException, InterruptedException {
  3.     Stat stat = new Stat();
  4.     byte[] data = zooKeeper.getData(ZK_NODE, false, stat);
  5.     log.info("修改前: {}",new String(data));
  6.     zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());
  7.     byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);
  8.     log.info("修改后: {}",new String(dataAfter));
  9. }
复制代码

1.2 Curator开源客户端利用

Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的题目以及NodeExistsException非常等。
Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。
Curator还为ZooKeeper客户端框架提供了一些比力普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式盘算器等,帮助开发者避免了“重复造轮子”的无效开发工作。
   Guava is to Java that Curator to ZooKeeper
  在现实的开发场景中,利用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。
   官网:Welcome to Apache Curator | Apache Curator
  
引入依赖

Curator 包含了几个包:


  • curator-framework是对ZooKeeper的底层API的一些封装。
  • curator-client提供了一些客户端的操作,例如重试计谋等。
  • curator-recipes封装了一些高级特性,如:Cache变乱监听、选举、分布式锁、分布式计数器、分布式Barrier等。
  1. <!-- zookeeper client -->
  2. <dependency>
  3.   <groupId>org.apache.zookeeper</groupId>
  4.   <artifactId>zookeeper</artifactId>
  5.   <version>3.8.0</version>
  6. </dependency>
  7. <!--curator-->
  8. <dependency>
  9.   <groupId>org.apache.curator</groupId>
  10.   <artifactId>curator-recipes</artifactId>
  11.   <version>5.1.0</version>
  12.   <exclusions>
  13.     <exclusion>
  14.       <groupId>org.apache.zookeeper</groupId>
  15.       <artifactId>zookeeper</artifactId>
  16.     </exclusion>
  17.   </exclusions>
  18. </dependency>
复制代码
示例代码:

  1. public class CuratorDemo {
  2.     // ZooKeeper集群连接字符串,包括多个ZooKeeper服务器的地址和端口
  3.     //private final static  String CLUSTER_CONNECT_STR="192.168.65.163:2181,192.168.65.184:2181,192.168.65.186:2181";
  4.     private final static  String CLUSTER_CONNECT_STR="192.168.189.131";
  5.     public static void main(String[] args) throws Exception {
  6.         //构建客户端实例
  7.         CuratorFramework curatorFramework= CuratorFrameworkFactory.builder()
  8.                 .connectString(CLUSTER_CONNECT_STR)
  9.                 .retryPolicy(new ExponentialBackoffRetry(1000,3)) // 设置重试策略
  10.                 .build();
  11.         //启动客户端
  12.         curatorFramework.start();
  13.         String path = "/user";
  14.         // 检查节点是否存在
  15.         Stat stat = curatorFramework.checkExists().forPath(path);
  16.         if (stat != null) {
  17.             // 删除节点
  18.             curatorFramework.delete()
  19.                     .deletingChildrenIfNeeded()  // 如果存在子节点,则删除所有子节点
  20.                     .forPath(path);  // 删除指定节点
  21.         }
  22.         // 创建节点
  23.         curatorFramework.create()
  24.                 .creatingParentsIfNeeded()  // 如果父节点不存在,则创建父节点
  25.                 .withMode(CreateMode.PERSISTENT)
  26.                 .forPath(path, "Init Data".getBytes());
  27.         // 注册节点监听
  28.         curatorFramework.getData()
  29.                 .usingWatcher(new CuratorWatcher() {
  30.                     @Override
  31.                     public void process(WatchedEvent event) throws Exception {
  32.                         byte[] bytes = curatorFramework.getData().forPath(path);
  33.                         System.out.println("Node data changed: " + new String(bytes));
  34.                     }
  35.                 })
  36.                 .forPath(path);
  37.         // 更新节点数据    set /user  Update Data
  38.         curatorFramework.setData()
  39.                 .forPath(path, "Update Data".getBytes());
  40.        stat=new Stat();
  41.         //查询节点数据
  42.         byte[] bytes = curatorFramework.getData().storingStatIn(stat)
  43.                 .forPath("/user");
  44.         System.out.println(new String(bytes));
  45.         ExecutorService executorService = Executors.newSingleThreadExecutor();
  46.         //异步处理,可以指定线程池
  47.         curatorFramework.getData().inBackground((item1, item2) -> {
  48.             System.out.println("background:"+item1+" <---> "+item2);
  49.             System.out.println("item2.getStat()=> " + item2.getStat());
  50.         },executorService).forPath(path);
  51.         // 创建节点缓存,用于监听指定节点的变化
  52.         final NodeCache nodeCache = new NodeCache(curatorFramework, path);
  53.         // 启动NodeCache并立即从服务端获取最新数据
  54.         nodeCache.start(true);
  55.         // 注册节点变化监听器
  56.         nodeCache.getListenable().addListener(new NodeCacheListener() {
  57.             @Override
  58.             public void nodeChanged() throws Exception {
  59.                 byte[] newData = nodeCache.getCurrentData().getData();
  60.                 System.out.println("Node data changed: " + new String(newData));
  61.             }
  62.         });
  63.         // 创建PathChildrenCache
  64.         PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
  65.         pathChildrenCache.start();
  66.         // 注册子节点变化监听器
  67.         pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
  68.             @Override
  69.             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  70.                 if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
  71.                     ChildData childData = event.getData();
  72.                     System.out.println("Child added: " + childData.getPath());
  73.                 } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
  74.                     ChildData childData = event.getData();
  75.                     System.out.println("Child removed: " + childData.getPath());
  76.                 } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
  77.                     ChildData childData = event.getData();
  78.                     System.out.println("Child updated: " + childData.getPath());
  79.                 }
  80.             }
  81.         });
  82.         Thread.sleep(Integer.MAX_VALUE);
  83.     }
  84. }
复制代码
运行效果
  1. .....
  2. Node data changed: Update Data
  3. Update Data
  4. background:org.apache.curator.framework.imps.CuratorFrameworkImpl@3c5f290a <---> CuratorEventImpl{type=GET_DATA, resultCode=0, path='/user', name='null', children=null, context=null, stat=40,41,1696611808183,1696611808193,1,0,0,0,11,0,40
  5. , data=[85, 112, 100, 97, 116, 101, 32, 68, 97, 116, 97], watchedEvent=null, aclList=null, opResults=null}
  6. item2.getStat()=> 40,41,1696611808183,1696611808193,1,0,0,0,11,0,40
  7. Child added: /user/bubble//修改子节点触发监听器的打印
  8. Child removed: /user/bubble
复制代码

创建一个客户端实例

在利用curator-framework包操作ZooKeeper前,首先要创建一个客户端实例。这是一个CuratorFramework类型的对象,有两种方法:


  • 利用工厂类CuratorFrameworkFactory的静态newClient()方法。
  1. // 重试策略
  2. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
  3. //创建客户端实例
  4. CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
  5. //启动客户端
  6. client.start();
复制代码


  • 利用工厂类CuratorFrameworkFactory的静态builder构造者方法。
  1. //随着重试次数增加重试时间间隔变大,指数倍增长baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
  2. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  3. CuratorFramework client = CuratorFrameworkFactory.builder()
  4.                 .connectString("192.168.128.129:2181")
  5.                 .sessionTimeoutMs(5000)  // 会话超时时间
  6.                 .connectionTimeoutMs(5000) // 连接超时时间
  7.                 .retryPolicy(retryPolicy)
  8.                 .namespace("base") // 包含隔离名称
  9.                 .build();
  10. client.start();
复制代码


  • connectionString:服务器地址列表,在指定服务器地址列表的时间可以是一个地址,也可以是多个地址。假如是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3;port3 。
  • retryPolicy:重试计谋,当客户端非常退出或者与服务端失去连接的时间,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判定服务器返回的 keeperException 的状态代码来判定是否举行重试处理,假如返回的是 OK 表示一切操作都没有题目,而 SYSTEMERROR 表示系统或服务端错误。
计谋名称
描述
ExponentialBackoffRetry
重试一组次数,重试之间的睡眠时间增加
RetryNTimes
重试最大次数
RetryOneTime
只重试一次
RetryUntilElapsed
在给定的时间结束之前重试


  • 超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。

创建节点

创建节点的方式如下面的代码所示,回顾我们之前课程中讲到的内容,描述一个节点要包括节点的类型,即暂时节点还是持久节点、节点的数据信息、节点是否是有序节点等属性和性质。
  1. @Test
  2. public void testCreate() throws Exception {
  3.     String path = curatorFramework.create().forPath("/curator-node");
  4.     curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node","some-data".getBytes())
  5.     log.info("curator create node :{}  successfully.",path);
  6. }
复制代码
在 Curator 中,可以利用 create 函数创建数据节点,并通过 withMode 函数指定节点类型(持久化节点,暂时节点,序次节点,暂时序次节点,持久化序次节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息。

一次性创建带层级布局的节点

  1. @Test
  2. public void testCreateWithParent() throws Exception {
  3.     String pathWithParent="/node-parent/sub-node-1";
  4.     String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
  5.     log.info("curator create node :{}  successfully.",path);
  6. }
复制代码


获取数据

  1. @Test
  2. public void testGetData() throws Exception {
  3.     byte[] bytes = curatorFramework.getData().forPath("/curator-node");
  4.     log.info("get data from  node :{}  successfully.",new String(bytes));
  5. }
复制代码


更新节点

我们通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点,在setData 方法的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。
  1. @Test
  2. public void testSetData() throws Exception {
  3.     curatorFramework.setData().forPath("/curator-node","changed!".getBytes());
  4.     byte[] bytes = curatorFramework.setData().forPath("/curator-node");
  5.     log.info("get data from  node /curator-node :{}  successfully.",new String(bytes));
  6. }
复制代码


删除节点

  1. @Test
  2. public void testDelete() throws Exception {
  3.     String pathWithParent="/node-parent";
  4.     curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
  5. }
复制代码
guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除乐成的作用,其底层工作方式是:只要该客户端的会话有效,就会在配景连续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时间会以递归的方式直接删除其子节点,以及子节点的子节点。

异步接口

Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。
  1. public interface BackgroundCallback
  2. {
  3.     /**
  4.      * Called when the async background operation completes
  5.      *
  6.      * @param client the client
  7.      * @param event operation result details
  8.      * @throws Exception errors
  9.      */
  10.     public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
  11. }
复制代码
如上接口,主要参数为 client 客户端, 和 服务端变乱 event。
inBackground 异步处理默认在EventThread中执行
  1. @Test
  2. public void test() throws Exception {
  3.     curatorFramework.getData().inBackground((item1, item2) -> {
  4.         log.info(" background: {}", item2);
  5.     }).forPath(ZK_NODE);
  6.     TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  7. }
复制代码

指定线程池

  1. @Test
  2. public void test() throws Exception {
  3.     ExecutorService executorService = Executors.newSingleThreadExecutor();
  4.    
  5.     curatorFramework.getData().inBackground((item1, item2) -> {
  6.         log.info(" background: {}", item2);
  7.     },executorService).forPath(ZK_NODE);
  8.     TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  9. }
复制代码

Curator 监听器

  1. /**
  2. * Receives notifications about errors and background events
  3. */
  4. public interface CuratorListener
  5. {
  6.     /**
  7.      * Called when a background task has completed or a watch has triggered
  8.      *
  9.      * @param client client
  10.      * @param event the event
  11.      * @throws Exception any errors
  12.      */
  13.     public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
  14. }
复制代码
针对 background 关照和错误关照。利用此监听器之后,调用inBackground 方法会异步获得监听

Curator Caches

Curator 引入了 Cache 来实现对 Zookeeper 服务端变乱监听,Cache 变乱监听可以明白为一个当地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。Cache 分为两类注册类型:节点监听和子节点监听。

node cache:
NodeCache 对某一个节点举行监听
  1. public NodeCache(CuratorFramework client,
  2.                          String path)
  3. Parameters:
  4. client - the client
  5. path - path to cache
复制代码
可以通过注册监听器来实现,对当前节点数据变革的处理
  1. public void addListener(NodeCacheListener listener)
  2.      Add a change listener
  3. Parameters:
  4. listener - the listener
复制代码
  1. @Slf4j
  2. public class NodeCacheTest extends AbstractCuratorTest{
  3.     public static final String NODE_CACHE="/node-cache";
  4.     @Test
  5.     public void testNodeCacheTest() throws Exception {
  6.         createIfNeed(NODE_CACHE);
  7.         NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);
  8.         nodeCache.getListenable().addListener(new NodeCacheListener() {
  9.             @Override
  10.             public void nodeChanged() throws Exception {
  11.                 log.info("{} path nodeChanged: ",NODE_CACHE);
  12.                 printNodeData();
  13.             }
  14.         });
  15.         nodeCache.start();
  16.     }
  17.     public void printNodeData() throws Exception {
  18.         byte[] bytes = curatorFramework.getData().forPath(NODE_CACHE);
  19.         log.info("data: {}",new String(bytes));
  20.     }
  21. }
复制代码

path cache:
PathChildrenCache 会对子节点举行监听,但是不会对二级子节点举行监听
  1. public PathChildrenCache(CuratorFramework client,
  2.                          String path,
  3.                          boolean cacheData)
  4. Parameters:
  5. client - the client
  6. path - path to watch
  7. cacheData - if true, node contents are cached in addition to the stat
复制代码
可以通过注册监听器来实现,对当前节点的子节点数据变革的处理
  1. public void addListener(PathChildrenCacheListener listener)
  2.      Add a change listener
  3. Parameters:
  4. listener - the listener
复制代码
  1. @Slf4j
  2. public class PathCacheTest extends AbstractCuratorTest{
  3.     public static final String PATH="/path-cache";
  4.     @Test
  5.     public void testPathCache() throws Exception {
  6.         createIfNeed(PATH);
  7.         PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);
  8.         pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
  9.             @Override
  10.             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  11.                 log.info("event:  {}",event);
  12.             }
  13.         });
  14.         // 如果设置为true则在首次启动时就会缓存节点内容到Cache中
  15.         pathChildrenCache.start(true);
  16.     }
  17. }
复制代码

tree cache:
TreeCache 利用一个内部类TreeNode来维护这个一个树布局。并将这个树布局与ZK节点举行了映射。所以TreeCache 可以监听当前节点下所有节点的变乱。
  1. public TreeCache(CuratorFramework client,
  2.                          String path,
  3.                          boolean cacheData)
  4. Parameters:
  5. client - the client
  6. path - path to watch
  7. cacheData - if true, node contents are cached in addition to the stat
复制代码
可以通过注册监听器来实现,对当前节点的子节点,及递归子节点数据变革的处理
  1. public void addListener(TreeCacheListener listener)
  2.      Add a change listener
  3. Parameters:
  4. listener - the listener
复制代码
  1. @Slf4j
  2. public class TreeCacheTest extends CuratorBaseOperations {
  3.     public static final String TREE_CACHE="/tree-path";
  4.     @Test
  5.     public void testTreeCache() throws Exception {
  6.         CuratorFramework curatorFramework = getCuratorFramework();
  7.         createIfNeed(TREE_CACHE);
  8.         TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);
  9.         treeCache.getListenable().addListener(new TreeCacheListener() {
  10.             @Override
  11.             public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
  12.                 log.info(" tree cache: {}",event);
  13.                 Map<String, ChildData> currentChildren = treeCache.getCurrentChildren(TREE_CACHE);
  14.                 log.info("currentChildren: {}",currentChildren);
  15.             }
  16.         });
  17.         treeCache.start();
  18.     }
  19. }
复制代码
运行效果
  1. 2023-10-07 01:03:44.105 [main-EventThread] INFO  o.a.curator.framework.state.ConnectionStateManager --- State change: CONNECTED
  2. 2023-10-07 01:03:44.106 [Curator-ConnectionStateManager-0] INFO  com.bubble.zk_demo.curator.CuratorStandaloneBase --- 连接成功!
  3. 2023-10-07 01:03:44.113 [main-EventThread] INFO  org.apache.curator.framework.imps.EnsembleTracker --- New config event received: {}
  4. 2023-10-07 01:03:44.113 [main-EventThread] INFO  org.apache.curator.framework.imps.EnsembleTracker --- New config event received: {}
  5. 2023-10-07 01:03:44.119 [main] INFO  com.bubble.zk_demo.curator.CuratorStandaloneBase --- path /tree-path created!
  6. 2023-10-07 01:03:44.133 [Curator-TreeCache-0] INFO  com.bubble.zk_demo.curator.cache.TreeCacheTest ---  tree cache: TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/tree-path', stat=117,117,1696615424405,1696615424405,0,0,0,0,13,0,117
  7. , data=[49, 57, 50, 46, 49, 54, 56, 46, 49, 56, 57, 46, 49]}}
  8. 2023-10-07 01:03:44.134 [Curator-TreeCache-0] INFO  com.bubble.zk_demo.curator.cache.TreeCacheTest --- currentChildren: {}
  9. 2023-10-07 01:03:44.136 [Curator-TreeCache-0] INFO  com.bubble.zk_demo.curator.cache.TreeCacheTest ---  tree cache: TreeCacheEvent{type=INITIALIZED, data=null}
  10. 2023-10-07 01:03:44.136 [Curator-TreeCache-0] INFO  com.bubble.zk_demo.curator.cache.TreeCacheTest --- currentChildren: {}
  11. //修改节点值触发的监听
  12. 2023-10-07 01:04:45.104 [Curator-TreeCache-0] INFO  com.bubble.zk_demo.curator.cache.TreeCacheTest ---  tree cache: TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/tree-path', stat=117,118,1696615424405,1696615485384,1,0,0,0,13,0,117
  13. , data=[49, 57, 50, 46, 49, 54, 56, 46, 49, 56, 57, 46, 50]}}
  14. 2023-10-07 01:04:45.104 [Curator-TreeCache-0] INFO  com.bubble.zk_demo.curator.cache.TreeCacheTest --- currentChildren: {}
复制代码

2. Zookeeper在分布式定名服务中的实战

定名服务是为系统中的资源提供标识能力。ZooKeeper的定名服务主要是利用ZooKeeper节点的树形分层布局和子节点的序次维护能力,来为分布式系统中的资源定名。
哪些应用场景需要用到分布式定名服务呢?典型的有:


  • 分布式API目录
  • 分布式节点定名
  • 分布式ID生成器


2.1 分布式API目录

为分布式系统中各种API接口服务的名称、链接地址,提供类似JNDI(Java定名和目录接口)中的文件系统的功能。借助于ZooKeeper的树形分层布局就能提供分布式的API调用功能。
著名的Dubbo分布式框架就是应用了ZooKeeper的分布式的JNDI功能。在Dubbo中,利用ZooKeeper维护的全局服务接口API的地址列表。大致的思绪为:


  • 服务提供者(Service Provider)在启动的时间,向ZooKeeper上的指定节点/dubbo/${serviceName}/providers写入自己的API地址,这个操作就相称于服务的公开。
  • 服务消费者(Consumer)启动的时间,订阅节点/dubbo/{serviceName}/providers下的服务提供者的URL地址,获得所有服务提供者的API。


2.2 分布式节点的定名

一个分布式系统通常会由很多的节点构成,节点的数目不是固定的,而是不断动态变革的。比如说,当业务不断膨胀和流量洪峰到来时,大量的节点大概会动态加入到集群中。而一旦流量洪峰过去了,就需要下线大量的节点。再比如说,由于机器或者网络的原因,一些节点会主动离开集群。

怎样为大量的动态节点定名呢?
一种简单的办法是可以通过配置文件,手动为每一个节点定名。但是,假如节点数据量太大,或者说变动频仍,手动定名则是不现实的,这就需要用到分布式节点的定名服务。

可用于生成集群节点的编号的方案:
(1)利用数据库的自增ID特性,用数据表存储机器的MAC地址或者IP来维护。
(2)利用ZooKeeper持久序次节点的序次特性来维护节点的NodeId编号。
在第2种方案中,集群节点定名服务的根本流程是:


  • 启动节点服务,连接ZooKeeper,检查定名服务根节点是否存在,假如不存在,就创建系统的根节点。
  • 在根节点下创建一个暂时序次ZNode节点,取回ZNode的编号把它作为分布式系统中节点的NODEID。
  • 假如暂时节点太多,可以根据需要删除暂时序次ZNode节点。

2.3 分布式的ID生成器

在分布式系统中,分布式ID生成器的利用场景非常之多:


  • 大量的数据记录,需要分布式ID。
  • 大量的系统消息,需要分布式ID。
  • 大量的请求日志,如restful的操作记录,需要唯一标识,以便举行后续的用户行为分析和调用链路分析。
  • 分布式节点的定名服务,往往也需要分布式ID。
  • 。。。
传统的数据库自增主键已经不能满意需求。在分布式系统情况中,迫切需要一种全新的唯一ID系统,这种系统需要满意以下需求:
(1)全局唯一:不能出现重复ID。
(2)高可用:ID生成系统是基础系统,被许多关键系统调用,一旦宕机,就会造成严峻影响。

有哪些分布式的ID生成器方案呢?大致如下:

  • Java的UUID。(不保举)
  • 分布式缓存Redis生成ID:利用Redis的原子操作INCR和INCRBY,生玉成局唯一的ID。
  • Twitter的SnowFlake算法。
  • ZooKeeper生成ID:利用ZooKeeper的序次节点,生玉成局唯一的ID。
  • MongoDb的ObjectId:MongoDB是一个分布式的非布局化NoSQL数据库,每插入一条记录会主动生玉成局唯一的一个“_id”字段值,它是一个12字节的字符串,可以作为分布式系统中全局唯一的ID。

基于Zookeeper实现分布式ID生成器

在ZooKeeper节点的四种类型中,其中有以下两种类型具备主动编号的能力


  • PERSISTENT_SEQUENTIAL持久化序次节点。
  • EPHEMERAL_SEQUENTIAL暂时序次节点。
ZooKeeper的每一个节点都会为它的第一级子节点维护一份序次编号,会记录每个子节点创建的先后序次,这个序次编号是分布式同步的,也是全局唯一的。
可以通过创建ZooKeeper的暂时序次节点的方法,生玉成局唯一的ID
  1. @Slf4j
  2. public class IDMaker extends CuratorBaseOperations {
  3.     private String createSeqNode(String pathPefix) throws Exception {
  4.         CuratorFramework curatorFramework = getCuratorFramework();
  5.         //创建一个临时顺序节点
  6.         String destPath = curatorFramework.create()
  7.                 .creatingParentsIfNeeded()
  8.                 .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
  9.                 .forPath(pathPefix);
  10.         return destPath;
  11.     }
  12.     public String  makeId(String path) throws Exception {
  13.         String str = createSeqNode(path);
  14.         if(null != str){
  15.             //获取末尾的序号
  16.             int index = str.lastIndexOf(path);
  17.             if(index>=0){
  18.                 index+=path.length();
  19.                 return index<=str.length() ? str.substring(index):"";
  20.             }
  21.         }
  22.         return str;
  23.     }
  24. }
复制代码

测试
  1. @Slf4j
  2. public class IDMakerTest {
  3.     @Test
  4.     public void testMarkId() throws Exception {
  5.         IDMaker idMaker = new IDMaker();
  6.         idMaker.init();
  7.         String pathPrefix = "/idmarker/id-";
  8.         //模拟5个线程创建id
  9.         for(int i=0;i<5;i++){
  10.             new Thread(()->{
  11.                 for (int j=0;j<10;j++){
  12.                     String id = null;
  13.                     try {
  14.                         id = idMaker.makeId(pathPrefix);
  15.                         log.info("线程{}第{}次创建id为{}",Thread.currentThread().getName(),
  16.                                 j,id);
  17.                     } catch (Exception e) {
  18.                         e.printStackTrace();
  19.                     }
  20.                 }
  21.             },"thread"+i).start();
  22.         }
  23.         Thread.sleep(Integer.MAX_VALUE);
  24.     }
  25. }
复制代码
运行效果
  1. ........
  2. 2023-10-07 01:00:17.459 [main-EventThread] INFO  o.a.curator.framework.state.ConnectionStateManager --- State change: CONNECTED
  3. 2023-10-07 01:00:17.459 [Curator-ConnectionStateManager-0] INFO  com.bubble.zk_demo.curator.CuratorStandaloneBase --- 连接成功!
  4. 2023-10-07 01:00:17.468 [main-EventThread] INFO  org.apache.curator.framework.imps.EnsembleTracker --- New config event received: {}
  5. 2023-10-07 01:00:17.468 [main-EventThread] INFO  org.apache.curator.framework.imps.EnsembleTracker --- New config event received: {}
  6. 2023-10-07 01:00:17.478 [thread1] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread1第0次创建id为0000000001
  7. 2023-10-07 01:00:17.478 [thread0] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread0第0次创建id为0000000002
  8. 2023-10-07 01:00:17.478 [thread4] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread4第0次创建id为0000000000
  9. 2023-10-07 01:00:17.478 [thread2] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread2第0次创建id为0000000003
  10. 2023-10-07 01:00:17.478 [thread3] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread3第0次创建id为0000000004
  11. 2023-10-07 01:00:17.478 [thread1] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread1第1次创建id为0000000005
  12. 2023-10-07 01:00:17.478 [thread0] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread0第1次创建id为0000000006
  13. 2023-10-07 01:00:17.478 [thread4] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread4第1次创建id为0000000007
  14. 2023-10-07 01:00:17.478 [thread2] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread2第1次创建id为0000000008
  15. 2023-10-07 01:00:17.478 [thread3] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread3第1次创建id为0000000009
  16. 2023-10-07 01:00:17.478 [thread1] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread1第2次创建id为0000000010
  17. 2023-10-07 01:00:17.478 [thread0] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread0第2次创建id为0000000011
  18. 2023-10-07 01:00:17.478 [thread4] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread4第2次创建id为0000000012
  19. 2023-10-07 01:00:17.478 [thread2] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread2第2次创建id为0000000013
  20. 2023-10-07 01:00:17.478 [thread3] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread3第2次创建id为0000000014
  21. 2023-10-07 01:00:17.478 [thread0] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread0第3次创建id为0000000016
  22. 2023-10-07 01:00:17.478 [thread1] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread1第3次创建id为0000000015
  23. 2023-10-07 01:00:17.490 [thread4] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread4第3次创建id为0000000017
  24. 2023-10-07 01:00:17.490 [thread2] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread2第3次创建id为0000000018
  25. 2023-10-07 01:00:17.490 [thread3] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread3第3次创建id为0000000019
  26. 2023-10-07 01:00:17.490 [thread0] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread0第4次创建id为0000000020
  27. 2023-10-07 01:00:17.490 [thread1] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread1第4次创建id为0000000021
  28. 2023-10-07 01:00:17.490 [thread4] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread4第4次创建id为0000000023
  29. 2023-10-07 01:00:17.490 [thread2] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread2第4次创建id为0000000022
  30. 2023-10-07 01:00:17.490 [thread0] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread0第5次创建id为0000000025
  31. 2023-10-07 01:00:17.490 [thread3] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread3第4次创建id为0000000024
  32. 2023-10-07 01:00:17.490 [thread1] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread1第5次创建id为0000000026
  33. 2023-10-07 01:00:17.498 [thread4] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread4第5次创建id为0000000027
  34. 2023-10-07 01:00:17.498 [thread2] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread2第5次创建id为0000000028
  35. 2023-10-07 01:00:17.498 [thread0] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread0第6次创建id为0000000029
  36. 2023-10-07 01:00:17.498 [thread1] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread1第6次创建id为0000000030
  37. 2023-10-07 01:00:17.498 [thread3] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread3第5次创建id为0000000031
  38. 2023-10-07 01:00:17.498 [thread4] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread4第6次创建id为0000000032
  39. 2023-10-07 01:00:17.498 [thread2] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread2第6次创建id为0000000033
  40. 2023-10-07 01:00:17.498 [thread0] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread0第7次创建id为0000000034
  41. 2023-10-07 01:00:17.498 [thread1] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread1第7次创建id为0000000035
  42. 2023-10-07 01:00:17.498 [thread3] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread3第6次创建id为0000000036
  43. 2023-10-07 01:00:17.506 [thread4] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread4第7次创建id为0000000037
  44. 2023-10-07 01:00:17.506 [thread2] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread2第7次创建id为0000000038
  45. 2023-10-07 01:00:17.508 [thread0] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread0第8次创建id为0000000039
  46. 2023-10-07 01:00:17.508 [thread1] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread1第8次创建id为0000000040
  47. 2023-10-07 01:00:17.508 [thread3] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread3第7次创建id为0000000041
  48. 2023-10-07 01:00:17.508 [thread4] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread4第8次创建id为0000000042
  49. 2023-10-07 01:00:17.508 [thread2] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread2第8次创建id为0000000043
  50. 2023-10-07 01:00:17.508 [thread0] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread0第9次创建id为0000000044
  51. 2023-10-07 01:00:17.508 [thread1] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread1第9次创建id为0000000045
  52. 2023-10-07 01:00:17.508 [thread3] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread3第8次创建id为0000000046
  53. 2023-10-07 01:00:17.508 [thread4] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread4第9次创建id为0000000047
  54. 2023-10-07 01:00:17.508 [thread2] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread2第9次创建id为0000000048
  55. 2023-10-07 01:00:17.518 [thread3] INFO  c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread3第9次创建id为0000000049
复制代码

基于Zookeeper实现SnowFlakeID算法

Twitter(推特)的SnowFlake算法是一种著名的分布式服务器用户ID生成算法。SnowFlake算法所生成的ID是一个64bit的长整型数字,如图10-2所示。这个64bit被划分成四个部分,其中后面三个部分分别表示时间戳、工作机器ID、序列号。


SnowFlakeID的四个部分,具体介绍如下:
(1)第一位 占用1 bit,其值始终是0,没有现实作用。
(2)时间戳 占用41 bit,准确到毫秒,统共可以容纳约69年的时间。
(3)工作机器id占用10 bit,最多可以容纳1024个节点。
(4)序列号 占用12 bit。这个值在同一毫秒同一节点上从0开始不断累加,最多可以累加到4095。
在工作节点到达1024顶配的场景下,SnowFlake算法在同一毫秒最多可以生成的ID数目为: 1024 * 4096 =4194304,在绝大多数并发场景下都是够用的。

SnowFlake算法的优点:


  • 生成ID时不依赖于数据库,完全在内存生成,高性能和高可用性。
  • 容量大,每秒可生成几百万个ID。
  • ID呈趋势递增,后续插入数据库的索引树时,性能较高。

SnowFlake算法的缺点:


  • 依赖于系统时钟的同等性,假如某台机器的系统时钟回拨了,有大概造成ID冲突,或者ID乱序。
  • 在启动之前,假如这台机器的系统时间回拨过,那么有大概出现ID重复的伤害。

基于zookeeper实现雪花算法:
  1. public class SnowflakeIdGenerator {
  2.     /**
  3.      * 单例
  4.      */
  5.     public static SnowflakeIdGenerator instance =
  6.             new SnowflakeIdGenerator();
  7.     /**
  8.      * 初始化单例
  9.      *
  10.      * @param workerId 节点Id,最大8091
  11.      * @return the 单例
  12.      */
  13.     public synchronized void init(long workerId) {
  14.         if (workerId > MAX_WORKER_ID) {
  15.             // zk分配的workerId过大
  16.             throw new IllegalArgumentException("woker Id wrong: " + workerId);
  17.         }
  18.         instance.workerId = workerId;
  19.     }
  20.     private SnowflakeIdGenerator() {
  21.     }
  22.     /**
  23.      * 开始使用该算法的时间为: 2017-01-01 00:00:00
  24.      */
  25.     private static final long START_TIME = 1483200000000L;
  26.     /**
  27.      * worker id 的bit数,最多支持8192个节点
  28.      */
  29.     private static final int WORKER_ID_BITS = 13;
  30.     /**
  31.      * 序列号,支持单节点最高每毫秒的最大ID数1024
  32.      */
  33.     private final static int SEQUENCE_BITS = 10;
  34.     /**
  35.      * 最大的 worker id ,8091
  36.      * -1 的补码(二进制全1)右移13位, 然后取反
  37.      */
  38.     private final static long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
  39.     /**
  40.      * 最大的序列号,1023
  41.      * -1 的补码(二进制全1)右移10位, 然后取反
  42.      */
  43.     private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);
  44.     /**
  45.      * worker 节点编号的移位
  46.      */
  47.     private final static long WORKER_ID_SHIFT = SEQUENCE_BITS;
  48.     /**
  49.      * 时间戳的移位
  50.      */
  51.     private final static long TIMESTAMP_LEFT_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS;
  52.     /**
  53.      * 该项目的worker 节点 id
  54.      */
  55.     private long workerId;
  56.     /**
  57.      * 上次生成ID的时间戳
  58.      */
  59.     private long lastTimestamp = -1L;
  60.     /**
  61.      * 当前毫秒生成的序列
  62.      */
  63.     private long sequence = 0L;
  64.     /**
  65.      * Next id long.
  66.      *
  67.      * @return the nextId
  68.      */
  69.     public Long nextId() {
  70.         return generateId();
  71.     }
  72.     /**
  73.      * 生成唯一id的具体实现
  74.      */
  75.     private synchronized long generateId() {
  76.         long current = System.currentTimeMillis();
  77.         if (current < lastTimestamp) {
  78.             // 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,出现问题返回-1
  79.             return -1;
  80.         }
  81.         if (current == lastTimestamp) {
  82.             // 如果当前生成id的时间还是上次的时间,那么对sequence序列号进行+1
  83.             sequence = (sequence + 1) & MAX_SEQUENCE;
  84.             if (sequence == MAX_SEQUENCE) {
  85.                 // 当前毫秒生成的序列数已经大于最大值,那么阻塞到下一个毫秒再获取新的时间戳
  86.                 current = this.nextMs(lastTimestamp);
  87.             }
  88.         } else {
  89.             // 当前的时间戳已经是下一个毫秒
  90.             sequence = 0L;
  91.         }
  92.         // 更新上次生成id的时间戳
  93.         lastTimestamp = current;
  94.         // 进行移位操作生成int64的唯一ID
  95.         //时间戳右移动23位
  96.         long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;
  97.         //workerId 右移动10位
  98.         long workerId = this.workerId << WORKER_ID_SHIFT;
  99.         return time | workerId | sequence;
  100.     }
  101.     /**
  102.      * 阻塞到下一个毫秒
  103.      */
  104.     private long nextMs(long timeStamp) {
  105.         long current = System.currentTimeMillis();
  106.         while (current <= timeStamp) {
  107.             current = System.currentTimeMillis();
  108.         }
  109.         return current;
  110.     }
  111. }
复制代码
测试
  1. @Slf4j
  2. public class SnowflakeIdTest {
  3.     /**
  4.      * The entry point of application.
  5.      *
  6.      * @param args the input arguments
  7.      * @throws InterruptedException the interrupted exception
  8.      */
  9.     public static void main(String[] args) throws InterruptedException
  10.     {
  11.         //创建worker节点
  12.         long workerId = SnowflakeIdWorker.instance.getId();
  13.         SnowflakeIdGenerator.instance.init(workerId);
  14.         ExecutorService threadPool = Executors.newFixedThreadPool(10);
  15.         final HashSet idSet = new HashSet();
  16.         Collections.synchronizedCollection(idSet);
  17.         long start = System.currentTimeMillis();
  18.         log.info(" start generate id *");
  19.         int threadCount = 10;
  20.         int turn = 50000;
  21.         CountDownLatch countDownLatch = new CountDownLatch(threadCount);
  22.         for (int i = 0; i < threadCount; i++)
  23.             threadPool.execute(() ->
  24.             {
  25.                 for (long j = 0; j < turn; j++)
  26.                 {
  27.                     long id = SnowflakeIdGenerator.instance.nextId();
  28.                     synchronized (idSet)
  29.                     {
  30.                         if (j % 10000 == 0)
  31.                         {
  32.                             log.info("线程{}生成第{}个 id 为:{}",
  33.                                     Thread.currentThread().getName(),j,id);
  34.                         }
  35.                         idSet.add(id);
  36.                     }
  37.                 }
  38.                 countDownLatch.countDown();
  39.             });
  40.         countDownLatch.await(50000, TimeUnit.MICROSECONDS);
  41.         threadPool.shutdown();
  42.         threadPool.awaitTermination(10, TimeUnit.SECONDS);
  43.         long end = System.currentTimeMillis();
  44.         log.info(" end generate id ");
  45.         log.info("* cost " + (end - start) + " ms!");
  46.     }
  47. }
复制代码
运行效果
  1. 2023-10-07 01:12:30.441 [main-EventThread] INFO  o.a.curator.framework.state.ConnectionStateManager --- State change: CONNECTED
  2. 2023-10-07 01:12:30.442 [Curator-ConnectionStateManager-0] INFO  com.bubble.zk_demo.curator.CuratorStandaloneBase --- 连接成功!
  3. 2023-10-07 01:12:30.448 [main-EventThread] INFO  org.apache.curator.framework.imps.EnsembleTracker --- New config event received: {}
  4. 2023-10-07 01:12:30.449 [main-EventThread] INFO  org.apache.curator.framework.imps.EnsembleTracker --- New config event received: {}
  5. 2023-10-07 01:12:30.474 [main] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest ---  start generate id *
  6. 2023-10-07 01:12:30.476 [pool-4-thread-1] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-1生成第0个 id 为:1790262749490577408
  7. 2023-10-07 01:12:30.476 [pool-4-thread-3] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-3生成第0个 id 为:1790262749490577410
  8. 2023-10-07 01:12:30.476 [pool-4-thread-6] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-6生成第0个 id 为:1790262749490577412
  9. 2023-10-07 01:12:30.476 [pool-4-thread-4] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-4生成第0个 id 为:1790262749490577453
  10. 2023-10-07 01:12:30.476 [pool-4-thread-2] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-2生成第0个 id 为:1790262749490577409
  11. 2023-10-07 01:12:30.476 [pool-4-thread-8] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-8生成第0个 id 为:1790262749490577477
  12. 2023-10-07 01:12:30.476 [pool-4-thread-5] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-5生成第0个 id 为:1790262749490577468
  13. 2023-10-07 01:12:30.477 [pool-4-thread-7] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-7生成第0个 id 为:1790262749490577416
  14. 2023-10-07 01:12:30.478 [pool-4-thread-10] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-10生成第0个 id 为:1790262749490577482
  15. 2023-10-07 01:12:30.478 [pool-4-thread-9] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-9生成第0个 id 为:1790262749490577481
  16. 2023-10-07 01:12:30.547 [pool-4-thread-6] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-6生成第10000个 id 为:1790262750086168838
  17. 2023-10-07 01:12:30.556 [pool-4-thread-4] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-4生成第10000个 id 为:1790262750161666780
  18. 2023-10-07 01:12:30.578 [pool-4-thread-7] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-7生成第10000个 id 为:1790262750346216432
  19. 2023-10-07 01:12:30.594 [pool-4-thread-2] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-2生成第10000个 id 为:1790262750480433418
  20. 2023-10-07 01:12:30.597 [pool-4-thread-8] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-8生成第10000个 id 为:1790262750505599648
  21. 2023-10-07 01:12:30.608 [pool-4-thread-8] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-8生成第20000个 id 为:1790262750597874351
  22. 2023-10-07 01:12:30.633 [pool-4-thread-6] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-6生成第20000个 id 为:1790262750807589505
  23. 2023-10-07 01:12:30.649 [pool-4-thread-4] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-4生成第20000个 id 为:1790262750941807275
  24. 2023-10-07 01:12:30.666 [pool-4-thread-9] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-9生成第10000个 id 为:1790262751084413200
  25. 2023-10-07 01:12:30.670 [pool-4-thread-1] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-1生成第10000个 id 为:1790262751117967610
  26. 2023-10-07 01:12:30.683 [pool-4-thread-3] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-3生成第10000个 id 为:1790262751227020199
  27. 2023-10-07 01:12:30.687 [pool-4-thread-2] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-2生成第20000个 id 为:1790262751260574361
  28. 2023-10-07 01:12:30.694 [pool-4-thread-10] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-10生成第10000个 id 为:1790262751319293986
  29. 2023-10-07 01:12:30.699 [pool-4-thread-5] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-5生成第10000个 id 为:1790262751361237527
  30. 2023-10-07 01:12:30.699 [pool-4-thread-8] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-8生成第30000个 id 为:1790262751361237589
  31. 2023-10-07 01:12:30.720 [pool-4-thread-4] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-4生成第30000个 id 为:1790262751537398236
  32. 2023-10-07 01:12:30.738 [pool-4-thread-6] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-6生成第30000个 id 为:1790262751688393287
  33. 2023-10-07 01:12:30.751 [pool-4-thread-7] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-7生成第20000个 id 为:1790262751797445477
  34. 2023-10-07 01:12:30.758 [pool-4-thread-2] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-2生成第30000个 id 为:1790262751856165768
  35. 2023-10-07 01:12:30.765 [pool-4-thread-1] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-1生成第20000个 id 为:1790262751914885757
  36. 2023-10-07 01:12:30.768 [pool-4-thread-3] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-3生成第20000个 id 为:1790262751940051228
  37. 2023-10-07 01:12:30.775 [pool-4-thread-9] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-9生成第20000个 id 为:1790262751998772139
  38. 2023-10-07 01:12:30.788 [pool-4-thread-10] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-10生成第20000个 id 为:1790262752107823274
  39. 2023-10-07 01:12:30.810 [pool-4-thread-4] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-4生成第40000个 id 为:1790262752292372664
  40. 2023-10-07 01:12:30.814 [pool-4-thread-3] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-3生成第30000个 id 为:1790262752325927641
  41. 2023-10-07 01:12:30.824 [pool-4-thread-3] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-3生成第40000个 id 为:1790262752409813500
  42. 2023-10-07 01:12:30.832 [pool-4-thread-7] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-7生成第30000个 id 为:1790262752476922621
  43. 2023-10-07 01:12:30.850 [pool-4-thread-2] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-2生成第40000个 id 为:1790262752627916964
  44. 2023-10-07 01:12:30.858 [pool-4-thread-6] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-6生成第40000个 id 为:1790262752695025715
  45. 2023-10-07 01:12:30.872 [pool-4-thread-9] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-9生成第30000个 id 为:1790262752812466932
  46. 2023-10-07 01:12:30.885 [pool-4-thread-10] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-10生成第30000个 id 为:1790262752921518631
  47. 2023-10-07 01:12:30.898 [pool-4-thread-8] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-8生成第40000个 id 为:1790262753030570681
  48. 2023-10-07 01:12:30.906 [pool-4-thread-7] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-7生成第40000个 id 为:1790262753097679390
  49. 2023-10-07 01:12:30.929 [pool-4-thread-1] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-1生成第30000个 id 为:1790262753290617713
  50. 2023-10-07 01:12:30.959 [pool-4-thread-5] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-5生成第20000个 id 为:1790262753542275827
  51. 2023-10-07 01:12:30.966 [pool-4-thread-1] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-1生成第40000个 id 为:1790262753600996220
  52. 2023-10-07 01:12:30.972 [pool-4-thread-9] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-9生成第40000个 id 为:1790262753651327184
  53. 2023-10-07 01:12:30.990 [pool-4-thread-10] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-10生成第40000个 id 为:1790262753802322148
  54. 2023-10-07 01:12:31.017 [pool-4-thread-5] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-5生成第30000个 id 为:1790262754028815348
  55. 2023-10-07 01:12:31.029 [pool-4-thread-5] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-5生成第40000个 id 为:1790262754129478187
  56. 2023-10-07 01:12:31.040 [main] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest ---  end generate id
  57. 2023-10-07 01:12:31.040 [main] INFO  c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- * cost 566 ms!
  58. Process finished with exit code 0
复制代码

3. zookeeper实现分布式队列

常见的消息队列有:RabbitMQ,RocketMQ,Kafka等。Zookeeper作为一个分布式的小文件管理系统,同样能实现简单的队列功能。Zookeeper不恰当大数据量存储,官方并不保举作为队列利用,但由于实现简单,集群搭建较为便利,因此在一些吞吐量不高的小型系统中还是比力好用的。


3.1 计划思绪


1.创建队列根节点:在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节点下。
2.实现入队操作:当需要将一个元素添加到队列时,可以在队列的根节点下创建一个暂时有序节点。节点的数据可以包含队列元素的信息。
3.实现出队操作:当需要从队列中取出一个元素时,可以执行以下操作:


  • 获取根节点下的所有子节点。
  • 找到具有最小序号的子节点。
  • 获取该节点的数据。
  • 删除该节点。
  • 返回节点的数据。
  1. /**
  2. * 入队
  3. * @param data
  4. * @throws Exception
  5. */
  6. public void enqueue(String data) throws Exception {
  7.     // 创建临时有序子节点
  8.     zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),
  9.             ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  10. }
  11. /**
  12. * 出队
  13. * @return
  14. * @throws Exception
  15. */
  16. public String dequeue() throws Exception {
  17.     while (true) {
  18.         List<String> children = zk.getChildren(QUEUE_ROOT, false);
  19.         if (children.isEmpty()) {
  20.             return null;
  21.         }
  22.         Collections.sort(children);
  23.         for (String child : children) {
  24.             String childPath = QUEUE_ROOT + "/" + child;
  25.             try {
  26.                 byte[] data = zk.getData(childPath, false, null);
  27.                 zk.delete(childPath, -1);
  28.                 return new String(data, StandardCharsets.UTF_8);
  29.             } catch (KeeperException.NoNodeException e) {
  30.                 // 节点已被其他消费者删除,尝试下一个节点
  31.             }
  32.         }
  33.     }
  34. }
复制代码

3.2 利用Apache Curator实现分布式队列

Apache Curator是一个ZooKeeper客户端的封装库,提供了许多高级功能,包括分布式队列。
  1. public class CuratorDistributedQueueDemo {
  2.     private static final String QUEUE_ROOT = "/curator_distributed_queue";
  3.     public static void main(String[] args) throws Exception {
  4.         CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",
  5.                 new ExponentialBackoffRetry(1000, 3));
  6.         client.start();
  7.         // 定义队列序列化和反序列化
  8.         QueueSerializer<String> serializer = new QueueSerializer<String>() {
  9.             @Override
  10.             public byte[] serialize(String item) {
  11.                 return item.getBytes();
  12.             }
  13.             @Override
  14.             public String deserialize(byte[] bytes) {
  15.                 return new String(bytes);
  16.             }
  17.         };
  18.         // 定义队列消费者
  19.         QueueConsumer<String> consumer = new QueueConsumer<String>() {
  20.             @Override
  21.             public void consumeMessage(String message) throws Exception {
  22.                 System.out.println("消费消息: " + message);
  23.             }
  24.             @Override
  25.             public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
  26.             }
  27.         };
  28.         // 创建分布式队列
  29.         DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT)
  30.                 .buildQueue();
  31.         queue.start();
  32.         // 生产消息
  33.         for (int i = 0; i < 5; i++) {
  34.             String message = "Task-" + i;
  35.             System.out.println("生产消息: " + message);
  36.             queue.put(message);
  37.             Thread.sleep(1000);
  38.         }
  39.         Thread.sleep(10000);
  40.         queue.close();
  41.         client.close();
  42.     }
  43. }
复制代码

3.3 留意事项

利用Curator的DistributedQueue时,默认情况下不利用锁。当调用QueueBuilder的lockPath()方法并指定一个锁节点路径时,才会启用锁。假如不指定锁节点路径,那么队列操作大概会受到并发题目的影响。
在创建分布式队列时,指定一个锁节点路径可以帮助确保队列操作的原子性和序次性。分布式情况中,多个消费者大概同时实验消费队列中的消息。假如不利用锁来同步这些操作,大概会导致消息被多次处理或者处理序次出现杂乱。当然,并非所有场景都需要指定锁节点路径。假如您的应用场景允许消息被多次处理,或者处理序次不是关键题目,那么可以不利用锁。这样可以进队伍列操作的性能,因为不再需要期待获取锁。
  1. // 创建分布式队列
  2. QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer, "/order");
  3. //指定了一个锁节点路径/orderlock,用于实现分布式锁,以保证队列操作的原子性和顺序性。
  4. queue = builder.lockPath("/orderlock").buildQueue();
  5. //启动队列,这时队列开始监听ZooKeeper中/order节点下的消息。
  6. queue.start();
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

星球的眼睛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表