Nacos源码 (4) 配置中心

打印 上一主题 下一主题

主题 875|帖子 875|积分 2625

本文阅读nacos-2.0.2的config源码,编写示例,分析推送配置、监听配置的原理。
客户端

创建NacosConfigService对象
  1. Properties properties = new Properties();
  2. properties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_HOST);
  3. NacosConfigService configService = new NacosConfigService(properties);
复制代码
构造方法:
  1. public NacosConfigService(Properties properties) throws NacosException {
  2.     ValidatorUtils.checkInitParam(properties);
  3.    
  4.     initNamespace(properties);
  5.     this.configFilterChainManager = new ConfigFilterChainManager(properties);
  6.     ServerListManager serverListManager = new ServerListManager(properties);
  7.     serverListManager.start();
  8.    
  9.     this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties);
  10.     // will be deleted in 2.0 later versions
  11.     agent = new ServerHttpAgent(serverListManager);
  12. }
复制代码

  • 创建ConfigFilterChainManager - 过滤器链
  • 创建ServerListManager - 服务器列表管理
  • 创建ClientWorker - 用来发送请求,内部封装了一个ConfigRpcTransportClient类型对象agent,它能够获取到RpcClient与服务端进行通信
推送配置
  1. Properties properties = new Properties();
  2. properties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_HOST);
  3. NacosConfigService configService = new NacosConfigService(properties);StringWriter out = new StringWriter();properties.store(out, "test config");// 推送配置到nacos服务器configService.publishConfig(    ORDER_SERVICE, Constants.DEFAULT_GROUP, out.toString(), "properties");
复制代码
推送配置到nacos服务器:
  1. public boolean publishConfig(String dataId,
  2.                              String group,
  3.                              String content,
  4.                              String type) throws NacosException {
  5.     return publishConfigInner(namespace, dataId, group, null, null, null, content, type, null);
  6. }
  7. private boolean publishConfigInner(String tenant, String dataId, String group, String tag, String appName,
  8.         String betaIps, String content, String type, String casMd5) throws NacosException {
  9.     group = blank2defaultGroup(group);
  10.     ParamUtils.checkParam(dataId, group, content);
  11.     ConfigRequest cr = new ConfigRequest();
  12.     cr.setDataId(dataId);
  13.     cr.setTenant(tenant);
  14.     cr.setGroup(group);
  15.     cr.setContent(content);
  16.     cr.setType(type);
  17.     configFilterChainManager.doFilter(cr, null);
  18.     content = cr.getContent();
  19.     String encryptedDataKey = (String) cr.getParameter("encryptedDataKey");
  20.     return worker.publishConfig(
  21.         dataId, group, tenant, appName, tag, betaIps, content, encryptedDataKey, casMd5, type);
  22. }
复制代码
worker使用agent推送配置:
  1. // 1. 封装ConfigPublishRequest对象
  2. ConfigPublishRequest request = new ConfigPublishRequest(dataId, group, tenant, content);
  3. request.setCasMd5(casMd5);
  4. request.putAdditionalParam(TAG_PARAM, tag);
  5. request.putAdditionalParam(APP_NAME_PARAM, appName);
  6. request.putAdditionalParam(BETAIPS_PARAM, betaIps);
  7. request.putAdditionalParam(TYPE_PARAM, type);
  8. request.putAdditionalParam(ENCRYPTED_DATA_KEY_PARAM, encryptedDataKey);
  9. // 2. 获取RpcClient对象
  10. // 3. 使用RpcClient发请求
  11. ConfigPublishResponse response = (ConfigPublishResponse) requestProxy(getOneRunningClient(), request);
复制代码
监听配置
  1. Properties properties = new Properties();
  2. properties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_HOST);
  3. NacosConfigService configService = new NacosConfigService(properties);// 添加监听器configService.addListener(ORDER_SERVICE, Constants.DEFAULT_GROUP, new AbstractListener() {  @Override  public void receiveConfigInfo(String configInfo) {    System.out.printf(">> config: \n%s\n\n", configInfo);  }});
复制代码

  • 将监听器注册到本地,本地使用CacheData作为监听器管理器,封装配置名、dataId和监听器集合,内部使用CopyOnWriteArrayList保存监听器,如果是第一次监听,会先拉取一次配置。本地注册表为Map结构,使用dataId+group+tenant作为key,value是CacheData对象
  • 在client初始化阶段,会注册一个ServerRequestHandler,专门处理服务器端的ConfigChangeNotifyRequest请求,该请求只会推送变化了的配置的基本信息,而不包括内容,所以此处还会触发一次ConfigBatchListenRequest请求
  • 之后查找到变化的配置后,再发一个查询请求拉取配置
服务端

推送配置处理器

ConfigPublishRequestHandler处理器

配置中心使用ConfigPublishRequestHandler类处理客户端配置推送请求:

  • 验证请求参数
  • 封装configAdvanceInfo配置扩展信息
  • 创建ConfigInfo封装配置信息
  • 使用持久层对象insert或者update配置
  • 使用ConfigChangePublisher推送一个ConfigDataChangeEvent事件
    1. ConfigChangePublisher.notifyConfigChange(
    2.     new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
    复制代码
ConfigDataChangeEvent事件处理

ConfigDataChangeEvent事件会触发数据dump操作和集群同步操作。
此处先介绍数据dump操作:
  1. dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
  2.     syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
复制代码
dumpService是Dump data service用于备份数据:
  1. // Add DumpTask to TaskManager, it will execute asynchronously.
  2. public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
  3.         boolean isBeta) {
  4.     String groupKey = GroupKey2.getKey(dataId, group, tenant);
  5.     String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
  6.     // 推送一个DumpTask
  7.     dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
  8. }
复制代码
DumpTask将使用DumpProcessor类处理:

  • 把数据写到磁盘
  • 推送LocalDataChangeEvent事件
LocalDataChangeEvent事件会被以下几个类处理:

  • LongPollingService$1会触发DataChangeTask,响应长轮询订阅者
  • RpcConfigChangeNotifier将数据变化推送给rpc订阅者
  • InternalConfigChangeNotifier处理内部配置文件变化
监听配置处理器

ConfigChangeBatchListenRequestHandler处理器

处理客户端的配置监听请求:
  1. public class ConfigChangeBatchListenRequestHandler
  2.         extends RequestHandler<ConfigBatchListenRequest, ConfigChangeBatchListenResponse> {
  3.     @Autowired
  4.     private ConfigChangeListenContext configChangeListenContext;
  5.     @TpsControl(pointName = "ConfigListen")
  6.     @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
  7.     public ConfigChangeBatchListenResponse handle(
  8.             ConfigBatchListenRequest configChangeListenRequest, RequestMeta meta)
  9.             throws NacosException {
  10.         String connectionId = StringPool.get(meta.getConnectionId());
  11.         String tag = configChangeListenRequest.getHeader(Constants.VIPSERVER_TAG);
  12.         ConfigChangeBatchListenResponse configChangeBatchListenResponse =
  13.             new ConfigChangeBatchListenResponse();
  14.         for (ConfigBatchListenRequest.ConfigListenContext listenContext : configChangeListenRequest
  15.                 .getConfigListenContexts()) {
  16.             String groupKey = GroupKey2
  17.                     .getKey(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant());
  18.             groupKey = StringPool.get(groupKey);
  19.             String md5 = StringPool.get(listenContext.getMd5());
  20.             // 监听
  21.             if (configChangeListenRequest.isListen()) {
  22.                 // 注册监听器,维护groupKey->connectionId集关系和groupKey->md5关系
  23.                 configChangeListenContext.addListen(groupKey, md5, connectionId);
  24.                 // 判断变化
  25.                 boolean isUptoDate = ConfigCacheService.isUptodate(groupKey, md5, meta.getClientIp(), tag);
  26.                 if (!isUptoDate) {
  27.                     // 把变化的配置基本信息添加到响应
  28.                     configChangeBatchListenResponse
  29.                         .addChangeConfig(listenContext.getDataId(), listenContext.getGroup(),
  30.                             listenContext.getTenant());
  31.                 }
  32.             } else {
  33.                 // 取消监听
  34.                 configChangeListenContext.removeListen(groupKey, connectionId);
  35.             }
  36.         }
  37.         return configChangeBatchListenResponse;
  38.     }
  39. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

八卦阵

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表