Nacos源码 (3) 注册中心

打印 上一主题 下一主题

主题 937|帖子 937|积分 2811

本文将从一个服务注册示例入手,通过阅读客户端、服务端源码,分析服务注册、服务发现原理。
使用的2.0.2的版本。
客户端

创建NacosNamingService对象
  1. NacosNamingService nacosNamingService = new NacosNamingService(NACOS_HOST);
复制代码
NacosNamingService提供两个构造方法:
  1. public NacosNamingService(String serverList) throws NacosException {
  2.     Properties properties = new Properties();
  3.     properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
  4.     init(properties);
  5. }
  6. public NacosNamingService(Properties properties) throws NacosException {
  7.     init(properties);
  8. }
复制代码
第二个方法的properties的key在PropertyKeyConst常量类可以找到,如:

  • namespace
  • username
  • password
  • serverAddr
  • clusterName
  • 其他
构造方法中会初始化一些参数和组件:

  • 初始化namespace参数
  • 创建InstancesChangeNotifier对象,它实现了Subscriber接口,监听InstancesChangeEvent事件
    1. public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
    2.     // key使用serviceName + groupName + clusters组合而成
    3.     // value是监听器集合
    4.     private final Map<String, ConcurrentHashSet<EventListener>> listenerMap;
    5.     // 锁
    6.     private final Object lock = new Object();
    复制代码
  • 向NotifyCenter注册InstancesChangeEvent事件,注册之前创建的InstancesChangeNotifier对象监听服务实例变化
    1. NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
    2. NotifyCenter.registerSubscriber(changeNotifier);
    3. // NotifyCenter维护着EventPublisher集,Subscriber会被注册到EventPublisher上
    4. // EventPublisher提供publish方法向Event队列推送事件
    5. // EventPublisher是一个Thread类,run方法从Event队列取事件通知Subscriber来处理
    复制代码
  • 创建NamingClientProxyDelegate对象,用于与服务端通信,它是一个代理,内部使用其他的NamingClientProxy实现:

    • NamingHttpClientProxy
    • NamingGrpcClientProxy - 默认使用该实现类,其中有healthCheck检测服务端是否健康,服务端直接响应成功无操作

服务注册
  1. NacosNamingService nacosNamingService = new NacosNamingService(NACOS_HOST);
  2. nacosNamingService.registerInstance(ORDER_SERVICE, "192.168.0.100", 9999);
复制代码
提供多个重载的registerInstance方法,最终使用这个方法:
  1. public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
  2.             throws NacosException {
  3.     Instance instance = new Instance();
  4.     instance.setIp(ip);
  5.     instance.setPort(port);
  6.     instance.setWeight(1.0);
  7.     instance.setClusterName(clusterName);
  8.     registerInstance(serviceName, groupName, instance);
  9. }
  10. public void registerInstance(String serviceName, String groupName, Instance instance)
  11.             throws NacosException {
  12.     // 此处clientProxy是NamingClientProxyDelegate对象
  13.     clientProxy.registerService(serviceName, groupName, instance);
  14. }
复制代码
NamingClientProxyDelegate的registerService方法会选择一个具体的NamingClientProxy对象与服务端通信,默认使用NamingGrpcClientProxy对象。
NamingGrpcClientProxy的registerService方法构建InstanceRequest请求对象,之后使用RpcClient对象发送请求并接收响应。
RpcClient内部通过GrpcConnection对象使用GRPC来访问服务端。
内部的GRPC代码是使用protoc和protobuf-maven-plugin生成的,通信细节此处不做介绍。
服务下线
  1. nacosNamingService.deregisterInstance(ORDER_SERVICE, "192.168.0.100", 9999);
复制代码
deregisterInstance服务下线:
  1. public void deregisterInstance(String serviceName,
  2.                                String groupName,
  3.                                String ip,
  4.                                int port,
  5.                                String clusterName) throws NacosException {
  6.     Instance instance = new Instance();
  7.     instance.setIp(ip);
  8.     instance.setPort(port);
  9.     instance.setClusterName(clusterName);
  10.     deregisterInstance(serviceName, groupName, instance);
  11. }
  12. public void deregisterInstance(String serviceName,
  13.                                String groupName,
  14.                                Instance instance) throws NacosException {
  15.     clientProxy.deregisterService(serviceName, groupName, instance);
  16. }
复制代码
查询实例

示例代码:
  1. NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
  2. List<Instance> instances = namingService.getAllInstances(ORDER_SERVICE, true);
  3. System.out.printf(">> instance count=%d\n", instances.size());
  4. for (Instance instance : instances) {
  5.     System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
  6.             instance.getServiceName(), instance.getInstanceId(),
  7.             instance.getClusterName(), instance.getIp(), instance.getPort());
  8. }
复制代码
提供了几个重载的getAllInstances方法,最重要的参数就是subscribe,当为true时,会向服务端发送订阅请求,之后一直从ServiceInfoHolder中获取服务实例信息,而不再向服务端发送查询请求。
  1. public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
  2.         boolean subscribe) throws NacosException {
  3.     ServiceInfo serviceInfo;
  4.     String clusterString = StringUtils.join(clusters, ",");
  5.     if (subscribe) {
  6.         serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
  7.         if (null == serviceInfo) {
  8.             // 订阅请求
  9.             serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
  10.         }
  11.     } else {
  12.         // 查询请求
  13.         serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
  14.     }
  15.     List<Instance> list;
  16.     if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
  17.         return new ArrayList<Instance>();
  18.     }
  19.     return list;
  20. }
复制代码
服务订阅

示例代码:
  1. NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
  2. namingService.subscribe(ORDER_SERVICE, new EventListener() {
  3.     @Override
  4.     public void onEvent(Event event) {
  5.         NamingEvent e = (NamingEvent) event;
  6.         System.out.println("serviceName=" + e.getServiceName());
  7.         List<Instance> instances = e.getInstances();
  8.         System.out.printf(">> instance count=%d\n", instances.size());
  9.         for (Instance instance : instances) {
  10.             System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
  11.                     instance.getServiceName(), instance.getInstanceId(),
  12.                     instance.getClusterName(), instance.getIp(), instance.getPort());
  13.         }
  14.     }
  15. });
  16. TimeUnit.SECONDS.sleep(1200);
复制代码
subscribe方法:
  1. public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
  2.         throws NacosException {
  3.     String clusterString = StringUtils.join(clusters, ",");
  4.     // 将listener保存到listenerMap中
  5.     changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
  6.     // 发送订阅请求
  7.     clientProxy.subscribe(serviceName, groupName, clusterString);
  8. }
复制代码
实例变化的方法调用栈:

当收到服务端的实例变化事件时,会触发grpc层的观察者监听:
  1. public void onMessage(RespT message) {
  2.   if (firstResponseReceived && !streamingResponse) {
  3.     throw Status.INTERNAL
  4.         .withDescription("More than one responses received for unary or client-streaming call")
  5.         .asRuntimeException();
  6.   }
  7.   firstResponseReceived = true;
  8.   // 调用观察者
  9.   observer.onNext(message);
  10.   if (streamingResponse && adapter.autoFlowControlEnabled) {
  11.     // Request delivery of the next inbound message.
  12.     adapter.request(1);
  13.   }
  14. }
复制代码
此处的observer是在创建rpc连接的时候注册的:
  1. private StreamObserver<Payload> bindRequestStream(
  2.         final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
  3.         final GrpcConnection grpcConn) {
  4.     return streamStub.requestBiStream(new StreamObserver<Payload>() {
  5.         @Override
  6.         public void onNext(Payload payload) {
  7.             try {
  8.                 Object parseBody = GrpcUtils.parse(payload);
  9.                 final Request request = (Request) parseBody;
  10.                 if (request != null) {
  11.                     try {
  12.                         // 调用ServerRequestHandler处理请求
  13.                         Response response = handleServerRequest(request);
  14.                         if (response != null) {
  15.                             response.setRequestId(request.getRequestId());
  16.                             sendResponse(response);
  17.                         }
  18. // ...
复制代码
NamingPushRequestHandler的处理逻辑:
  1. public Response requestReply(Request request) {
  2.     if (request instanceof NotifySubscriberRequest) {
  3.         NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
  4.         serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
  5.         return new NotifySubscriberResponse();
  6.     }
  7.     return null;
  8. }
复制代码
serviceInfoHolder.processServiceInfo方法:
  1. public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
  2.     String serviceKey = serviceInfo.getKey();
  3.     if (serviceKey == null) {
  4.         return null;
  5.     }
  6.     ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
  7.     if (isEmptyOrErrorPush(serviceInfo)) {
  8.         //empty or error push, just ignore
  9.         return oldService;
  10.     }
  11.     serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
  12.     boolean changed = isChangedServiceInfo(oldService, serviceInfo);
  13.     if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
  14.         serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
  15.     }
  16.     MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
  17.     if (changed) {
  18.         // 推送一个InstancesChangeEvent事件
  19.         NotifyCenter.publishEvent(new InstancesChangeEvent(
  20.                 serviceInfo.getName(), serviceInfo.getGroupName(),
  21.                 serviceInfo.getClusters(), serviceInfo.getHosts()));
  22.         DiskCache.write(serviceInfo, cacheDir);
  23.     }
  24.     return serviceInfo;
  25. }
复制代码
推送一个InstancesChangeEvent事件:

  • NotifyCenter维护着一个EventPublisher集,当有事件时,会选择一个目标EventPublisher
  • 通过publish方法将事件保存到一个Event队列
    1. public boolean publish(Event event) {
    2.     checkIsStart();
    3.     boolean success = this.queue.offer(event);
    4.     if (!success) {
    5.         // 当队列操作失败时,直接使用当前线程处理事件
    6.         receiveEvent(event);
    7.         return true;
    8.     }
    9.     return true;
    10. }
    复制代码
  • EventPublisher是一个线程,在NotifyCenter初始化时启动。run方法会从Event队列取事件,使用receiveEvent(event)进行处理
  • receiveEvent方法查找所有的Subscriber,其中就有最初创建的InstancesChangeNotifier,调用订阅者onEvent方法
服务端

服务注册

InstanceRequestHandler处理器

注册中心的rpc处理器在com.alibaba.nacos.naming.remote.rpc.handler包,处理服务注册和下线的处理器是InstanceRequestHandler类:
  1. public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
  2.     private final EphemeralClientOperationServiceImpl clientOperationService;
  3.     public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
  4.         this.clientOperationService = clientOperationService;
  5.     }
  6.     @Secured(action = ActionTypes.WRITE, parser = NamingResourceParser.class)
  7.     public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
  8.         Service service = Service
  9.                 .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
  10.         switch (request.getType()) {
  11.             // 服务注册
  12.             case NamingRemoteConstants.REGISTER_INSTANCE:
  13.                 return registerInstance(service, request, meta);
  14.             // 服务下线
  15.             case NamingRemoteConstants.DE_REGISTER_INSTANCE:
  16.                 return deregisterInstance(service, request, meta);
  17.             default:
  18.                 throw new NacosException(NacosException.INVALID_PARAM,
  19.                         String.format("Unsupported request type %s", request.getType()));
  20.         }
  21.     }
  22.     // 服务注册
  23.     private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
  24.         clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
  25.         return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
  26.     }
  27.     // 服务下线
  28.     private InstanceResponse deregisterInstance(
  29.             Service service, InstanceRequest request, RequestMeta meta) {
  30.         clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
  31.         return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
  32.     }
  33. }
复制代码
服务注册核心流程
  1. public void registerInstance(Service service, Instance instance, String clientId) {
  2.     Service singleton = ServiceManager.getInstance().getSingleton(service);
  3.     Client client = clientManager.getClient(clientId);
  4.     InstancePublishInfo instanceInfo = getPublishInfo(instance);
  5.     // Add a new instance for service for current client
  6.     // 1. 给当前客户端绑定service -> instance关系
  7.     // 2. 推送一个ClientChangedEvent事件
  8.     client.addServiceInstance(singleton, instanceInfo);
  9.     client.setLastUpdatedTime();
  10.     // 推送ClientRegisterServiceEvent和InstanceMetadataEvent事件
  11.     NotifyCenter.publishEvent(
  12.         new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
  13.     NotifyCenter.publishEvent(
  14.         new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
  15. }
复制代码

  • 给当前客户端绑定service -> instance关系
  • 推送一个ClientChangedEvent事件
  • 推送ClientRegisterServiceEvent事件
  • 推送InstanceMetadataEvent事件
事件处理流程

ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 会由DistroClientDataProcessor进行处理,同步客户端数据到所有服务节点
ClientRegisterServiceEvent事件:Client register service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。另外该处理器会推送一个ServiceChangedEvent事件。
InstanceMetadataEvent事件:实例元数据事件。由NamingMetadataManager进行处理,NamingMetadataManager管理客户端注册的服务和实例元数据信息。InstanceMetadataEvent事件会触发该处理器的实例过期判断
ServiceChangedEvent事件:Service data changed event. 有两个处理器:

  • NamingSubscriberServiceV2Impl - 触发回调服务订阅者任务
  • DoubleWriteEventListener - 触发将服务信息同步到其他nacos节点任务
服务下线

服务下线核心流程
  1. public void deregisterInstance(Service service, Instance instance, String clientId) {
  2.     Service singleton = ServiceManager.getInstance().getSingleton(service);
  3.     Client client = clientManager.getClient(clientId);
  4.     // Remove service instance from client
  5.     // 1. 解除当前客户端的service -> instance关系
  6.     // 2. 推送一个ClientChangedEvent事件
  7.     InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);
  8.     client.setLastUpdatedTime();
  9.     // 推送ClientDeregisterServiceEvent和InstanceMetadataEvent事件
  10.     if (null != removedInstance) {
  11.         NotifyCenter.publishEvent(
  12.             new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
  13.         NotifyCenter.publishEvent(
  14.             new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true));
  15.     }
  16. }
复制代码

  • 解除当前客户端的service -> instance关系
  • 推送一个ClientChangedEvent事件
  • 推送ClientDeregisterServiceEvent事件
  • 推送InstanceMetadataEvent事件
事件处理流程

基本与服务注册流程相同。
ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 会由DistroClientDataProcessor进行处理,同步客户端数据到所有服务节点
ClientDeregisterServiceEvent事件:Client deregister service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。另外该处理器会推送一个ServiceChangedEvent事件。
InstanceMetadataEvent事件:实例元数据事件。由NamingMetadataManager进行处理,NamingMetadataManager管理客户端注册的服务和实例元数据信息。InstanceMetadataEvent事件会触发该处理器的实例过期判断
ServiceChangedEvent事件:Service data changed event. 有两个处理器:

  • NamingSubscriberServiceV2Impl - 触发回调服务订阅者任务
  • DoubleWriteEventListener - 触发将服务信息同步到其他nacos节点任务
服务实例心跳


  • 客户端会周期性的发送healthCheck请求
  • 服务端每次收到客户端请求时都会更新对应connection的活跃时间戳
  • 服务端也会周期性的检查客户端connection的活跃时间戳和客户端IP连接数,当超过一定的时间不活跃,服务端会发一个检测请求给客户端,当连接数超过阈值时将重置多余的连接
客户端healthCheck请求

客户端会周期性发送healthCheck请求,默认每5秒执行一次,在RpcClient中:
  1. clientEventExecutor.submit(new Runnable() {
  2.     @Override
  3.     public void run() {
  4.         while (true) {
  5.             try {
  6.                 if (isShutdown()) {
  7.                     break;
  8.                 }
  9.                 ReconnectContext reconnectContext = reconnectionSignal
  10.                         .poll(keepAliveTime, TimeUnit.MILLISECONDS);
  11.                 if (reconnectContext == null) {
  12.                     //check alive time.
  13.                     if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
  14.                         boolean isHealthy = healthCheck();
  15.                         if (!isHealthy) {
  16.                             if (currentConnection == null) {
  17.                                 continue;
  18.                             }
  19.                            
  20.                             RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
  21.                             if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
  22.                                 break;
  23.                             }
  24. // ...
复制代码
healthCheck健康检查:
  1. private boolean healthCheck() {
  2.     HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
  3.     try {
  4.         Response response = this.currentConnection.request(healthCheckRequest, 3000L);
  5.         return response != null && response.isSuccess();
  6.     } catch (NacosException e) {
  7.         //ignore
  8.     }
  9.     return false;
  10. }
复制代码
如果检查失败,将重新建立连接。
服务端记录connection活跃时间戳

服务端每次收到客户端请求时都会更新对应connection的活跃时间戳。
服务端使用GrpcRequestAcceptor作为业务层请求Acceptor入口,这个类会将GRPC的请求转为业务层请求,并转发到对应的RequestHandler处理器。
在其request方法中,会刷新对应connection的活跃时间戳:
  1. Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
  2. RequestMeta requestMeta = new RequestMeta();
  3. requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
  4. requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
  5. requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
  6. requestMeta.setLabels(connection.getMetaInfo().getLabels());
  7. // 刷新connection的活跃时间戳
  8. connectionManager.refreshActiveTime(requestMeta.getConnectionId());
  9. Response response = requestHandler.handleRequest(request, requestMeta);
  10. Payload payloadResponse = GrpcUtils.convert(response);
  11. traceIfNecessary(payloadResponse, false);
  12. responseObserver.onNext(payloadResponse);
  13. responseObserver.onCompleted();
复制代码
服务端connection活跃检查

服务端周期性检查客户端connection的活跃时间戳和客户端IP连接数,当超过一定的时间不活跃,服务端会发一个检测请求给客户端,当连接数超过阈值时将重置多余的连接。
服务端使用ConnectionManager管理连接:
  1. Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
复制代码
在启动时,会创建周期性任务检查connections的活跃状态,默认每3秒执行一次,以下为代码片段:
  1. // 检查长时间不活跃的连接和超过最大连接数的连接
  2. for (Map.Entry<String, Connection> entry : entries) {
  3.     Connection client = entry.getValue();
  4.     String clientIp = client.getMetaInfo().getClientIp();
  5.     AtomicInteger integer = expelForIp.get(clientIp);
  6.     if (integer != null && integer.intValue() > 0) {
  7.         integer.decrementAndGet();
  8.         expelClient.add(client.getMetaInfo().getConnectionId());
  9.         expelCount--;
  10.     } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
  11.         outDatedConnections.add(client.getMetaInfo().getConnectionId());
  12.     }
  13. }
  14. // ...
  15. // 重置超过最大连接数的连接
  16. for (String expelledClientId : expelClient) {
  17.     try {
  18.         Connection connection = getConnection(expelledClientId);
  19.         if (connection != null) {
  20.             ConnectResetRequest connectResetRequest = new ConnectResetRequest();
  21.             connectResetRequest.setServerIp(serverIp);
  22.             connectResetRequest.setServerPort(serverPort);
  23.             connection.asyncRequest(connectResetRequest, null);
  24.         }
  25.     } catch (ConnectionAlreadyClosedException e) {
  26.         unregister(expelledClientId);
  27.     } catch (Exception e) {
  28.     }
  29. }
  30. // ...
  31. if (CollectionUtils.isNotEmpty(outDatedConnections)) {
  32.     Set<String> successConnections = new HashSet<>();
  33.     final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
  34.     for (String outDateConnectionId : outDatedConnections) {
  35.         try {
  36.             Connection connection = getConnection(outDateConnectionId);
  37.             if (connection != null) {
  38.                 // 给客户端发检测请求
  39.                 ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
  40.                 connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
  41.                     @Override
  42.                     public Executor getExecutor() {
  43.                         return null;
  44.                     }
  45.                     @Override
  46.                     public long getTimeout() {
  47.                         return 1000L;
  48.                     }
  49.                     @Override
  50.                     public void onResponse(Response response) {
  51.                         latch.countDown();
  52.                         if (response != null && response.isSuccess()) {
  53.                             connection.freshActiveTime();
  54.                             successConnections.add(outDateConnectionId);
  55.                         }
  56.                     }
  57.                     @Override
  58.                     public void onException(Throwable e) {
  59.                         latch.countDown();
  60.                     }
  61.                 });
  62.             } else {
  63.                 latch.countDown();
  64.             }
  65.         } catch (ConnectionAlreadyClosedException e) {
  66.             latch.countDown();
  67.         } catch (Exception e) {
  68.             latch.countDown();
  69.         }
  70.     }
  71.     latch.await(3000L, TimeUnit.MILLISECONDS);
  72.     // 移除失败的已断开连接
  73.     for (String outDateConnectionId : outDatedConnections) {
  74.         if (!successConnections.contains(outDateConnectionId)) {
  75.             unregister(outDateConnectionId);
  76.         }
  77.     }
  78. }
复制代码
客户端断开连接

业务处理流程

GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:
  1. public void transportTerminated(Attributes transportAttrs) {
  2.     String connectionId = null;
  3.     try {
  4.         connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
  5.     } catch (Exception e) {
  6.         // Ignore
  7.     }
  8.     if (StringUtils.isNotBlank(connectionId)) {
  9.         // 使用ConnectionManager移除连接
  10.         connectionManager.unregister(connectionId);
  11.     }
  12. }
复制代码
ConnectionManager移除连接:
  1. public synchronized void unregister(String connectionId) {
  2.     // 从Connection集移除连接
  3.     Connection remove = this.connections.remove(connectionId);
  4.     if (remove != null) {
  5.         String clientIp = remove.getMetaInfo().clientIp;
  6.         AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
  7.         // IP连接数--
  8.         if (atomicInteger != null) {
  9.             int count = atomicInteger.decrementAndGet();
  10.             if (count <= 0) {
  11.                 connectionForClientIp.remove(clientIp);
  12.             }
  13.         }
  14.         remove.close();
  15.         // 通知ClientManager层断开连接
  16.         clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
  17.     }
  18. }
复制代码
事件处理流程

ClientSubscribeServiceEvent事件:Client subscribe service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系
  1. public boolean clientDisconnected(String clientId) {
  2.     ConnectionBasedClient client = clients.remove(clientId);
  3.     if (null == client) {
  4.         return true;
  5.     }
  6.     client.release();
  7.     // 推送一个ClientDisconnectEvent事件
  8.     NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
  9.     return true;
  10. }
复制代码
ServiceSubscribedEvent事件:Service is subscribed by one client event. NamingSubscriberServiceV2Impl进行处理。
  1. public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryRequest, QueryServiceResponse> {
  2.    
  3.     private final ServiceStorage serviceStorage;
  4.    
  5.     private final NamingMetadataManager metadataManager;
  6.    
  7.     public ServiceQueryRequestHandler(ServiceStorage serviceStorage,
  8.                                       NamingMetadataManager metadataManager) {
  9.         this.serviceStorage = serviceStorage;
  10.         this.metadataManager = metadataManager;
  11.     }
  12.    
  13.     @Override
  14.     @Secured(action = ActionTypes.READ, parser = NamingResourceParser.class)
  15.     public QueryServiceResponse handle(
  16.            ServiceQueryRequest request, RequestMeta meta) throws NacosException {
  17.         String namespaceId = request.getNamespace();
  18.         String groupName = request.getGroupName();
  19.         String serviceName = request.getServiceName();
  20.         Service service = Service.newService(namespaceId, groupName, serviceName);
  21.         String cluster = null == request.getCluster() ? "" : request.getCluster();
  22.         boolean healthyOnly = request.isHealthyOnly();
  23.         // ServiceInfo封装服务基本信息和其实例集合
  24.         ServiceInfo result = serviceStorage.getData(service);
  25.         ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
  26.         result = ServiceUtil
  27.             .selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true);
  28.         return QueryServiceResponse.buildSuccessResponse(result);
  29.     }
  30. }
复制代码
取消服务订阅
  1. public ServiceInfo getData(Service service) {
  2.     // 如果缓存里面有服务信息则直接从缓存查找
  3.     return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
  4. }
  5. public ServiceInfo getPushData(Service service) {
  6.     ServiceInfo result = emptyServiceInfo(service);
  7.     if (!ServiceManager.getInstance().containSingleton(service)) {
  8.         return result;
  9.     }
  10.     // 从ClientServiceIndexesManager查找
  11.     result.setHosts(getAllInstancesFromIndex(service));
  12.     serviceDataIndexes.put(service, result);
  13.     return result;
  14. }
  15. private List<Instance> getAllInstancesFromIndex(Service service) {
  16.     Set<Instance> result = new HashSet<>();
  17.     Set<String> clusters = new HashSet<>();
  18.     // 从ClientServiceIndexesManager查找service绑定的client集
  19.     for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
  20.         // 查找该client注册的实例信息
  21.         Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
  22.         if (instancePublishInfo.isPresent()) {
  23.             Instance instance = parseInstance(service, instancePublishInfo.get());
  24.             result.add(instance);
  25.             clusters.add(instance.getClusterName());
  26.         }
  27.     }
  28.     // cache clusters of this service
  29.     serviceClusterIndex.put(service, clusters);
  30.     return new LinkedList<>(result);
  31. }
  32. private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
  33.     // 获取到client对象
  34.     Client client = clientManager.getClient(clientId);
  35.     if (null == client) {
  36.         return Optional.empty();
  37.     }
  38.     // 查找该client指定service注册的实例信息
  39.     // AbstractClient使用Map<Service, InstancePublishInfo>结构保存
  40.     // 前文介绍过在服务注册时会使用client.addServiceInstance方法添加注册信息
  41.     return Optional.ofNullable(client.getInstancePublishInfo(service));
  42. }
复制代码
推送一个ClientUnsubscribeServiceEvent事件,还是使用ClientServiceIndexesManager来处理,移除订阅关系。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

慢吞云雾缓吐愁

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表