【Nacos】Nacos服务注册与发现 心跳检测机制源码剖析

打印 上一主题 下一主题

主题 948|帖子 948|积分 2844

在前两篇文章,介绍了springboot的主动设置原理,而nacos的服务注册就依赖主动设置原理。
Nacos


Nacos核心功能点

服务注册 :Nacos Client会通过发送REST哀求的方式向Nacos Server注册本身的服务,提供自身的元数据,好比ip地址、端口等信息。Nacos Server接收到注册哀求后,就会把这些元数据信息存储在一个双层的内存Map中。
服务心跳: 在服务注册后,Nacos Client会维护一个定时心跳来持续关照Nacos Server,说明服务不停处于可用状态,防止被剔除。默认5s发送一次心跳。
服务健康查抄: Nacos Server会开启一个定时使命用来查抄注册服务实例的健康情况,对于超过15s没有收到客户端心跳的实例会将它 的healthy属性置为false(客户端服务发现时不会发现),如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实比方果恢复 发送心跳则会重新注册)
服务发现: 服务消耗者(Nacos Client)在调用服务提供者的服务时,会发送一个REST哀求给Nacos Server,获取上面注册的服务清 单,并且缓存在Nacos Client本地,同时会在Nacos Client本地开启一个定时使命定时拉取服务端最新的注册表信息更新到本地缓存
服务同步: Nacos Server集群之间会互相同步服务实例,用来保证服务信息的一致性。
客户端 服务注册&心跳发送

在客户端中,也就是开发的应用,包含引入有 nacos-discovery 而路径下包含有一个spring.factories 在主动设置的时候,会加载
如下设置类。
  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2.   com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\
  3.   com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  4.   com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  5.   com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\
复制代码
其中 NacosServiceRegistryAutoConfiguration 中引入了三个类 NacosServiceRegistry、NacosRegistration、NacosAutoServiceRegistration 其中 NacosAutoServiceRegistration 继承了 AbstractAutoServiceRegistration
  1.         public void onApplicationEvent(WebServerInitializedEvent event) {
  2.                 bind(event);
  3.         }
  4.         // 启动
  5.         this.start();
  6.         //注册服务
  7.         register();
复制代码
组装实例信息,ip 端口 服务权重 集群名字 源信息 以及是否
  1.         private Instance getNacosInstanceFromRegistration(Registration registration) {
  2.                 Instance instance = new Instance();
  3.                 instance.setIp(registration.getHost());
  4.                 instance.setPort(registration.getPort());
  5.                 instance.setWeight(nacosDiscoveryProperties.getWeight());
  6.                 instance.setClusterName(nacosDiscoveryProperties.getClusterName());
  7.                 instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
  8.                 instance.setMetadata(registration.getMetadata());
  9.                 instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
  10.                 return instance;
  11.         }
复制代码
  1.         // 注册实例
  2.         namingService.registerInstance(serviceId, group, instance);
  3.         // 服务名 组名 实例
  4.    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
  5.        // 实例非法校验
  6.        NamingUtils.checkInstanceIsLegal(instance);
  7.        // 获取组服务名
  8.        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
  9.       
  10.        if (instance.isEphemeral()) {
  11.                        // 构建心跳信息
  12.            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
  13.            // 启动一个心跳发送线程
  14.            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
  15.        }
  16.        // 实际就是调用 instance
  17.        serverProxy.registerService(groupedServiceName, groupName, instance);
  18.    }
  19.         // 心跳发送线程
  20.         executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
复制代码
发送心跳线程

所谓的客户端心跳,其实就是启动一个线程,然后定时给一个接口发送调用。
  1.         class BeatTask implements Runnable {
  2.         
  3.         BeatInfo beatInfo;
  4.         
  5.         public BeatTask(BeatInfo beatInfo) {
  6.             this.beatInfo = beatInfo;
  7.         }
  8.         
  9.         @Override
  10.         public void run() {
  11.                 // 如果停止 直接返回
  12.             if (beatInfo.isStopped()) {
  13.                 return;
  14.             }
  15.             // 获取下次时间
  16.             long nextTime = beatInfo.getPeriod();
  17.             // 实际就是调用服务端的一个心跳接口  /instance/beat
  18.             JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
  19.             // 如果结束,启动另外一个 开始下次的心跳线程发送
  20.             executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
  21.         }
  22.     }
复制代码

nacos 服务注册

对于服务端来说,就是一个API接口
  1.         public String register(HttpServletRequest request) throws Exception {
  2.         // 准备服务实例
  3.         final Instance instance = parseInstance(request);
  4.         serviceManager.registerInstance(namespaceId, serviceName, instance);
  5.         return "ok";
  6.     }
复制代码
注册实例其实就是三步、创建服务、获取服务、添加实例
  1.     public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
  2.         createEmptyService(namespaceId, serviceName, instance.isEphemeral());
  3.         Service service = getService(namespaceId, serviceName);
  4.         addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
  5.     }
复制代码
创建服务
  1.     public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
  2.             throws NacosException {
  3.         // 命名空间 服务名称
  4.         Service service = getService(namespaceId, serviceName);
  5.         if (service == null) {
  6.             service = new Service();
  7.             service.setName(serviceName);
  8.             service.setNamespaceId(namespaceId);
  9.             service.setGroupName(NamingUtils.getGroupName(serviceName));
  10.             // now validate the service. if failed, exception will be thrown
  11.             service.setLastModifiedMillis(System.currentTimeMillis());
  12.             service.recalculateChecksum();
  13.             if (cluster != null) {
  14.                 cluster.setService(service);
  15.                 service.getClusterMap().put(cluster.getName(), cluster);
  16.             }
  17.             service.validate();
  18.                         // 这里是重点
  19.             putServiceAndInit(service);
  20.             if (!local) {
  21.                 addOrReplaceService(service);
  22.             }
  23.         }
  24.     }
  25.     private void putServiceAndInit(Service service) throws NacosException {
  26.              // 添加服务
  27.         putService(service);
  28.         // 下面说心跳检测机制
  29.         service.init();
  30.     }
复制代码

这里其实就是底层的注册表的数据结构了,这里使用双查抄锁。分别是nameSpace、group、service、实例。
这里简单思索下,为什么要筹划这么复杂。一个服务可能对应多个实例。没有问题。一个分组group 可能是在同一个公司有差别的组,好比订单、支付,每个组都有本身的服务。namespace则可以分为dev\test\prod 三个差别的组。
  1.         // 注册表  如何提升
  2.     private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
  3.         // 这里的机制其实就是 判断当前是否存在该nameSpaceId , 如果不存在的话,则创建一个CSLM
  4.     public void putService(Service service) {
  5.         if (!serviceMap.containsKey(service.getNamespaceId())) {
  6.             // 添加锁 lock
  7.             synchronized (putServiceLock) {
  8.                 if (!serviceMap.containsKey(service.getNamespaceId())) {
  9.                     serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
  10.                 }
  11.             }
  12.         }
  13.         // 将添加服务添加到nameSpaceId 添加服务
  14.         serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
  15.     }
复制代码
  1.     // 添加实例
  2.     public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
  3.             throws NacosException {
  4.         // 构建一个key
  5.         String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
  6.         Service service = getService(namespaceId, serviceName);
  7.         synchronized (service) {
  8.             List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
  9.             Instances instances = new Instances();
  10.             instances.setInstanceList(instanceList);
  11.             // 持久化服务
  12.             consistencyService.put(key, instances);
  13.         }
  14.     }
  15.         // 这里根据判断是否ephemeral 走保存内存还是磁盘持久化
  16.     private ConsistencyService mapConsistencyService(String key) {
  17.         // AP CP
  18.         return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
  19.     }
复制代码
这里其实选择的是 DistroConsistencyServiceImpl 另一个就是 RaftConsistencyServiceImpl 使用raft实现的数据持久化,这里先不介绍。
DistroConsistencyServiceImpl

  1.     public void put(String key, Record value) throws NacosException {
  2.         onPut(key, value);
  3.         distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
  4.                 globalConfig.getTaskDispatchPeriod() / 2);
  5.     }
  6.     // 这里调用put
  7.         public void onPut(String key, Record value) {
  8.         if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
  9.             Datum<Instances> datum = new Datum<>();
  10.             datum.value = (Instances) value;
  11.             datum.key = key;
  12.             datum.timestamp.incrementAndGet();
  13.             //将数据保存到内存中
  14.             dataStore.put(key, datum);
  15.         }
  16.         if (!listeners.containsKey(key)) {
  17.             return;
  18.         }
  19.         // 但是这里调用了一个任务
  20.         notifier.addTask(key, DataOperation.CHANGE);
  21.     }
  22.         // 其实就是一个map
  23.     private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
  24.    
  25.     public void put(String key, Datum value) {
  26.         dataMap.put(key, value);
  27.     }
  28.     @PostConstruct
  29.     public void init() {
  30.         // 初始化 构造方法执行的时候 进行处理
  31.         GlobalExecutor.submitDistroNotifyTask(notifier);
  32.     }
复制代码
从源码中可以看到,在类初始化的时候,创建一个使命异步进行执行。 其实就是将当前服务进行异步使命注册,可以提拔性能。添加和获取使命。
源码英华:很多开源框架为了提拔操作性能会大量使用这种异步使命及内存队列操作,这些操作本省不需要写入立即返回乐成,用这种方式可以提拔操作性能很大资助
  1.    public class Notifier implements Runnable {
  2.                 // 保存服务
  3.         private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
  4.                 // 阻塞队列
  5.         private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
  6.                 // 初始化的时候 添加一个任务到阻塞队列中
  7.         public void addTask(String datumKey, DataOperation action) {
  8.             if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
  9.                 return;
  10.             }
  11.             if (action == DataOperation.CHANGE) {
  12.                 services.put(datumKey, StringUtils.EMPTY);
  13.             }
  14.             tasks.offer(Pair.with(datumKey, action));
  15.         }
  16.         @Override
  17.         public void run() {
  18.             // 为什么异步设计 : 提升性能
  19.             // 阻塞队列 ,会线程等待 wait
  20.             // 并发、反射、网络、IO
  21.             for (; ; ) {
  22.                       // 异步处理
  23.                Pair<String, DataOperation> pair = tasks.take();
  24.                handle(pair);
  25.             }
  26.         }
  27.     }
复制代码

心跳检测机制

其实就是服务注册的时候 启动一个线程,然后查抄所有实例的心跳检测,对于超过15s没有收到客户端心跳的实例会将它 的healthy属性置为false(客户端服务发现时不会发现),如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实比方果恢复 发送心跳则会重新注册)
  1.     /**
  2.      * Init service.
  3.      */
  4.     public void init() {
  5.         // 心跳检查线程
  6.         HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
  7.         for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
  8.             entry.getValue().setService(this);
  9.             entry.getValue().init();
  10.         }
  11.     }
  12.         // 初始化5S后执行,每5S执行一次
  13.     public static void scheduleCheck(ClientBeatCheckTask task) {
  14.         futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
  15.     }
  16.     //
  17.         public void run() {
  18.         List<Instance> instances = service.allIPs(true);
  19.             for (Instance instance : instances) {
  20.                 // 当前时间 减去 心跳超时时间
  21.                 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
  22.                         if (instance.isHealthy()) {
  23.                             // 健康状态
  24.                             instance.setHealthy(false);
  25.                             getPushService().serviceChanged(service);
  26.                             ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
  27.                         }
  28.                     }
  29.                 }
  30.             }
  31.             // then remove obsolete instances:
  32.             for (Instance instance : instances) {
  33.                 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
  34.                     // delete instance
  35.                     Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
  36.                             JacksonUtils.toJson(instance));
  37.                     deleteIp(instance);
  38.                 }
  39.             }
  40.         }
  41.     }
复制代码
至此,我们就根本上过了一遍,服务的注册 以及心跳检测机制,本篇主要是针对nacos1.4.1 的源码学习,关于后续的服务发现,以及2.X版本的源码 讲解,后续在继续。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

温锦文欧普厨电及净水器总代理

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表