一、SpringCloud 简介
Spring Cloud 是一系列框架的有序集合如服务发现注册、配置中心、消息总线、负载均衡、熔断器、数据监控等。
SpringCloud 将多个服务框架组合起来,通过Spring Boot进行再封装,屏蔽掉了复杂的配置和实现原理,最终给开发者提供了一套简单易懂、易部署和易维护的分布式系统开发工具包。

Spring Cloud是一个基于SpringBoot实现的微服务开发方案,Spring boot 是 Spring 的一套快速配置框架。可以基于spring boot 快速开发单个微服务。
二、NACOS简介
一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。
Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。
Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。
1、Nacos中的概念
地域
物理的数据中心,资源创建成功后不能更换。
可用区
同一地域内,电力和网络互相独立的物理区域。同一可用区内,实例的网络延迟较低。
接入点
地域的某个服务的入口域名。
命名空间
用于进行租户粒度的配置隔离。不同的命名空间下,可以存在相同的 Group 或 Data ID 的配置。Namespace 的常用场景之一是不同环境的配置的区分隔离,例如开发测试环境和生产环境的资源(如配置、服务)隔离等。
配置
在系统开发过程中,开发者通常会将一些需要变更的参数、变量等从代码中分离出来独立管理,以独立的配置文件的形式存在。目的是让静态的系统工件或者交付物(如 WAR,JAR 包等)更好地和实际的物理运行环境进行适配。配置管理一般包含在系统部署的过程中,由系统管理员或者运维人员完成。配置变更是调整系统运行时的行为的有效手段。
配置管理
系统配置的编辑、存储、分发、变更管理、历史版本管理、变更审计等所有与配置相关的活动。
配置项
一个具体的可配置的参数与其值域,通常以 param-key=param-value 的形式存在。例如我们常配置系统的日志输出级别(logLevel=INFO|WARN|ERROR) 就是一个配置项。
配置集
一组相关或者不相关的配置项的集合称为配置集。在系统中,一个配置文件通常就是一个配置集,包含了系统各个方面的配置。例如,一个配置集可能包含了数据源、线程池、日志级别等配置项。
配置集 ID
Nacos 中的某个配置集的 ID。配置集 ID 是组织划分配置的维度之一。Data ID 通常用于组织划分系统的配置集。一个系统或者应用可以包含多个配置集,每个配置集都可以被一个有意义的名称标识。Data ID 通常采用类 Java 包(如 com.taobao.tc.refund.log.level)的命名规则保证全局唯一性。此命名规则非强制。
配置分组
Nacos 中的一组配置集,是组织配置的维度之一。通过一个有意义的字符串(如 Buy 或 Trade )对配置集进行分组,从而区分 Data ID 相同的配置集。当您在 Nacos 上创建一个配置时,如果未填写配置分组的名称,则配置分组的名称默认采用 DEFAULT_GROUP 。配置分组的常见场景:不同的应用或组件使用了相同的配置类型,如 database_url 配置和 MQ_topic 配置。
配置快照
Nacos 的客户端 SDK 会在本地生成配置的快照。当客户端无法连接到 Nacos Server 时,可以使用配置快照显示系统的整体容灾能力。配置快照类似于 Git 中的本地 commit,也类似于缓存,会在适当的时机更新,但是并没有缓存过期(expiration)的概念。
服务
通过预定义接口网络访问的提供给客户端的软件功能。
服务名
服务提供的标识,通过该标识可以唯一确定其指代的服务。
服务注册中心
存储服务实例和服务负载均衡策略的数据库。
服务发现
在计算机网络上,(通常使用服务名)对服务下的实例的地址和元数据进行探测,并以预先定义的接口提供给客户端进行查询。
元信息
Nacos数据(如配置和服务)描述信息,如服务版本、权重、容灾策略、负载均衡策略、鉴权配置、各种自定义标签 (label),从作用范围来看,分为服务级别的元信息、集群的元信息及实例的元信息。
应用
用于标识服务提供方的服务的属性。
服务分组
不同的服务可以归类到同一分组。
虚拟集群
同一个服务下的所有服务实例组成一个默认集群, 集群可以被进一步按需求划分,划分的单位可以是虚拟集群。
实例
提供一个或多个服务的具有可访问网络地址(IP ort)的进程。
权重
实例级别的配置。权重为浮点数。权重越大,分配给该实例的流量越大。
健康检查
以指定方式检查服务下挂载的实例 (Instance) 的健康度,从而确认该实例 (Instance) 是否能提供服务。根据检查结果,实例 (Instance) 会被判断为健康或不健康。对服务发起解析请求时,不健康的实例 (Instance) 不会返回给客户端。
健康保护阈值
为了防止因过多实例 (Instance) 不健康导致流量全部流向健康实例 (Instance) ,继而造成流量压力把健康实例 (Instance) 压垮并形成雪崩效应,应将健康保护阈值定义为一个 0 到 1 之间的浮点数。当域名健康实例数 (Instance) 占总服务实例数 (Instance) 的比例小于该值时,无论实例 (Instance) 是否健康,都会将这个实例 (Instance) 返回给客户端。这样做虽然损失了一部分流量,但是保证了集群中剩余健康实例 (Instance) 能正常工作。
2、Nacos 架构
基础架构如下:

逻辑架构及组件如下:

- 服务管理:实现服务CRUD,域名CRUD,服务健康状态检查,服务权重管理等功能
- 配置管理:实现配置管CRUD,版本管理,灰度管理,监听管理,推送轨迹,聚合数据等功能
- 元数据管理:提供元数据CURD 和打标能力
- 插件机制:实现三个模块可分可合能力,实现扩展点SPI机制
- 事件机制:实现异步化事件通知,sdk数据变化异步通知等逻辑
- 日志模块:管理日志分类,日志级别,日志可移植性(尤其避免冲突),日志格式,异常码+帮助文档
- 回调机制:sdk通知数据,通过统一的模式回调用户处理。接口和数据结构需要具备可扩展性
- 寻址模式:解决ip,域名,nameserver、广播等多种寻址模式,需要可扩展
- 推送通道:解决server与存储、server间、server与sdk间推送性能问题
- 容量管理:管理每个租户,分组下的容量,防止存储被写爆,影响服务可用性
- 流量管理:按照租户,分组等多个维度对请求频率,长链接个数,报文大小,请求流控进行控制
- 缓存机制:容灾目录,本地缓存,server缓存机制。容灾目录使用需要工具
- 启动模式:按照单机模式,配置模式,服务模式,dns模式,或者all模式,启动不同的程序+UI
- 一致性协议:解决不同数据,不同一致性要求情况下,不同一致性机制
- 存储模块:解决数据持久化、非持久化存储,解决数据分片问题
- Nameserver:解决namespace到clusterid的路由问题,解决用户环境与nacos物理环境映射问题
- CMDB:解决元数据存储,与三方cmdb系统对接问题,解决应用,人,资源关系
- Metrics:暴露标准metrics数据,方便与三方监控系统打通
- Trace:暴露标准trace,方便与SLA系统打通,日志白平化,推送轨迹等能力,并且可以和计量计费系统打通
- 接入管理:相当于阿里云开通服务,分配身份、容量、权限过程
- 用户管理:解决用户管理,登录,sso等问题
- 权限管理:解决身份识别,访问控制,角色管理等问题
- 审计系统:扩展接口方便与不同公司审计系统打通
- 通知系统:核心数据变更,或者操作,方便通过SMS系统打通,通知到对应人数据变更
- OpenAPI:暴露标准Rest风格HTTP接口,简单易用,方便多语言集成
- Console:易用控制台,做服务管理、配置管理等操作
- SDK:多语言sdk
- Agent:dns-f类似模式,或者与mesh等方案集成
- CLI:命令行对产品进行轻量化管理,像git一样好用
部署架构如下:

nacos 官网以及帮助文档和部署手册:https://nacos.io/zh-cn/index.html
nacos github: https://github.com/alibaba/nacos
三、NACOS源码分析
1、Nacos注册源码分析-Clinet端
cosumer启动的时候,从nacos server上读取指定服务名称的实例列表,缓存到本地内存中。
开启一个定时任务,每隔10s去nacos server上拉取服务列表
nacos的push机制:
通过心跳检测发现服务提供者出现心态超时的时候,推送一个push消息到consumer,更新本地的缓存数据。
客户端Client
我们自己的项目在配置了nacos作为注册中心后,至少要配置这么一个属性- spring.cloud.nacos.discovery.server-addr=ip地址:8848
- # 从逻辑上看,这个是通过grpc去注册还是通过http去注册。false-http1.x注册 true-gRPC注册,默认是true,也就是通过gRPC去注册,毕竟gRPC的性能上要比http1.x高很多
- spring.cloud.nacos.discovery.ephemeral=false
复制代码 这个属性会让应用找到nacos的server地址去注册。如果不配置的话,会一直报错
springboot的@EnableAutoConfiguration这里就不再讲解了。都到nacos的源码了,springboot默认是熟悉的。
我们再去打开NacosServiceRegistryAutoConfiguration这个类。- @Configuration(proxyBeanMethods = false)
- @EnableConfigurationProperties
- @ConditionalOnNacosDiscoveryEnabled
- @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
- matchIfMissing = true)
- @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
- AutoServiceRegistrationAutoConfiguration.class,
- NacosDiscoveryAutoConfiguration.class })
- public class NacosServiceRegistryAutoConfiguration {
- @Bean
- public NacosServiceRegistry nacosServiceRegistry(
- NacosServiceManager nacosServiceManager,
- NacosDiscoveryProperties nacosDiscoveryProperties) {
- return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
- }
- @Bean
- @ConditionalOnBean(AutoServiceRegistrationProperties.class)
- public NacosRegistration nacosRegistration(
- ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
- NacosDiscoveryProperties nacosDiscoveryProperties,
- ApplicationContext context) {
- return new NacosRegistration(registrationCustomizers.getIfAvailable(),
- nacosDiscoveryProperties, context);
- }
- @Bean
- @ConditionalOnBean(AutoServiceRegistrationProperties.class)
- public NacosAutoServiceRegistration nacosAutoServiceRegistration(
- NacosServiceRegistry registry,
- AutoServiceRegistrationProperties autoServiceRegistrationProperties,
- NacosRegistration registration) {
- return new NacosAutoServiceRegistration(registry,
- autoServiceRegistrationProperties, registration);
- }
- }
复制代码 其中第三个类NacosAutoServiceRegistration实现了一个抽象类AbstractAutoServiceRegistration.- public abstract class AbstractAutoServiceRegistration<R extends Registration>
- implements AutoServiceRegistration, ApplicationContextAware,
- ApplicationListener<WebServerInitializedEvent> {
-
- @Override
- @SuppressWarnings("deprecation")
- public void onApplicationEvent(WebServerInitializedEvent event) {
- bind(event);
- }
- @Deprecated
- public void bind(WebServerInitializedEvent event) {
- ApplicationContext context = event.getApplicationContext();
- if (context instanceof ConfigurableWebServerApplicationContext) {
- if ("management".equals(((ConfigurableWebServerApplicationContext) context)
- .getServerNamespace())) {
- return;
- }
- }
- this.port.compareAndSet(0, event.getWebServer().getPort());
- this.start();
- }
- public void start() {
- if (!isEnabled()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Discovery Lifecycle disabled. Not starting");
- }
- return;
- }
- // only initialize if nonSecurePort is greater than 0 and it isn't already running
- // because of containerPortInitializer below
- if (!this.running.get()) {
- this.context.publishEvent(
- new InstancePreRegisteredEvent(this, getRegistration()));
- register();
- if (shouldRegisterManagement()) {
- registerManagement();
- }
- this.context.publishEvent(
- new InstanceRegisteredEvent<>(this, getConfiguration()));
- this.running.compareAndSet(false, true);
- }
- }
- }
复制代码 这里有实现一个ApplicationListener的类,这个类是spring的一个监听事件(观察者模式),而这个事件就是webserver初始化的时候去触发的。onApplicationEvent方法调用了bind()方法。而bind()中又调用了start().
start()中有一个register()。而这个register就是NacosServiceRegistry中的register()。- public class NacosServiceRegistry implements ServiceRegistry<Registration> {
- @Override
- public void register(Registration registration) {
- if (StringUtils.isEmpty(registration.getServiceId())) {
- log.warn("No service to register for nacos client...");
- return;
- }
- NamingService namingService = namingService();
- String serviceId = registration.getServiceId();
- String group = nacosDiscoveryProperties.getGroup();
- Instance instance = getNacosInstanceFromRegistration(registration);
- try {
- namingService.registerInstance(serviceId, group, instance);
- log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
- instance.getIp(), instance.getPort());
- }
- catch (Exception e) {
- if (nacosDiscoveryProperties.isFailFast()) {
- log.error("nacos registry, {} register failed...{},", serviceId,
- registration.toString(), e);
- rethrowRuntimeException(e);
- }
- else {
- log.warn("Failfast is false. {} register failed...{},", serviceId,
- registration.toString(), e);
- }
- }
- }
- }
复制代码
- getNacosInstanceFromRegistration 获取注册的实例信息。
- private Instance getNacosInstanceFromRegistration(Registration registration) {
- Instance instance = new Instance();
- instance.setIp(registration.getHost());
- instance.setPort(registration.getPort());
- instance.setWeight(nacosDiscoveryProperties.getWeight());
- instance.setClusterName(nacosDiscoveryProperties.getClusterName());
- instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
- instance.setMetadata(registration.getMetadata());
- instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
- return instance;
- }
复制代码
- namingService.registerInstance(serviceId, group, instance);
clientProxy有3个实现类,NamingClientProxyDelegate、NamingGrpcClientProxy、NamingHttpClientProxy。
这个类的构造方法中有个init(properties)方法,这个方法中给clientProxy赋值了。走的是NamingClientProxyDelegate方法。一般情况下,带有delegate的方法都是委派模式。- public NacosNamingService(String serverList) throws NacosException {
- Properties properties = new Properties();
- properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
- init(properties);
- }
- public NacosNamingService(Properties properties) throws NacosException {
- init(properties);
- }
- private void init(Properties properties) throws NacosException {
- ValidatorUtils.checkInitParam(properties);
- this.namespace = InitUtils.initNamespaceForNaming(properties);
- InitUtils.initSerialization();
- InitUtils.initWebRootContext(properties);
- initLogName(properties);
- this.changeNotifier = new InstancesChangeNotifier();
- NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
- NotifyCenter.registerSubscriber(changeNotifier);
- this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
- this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
- }
- @Override
- public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
- NamingUtils.checkInstanceIsLegal(instance);
- clientProxy.registerService(serviceName, groupName, instance);
- }
复制代码 基于http1.x协议注册
- NamingClientProxyDelegate.registerService
委派这里做了一个可执行的判断
- @Override
- public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
- getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
- }
复制代码 NamingClientProxyDelegate.getExecuteClientProxy
做了一个判断,配置ephemeral=false就走http,否则grpc。这里请注意,如果nacos-server还是用的1.x.x版本的话,会报错的。因为2.x.x增加一个grpc的支持,会额外的多增加一个端口,默认对外提供端口为8848和9848- private NamingClientProxy getExecuteClientProxy(Instance instance) {
- return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
- }
复制代码
- NamingHttpClientProxy.registerService
这里的clientProxy=NamingHttpClientProxy
- @Override
- public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
- NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
- instance);
- String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
- if (instance.isEphemeral()) {
- BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
- beatReactor.addBeatInfo(groupedServiceName, beatInfo);
- }
- final Map<String, String> params = new HashMap<String, String>(32);
- params.put(CommonParams.NAMESPACE_ID, namespaceId);
- params.put(CommonParams.SERVICE_NAME, groupedServiceName);
- params.put(CommonParams.GROUP_NAME, groupName);
- params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
- params.put(IP_PARAM, instance.getIp());
- params.put(PORT_PARAM, String.valueOf(instance.getPort()));
- params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
- params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled()));
- params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
- params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
- params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));
- reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
- }
复制代码 NamingHttpClientProxy.reqApi- public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
- return reqApi(api, params, Collections.EMPTY_MAP, method);
- }
- public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
- throws NacosException {
- return reqApi(api, params, body, serverListManager.getServerList(), method);
- }
- public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
- String method) throws NacosException {
- params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
- if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) {
- throw new NacosException(NacosException.INVALID_PARAM, "no server available");
- }
- NacosException exception = new NacosException();
- if (serverListManager.isDomain()) {
- String nacosDomain = serverListManager.getNacosDomain();
- for (int i = 0; i < maxRetry; i++) {
- try {
- return callServer(api, params, body, nacosDomain, method);
- } catch (NacosException e) {
- exception = e;
- if (NAMING_LOGGER.isDebugEnabled()) {
- NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
- }
- }
- }
- } else {
- Random random = new Random(System.currentTimeMillis());
- int index = random.nextInt(servers.size());
- for (int i = 0; i < servers.size(); i++) {
- String server = servers.get(index);
- try {
- return callServer(api, params, body, server, method);
- } catch (NacosException e) {
- exception = e;
- if (NAMING_LOGGER.isDebugEnabled()) {
- NAMING_LOGGER.debug("request {} failed.", server, e);
- }
- }
- index = (index + 1) % servers.size();
- }
- }
- NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
- exception.getErrMsg());
- throw new NacosException(exception.getErrCode(),
- "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
- }
复制代码 serverListManager.isDomain()这个判断是配置了几个nacos server的值,如果是一个的话,走if逻辑,如果多余1个的话,走else逻辑。
else中的servers就是nacos server服务列表,通过Ramdom拿到一个随机数,然后去callServer(),如果发现其中的一个失败,那么index+1 获取下一个服务节点再去callServer。如果所有的都失败的话,则抛出错误。
NamingHttpClientProxy.callServer
前边的判断支线省略,拼接url,拼好了后,进入try逻辑块中,这里封装了一个nacosRestTemplate类。请求完成后,返回一个restResult,拿到了请求结果后,把请求结果code放入了一个交MetricsMonitor的类中了,从代码上看很明显是监控相关的类,点击进去果然发现是prometheus相关的。这里我们不扩展了,继续回到主线。
如果返回结果是200的话,把result.content返回去。- public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
- String method) throws NacosException {
- long start = System.currentTimeMillis();
- long end = 0;
- String namespace = params.get(CommonParams.NAMESPACE_ID);
- String group = params.get(CommonParams.GROUP_NAME);
- String serviceName = params.get(CommonParams.SERVICE_NAME);
- params.putAll(getSecurityHeaders(namespace, group, serviceName));
- Header header = NamingHttpUtil.builderHeader();
- String url;
- if (curServer.startsWith(HTTPS_PREFIX) || curServer.startsWith(HTTP_PREFIX)) {
- url = curServer + api;
- } else {
- if (!InternetAddressUtil.containsPort(curServer)) {
- curServer = curServer + InternetAddressUtil.IP_PORT_SPLITER + serverPort;
- }
- url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
- }
- try {
- HttpRestResult<String> restResult = nacosRestTemplate
- .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
- end = System.currentTimeMillis();
- MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
- .observe(end - start);
- if (restResult.ok()) {
- return restResult.getData();
- }
- if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
- return StringUtils.EMPTY;
- }
- throw new NacosException(restResult.getCode(), restResult.getMessage());
- } catch (Exception e) {
- NAMING_LOGGER.error("[NA] failed to request", e);
- throw new NacosException(NacosException.SERVER_ERROR, e);
- }
- }
复制代码
- NacosRestTemplate.exchangeForm
关键方法:this.requestClient().execute()
- public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,
- String httpMethod, Type responseType) throws Exception {
- RequestHttpEntity requestHttpEntity = new RequestHttpEntity(
- header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
- return execute(url, httpMethod, requestHttpEntity, responseType);
- }
- private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
- Type responseType) throws Exception {
- URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
- if (logger.isDebugEnabled()) {
- logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
- }
- ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
- HttpClientResponse response = null;
- try {
- response = this.requestClient().execute(uri, httpMethod, requestEntity);
- return responseHandler.handle(response);
- } finally {
- if (response != null) {
- response.close();
- }
- }
- }
复制代码- private final HttpClientRequest requestClient;
-
- private final List<HttpClientRequestInterceptor> interceptors = new ArrayList<HttpClientRequestInterceptor>();
- public NacosRestTemplate(Logger logger, HttpClientRequest requestClient) {
- super(logger);
- this.requestClient = requestClient;
- }
- private HttpClientRequest requestClient() {
- if (CollectionUtils.isNotEmpty(interceptors)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Execute via interceptors :{}", interceptors);
- }
- return new InterceptingHttpClientRequest(requestClient, interceptors.iterator());
- }
- return requestClient;
- }
复制代码 HttpClientBeanHolder.getNacosRestTemplate
典型的双重检查锁。
- public static NacosRestTemplate getNacosRestTemplate(HttpClientFactory httpClientFactory) {
- if (httpClientFactory == null) {
- throw new NullPointerException("httpClientFactory is null");
- }
- String factoryName = httpClientFactory.getClass().getName();
- NacosRestTemplate nacosRestTemplate = SINGLETON_REST.get(factoryName);
- if (nacosRestTemplate == null) {
- synchronized (SINGLETON_REST) {
- nacosRestTemplate = SINGLETON_REST.get(factoryName);
- if (nacosRestTemplate != null) {
- return nacosRestTemplate;
- }
- nacosRestTemplate = httpClientFactory.createNacosRestTemplate();
- SINGLETON_REST.put(factoryName, nacosRestTemplate);
- }
- }
- return nacosRestTemplate;
- }
复制代码 而NamingHttpClientFactory是一个AbstractHttpClientFactory的实现类,由于NamingHttpClientProxy没有重写createNacosRestTemplate方法。所以最终引用的也就是AbstractHttpClientFactory的createNacosRestTemplate方法。- private static final HttpClientFactory HTTP_CLIENT_FACTORY = new NamingHttpClientFactory();
- public NacosRestTemplate getNacosRestTemplate() {
- return HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY);
- }
- private static class NamingHttpClientFactory extends AbstractHttpClientFactory {
-
- @Override
- protected HttpClientConfig buildHttpClientConfig() {
- return HttpClientConfig.builder().setConTimeOutMillis(CON_TIME_OUT_MILLIS)
- .setReadTimeOutMillis(READ_TIME_OUT_MILLIS).setMaxRedirects(MAX_REDIRECTS).build();
- }
- @Override
- protected Logger assignLogger() {
- return NAMING_LOGGER;
- }
- }
复制代码 AbstractHttpClientFactory.createNacosRestTemplate- @Override
- public NacosRestTemplate createNacosRestTemplate() {
- HttpClientConfig httpClientConfig = buildHttpClientConfig();
- final JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);
- // enable ssl
- initTls(new BiConsumer<SSLContext, HostnameVerifier>() {
- @Override
- public void accept(SSLContext sslContext, HostnameVerifier hostnameVerifier) {
- clientRequest.setSSLContext(loadSSLContext());
- clientRequest.replaceSSLHostnameVerifier(hostnameVerifier);
- }
- }, new TlsFileWatcher.FileChangeListener() {
- @Override
- public void onChanged(String filePath) {
- clientRequest.setSSLContext(loadSSLContext());
- }
- });
- return new NacosRestTemplate(assignLogger(), clientRequest);
- }
复制代码 JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);
可以看到这里定义了一个JdkHttpClientRequest 。
再往下跟就到java.net.HttpURLConnection的调用,去请求nacos-server的地址,再往下的就不做分析了,进入了http的通讯层了。
最终返回了一个结果,如果是200的话,就注册成功了。失败了就会抛出异常。
基于gRPC http2.0的注册
这里同样的从gRPC和http的委派来进行分析
NamingClientProxyDelegate.registerService
代码上边已经分析过,我们直接进入gRPC的实现。- @Override
- public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
- getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
- }
复制代码
- NamingGrpcClientProxy.registerService
redoService.cacheInstanceForRedo 这个从名称上看应该是重试机制,
- @Override
- public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
- NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
- instance);
- redoService.cacheInstanceForRedo(serviceName, groupName, instance);
- doRegisterService(serviceName, groupName, instance);
- }
复制代码
- NamingGrpcRedoService.cacheInstanceForRedo
这里看起来只是给ConcurrentMap中存放一个redoData,并没有其他的逻辑,后续可能会用到这个。回到主线,继续往下走。
- private final ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>();
- public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
- String key = NamingUtils.getGroupedName(serviceName, groupName);
- InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
- synchronized (registeredInstances) {
- registeredInstances.put(key, redoData);
- }
- }
复制代码
- NamingGrpcClientProxy.doRegisterService
request是根据构造函数封装的一个实例,requestToServer去进行注册。
- public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
- InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
- NamingRemoteConstants.REGISTER_INSTANCE, instance);
- requestToServer(request, Response.class);
- redoService.instanceRegistered(serviceName, groupName);
- }
复制代码 NamingGrpcClientProxy.requestToServer
request.putAllHeader推测是跟权限校验相关的,我搭建的没有设置鉴权,所以都是空的。
然后根据rpcClient去调用request方法。根据超时时间判断的,这2个分支最终都会进入一个方法,默认是3s的超时时间。
最终返回一个response结果。- private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
- throws NacosException {
- try {
- request.putAllHeader(
- getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
- Response response =
- requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
- if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
- throw new NacosException(response.getErrorCode(), response.getMessage());
- }
- if (responseClass.isAssignableFrom(response.getClass())) {
- return (T) response;
- }
- NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
- response.getClass().getName(), responseClass.getName());
- } catch (Exception e) {
- throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
- }
- throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
- }
复制代码 这里的校验暂且不看,直切主线, response = this.currentConnection.request(request, timeoutMills);
再进入到request方法。
- public Response request(Request request, long timeoutMills) throws NacosException {
- int retryTimes = 0;
- Response response;
- Exception exceptionThrow = null;
- long start = System.currentTimeMillis();
- while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
- boolean waitReconnect = false;
- try {
- if (this.currentConnection == null || !isRunning()) {
- waitReconnect = true;
- throw new NacosException(NacosException.CLIENT_DISCONNECT,
- "Client not connected, current status:" + rpcClientStatus.get());
- }
- response = this.currentConnection.request(request, timeoutMills);
- if (response == null) {
- throw new NacosException(SERVER_ERROR, "Unknown Exception.");
- }
- if (response instanceof ErrorResponse) {
- if (response.getErrorCode() == NacosException.UN_REGISTER) {
- synchronized (this) {
- waitReconnect = true;
- if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
- LoggerUtils.printIfErrorEnabled(LOGGER,
- "Connection is unregistered, switch server, connectionId = {}, request = {}",
- currentConnection.getConnectionId(), request.getClass().getSimpleName());
- switchServerAsync();
- }
- }
- }
- throw new NacosException(response.getErrorCode(), response.getMessage());
- }
- // return response.
- lastActiveTimeStamp = System.currentTimeMillis();
- return response;
- } catch (Exception e) {
- if (waitReconnect) {
- try {
- // wait client to reconnect.
- Thread.sleep(Math.min(100, timeoutMills / 3));
- } catch (Exception exception) {
- // Do nothing.
- }
- }
- LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
- request, retryTimes, e.getMessage());
- exceptionThrow = e;
- }
- retryTimes++;
- }
- if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
- switchServerAsyncOnRequestFail();
- }
- if (exceptionThrow != null) {
- throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
- : new NacosException(SERVER_ERROR, exceptionThrow);
- } else {
- throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
- }
- }
复制代码 这里的就是封装的rpc请求,和服务端进行交互的逻辑。在这里封装了一个PayLoad类
- @Override
- public Response request(Request request, long timeouts) throws NacosException {
- Payload grpcRequest = GrpcUtils.convert(request);
- ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
- Payload grpcResponse;
- try {
- grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
- } catch (Exception e) {
- throw new NacosException(NacosException.SERVER_ERROR, e);
- }
- return (Response) GrpcUtils.parse(grpcResponse);
- }
复制代码 2、Nacos注册源码分析-Server端
- 接收注册
客户端和服务端之间进行交互的话,一定需要建立一个网络连接。这里的grpc的源码相对来说比较复杂,就简单分析nacos相关的。
工程名称是nacos-console。
BaseGrpcServer在启动的时候会绑定很多的Handler。

而基于grpc的通信,会进入server端的InstanceRequestHandler
InstanceRequestHandler.handle
从handle方法中可以根据type走到registerInstance中。
最终进入到EphemeralClientOperationServiceImpl.registerInstance- public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
-
- private final EphemeralClientOperationServiceImpl clientOperationService;
-
- public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
- this.clientOperationService = clientOperationService;
- }
-
- @Override
- @Secured(action = ActionTypes.WRITE)
- public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
- Service service = Service
- .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
- switch (request.getType()) {
- case NamingRemoteConstants.REGISTER_INSTANCE:
- // 注册
- return registerInstance(service, request, meta);
- case NamingRemoteConstants.DE_REGISTER_INSTANCE:
- // 取消注册
- return deregisterInstance(service, request, meta);
- default:
- throw new NacosException(NacosException.INVALID_PARAM,
- String.format("Unsupported request type %s", request.getType()));
- }
- }
-
- private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
- throws NacosException {
- // 注册实例
- clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
- NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
- meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
- request.getInstance().getIp(), request.getInstance().getPort()));
- return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
- }
-
- private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
- clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
- NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(),
- meta.getClientIp(), true, DeregisterInstanceReason.REQUEST, service.getNamespace(),
- service.getGroup(), service.getName(), request.getInstance().getIp(), request.getInstance().getPort()));
- return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
- }
- }
复制代码 EphemeralClientOperationServiceImpl.registerInstance
这里的clientManager.getClient(client)说明跳转到下边的建立长连接
- @Override
- public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
- NamingUtils.checkInstanceIsLegal(instance);
- // 获取一个单例的Service,也就是注册的实例
- Service singleton = ServiceManager.getInstance().getSingleton(service);
- if (!singleton.isEphemeral()) {
- throw new NacosRuntimeException(NacosException.INVALID_PARAM,
- String.format("Current service %s is persistent service, can't register ephemeral instance.",
- singleton.getGroupedServiceName()));
- }
- // 这里的Client是客户端的长连接,会进入到ClientManagerDelegate的一个委托,最终进入到connectionBasedClientManager中
- Client client = clientManager.getClient(clientId);
- if (!clientIsLegal(client, clientId)) {
- return;
- }
- InstancePublishInfo instanceInfo = getPublishInfo(instance);
- // 对这个实例进行注册
- client.addServiceInstance(singleton, instanceInfo);
- client.setLastUpdatedTime();
- client.recalculateRevision();
- NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
- NotifyCenter
- .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
- }
复制代码 AbstractClient.addServiceInstance
- // 这个ConcurrentHashMap就是保存实例和发布信息关系的
- protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
- @Override
- public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
- if (null == publishers.put(service, instancePublishInfo)) {
- if (instancePublishInfo instanceof BatchInstancePublishInfo) {
- MetricsMonitor.incrementIpCountWithBatchRegister(instancePublishInfo);
- } else {
- MetricsMonitor.incrementInstanceCount();
- }
- }
- // 这里有一个事件,ClientChangeEvent
- NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
- Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
- return true;
- }
复制代码 ClientServiceIndexesManager
- // 应用Service和clientId的映射,一个应用Service有多个服务,所以会建立多个长连接,用Set来保存clientId
- private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
- // 应用Service和订阅者clientId的关系
- private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
- @Override
- public void onEvent(Event event) {
- if (event instanceof ClientEvent.ClientDisconnectEvent) {
- handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
- } else if (event instanceof ClientOperationEvent) {
- handleClientOperation((ClientOperationEvent) event);
- }
- }
- private void handleClientOperation(ClientOperationEvent event) {
- Service service = event.getService();
- String clientId = event.getClientId();
- if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
- // 注册
- addPublisherIndexes(service, clientId);
- } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
- // 取消注册
- removePublisherIndexes(service, clientId);
- } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
- // 订阅
- addSubscriberIndexes(service, clientId);
- } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
- // 取消订阅
- removeSubscriberIndexes(service, clientId);
- }
- }
复制代码 建立长连接(这里的过程比较难一些,还在持续学习中)
GrpcBiStreamRequestAcceptor这个类是建立连接的。
每一个grpc请求过来后,都会进入到GrpcBiStreamRequestAcceptor.requestBiStream的方法中。
而会话的长连接id就是这里的ConnectionId。- @Service
- public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestStreamImplBase {
- @Autowired
- ConnectionManager connectionManager;
-
- private void traceDetailIfNecessary(Payload grpcRequest) {
- String clientIp = grpcRequest.getMetadata().getClientIp();
- String connectionId = CONTEXT_KEY_CONN_ID.get();
- try {
- if (connectionManager.traced(clientIp)) {
- Loggers.REMOTE_DIGEST.info("[{}]Bi stream request receive, meta={},body={}", connectionId,
- grpcRequest.getMetadata().toByteString().toStringUtf8(),
- grpcRequest.getBody().toByteString().toStringUtf8());
- }
- } catch (Throwable throwable) {
- Loggers.REMOTE_DIGEST.error("[{}]Bi stream request error,payload={},error={}", connectionId,
- grpcRequest.toByteString().toStringUtf8(), throwable);
- }
-
- }
- @Override
- public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {
-
- StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {
-
- final String connectionId = CONTEXT_KEY_CONN_ID.get();
-
- final Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get();
-
- final int remotePort = CONTEXT_KEY_CONN_REMOTE_PORT.get();
-
- String remoteIp = CONTEXT_KEY_CONN_REMOTE_IP.get();
-
- String clientIp = "";
-
- @Override
- public void onNext(Payload payload) {
-
- clientIp = payload.getMetadata().getClientIp();
- traceDetailIfNecessary(payload);
-
- Object parseObj;
- try {
- parseObj = GrpcUtils.parse(payload);
- } catch (Throwable throwable) {
- Loggers.REMOTE_DIGEST
- .warn("[{}]Grpc request bi stream,payload parse error={}", connectionId, throwable);
- return;
- }
-
- if (parseObj == null) {
- Loggers.REMOTE_DIGEST
- .warn("[{}]Grpc request bi stream,payload parse null ,body={},meta={}", connectionId,
- payload.getBody().getValue().toStringUtf8(), payload.getMetadata());
- return;
- }
- if (parseObj instanceof ConnectionSetupRequest) {
- ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
- Map<String, String> labels = setUpRequest.getLabels();
- String appName = "-";
- if (labels != null && labels.containsKey(Constants.APPNAME)) {
- appName = labels.get(Constants.APPNAME);
- }
-
- ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
- remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
- setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
- metaInfo.setTenant(setUpRequest.getTenant());
- Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
- connection.setAbilities(setUpRequest.getAbilities());
- boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
- // 这里会有一个connectionManager.register
- if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
- //Not register to the connection manager if current server is over limit or server is starting.
- try {
- Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId,
- rejectSdkOnStarting ? " server is not started" : " server is over limited.");
- connection.request(new ConnectResetRequest(), 3000L);
- connection.close();
- } catch (Exception e) {
- //Do nothing.
- if (connectionManager.traced(clientIp)) {
- Loggers.REMOTE_DIGEST
- .warn("[{}]Send connect reset request error,error={}", connectionId, e);
- }
- }
- }
-
- } else if (parseObj instanceof Response) {
- Response response = (Response) parseObj;
- if (connectionManager.traced(clientIp)) {
- Loggers.REMOTE_DIGEST
- .warn("[{}]Receive response of server request ,response={}", connectionId, response);
- }
- RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
- connectionManager.refreshActiveTime(connectionId);
- } else {
- Loggers.REMOTE_DIGEST
- .warn("[{}]Grpc request bi stream,unknown payload receive ,parseObj={}", connectionId,
- parseObj);
- }
-
- }
-
- @Override
- public void onError(Throwable t) {
- if (connectionManager.traced(clientIp)) {
- Loggers.REMOTE_DIGEST.warn("[{}]Bi stream on error,error={}", connectionId, t);
- }
-
- if (responseObserver instanceof ServerCallStreamObserver) {
- ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
- if (serverCallStreamObserver.isCancelled()) {
- //client close the stream.
- } else {
- try {
- serverCallStreamObserver.onCompleted();
- } catch (Throwable throwable) {
- //ignore
- }
- }
- }
-
- }
-
- @Override
- public void onCompleted() {
- if (connectionManager.traced(clientIp)) {
- Loggers.REMOTE_DIGEST.warn("[{}]Bi stream on completed", connectionId);
- }
- if (responseObserver instanceof ServerCallStreamObserver) {
- ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
- if (serverCallStreamObserver.isCancelled()) {
- //client close the stream.
- } else {
- try {
- serverCallStreamObserver.onCompleted();
- } catch (Throwable throwable) {
- //ignore
- }
-
- }
- }
- }
- };
-
- return streamObserver;
- }
- }
复制代码
- ConnectionManager.register
这里的connections是用来管理所有的长连接的
- Map<String, Connection> connections = new ConcurrentHashMap<>();
- public synchronized boolean register(String connectionId, Connection connection) {
- if (connection.isConnected()) {
- String clientIp = connection.getMetaInfo().clientIp;
- if (connections.containsKey(connectionId)) {
- return true;
- }
- if (checkLimit(connection)) {
- return false;
- }
- if (traced(clientIp)) {
- connection.setTraced(true);
- }
- connections.put(connectionId, connection);
- if (!connectionForClientIp.containsKey(clientIp)) {
- connectionForClientIp.put(clientIp, new AtomicInteger(0));
- }
- connectionForClientIp.get(clientIp).getAndIncrement();
- clientConnectionEventListenerRegistry.notifyClientConnected(connection);
- LOGGER.info("new connection registered successfully, connectionId = {},connection={} ", connectionId,
- connection);
- return true;
- }
- return false;
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |