微服务组件-----Spring Cloud Alibaba 注册中心 Nacos源码(1.4.x版本)分 ...

打印 上一主题 下一主题

主题 538|帖子 538|积分 1614

 

核心功能点

【1】服务注册:Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务,提供自身的元数据,比如ip地址、端口等信息。Nacos Server接收到注册请求后,就会把这些元数据信息存储在一个双层的内存Map中。
【2】服务心跳:在服务注册后,Nacos Client会维护一个定时心跳来持续通知Nacos Server,说明服务一直处于可用状态,防止被剔除。默认5s发送一次心跳
【3】服务同步:Nacos Server集群之间会互相同步服务实例,用来保证服务信息的一致性。  
【4】服务发现:服务消费者(Nacos Client)在调用服务提供者的服务时,会发送一个REST请求给Nacos Server,获取上面注册的服务清单,并且缓存在Nacos Client本地,同时会在Nacos Client本地开启一个定时任务定时拉取服务端最新的注册表信息更新到本地缓存
【5】服务健康检查:Nacos Server会开启一个定时任务用来检查注册服务实例的健康情况,对于超过15s没有收到客户端心跳的实例会将它的healthy属性置为false(客户端服务发现时不会发现),如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实例如果恢复发送心跳则会重新注册)
 

源码精髓总结

【1】注册表的结构说明(这个仅是记录):
  1. //Map<namespaceId, Map<service_name, Service>【ConcurrentSkipListMap】>
  2. private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
  3. //再分析里面的Service,Map<clusterName, Cluster>
  4. private Map<String, Cluster> clusterMap = new HashMap<>();
  5. //再分析Cluster
  6. private Set<Instance> persistentInstances = new HashSet<>();
  7. private Set<Instance> ephemeralInstances = new HashSet<>();
复制代码
【2】分析注册表为何要这么设计
  1. 1.注册表是基于第一层ConcurrentHashMap,第二层ConcurrentSkipListMap,第三层HashMap,然后定位到对应的Cluster。
  2. 2.至于为什么要这样设计,一方面是将粒度划分的更细,通过源码分析可知,nacos更新注册表是进行小范围的更新,如定位到Cluster的临时列表ephemeralInstances或者持久列表persistentInstances【这两个都是set集合,所以排除了会有重复的数据】。因为粒度小所以更新速度会更快。
  3. 3.其次采用的是 写时复制思想,也就是说,不会影响读取的效率,因为是新开一个副本,将新旧的数据合并到一个新数据里面,然后将引用指向新数据。
  4. 4.其次是为了高扩展,对namespace进行划分【对开发环境隔离】,对service进行划分【对服务进行隔离】,对Cluster进行划分【多机房部署,加快访问速度】
  5. 5.为了解决并发读写问题,采用的是ConcurrentHashMap与ConcurrentSkipListMap的分段锁,加上Cluster里面的写时复制。其次Cluster里面是不加锁的,因为是单线程进行修改,不存在冲突。
  6. 6.虽说牺牲了,一定的实时性,但是大大提高了并发的性能。
复制代码
【3】分析AP架构下为什么高性能的原因
  1. 1.因为采用的是异步任务加队列的形式来实现注册的,所以响应很快,然后任务是慢慢做的。
  2. 2.Notifier 是在DistroConsistencyServiceImpl类中初始化,默认单线程,而且队列为ArrayBlockingQueue<>(1024 * 1024)。
  3. 3.缩小了变更数据的粒度,单线程避免了线程安全问题【不用加锁】。
  4. 4.这种方式毫无疑问是会存在问题的,就是响应了但是没有注册上。但是对于这个问题,在客户端里面做了心跳机制,如果检测不到会重新注册。
复制代码
 【4】分析Nacos为什么感知快的原因
  1. 采用的是客户端定时进行一次拉取,兼服务端采用异步的形式使用UDP发送更新的数据到客户端;
  2. 虽然UDP存在通知丢失的情况,但是每隔1s的拉取依旧能很好的保持数据的最终一致性。
复制代码
 
源码分析

验证服务端

【1】在启动的时候我们一般是调用shell脚本启动,查看startup.sh脚本
  从以下看实际上是调用了java命令启动了个java的项目(-jar ${BASE_DIR}/target/${SERVER}.jar 将参数对应替换后 -jar ${BASE_DIR}/target/nacos-server.jar)
  去寻找启动入口的时候会发现,它其实是SpringBoot搭建的一个WEB服务。
  1. cygwin=false
  2. darwin=false
  3. os400=false
  4. case "`uname`" in
  5. CYGWIN*) cygwin=true;;
  6. Darwin*) darwin=true;;
  7. OS400*) os400=true;;
  8. esac
  9. error_exit ()
  10. {
  11.     echo "ERROR: $1 !!"
  12.     exit 1
  13. }
  14. [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
  15. [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
  16. [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/opt/taobao/java
  17. [ ! -e "$JAVA_HOME/bin/java" ] && unset JAVA_HOME
  18. if [ -z "$JAVA_HOME" ]; then
  19.   if $darwin; then
  20.     if [ -x '/usr/libexec/java_home' ] ; then
  21.       export JAVA_HOME=`/usr/libexec/java_home`
  22.     elif [ -d "/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home" ]; then
  23.       export JAVA_HOME="/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home"
  24.     fi
  25.   else
  26.     JAVA_PATH=`dirname $(readlink -f $(which javac))`
  27.     if [ "x$JAVA_PATH" != "x" ]; then
  28.       export JAVA_HOME=`dirname $JAVA_PATH 2>/dev/null`
  29.     fi
  30.   fi
  31.   if [ -z "$JAVA_HOME" ]; then
  32.         error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)! jdk8 or later is better!"
  33.   fi
  34. fi
  35. export SERVER="nacos-server"
  36. export MODE="cluster"
  37. export FUNCTION_MODE="all"
  38. export MEMBER_LIST=""
  39. export EMBEDDED_STORAGE=""
  40. while getopts ":m:f:s:c:p:" opt
  41. do
  42.     case $opt in
  43.         m)
  44.             MODE=$OPTARG;;
  45.         f)
  46.             FUNCTION_MODE=$OPTARG;;
  47.         s)
  48.             SERVER=$OPTARG;;
  49.         c)
  50.             MEMBER_LIST=$OPTARG;;
  51.         p)
  52.             EMBEDDED_STORAGE=$OPTARG;;
  53.         ?)
  54.         echo "Unknown parameter"
  55.         exit 1;;
  56.     esac
  57. done
  58. export JAVA_HOME
  59. export JAVA="$JAVA_HOME/bin/java"
  60. export BASE_DIR=`cd $(dirname $0)/..; pwd`
  61. export CUSTOM_SEARCH_LOCATIONS=file:${BASE_DIR}/conf/
  62. #===========================================================================================
  63. # JVM Configuration
  64. #===========================================================================================
  65. if [[ "${MODE}" == "standalone" ]]; then
  66.     JAVA_OPT="${JAVA_OPT} -Xms512m -Xmx512m -Xmn256m"
  67.     JAVA_OPT="${JAVA_OPT} -Dnacos.standalone=true"
  68. else
  69.     if [[ "${EMBEDDED_STORAGE}" == "embedded" ]]; then
  70.         JAVA_OPT="${JAVA_OPT} -DembeddedStorage=true"
  71.     fi
  72.     JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
  73.     JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${BASE_DIR}/logs/java_heapdump.hprof"
  74.     JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
  75. fi
  76. if [[ "${FUNCTION_MODE}" == "config" ]]; then
  77.     JAVA_OPT="${JAVA_OPT} -Dnacos.functionMode=config"
  78. elif [[ "${FUNCTION_MODE}" == "naming" ]]; then
  79.     JAVA_OPT="${JAVA_OPT} -Dnacos.functionMode=naming"
  80. fi
  81. JAVA_OPT="${JAVA_OPT} -Dnacos.member.list=${MEMBER_LIST}"
  82. JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
  83. if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
  84.   JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${BASE_DIR}/logs/nacos_gc.log:time,tags:filecount=10,filesize=102400"
  85. else
  86.   JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext"
  87.   JAVA_OPT="${JAVA_OPT} -Xloggc:${BASE_DIR}/logs/nacos_gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
  88. fi
  89. JAVA_OPT="${JAVA_OPT} -Dloader.path=${BASE_DIR}/plugins/health,${BASE_DIR}/plugins/cmdb"
  90. JAVA_OPT="${JAVA_OPT} -Dnacos.home=${BASE_DIR}"
  91. JAVA_OPT="${JAVA_OPT} -jar ${BASE_DIR}/target/${SERVER}.jar"
  92. JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
  93. JAVA_OPT="${JAVA_OPT} --spring.config.additional-location=${CUSTOM_SEARCH_LOCATIONS}"
  94. JAVA_OPT="${JAVA_OPT} --logging.config=${BASE_DIR}/conf/nacos-logback.xml"
  95. JAVA_OPT="${JAVA_OPT} --server.max-http-header-size=524288"
  96. if [ ! -d "${BASE_DIR}/logs" ]; then
  97.   mkdir ${BASE_DIR}/logs
  98. fi
  99. echo "$JAVA ${JAVA_OPT}"
  100. if [[ "${MODE}" == "standalone" ]]; then
  101.     echo "nacos is starting with standalone"
  102. else
  103.     echo "nacos is starting with cluster"
  104. fi
  105. # check the start.out log output file
  106. if [ ! -f "${BASE_DIR}/logs/start.out" ]; then
  107.   touch "${BASE_DIR}/logs/start.out"
  108. fi
  109. # start
  110. echo "$JAVA ${JAVA_OPT}" > ${BASE_DIR}/logs/start.out 2>&1 &
  111. nohup $JAVA ${JAVA_OPT} nacos.nacos >> ${BASE_DIR}/logs/start.out 2>&1 &
  112. echo "nacos is starting,you can check the ${BASE_DIR}/logs/start.out"
复制代码
 
从客户端开始分析

【1】根据自动装配原理(寻找spring.factories文件配置)
  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2.   com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
  3.   com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  4.   com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  5.   com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
  6.   com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
  7. org.springframework.cloud.bootstrap.BootstrapConfiguration=\
  8.   com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
复制代码
【2】分析NacosDiscoveryAutoConfiguration类自动装配了什么
  1. @Configuration
  2. @EnableConfigurationProperties
  3. @ConditionalOnNacosDiscoveryEnabled
  4. @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
  5. @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class })
  6. public class NacosDiscoveryAutoConfiguration {
  7.     @Bean
  8.     public NacosServiceRegistry nacosServiceRegistry(
  9.             NacosDiscoveryProperties nacosDiscoveryProperties) {
  10.         return new NacosServiceRegistry(nacosDiscoveryProperties);
  11.     }
  12.     @Bean
  13.     @ConditionalOnBean(AutoServiceRegistrationProperties.class)
  14.     public NacosRegistration nacosRegistration(
  15.             NacosDiscoveryProperties nacosDiscoveryProperties,
  16.             ApplicationContext context) {
  17.         return new NacosRegistration(nacosDiscoveryProperties, context);
  18.     }
  19.     //可以看出是将上面两个Bean当做参数传入了这个Bean
  20.     @Bean
  21.     @ConditionalOnBean(AutoServiceRegistrationProperties.class)
  22.     public NacosAutoServiceRegistration nacosAutoServiceRegistration(
  23.             NacosServiceRegistry registry,
  24.             AutoServiceRegistrationProperties autoServiceRegistrationProperties,
  25.             NacosRegistration registration) {
  26.         return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
  27.     }
  28. }
复制代码
 
 【3】分析NacosAutoServiceRegistration类有什么重要性

  利用监听机制,达到注册服务的目的。监听WebServer初始化事件
  1. //class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration>
  2. //abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent>
  3. //因为继承了ApplicationListener,必然会有监听方法
  4. public void onApplicationEvent(WebServerInitializedEvent event) {
  5.     <strong>bind</strong>(event);
  6. }
  7. @Deprecated
  8. public void bind(WebServerInitializedEvent event) {
  9.     ApplicationContext context = event.getApplicationContext();
  10.     if (context instanceof ConfigurableWebServerApplicationContext) {
  11.         if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
  12.             return;
  13.         }
  14.     }
  15.     this.port.compareAndSet(0, event.getWebServer().getPort());
  16.     this.<strong>start</strong>();
  17. }
  18. public void start() {
  19.     if (!isEnabled()) {return;
  20.     }
  21.     // only initialize if nonSecurePort is greater than 0 and it isn't already running
  22.     // because of containerPortInitializer below
  23.     if (!this.running.get()) {
  24.         this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
  25.         <strong>register</strong>();
  26.         if (shouldRegisterManagement()) {
  27.             registerManagement();
  28.         }
  29.         this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
  30.         this.running.compareAndSet(false, true);
  31.     }
  32. }
  33. protected void register() {
  34.     this.serviceRegistry.<strong>register</strong>(getRegistration());
  35. }
  36. @Override
  37. public void register(Registration registration) {
  38.     if (StringUtils.isEmpty(registration.getServiceId())) {
  39.         return;
  40.     }
  41.     NamingService namingService = namingService();
  42.     String serviceId = registration.getServiceId();
  43.     String group = nacosDiscoveryProperties.getGroup();
  44.     Instance instance = getNacosInstanceFromRegistration(registration);
  45.     try {
  46.         <strong>namingService.registerInstance</strong>(serviceId, group, instance);
  47.     }
  48.     catch (Exception e) {
  49.         // rethrow a RuntimeException if the registration is failed.
  50.         // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
  51.         rethrowRuntimeException(e);
  52.     }
  53. }
复制代码
 
【4】分析如何注册的【服务注册】
  1. //NacosNamingService类的registerInstance方法
  2. @Override
  3. public void registerInstance(String serviceName, Instance instance) throws NacosException {
  4.     <strong>registerInstance</strong>(serviceName, Constants.DEFAULT_GROUP, instance);
  5. }
  6. //NacosNamingService类#registerInstance方法
  7. @Override
  8. public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
  9.     NamingUtils.checkInstanceIsLegal(instance);
  10.     String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
  11.     if (instance.isEphemeral()) {
  12.         BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);<br>     <strong>//添加一个延时执行的心跳任务
  13.         beatReactor.addBeatInfo</strong>(groupedServiceName, beatInfo);
  14.     }<br>   <strong>//进行服务注册</strong>
  15.     <strong>serverProxy.registerService</strong>(groupedServiceName, groupName, instance);
  16. }
  17. //NamingProxy类#registerService方法
  18. public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  19.     //构建注册参数
  20.     final Map<String, String> params = new HashMap<String, String>(16);
  21.     params.put(CommonParams.NAMESPACE_ID, namespaceId);
  22.     params.put(CommonParams.SERVICE_NAME, serviceName);
  23.     params.put(CommonParams.GROUP_NAME, groupName);
  24.     params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
  25.     params.put("ip", instance.getIp());
  26.     params.put("port", String.valueOf(instance.getPort()));
  27.     params.put("weight", String.valueOf(instance.getWeight()));
  28.     params.put("enable", String.valueOf(instance.isEnabled()));
  29.     params.put("healthy", String.valueOf(instance.isHealthy()));
  30.     params.put("ephemeral", String.valueOf(instance.isEphemeral()));
  31.     params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
  32.     //向服务端发送请求
  33.     //UtilAndComs.nacosUrlInstance=/nacos/v1/ns/instance  也就是官网所示的注册接口地址
  34. <strong>    reqApi</strong>(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
  35.    
  36. }
  37. public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
  38.     return reqApi(api, params, Collections.EMPTY_MAP, method);
  39. }
  40. public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException {
  41.     return reqApi(api, params, body, getServerList(), method);
  42. }
  43. public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {
  44.    
  45.     params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
  46.    
  47.     if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
  48.         throw new NacosException(...);
  49.     }
  50.    
  51.     NacosException exception = new NacosException();
  52.    
  53.     if (StringUtils.isNotBlank(nacosDomain)) {
  54.         for (int i = 0; i < maxRetry; i++) {
  55.             try {
  56.                 return<strong> callServer</strong>(api, params, body, nacosDomain, method);
  57.             } catch (NacosException e) {
  58.                 exception = e;
  59.             }
  60.         }
  61.     } else {
  62.         Random random = new Random(System.currentTimeMillis());
  63.         int index = random.nextInt(servers.size());
  64.         
  65.         for (int i = 0; i < servers.size(); i++) {
  66.             String server = servers.get(index);
  67.             try {
  68.                 return<strong> callServer</strong>(api, params, body, server, method);
  69.             } catch (NacosException e) {
  70.                 exception = e;
  71.             }
  72.             index = (index + 1) % servers.size();
  73.         }
  74.     }
  75.     throw new NacosException(...);
  76. }
  77. public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException {
  78.     long start = System.currentTimeMillis();
  79.     long end = 0;
  80.     injectSecurityInfo(params);
  81.     Header header = builderHeader();
  82.    
  83.     String url;
  84.     if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
  85.         url = curServer + api;
  86.     } else {
  87.         if (!IPUtil.containsPort(curServer)) {
  88.             curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
  89.         }
  90.         url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
  91.     }
  92.    
  93.     try {
  94.         //真正远程调用
  95.         HttpRestResult<String> restResult = nacosRestTemplate
  96.                 .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
  97.         end = System.currentTimeMillis();
  98.         
  99.         MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end - start);
  100.         
  101.         if (restResult.ok()) {
  102.             return restResult.getData();
  103.         }
  104.         if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
  105.             return StringUtils.EMPTY;
  106.         }
  107.         throw new NacosException(restResult.getCode(), restResult.getMessage());
  108.     } catch (Exception e) {
  109.         throw new NacosException(NacosException.SERVER_ERROR, e);
  110.     }
  111. }
  112. public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues, String httpMethod, Type responseType) throws Exception {
  113.     RequestHttpEntity requestHttpEntity = new RequestHttpEntity( header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
  114.     return execute(url, httpMethod, requestHttpEntity, responseType);
  115. }
  116. private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType) throws Exception {
  117.     URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
  118.    
  119.     ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
  120.     HttpClientResponse response = null;
  121.     try {
  122.         //使用JdkHttpClientRequest去发起请求
  123.         response = this.requestClient().execute(uri, httpMethod, requestEntity);
  124.         return responseHandler.handle(response);
  125.     } finally {
  126.         if (response != null) {
  127.             response.close();
  128.         }
  129.     }
  130. }
  131. //JdkHttpClientRequest类#execute方法
  132. @Override
  133. public HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity) throws Exception {
  134.     final Object body = requestHttpEntity.getBody();
  135.     final Header headers = requestHttpEntity.getHeaders();
  136.     replaceDefaultConfig(requestHttpEntity.getHttpClientConfig());
  137.    
  138.     HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection();
  139.     Map<String, String> headerMap = headers.getHeader();
  140.     if (headerMap != null && headerMap.size() > 0) {
  141.         for (Map.Entry<String, String> entry : headerMap.entrySet()) {
  142.             conn.setRequestProperty(entry.getKey(), entry.getValue());
  143.         }
  144.     }
  145.    
  146.     conn.setConnectTimeout(this.httpClientConfig.getConTimeOutMillis());
  147.     conn.setReadTimeout(this.httpClientConfig.getReadTimeOutMillis());
  148.     conn.setRequestMethod(httpMethod);
  149.     if (body != null && !"".equals(body)) {
  150.         String contentType = headers.getValue(HttpHeaderConsts.CONTENT_TYPE);
  151.         String bodyStr = JacksonUtils.toJson(body);
  152.         if (MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)) {
  153.             Map<String, String> map = JacksonUtils.toObj(bodyStr, HashMap.class);
  154.             bodyStr = HttpUtils.encodingParams(map, headers.getCharset());
  155.         }
  156.         if (bodyStr != null) {
  157.             conn.setDoOutput(true);
  158.             byte[] b = bodyStr.getBytes();
  159.             conn.setRequestProperty("Content-Length", String.valueOf(b.length));
  160.             OutputStream outputStream = conn.getOutputStream();
  161.             outputStream.write(b, 0, b.length);
  162.             outputStream.flush();
  163.             IoUtils.closeQuietly(outputStream);
  164.         }
  165.     }
  166.     <strong>conn.connect()</strong>;
  167.     return new JdkHttpClientResponse(conn);
  168. }
复制代码
 
【5】beatReactor.addBeatInfo  心跳任务的流程【服务心跳
  1. //BeatReactor类#构造方法
  2. public BeatReactor(NamingProxy serverProxy, int threadCount) {
  3.     this.serverProxy = serverProxy;
  4.     //定义延迟的线程池
  5.     this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
  6.         @Override
  7.         public Thread newThread(Runnable r) {
  8.             Thread thread = new Thread(r);
  9.             thread.setDaemon(true);
  10.             thread.setName("com.alibaba.nacos.naming.beat.sender");
  11.             return thread;
  12.         }
  13.     });
  14. }
  15. //添加任务方法
  16. public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
  17.     NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
  18.     String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
  19.     BeatInfo existBeat = null;
  20.     //fix #1733
  21.     if ((existBeat = dom2Beat.remove(key)) != null) {
  22.         existBeat.setStopped(true);
  23.     }
  24.     dom2Beat.put(key, beatInfo);
  25.     //实际上就是往延迟的线程池添加任务
  26.     executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
  27.     MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
  28. }
  29. //分析心跳任务类,主要都是run方法
  30. //这种调用方式eureka中也是
  31. class BeatTask implements Runnable {
  32.    
  33.     BeatInfo beatInfo;
  34.    
  35.     public BeatTask(BeatInfo beatInfo) {
  36.         this.beatInfo = beatInfo;
  37.     }
  38.    
  39.     @Override
  40.     public void run() {
  41.         if (beatInfo.isStopped()) {
  42.             return;
  43.         }
  44.         long nextTime = beatInfo.getPeriod();
  45.         try {
  46.             //调用server代理实例发送心跳接口
  47.             JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
  48.             long interval = result.get("clientBeatInterval").asLong();
  49.             boolean lightBeatEnabled = false;
  50.             if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
  51.                 lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
  52.             }
  53.             BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
  54.             if (interval > 0) {
  55.                 nextTime = interval;
  56.             }
  57.             int code = NamingResponseCode.OK;
  58.             if (result.has(CommonParams.CODE)) {
  59.                 code = result.get(CommonParams.CODE).asInt();
  60.             }
  61.             //服务返回没有,则再次注册
  62.             if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
  63.                 Instance instance = new Instance();
  64.                 instance.setPort(beatInfo.getPort());
  65.                 instance.setIp(beatInfo.getIp());
  66.                 instance.setWeight(beatInfo.getWeight());
  67.                 instance.setMetadata(beatInfo.getMetadata());
  68.                 instance.setClusterName(beatInfo.getCluster());
  69.                 instance.setServiceName(beatInfo.getServiceName());
  70.                 instance.setInstanceId(instance.getInstanceId());
  71.                 instance.setEphemeral(true);
  72.                 try {
  73.                     //又是一个注册方法的调用
  74.                     serverProxy.registerService(beatInfo.getServiceName(),
  75.                             NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
  76.                 } catch (Exception ignore) {
  77.                 }
  78.             }
  79.         } catch (NacosException ex) {...}
  80.         //方法内再次将任务塞入,形成循环调用
  81.         executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
  82.     }
  83. }
  84. public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
  85.     Map<String, String> params = new HashMap<String, String>(8);
  86.     Map<String, String> bodyMap = new HashMap<String, String>(2);
  87.     if (!lightBeatEnabled) {
  88.         bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
  89.     }
  90.     params.put(CommonParams.NAMESPACE_ID, namespaceId);
  91.     params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
  92.     params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
  93.     params.put("ip", beatInfo.getIp());
  94.     params.put("port", String.valueOf(beatInfo.getPort()));
  95.     //地址为/nacos/v1/ns/instance/beat
  96.     String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
  97.     return JacksonUtils.toObj(result);
  98. }
复制代码
 
【6】分析如何引入服务的【服务发现】
  1. //NacosNamingService类#getAllInstances方法
  2. @Override
  3. public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
  4.    
  5.     ServiceInfo serviceInfo;
  6.     // 是否是订阅模式,默认是true
  7.     if (subscribe) {
  8.         // 先从客户端缓存获取服务信息
  9.         serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
  10.     } else {
  11.         // 如果本地缓存不存在服务信息,则进行订阅
  12.         serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
  13.     }
  14.     List<Instance> list;
  15.     // 从服务信息中获取实例列表
  16.     if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
  17.         return new ArrayList<Instance>();
  18.     }
  19.     return list;
  20. }
复制代码
 

【6.1】分析先从缓存中拿的hostReactor.getServiceInfo方法
  1. //获取服务信息
  2. public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
  3.    
  4.     NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
  5.     String key = ServiceInfo.getKey(serviceName, clusters);
  6.     if (failoverReactor.isFailoverSwitch()) {
  7.         return failoverReactor.getService(key);
  8.     }
  9.     //获取服务的信息
  10.     ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
  11.     //客户端第一次获取这个注册表信息为空
  12.     if (null == serviceObj) {
  13.         serviceObj = new ServiceInfo(serviceName, clusters);
  14.         serviceInfoMap.put(serviceObj.getKey(), serviceObj);
  15.         updatingMap.put(serviceName, new Object());
  16.         //会去拉取这个注册中心里面的注册表信息
  17.         updateServiceNow(serviceName, clusters);
  18.         updatingMap.remove(serviceName);
  19.         
  20.     }
  21.     //如果本地缓存里面已有这个注册表信息
  22.     else if (updatingMap.containsKey(serviceName)) {
  23.         
  24.         if (UPDATE_HOLD_INTERVAL > 0) {
  25.             // hold a moment waiting for update finish
  26.             synchronized (serviceObj) {
  27.                 try {
  28.                     serviceObj.wait(UPDATE_HOLD_INTERVAL);
  29.                 } catch (InterruptedException e) {...}
  30.             }
  31.         }
  32.     }
  33.     //客户端会开启一个定时任务,每隔几秒会去拉取注册中心里面的全部实例的信息
  34.     scheduleUpdateIfAbsent(serviceName, clusters);
  35.    
  36.     return serviceInfoMap.get(serviceObj.getKey());
  37. }
  38. //HostReactor类# Map<String, ServiceInfo> serviceInfoMap属性【这个便是客户端保存实例数据的缓存所在】
  39. //实际上是先从serviceInfoMap属性里面拿的
  40. private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
  41.     String key = ServiceInfo.getKey(serviceName, clusters);
  42.     return serviceInfoMap.get(key);
  43. }
复制代码
 

【6.1.1】分析远程拉取流程updateServiceNow方法
  1. private void updateServiceNow(String serviceName, String clusters) {
  2.     try {
  3.         updateService(serviceName, clusters);
  4.     } catch (NacosException e) {...}
  5. }
  6. public void updateService(String serviceName, String clusters) throws NacosException {
  7.     ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
  8.     try {
  9.         //远程调用
  10.         String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
  11.         
  12.         if (StringUtils.isNotEmpty(result)) {
  13.             //处理并塞入serviceInfoMap,还会发送一个InstancesChangeEvent事件
  14.             processServiceJson(result);
  15.         }
  16.     } finally {
  17.         if (oldService != null) {
  18.             synchronized (oldService) {
  19.                 oldService.notifyAll();
  20.             }
  21.         }
  22.     }
  23. }
  24. public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
  25.    
  26.     final Map<String, String> params = new HashMap<String, String>(8);
  27.     params.put(CommonParams.NAMESPACE_ID, namespaceId);
  28.     params.put(CommonParams.SERVICE_NAME, serviceName);
  29.     params.put("clusters", clusters);
  30.     params.put("udpPort", String.valueOf(udpPort));
  31.     params.put("clientIP", NetUtils.localIP());
  32.     params.put("healthyOnly", String.valueOf(healthyOnly));
  33.     //调用服务的API,获取服务注册中心里面的全部实例
  34.     return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
  35. }
复制代码
 
【6.1.1.1】分析定时任务scheduleUpdateIfAbsent方法做了什么

[code]public void scheduleUpdateIfAbsent(String serviceName, String clusters) {    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {        return;    }        synchronized (futureMap) {        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {            return;        }                ScheduledFuture future = addTask(new UpdateTask(serviceName, clusters));        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);    }}//DEFAULT_DELAY = 1000L,也就是说是1spublic synchronized ScheduledFuture addTask(UpdateTask task) {    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);}//分析UpdateTask类的run方法@Overridepublic void run() {    long delayTime = DEFAULT_DELAY;        try {        // 根据serviceName获取到当前服务的信息,包括服务器地址列表        ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));        // 如果为空,则重新拉取最新的服务列表        if (serviceObj == null) {            updateService(serviceName, clusters);            return;        }        // 如果时间戳
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

不到断气不罢休

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表