手撕Nacos源码,今日撕服务户端源码

打印 上一主题 下一主题

主题 888|帖子 888|积分 2664

紧接上文,我们分析了Nacos的客户端代码,
今天我们再来试一下服务端 ,至此就可以Nacos源码就告一段落,欢迎大家品鉴。
nacos服务端

注册中心服务端的主要功能包括,接收客户端的服务注册,服务发现,服务下线的功能,但是除了这些和客户端的交互之外,服务端还要做一些更重要的事情,就是我们常常会在分布式系统中听到的AP和CP,作为一个集群,nacos即实现了AP也实现了CP,其中AP使用的自己实现的Distro协议,而CP是采用raft协议实现的,这个过程中牵涉到心跳、选主等操作。
我们来学习一下注册中心服务端接收客户端服务注册的功能。
注册处理

我们先来学习一下Nacos的工具类WebUtils,该工具类在nacos-core工程下,该工具类是用于处理请求参数转化的,里面提供了2个常被用到的方法required()和optional():
  1. required方法通过参数名key,解析HttpServletRequest请求中的参数,并转码为UTF-8编码。
  2. optional方法在required方法的基础上增加了默认值,如果获取不到,则返回默认值。
复制代码
代码如下:
  1. /**
  2. * required方法通过参数名key,解析HttpServletRequest请求中的参数,并转码为UTF-8编码。
  3. */
  4. public static String required(final HttpServletRequest req, final String key) {
  5.     String value = req.getParameter(key);
  6.     if (StringUtils.isEmpty(value)) {
  7.         throw new IllegalArgumentException("Param '" + key + "' is required.");
  8.     }
  9.     String encoding = req.getParameter("encoding");
  10.     return resolveValue(value, encoding);
  11. }
  12. /**
  13. * optional方法在required方法的基础上增加了默认值,如果获取不到,则返回默认值。
  14. */
  15. public static String optional(final HttpServletRequest req, final String key, final String defaultValue) {
  16.     if (!req.getParameterMap().containsKey(key) || req.getParameterMap().get(key)[0] == null) {
  17.         return defaultValue;
  18.     }
  19.     String value = req.getParameter(key);
  20.     if (StringUtils.isBlank(value)) {
  21.         return defaultValue;
  22.     }
  23.     String encoding = req.getParameter("encoding");
  24.     return resolveValue(value, encoding);
  25. }
复制代码
nacos 的 server 与 client使用了http协议来交互,那么在server端必定提供了http接口的入口,并且在core模块看到其依赖了spring boot starter,所以它的http接口由集成了Spring的web服务器支持,简单地说就是像我们平时写的业务服务一样,有controller层和service层。
以OpenAPI作为入口来学习,我们找到/nacos/v1/ns/instance服务注册接口,在nacos-naming工程中我们可以看到InstanceController正是我们要找的对象,如下图:

处理服务注册,我们直接找对应的POST方法即可,代码如下:
  1. /**
  2. * Register new instance.
  3. * 接收客户端注册信息
  4. * @param request http request
  5. * @return 'ok' if success
  6. * @throws Exception any error during register
  7. */
  8. @CanDistro
  9. @PostMapping
  10. @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
  11. public String register(HttpServletRequest request) throws Exception {
  12.     //获取namespaceid,该参数是可选参数
  13.     final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
  14.     //获取服务名字
  15.     final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
  16.     //校验服务的名字,服务的名字格式为groupName@@serviceName
  17.     NamingUtils.checkServiceNameFormat(serviceName);
  18.     //创建实例
  19.     final Instance instance = parseInstance(request);
  20.     //注册服务
  21.     serviceManager.registerInstance(namespaceId, serviceName, instance);
  22.     return "ok";
  23. }
复制代码
如上图,该方法主要用于接收客户端注册信息,并且会校验参数是否存在问题,如果不存在问题就创建服务的实例,服务实例创建后将服务实例注册到Nacos中,注册的方法如下:
  1. public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
  2.     //判断本地缓存中是否存在该命名空间,如果不存在就创建,之后判断该命名空间下是否
  3.     //存在该服务,如果不存在就创建空的服务
  4.     //如果实例为空,则创建实例,并且会将创建的实例存入到serviceMap集合中
  5.     createEmptyService(namespaceId, serviceName, instance.isEphemeral());
  6.     //从serviceMap集合中获取创建的实例
  7.     Service service = getService(namespaceId, serviceName);
  8.     if (service == null) {
  9.         throw new NacosException(NacosException.INVALID_PARAM,
  10.                 "service not found, namespace: " + namespaceId + ", service: " + serviceName);
  11.     }
  12.     //服务注册,这一步才会把服务的实例信息和服务绑定起来
  13.     addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
  14. }
复制代码
注册的方法中会先创建该实例对象,创建前先检查本地缓存是否存在该实例对象,如果不存在就创建,最后注册该服务,并且该服务会和实例信息捆绑到一起。
Distro协议介绍

Distro是阿里巴巴的私有协议, 是一种分布式一致性算法,目前流行的Nacos服务管理框架就采用了Distro协议。Distro 协议被定位为 临时数据的一致性协议:该类型协议, 不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session会话, 该会话只要存在,数据就不会丢失
Distro 协议保证写必须永远是成功的,即使可能会发生网络分区。当网络恢复时,把各数据分片的数据进行合并。
Distro 协议具有以下特点:
  1. 1:专门为了注册中心而创造出的协议;
  2. 2:客户端与服务端有两个重要的交互,服务注册与心跳发送;
  3. 3:客户端以服务为维度向服务端注册,注册后每隔一段时间向服务端发送一次心跳,心跳包需要带上注册服务的全部信息,在客户端看来,服务端节点对等,所以请求的节点是随机的;
  4. 4:客户端请求失败则换一个节点重新发送请求;
  5. 5:服务端节点都存储所有数据,但每个节点只负责其中一部分服务,在接收到客户端的“写”(注册、心跳、下线等)请求后,服务端节点判断请求的服务是否为自己负责,如果是,则处理,否则交由负责的节点处理;
  6. 6:每个服务端节点主动发送健康检查到其他节点,响应的节点被该节点视为健康节点;
  7. 7:服务端在接收到客户端的服务心跳后,如果该服务不存在,则将该心跳请求当做注册请求来处理;
  8. 8:服务端如果长时间未收到客户端心跳,则下线该服务;
  9. 9:负责的节点在接收到服务注册、服务心跳等写请求后将数据写入后即返回,后台异步地将数据同步给其他节点;
  10. 10:节点在收到读请求后直接从本机获取后返回,无论数据是否为最新。
复制代码
Distro寻址

Distro协议主要用于nacos 服务端节点之间的相互发现,nacos使用寻址机制来实现服务端节点的管理。在Nacos中,寻址模式有三种:
  1. 单机模式(StandaloneMemberLookup)
  2. 文件模式(FileConfigMemberLookup)
  3. 服务器模式(AddressServerMemberLookup)
复制代码
三种寻址模式如下图:

1.2.3.1 单机模式

在com.alibaba.nacos.core.cluster.lookup.LookupFactory中有创建寻址方式,可以创建集群启动方式、单机启动方式,不同启动方式就决定了不同寻址模式,如果是集群启动,
  1. /**
  2. * Create the target addressing pattern.
  3. * 创建寻址模式
  4. * @param memberManager {@link ServerMemberManager}
  5. * @return {@link MemberLookup}
  6. * @throws NacosException NacosException
  7. */
  8. public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {
  9.     //NacosServer 集群方式启动
  10.     if (!EnvUtil.getStandaloneMode()) {
  11.         String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);
  12.         //由参数中传入的寻址方式得到LookupType对象
  13.         LookupType type = chooseLookup(lookupType);
  14.         //选择寻址方式
  15.         LOOK_UP = find(type);
  16.         //设置当前寻址方式
  17.         currentLookupType = type;
  18.     } else {
  19.         //NacosServer单机启动
  20.         LOOK_UP = new StandaloneMemberLookup();
  21.     }
  22.     LOOK_UP.injectMemberManager(memberManager);
  23.     Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());
  24.     return LOOK_UP;
  25. }
  26. /***
  27. * 选择寻址方式
  28. * @param type
  29. * @return
  30. */
  31. private static MemberLookup find(LookupType type) {
  32.     //文件寻址模式,也就是配置cluster.conf配置文件将多个节点串联起来,
  33.     // 通过配置文件寻找其他节点,以达到和其他节点通信的目的
  34.     if (LookupType.FILE_CONFIG.equals(type)) {
  35.         LOOK_UP = new FileConfigMemberLookup();
  36.         return LOOK_UP;
  37.     }
  38.     //服务器模式
  39.     if (LookupType.ADDRESS_SERVER.equals(type)) {
  40.         LOOK_UP = new AddressServerMemberLookup();
  41.         return LOOK_UP;
  42.     }
  43.     // unpossible to run here
  44.     throw new IllegalArgumentException();
  45. }
复制代码
单节点寻址模式会直接创建StandaloneMemberLookup对象,而文件寻址模式会创建FileConfigMemberLookup对象,服务器寻址模式会创建AddressServerMemberLookup;
1.2.3.2 文件寻址模式


文件寻址模式主要在创建集群的时候,通过cluster.conf来配置集群,程序可以通过监听cluster.conf文件变化实现动态管理节点,FileConfigMemberLookup源码如下:
  1. public class FileConfigMemberLookup extends AbstractMemberLookup {
  2.     //创建文件监听器
  3.     private FileWatcher watcher = new FileWatcher() {
  4.         //文件发生变更事件
  5.         @Override
  6.         public void onChange(FileChangeEvent event) {
  7.             readClusterConfFromDisk();
  8.         }
  9.         //检查context是否包含cluster.conf
  10.         @Override
  11.         public boolean interest(String context) {
  12.             return StringUtils.contains(context, "cluster.conf");
  13.         }
  14.     };
  15.     @Override
  16.     public void start() throws NacosException {
  17.         if (start.compareAndSet(false, true)) {
  18.             readClusterConfFromDisk();
  19.             // 使用inotify机制来监视文件更改,并自动触发对cluster.conf的读取
  20.             try {
  21.                 WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
  22.             } catch (Throwable e) {
  23.                 Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
  24.             }
  25.         }
  26.     }
  27.     @Override
  28.     public void destroy() throws NacosException {
  29.         WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);
  30.     }
  31.     private void readClusterConfFromDisk() {
  32.         Collection<Member> tmpMembers = new ArrayList<>();
  33.         try {
  34.             List<String> tmp = EnvUtil.readClusterConf();
  35.             tmpMembers = MemberUtil.readServerConf(tmp);
  36.         } catch (Throwable e) {
  37.             Loggers.CLUSTER
  38.                     .error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
  39.         }
  40.         afterLookup(tmpMembers);
  41.     }
  42. }
复制代码
1.2.3.3 服务器寻址模式

使用地址服务器存储节点信息,会创建AddressServerMemberLookup,服务端定时拉取信息进行管理;
  1. public class AddressServerMemberLookup extends AbstractMemberLookup {
  2.     private final GenericType<RestResult<String>> genericType = new GenericType<RestResult<String>>() {
  3.     };
  4.     public String domainName;
  5.     public String addressPort;
  6.     public String addressUrl;
  7.     public String envIdUrl;
  8.     public String addressServerUrl;
  9.     private volatile boolean isAddressServerHealth = true;
  10.     private int addressServerFailCount = 0;
  11.     private int maxFailCount = 12;
  12.     private final NacosRestTemplate restTemplate = HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);
  13.     private volatile boolean shutdown = false;
  14.     @Override
  15.     public void start() throws NacosException {
  16.         if (start.compareAndSet(false, true)) {
  17.             this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount", "12"));
  18.             initAddressSys();
  19.             run();
  20.         }
  21.     }
  22.     /***
  23.      * 获取服务器地址
  24.      */
  25.     private void initAddressSys() {
  26.         String envDomainName = System.getenv("address_server_domain");
  27.         if (StringUtils.isBlank(envDomainName)) {
  28.             domainName = EnvUtil.getProperty("address.server.domain", "jmenv.tbsite.net");
  29.         } else {
  30.             domainName = envDomainName;
  31.         }
  32.         String envAddressPort = System.getenv("address_server_port");
  33.         if (StringUtils.isBlank(envAddressPort)) {
  34.             addressPort = EnvUtil.getProperty("address.server.port", "8080");
  35.         } else {
  36.             addressPort = envAddressPort;
  37.         }
  38.         String envAddressUrl = System.getenv("address_server_url");
  39.         if (StringUtils.isBlank(envAddressUrl)) {
  40.             addressUrl = EnvUtil.getProperty("address.server.url", EnvUtil.getContextPath() + "/" + "serverlist");
  41.         } else {
  42.             addressUrl = envAddressUrl;
  43.         }
  44.         addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;
  45.         envIdUrl = "http://" + domainName + ":" + addressPort + "/env";
  46.         Loggers.CORE.info("ServerListService address-server port:" + addressPort);
  47.         Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);
  48.     }
  49.     @SuppressWarnings("PMD.UndefineMagicConstantRule")
  50.     private void run() throws NacosException {
  51.         // With the address server, you need to perform a synchronous member node pull at startup
  52.         // Repeat three times, successfully jump out
  53.         boolean success = false;
  54.         Throwable ex = null;
  55.         int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);
  56.         for (int i = 0; i < maxRetry; i++) {
  57.             try {
  58.                 //拉取集群节点信息
  59.                 syncFromAddressUrl();
  60.                 success = true;
  61.                 break;
  62.             } catch (Throwable e) {
  63.                 ex = e;
  64.                 Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
  65.             }
  66.         }
  67.         if (!success) {
  68.             throw new NacosException(NacosException.SERVER_ERROR, ex);
  69.         }
  70.         //创建定时任务
  71.         GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);
  72.     }
  73.     @Override
  74.     public void destroy() throws NacosException {
  75.         shutdown = true;
  76.     }
  77.     @Override
  78.     public Map<String, Object> info() {
  79.         Map<String, Object> info = new HashMap<>(4);
  80.         info.put("addressServerHealth", isAddressServerHealth);
  81.         info.put("addressServerUrl", addressServerUrl);
  82.         info.put("envIdUrl", envIdUrl);
  83.         info.put("addressServerFailCount", addressServerFailCount);
  84.         return info;
  85.     }
  86.     private void syncFromAddressUrl() throws Exception {
  87.         RestResult<String> result = restTemplate
  88.                 .get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());
  89.         if (result.ok()) {
  90.             isAddressServerHealth = true;
  91.             Reader reader = new StringReader(result.getData());
  92.             try {
  93.                 afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));
  94.             } catch (Throwable e) {
  95.                 Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",
  96.                         ExceptionUtil.getAllExceptionMsg(e));
  97.             }
  98.             addressServerFailCount = 0;
  99.         } else {
  100.             addressServerFailCount++;
  101.             if (addressServerFailCount >= maxFailCount) {
  102.                 isAddressServerHealth = false;
  103.             }
  104.             Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());
  105.         }
  106.     }
  107.     // 定时任务
  108.     class AddressServerSyncTask implements Runnable {
  109.         @Override
  110.         public void run() {
  111.             if (shutdown) {
  112.                 return;
  113.             }
  114.             try {
  115.                 //拉取服务列表
  116.                 syncFromAddressUrl();
  117.             } catch (Throwable ex) {
  118.                 addressServerFailCount++;
  119.                 if (addressServerFailCount >= maxFailCount) {
  120.                     isAddressServerHealth = false;
  121.                 }
  122.                 Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
  123.             } finally {
  124.                 GlobalExecutor.scheduleByCommon(this, 5_000L);
  125.             }
  126.         }
  127.     }
  128. }
复制代码
数据同步

Nacos数据同步分为全量同步和增量同步,所谓全量同步就是初始化数据一次性同步,而增量同步是指有数据增加的时候,只同步增加的数据。
全量同步


全量同步流程比较复杂,流程如上图:
  1. 1:启动一个定时任务线程DistroLoadDataTask加载数据,调用load()方法加载数据
  2. 2:调用loadAllDataSnapshotFromRemote()方法从远程机器同步所有的数据
  3. 3:从namingProxy代理获取所有的数据data
  4. 4:构造http请求,调用httpGet方法从指定的server获取数据
  5. 5:从获取的结果result中获取数据bytes
  6. 6:处理数据processData
  7. 7:从data反序列化出datumMap
  8. 8:把数据存储到dataStore,也就是本地缓存dataMap
  9. 9:监听器不包括key,就创建一个空的service,并且绑定监听器
  10. 10:监听器listener执行成功后,就更新data store
复制代码
任务启动

在com.alibaba.nacos.core.distributed.distro.DistroProtocol的构造函数中调用startDistroTask()方法,该方法会执行startVerifyTask()和startLoadTask(),我们重点关注startLoadTask(),该方法代码如下:
  1. /***
  2. * 启动DistroTask
  3. */
  4. private void startDistroTask() {
  5.     if (EnvUtil.getStandaloneMode()) {
  6.         isInitialized = true;
  7.         return;
  8.     }
  9.     //启动startVerifyTask,做数据同步校验
  10.     startVerifyTask();
  11.     //启动DistroLoadDataTask,批量加载数据
  12.     startLoadTask();
  13. }
  14. //启动DistroLoadDataTask
  15. private void startLoadTask() {
  16.     //处理状态回调对象
  17.     DistroCallback loadCallback = new DistroCallback() {
  18.         //处理成功
  19.         @Override
  20.         public void onSuccess() {
  21.             isInitialized = true;
  22.         }
  23.         //处理失败
  24.         @Override
  25.         public void onFailed(Throwable throwable) {
  26.             isInitialized = false;
  27.         }
  28.     };
  29.     //执行DistroLoadDataTask,是一个多线程
  30.     GlobalExecutor.submitLoadDataTask(
  31.             new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
  32. }
  33. /***
  34. * 启动startVerifyTask
  35. * 数据校验
  36. */
  37. private void startVerifyTask() {
  38.     GlobalExecutor.schedulePartitionDataTimedSync(
  39.         new DistroVerifyTask(
  40.             memberManager,
  41.             distroComponentHolder),
  42.         distroConfig.getVerifyIntervalMillis());
  43. }
复制代码
数据如何执行加载

上面方法会调用DistroLoadDataTask对象,而该对象其实是个线程,因此会执行它的run方法,run方法会调用load()方法实现数据全量加载,代码如下:
  1. /***
  2. * 数据加载过程
  3. */
  4. @Override
  5. public void run() {
  6.     try {
  7.         //加载数据
  8.         load();
  9.         if (!checkCompleted()) {
  10.             GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
  11.         } else {
  12.             loadCallback.onSuccess();
  13.             Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
  14.         }
  15.     } catch (Exception e) {
  16.         loadCallback.onFailed(e);
  17.         Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
  18.     }
  19. }
  20. /***
  21. * 加载数据,并同步
  22. * @throws Exception
  23. */
  24. private void load() throws Exception {
  25.     while (memberManager.allMembersWithoutSelf().isEmpty()) {
  26.         Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
  27.         TimeUnit.SECONDS.sleep(1);
  28.     }
  29.     while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
  30.         Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
  31.         TimeUnit.SECONDS.sleep(1);
  32.     }
  33.     //同步数据
  34.     for (String each : distroComponentHolder.getDataStorageTypes()) {
  35.         if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
  36.             //从远程机器上同步所有数据
  37.             loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
  38.         }
  39.     }
  40. }
复制代码
数据同步

数据同步会通过Http请求从远程服务器获取数据,并同步到当前服务的缓存中,执行流程如下:
  1. 1:loadAllDataSnapshotFromRemote()从远程加载所有数据,并处理同步到本机
  2. 2:transportAgent.getDatumSnapshot()远程加载数据,通过Http请求执行远程加载
  3. 3:dataProcessor.processSnapshot()处理数据同步到本地
复制代码
数据处理完整逻辑代码如下:loadAllDataSnapshotFromRemote()方法
  1. /***
  2. * 从远程机器上同步所有数据
  3. */
  4. private boolean loadAllDataSnapshotFromRemote(String resourceType) {
  5.     DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
  6.     DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
  7.     if (null == transportAgent || null == dataProcessor) {
  8.         Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
  9.                 resourceType, transportAgent, dataProcessor);
  10.         return false;
  11.     }
  12.     //遍历集群成员节点,不包括自己
  13.     for (Member each : memberManager.allMembersWithoutSelf()) {
  14.         try {
  15.             Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
  16.             //从远程节点加载数据,调用http请求接口: distro/datums;
  17.             DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
  18.             //处理数据
  19.             boolean result = dataProcessor.processSnapshot(distroData);
  20.             Loggers.DISTRO
  21.                     .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
  22.                             result);
  23.             if (result) {
  24.                 return true;
  25.             }
  26.         } catch (Exception e) {
  27.             Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
  28.         }
  29.     }
  30.     return false;
  31. }
复制代码
远程加载数据代码如下:transportAgent.getDatumSnapshot()方法
  1. /***
  2. * 从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;
  3. * @param targetServer target server.
  4. * @return
  5. */
  6. @Override
  7. public DistroData getDatumSnapshot(String targetServer) {
  8.     try {
  9.         //从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;
  10.         byte[] allDatum = NamingProxy.getAllData(targetServer);
  11.         //将数据封装成DistroData
  12.         return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);
  13.     } catch (Exception e) {
  14.         throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);
  15.     }
  16. }
  17. /**
  18. * Get all datum from target server.
  19. * NamingProxy.getAllData
  20. * 执行HttpGet请求,并获取返回数据
  21. * @param server target server address
  22. * @return all datum byte array
  23. * @throws Exception exception
  24. */
  25. public static byte[] getAllData(String server) throws Exception {
  26.     //参数封装
  27.     Map<String, String> params = new HashMap<>(8);
  28.     //组装URL,并执行HttpGet请求,获取结果集
  29.     RestResult<String> result = HttpClient.httpGet(
  30.             "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,
  31.             new ArrayList<>(), params);
  32.     //返回数据
  33.     if (result.ok()) {
  34.         return result.getData().getBytes();
  35.     }
  36.     throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()
  37.             + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: "
  38.             + result.getMessage());
  39. }
复制代码
处理数据同步到本地代码如下:dataProcessor.processSnapshot()
  1. /**
  2. * 数据处理并更新本地缓存
  3. * @param data
  4. * @return
  5. * @throws Exception
  6. */
  7. private boolean processData(byte[] data) throws Exception {
  8.     if (data.length > 0) {
  9.         //从data反序列化出datumMap
  10.         Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
  11.         // 把数据存储到dataStore,也就是本地缓存dataMap
  12.         for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
  13.             dataStore.put(entry.getKey(), entry.getValue());
  14.             //监听器不包括key,就创建一个空的service,并且绑定监听器
  15.             if (!listeners.containsKey(entry.getKey())) {
  16.                 // pretty sure the service not exist:
  17.                 if (switchDomain.isDefaultInstanceEphemeral()) {
  18.                     // create empty service
  19.                     //创建一个空的service
  20.                     Loggers.DISTRO.info("creating service {}", entry.getKey());
  21.                     Service service = new Service();
  22.                     String serviceName = KeyBuilder.getServiceName(entry.getKey());
  23.                     String namespaceId = KeyBuilder.getNamespace(entry.getKey());
  24.                     service.setName(serviceName);
  25.                     service.setNamespaceId(namespaceId);
  26.                     service.setGroupName(Constants.DEFAULT_GROUP);
  27.                     // now validate the service. if failed, exception will be thrown
  28.                     service.setLastModifiedMillis(System.currentTimeMillis());
  29.                     service.recalculateChecksum();
  30.                     // The Listener corresponding to the key value must not be empty
  31.                     // 与键值对应的监听器不能为空,这里的监听器类型是 ServiceManager
  32.                     RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
  33.                     if (Objects.isNull(listener)) {
  34.                         return false;
  35.                     }
  36.                     //为空的绑定监听器
  37.                     listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
  38.                 }
  39.             }
  40.         }
  41.         //循环所有datumMap
  42.         for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
  43.             if (!listeners.containsKey(entry.getKey())) {
  44.                 // Should not happen:
  45.                 Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
  46.                 continue;
  47.             }
  48.             try {
  49.                 //执行监听器的onChange监听方法
  50.                 for (RecordListener listener : listeners.get(entry.getKey())) {
  51.                     listener.onChange(entry.getKey(), entry.getValue().value);
  52.                 }
  53.             } catch (Exception e) {
  54.                 Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
  55.                 continue;
  56.             }
  57.             // Update data store if listener executed successfully:
  58.             // 监听器listener执行成功后,就更新dataStore
  59.             dataStore.put(entry.getKey(), entry.getValue());
  60.         }
  61.     }
  62.     return true;
  63. }
复制代码
到此实现数据全量同步,其实全量同步最终封装的协议还是Http。
增量同步

新增数据使用异步广播同步:
  1. 1:DistroProtocol 使用 sync() 方法接收增量数据
  2. 2:向其他节点发布广播任务
  3.   调用 distroTaskEngineHolder 发布延迟任务
  4.   
  5. 3:调用 DistroDelayTaskProcessor.process() 方法进行任务投递:将延迟任务转换为异步变更任务
  6. 4:执行变更任务 DistroSyncChangeTask.run() 方法:向指定节点发送消息
  7.   调用 DistroHttpAgent.syncData() 方法发送数据
  8.   调用 NamingProxy.syncData() 方法发送数据
  9.   
  10. 5:异常任务调用 handleFailedTask() 方法进行处理
  11.   调用 DistroFailedTaskHandler 处理失败任务
  12.   调用 DistroHttpCombinedKeyTaskFailedHandler 将失败任务重新投递成延迟任务。
复制代码
增量数据入口

我们回到服务注册,服务注册的InstanceController.register()就是数据入口,它会调用ServiceManager.registerInstance(),执行数据同步的时候,调用addInstance(),在该方法中会执行DistroConsistencyServiceImpl.put(),该方法是增量同步的入口,会调用distroProtocol.sync()方法,代码如下:
  1. /***
  2. * 数据保存
  3. * @param key   key of data, this key should be globally unique
  4. * @param value value of data
  5. * @throws NacosException
  6. */
  7. @Override
  8. public void put(String key, Record value) throws NacosException {
  9.     //将数据存入到dataStore中
  10.     onPut(key, value);
  11.     //使用distroProtocol同步数据
  12.     distroProtocol.sync(
  13.         new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),
  14.         DataOperation.CHANGE,
  15.         globalConfig.getTaskDispatchPeriod() / 2);
  16. }
复制代码
sync()方法会执行任务发布,代码如下:
  1. public void sync(DistroKey distroKey, DataOperation action, long delay) {
  2.     //向除了自己外的所有节点广播
  3.     for (Member each : memberManager.allMembersWithoutSelf()) {
  4.         DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
  5.                 each.getAddress());
  6.         DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
  7.         //从distroTaskEngineHolder获取延时执行引擎,并将distroDelayTask任务添加进来
  8.         //执行延时任务发布
  9.         distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
  10.         if (Loggers.DISTRO.isDebugEnabled()) {
  11.             Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
  12.         }
  13.     }
  14. }
复制代码
增量同步操作

延迟任务对象我们可以从DistroTaskEngineHolder构造函数中得知是DistroDelayTaskProcessor,代码如下:
  1. /***
  2. * 构造函数指定任务处理器
  3. * @param distroComponentHolder
  4. */
  5. public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
  6.     DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
  7.     //指定任务处理器defaultDelayTaskProcessor
  8.     delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
  9. }
复制代码
它延迟执行的时候会执行process方法,该方法正是执行数据同步的地方,它会执行DistroSyncChangeTask任务,代码如下:
  1. /***
  2. * 任务处理过程
  3. * @param task     task.
  4. * @return
  5. */
  6. @Override
  7. public boolean process(NacosTask task) {
  8.     if (!(task instanceof DistroDelayTask)) {
  9.         return true;
  10.     }
  11.     DistroDelayTask distroDelayTask = (DistroDelayTask) task;
  12.     DistroKey distroKey = distroDelayTask.getDistroKey();
  13.     if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
  14.         //将延迟任务变更成异步任务,异步任务对象是一个线程
  15.         DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
  16.         //将任务添加到NacosExecuteTaskExecuteEngine中,并执行
  17.         distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
  18.         return true;
  19.     }
  20.     return false;
  21. }
复制代码
DistroSyncChangeTask实质上是任务的开始,它自身是一个线程,所以会执行它的run方法,而run方法这是数据同步操作,代码如下:
  1. /***
  2. * 执行数据同步
  3. */
  4. @Override
  5. public void run() {
  6.     Loggers.DISTRO.info("[DISTRO-START] {}", toString());
  7.     try {
  8.         //获取本地缓存数据
  9.         String type = getDistroKey().getResourceType();
  10.         DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
  11.         distroData.setType(DataOperation.CHANGE);
  12.         //向其他节点同步数据
  13.         boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
  14.         if (!result) {
  15.             handleFailedTask();
  16.         }
  17.         Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
  18.     } catch (Exception e) {
  19.         Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
  20.         handleFailedTask();
  21.     }
  22. }
复制代码
数据同步会执行调用syncData,该方法其实就是通过Http协议将数据发送到其他节点实现数据同步,代码如下:
  1. /***
  2. * 向其他节点同步数据
  3. * @param data         data
  4. * @param targetServer target server
  5. * @return
  6. */
  7. @Override
  8. public boolean syncData(DistroData data, String targetServer) {
  9.     if (!memberManager.hasMember(targetServer)) {
  10.         return true;
  11.     }
  12.     //获取数据字节数组
  13.     byte[] dataContent = data.getContent();
  14.     //通过Http协议同步数据
  15.     return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
  16. }
复制代码
最后:一定要跟着讲师所给的源码自行走一遍!!!
本文由传智教育博学谷 - 狂野架构师教研团队发布
如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力
转载请注明出处!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

熊熊出没

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表