本文将从一个服务注册示例入手,通过阅读客户端、服务端源码,分析服务注册、服务发现原理。
使用的2.0.2的版本。
客户端
创建NacosNamingService对象
- NacosNamingService nacosNamingService = new NacosNamingService(NACOS_HOST);
复制代码 NacosNamingService提供两个构造方法:- public NacosNamingService(String serverList) throws NacosException {
- Properties properties = new Properties();
- properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
- init(properties);
- }
- public NacosNamingService(Properties properties) throws NacosException {
- init(properties);
- }
复制代码 第二个方法的properties的key在PropertyKeyConst常量类可以找到,如:
- namespace
- username
- password
- serverAddr
- clusterName
- 其他
构造方法中会初始化一些参数和组件:
- 初始化namespace参数
- 创建InstancesChangeNotifier对象,它实现了Subscriber接口,监听InstancesChangeEvent事件
- public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
- // key使用serviceName + groupName + clusters组合而成
- // value是监听器集合
- private final Map<String, ConcurrentHashSet<EventListener>> listenerMap;
- // 锁
- private final Object lock = new Object();
复制代码 - 向NotifyCenter注册InstancesChangeEvent事件,注册之前创建的InstancesChangeNotifier对象监听服务实例变化
- NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
- NotifyCenter.registerSubscriber(changeNotifier);
- // NotifyCenter维护着EventPublisher集,Subscriber会被注册到EventPublisher上
- // EventPublisher提供publish方法向Event队列推送事件
- // EventPublisher是一个Thread类,run方法从Event队列取事件通知Subscriber来处理
复制代码 - 创建NamingClientProxyDelegate对象,用于与服务端通信,它是一个代理,内部使用其他的NamingClientProxy实现:
- NamingHttpClientProxy
- NamingGrpcClientProxy - 默认使用该实现类,其中有healthCheck检测服务端是否健康,服务端直接响应成功无操作
服务注册
- NacosNamingService nacosNamingService = new NacosNamingService(NACOS_HOST);
- nacosNamingService.registerInstance(ORDER_SERVICE, "192.168.0.100", 9999);
复制代码 提供多个重载的registerInstance方法,最终使用这个方法:- public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
- throws NacosException {
- Instance instance = new Instance();
- instance.setIp(ip);
- instance.setPort(port);
- instance.setWeight(1.0);
- instance.setClusterName(clusterName);
- registerInstance(serviceName, groupName, instance);
- }
- public void registerInstance(String serviceName, String groupName, Instance instance)
- throws NacosException {
- // 此处clientProxy是NamingClientProxyDelegate对象
- clientProxy.registerService(serviceName, groupName, instance);
- }
复制代码 NamingClientProxyDelegate的registerService方法会选择一个具体的NamingClientProxy对象与服务端通信,默认使用NamingGrpcClientProxy对象。
NamingGrpcClientProxy的registerService方法构建InstanceRequest请求对象,之后使用RpcClient对象发送请求并接收响应。
RpcClient内部通过GrpcConnection对象使用GRPC来访问服务端。
内部的GRPC代码是使用protoc和protobuf-maven-plugin生成的,通信细节此处不做介绍。
服务下线
- nacosNamingService.deregisterInstance(ORDER_SERVICE, "192.168.0.100", 9999);
复制代码 deregisterInstance服务下线:- public void deregisterInstance(String serviceName,
- String groupName,
- String ip,
- int port,
- String clusterName) throws NacosException {
- Instance instance = new Instance();
- instance.setIp(ip);
- instance.setPort(port);
- instance.setClusterName(clusterName);
- deregisterInstance(serviceName, groupName, instance);
- }
- public void deregisterInstance(String serviceName,
- String groupName,
- Instance instance) throws NacosException {
- clientProxy.deregisterService(serviceName, groupName, instance);
- }
复制代码 查询实例
示例代码:- NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
- List<Instance> instances = namingService.getAllInstances(ORDER_SERVICE, true);
- System.out.printf(">> instance count=%d\n", instances.size());
- for (Instance instance : instances) {
- System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
- instance.getServiceName(), instance.getInstanceId(),
- instance.getClusterName(), instance.getIp(), instance.getPort());
- }
复制代码 提供了几个重载的getAllInstances方法,最重要的参数就是subscribe,当为true时,会向服务端发送订阅请求,之后一直从ServiceInfoHolder中获取服务实例信息,而不再向服务端发送查询请求。- public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
- boolean subscribe) throws NacosException {
- ServiceInfo serviceInfo;
- String clusterString = StringUtils.join(clusters, ",");
- if (subscribe) {
- serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
- if (null == serviceInfo) {
- // 订阅请求
- serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
- }
- } else {
- // 查询请求
- serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
- }
- List<Instance> list;
- if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
- return new ArrayList<Instance>();
- }
- return list;
- }
复制代码 服务订阅
示例代码:- NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
- namingService.subscribe(ORDER_SERVICE, new EventListener() {
- @Override
- public void onEvent(Event event) {
- NamingEvent e = (NamingEvent) event;
- System.out.println("serviceName=" + e.getServiceName());
- List<Instance> instances = e.getInstances();
- System.out.printf(">> instance count=%d\n", instances.size());
- for (Instance instance : instances) {
- System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
- instance.getServiceName(), instance.getInstanceId(),
- instance.getClusterName(), instance.getIp(), instance.getPort());
- }
- }
- });
- TimeUnit.SECONDS.sleep(1200);
复制代码 subscribe方法:- public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
- throws NacosException {
- String clusterString = StringUtils.join(clusters, ",");
- // 将listener保存到listenerMap中
- changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
- // 发送订阅请求
- clientProxy.subscribe(serviceName, groupName, clusterString);
- }
复制代码 实例变化的方法调用栈:

当收到服务端的实例变化事件时,会触发grpc层的观察者监听:- public void onMessage(RespT message) {
- if (firstResponseReceived && !streamingResponse) {
- throw Status.INTERNAL
- .withDescription("More than one responses received for unary or client-streaming call")
- .asRuntimeException();
- }
- firstResponseReceived = true;
- // 调用观察者
- observer.onNext(message);
- if (streamingResponse && adapter.autoFlowControlEnabled) {
- // Request delivery of the next inbound message.
- adapter.request(1);
- }
- }
复制代码 此处的observer是在创建rpc连接的时候注册的:- private StreamObserver<Payload> bindRequestStream(
- final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
- final GrpcConnection grpcConn) {
- return streamStub.requestBiStream(new StreamObserver<Payload>() {
- @Override
- public void onNext(Payload payload) {
- try {
- Object parseBody = GrpcUtils.parse(payload);
- final Request request = (Request) parseBody;
- if (request != null) {
- try {
- // 调用ServerRequestHandler处理请求
- Response response = handleServerRequest(request);
- if (response != null) {
- response.setRequestId(request.getRequestId());
- sendResponse(response);
- }
- // ...
复制代码 NamingPushRequestHandler的处理逻辑:- public Response requestReply(Request request) {
- if (request instanceof NotifySubscriberRequest) {
- NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
- serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
- return new NotifySubscriberResponse();
- }
- return null;
- }
复制代码 serviceInfoHolder.processServiceInfo方法:- public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
- String serviceKey = serviceInfo.getKey();
- if (serviceKey == null) {
- return null;
- }
- ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
- if (isEmptyOrErrorPush(serviceInfo)) {
- //empty or error push, just ignore
- return oldService;
- }
- serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
- boolean changed = isChangedServiceInfo(oldService, serviceInfo);
- if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
- serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
- }
- MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
- if (changed) {
- // 推送一个InstancesChangeEvent事件
- NotifyCenter.publishEvent(new InstancesChangeEvent(
- serviceInfo.getName(), serviceInfo.getGroupName(),
- serviceInfo.getClusters(), serviceInfo.getHosts()));
- DiskCache.write(serviceInfo, cacheDir);
- }
- return serviceInfo;
- }
复制代码 推送一个InstancesChangeEvent事件:
- NotifyCenter维护着一个EventPublisher集,当有事件时,会选择一个目标EventPublisher
- 通过publish方法将事件保存到一个Event队列
- public boolean publish(Event event) {
- checkIsStart();
- boolean success = this.queue.offer(event);
- if (!success) {
- // 当队列操作失败时,直接使用当前线程处理事件
- receiveEvent(event);
- return true;
- }
- return true;
- }
复制代码 - EventPublisher是一个线程,在NotifyCenter初始化时启动。run方法会从Event队列取事件,使用receiveEvent(event)进行处理
- receiveEvent方法查找所有的Subscriber,其中就有最初创建的InstancesChangeNotifier,调用订阅者onEvent方法
服务端
服务注册
InstanceRequestHandler处理器
注册中心的rpc处理器在com.alibaba.nacos.naming.remote.rpc.handler包,处理服务注册和下线的处理器是InstanceRequestHandler类:- public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
- private final EphemeralClientOperationServiceImpl clientOperationService;
- public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
- this.clientOperationService = clientOperationService;
- }
- @Secured(action = ActionTypes.WRITE, parser = NamingResourceParser.class)
- public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
- Service service = Service
- .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
- switch (request.getType()) {
- // 服务注册
- case NamingRemoteConstants.REGISTER_INSTANCE:
- return registerInstance(service, request, meta);
- // 服务下线
- case NamingRemoteConstants.DE_REGISTER_INSTANCE:
- return deregisterInstance(service, request, meta);
- default:
- throw new NacosException(NacosException.INVALID_PARAM,
- String.format("Unsupported request type %s", request.getType()));
- }
- }
- // 服务注册
- private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
- clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
- return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
- }
- // 服务下线
- private InstanceResponse deregisterInstance(
- Service service, InstanceRequest request, RequestMeta meta) {
- clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
- return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
- }
- }
复制代码 服务注册核心流程
- public void registerInstance(Service service, Instance instance, String clientId) {
- Service singleton = ServiceManager.getInstance().getSingleton(service);
- Client client = clientManager.getClient(clientId);
- InstancePublishInfo instanceInfo = getPublishInfo(instance);
- // Add a new instance for service for current client
- // 1. 给当前客户端绑定service -> instance关系
- // 2. 推送一个ClientChangedEvent事件
- client.addServiceInstance(singleton, instanceInfo);
- client.setLastUpdatedTime();
- // 推送ClientRegisterServiceEvent和InstanceMetadataEvent事件
- NotifyCenter.publishEvent(
- new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
- NotifyCenter.publishEvent(
- new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
- }
复制代码
- 给当前客户端绑定service -> instance关系
- 推送一个ClientChangedEvent事件
- 推送ClientRegisterServiceEvent事件
- 推送InstanceMetadataEvent事件
事件处理流程
ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 会由DistroClientDataProcessor进行处理,同步客户端数据到所有服务节点。
ClientRegisterServiceEvent事件:Client register service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。另外该处理器会推送一个ServiceChangedEvent事件。
InstanceMetadataEvent事件:实例元数据事件。由NamingMetadataManager进行处理,NamingMetadataManager管理客户端注册的服务和实例元数据信息。InstanceMetadataEvent事件会触发该处理器的实例过期判断。
ServiceChangedEvent事件:Service data changed event. 有两个处理器:
- NamingSubscriberServiceV2Impl - 触发回调服务订阅者任务
- DoubleWriteEventListener - 触发将服务信息同步到其他nacos节点任务
服务下线
服务下线核心流程
- public void deregisterInstance(Service service, Instance instance, String clientId) {
- Service singleton = ServiceManager.getInstance().getSingleton(service);
- Client client = clientManager.getClient(clientId);
- // Remove service instance from client
- // 1. 解除当前客户端的service -> instance关系
- // 2. 推送一个ClientChangedEvent事件
- InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);
- client.setLastUpdatedTime();
- // 推送ClientDeregisterServiceEvent和InstanceMetadataEvent事件
- if (null != removedInstance) {
- NotifyCenter.publishEvent(
- new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
- NotifyCenter.publishEvent(
- new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true));
- }
- }
复制代码
- 解除当前客户端的service -> instance关系
- 推送一个ClientChangedEvent事件
- 推送ClientDeregisterServiceEvent事件
- 推送InstanceMetadataEvent事件
事件处理流程
基本与服务注册流程相同。
ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 会由DistroClientDataProcessor进行处理,同步客户端数据到所有服务节点。
ClientDeregisterServiceEvent事件:Client deregister service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。另外该处理器会推送一个ServiceChangedEvent事件。
InstanceMetadataEvent事件:实例元数据事件。由NamingMetadataManager进行处理,NamingMetadataManager管理客户端注册的服务和实例元数据信息。InstanceMetadataEvent事件会触发该处理器的实例过期判断。
ServiceChangedEvent事件:Service data changed event. 有两个处理器:
- NamingSubscriberServiceV2Impl - 触发回调服务订阅者任务
- DoubleWriteEventListener - 触发将服务信息同步到其他nacos节点任务
服务实例心跳
- 客户端会周期性的发送healthCheck请求
- 服务端每次收到客户端请求时都会更新对应connection的活跃时间戳
- 服务端也会周期性的检查客户端connection的活跃时间戳和客户端IP连接数,当超过一定的时间不活跃,服务端会发一个检测请求给客户端,当连接数超过阈值时将重置多余的连接
客户端healthCheck请求
客户端会周期性发送healthCheck请求,默认每5秒执行一次,在RpcClient中:- clientEventExecutor.submit(new Runnable() {
- @Override
- public void run() {
- while (true) {
- try {
- if (isShutdown()) {
- break;
- }
- ReconnectContext reconnectContext = reconnectionSignal
- .poll(keepAliveTime, TimeUnit.MILLISECONDS);
- if (reconnectContext == null) {
- //check alive time.
- if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
- boolean isHealthy = healthCheck();
- if (!isHealthy) {
- if (currentConnection == null) {
- continue;
- }
-
- RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
- if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
- break;
- }
- // ...
复制代码 healthCheck健康检查:- private boolean healthCheck() {
- HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
- try {
- Response response = this.currentConnection.request(healthCheckRequest, 3000L);
- return response != null && response.isSuccess();
- } catch (NacosException e) {
- //ignore
- }
- return false;
- }
复制代码 如果检查失败,将重新建立连接。
服务端记录connection活跃时间戳
服务端每次收到客户端请求时都会更新对应connection的活跃时间戳。
服务端使用GrpcRequestAcceptor作为业务层请求Acceptor入口,这个类会将GRPC的请求转为业务层请求,并转发到对应的RequestHandler处理器。
在其request方法中,会刷新对应connection的活跃时间戳:- Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
- RequestMeta requestMeta = new RequestMeta();
- requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
- requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
- requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
- requestMeta.setLabels(connection.getMetaInfo().getLabels());
- // 刷新connection的活跃时间戳
- connectionManager.refreshActiveTime(requestMeta.getConnectionId());
- Response response = requestHandler.handleRequest(request, requestMeta);
- Payload payloadResponse = GrpcUtils.convert(response);
- traceIfNecessary(payloadResponse, false);
- responseObserver.onNext(payloadResponse);
- responseObserver.onCompleted();
复制代码 服务端connection活跃检查
服务端周期性检查客户端connection的活跃时间戳和客户端IP连接数,当超过一定的时间不活跃,服务端会发一个检测请求给客户端,当连接数超过阈值时将重置多余的连接。
服务端使用ConnectionManager管理连接:- Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
复制代码 在启动时,会创建周期性任务检查connections的活跃状态,默认每3秒执行一次,以下为代码片段:- // 检查长时间不活跃的连接和超过最大连接数的连接
- for (Map.Entry<String, Connection> entry : entries) {
- Connection client = entry.getValue();
- String clientIp = client.getMetaInfo().getClientIp();
- AtomicInteger integer = expelForIp.get(clientIp);
- if (integer != null && integer.intValue() > 0) {
- integer.decrementAndGet();
- expelClient.add(client.getMetaInfo().getConnectionId());
- expelCount--;
- } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
- outDatedConnections.add(client.getMetaInfo().getConnectionId());
- }
- }
- // ...
- // 重置超过最大连接数的连接
- for (String expelledClientId : expelClient) {
- try {
- Connection connection = getConnection(expelledClientId);
- if (connection != null) {
- ConnectResetRequest connectResetRequest = new ConnectResetRequest();
- connectResetRequest.setServerIp(serverIp);
- connectResetRequest.setServerPort(serverPort);
- connection.asyncRequest(connectResetRequest, null);
- }
- } catch (ConnectionAlreadyClosedException e) {
- unregister(expelledClientId);
- } catch (Exception e) {
- }
- }
- // ...
- if (CollectionUtils.isNotEmpty(outDatedConnections)) {
- Set<String> successConnections = new HashSet<>();
- final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
- for (String outDateConnectionId : outDatedConnections) {
- try {
- Connection connection = getConnection(outDateConnectionId);
- if (connection != null) {
- // 给客户端发检测请求
- ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
- connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
- @Override
- public Executor getExecutor() {
- return null;
- }
- @Override
- public long getTimeout() {
- return 1000L;
- }
- @Override
- public void onResponse(Response response) {
- latch.countDown();
- if (response != null && response.isSuccess()) {
- connection.freshActiveTime();
- successConnections.add(outDateConnectionId);
- }
- }
- @Override
- public void onException(Throwable e) {
- latch.countDown();
- }
- });
- } else {
- latch.countDown();
- }
- } catch (ConnectionAlreadyClosedException e) {
- latch.countDown();
- } catch (Exception e) {
- latch.countDown();
- }
- }
- latch.await(3000L, TimeUnit.MILLISECONDS);
- // 移除失败的已断开连接
- for (String outDateConnectionId : outDatedConnections) {
- if (!successConnections.contains(outDateConnectionId)) {
- unregister(outDateConnectionId);
- }
- }
- }
复制代码 客户端断开连接
业务处理流程
GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:- public void transportTerminated(Attributes transportAttrs) {
- String connectionId = null;
- try {
- connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
- } catch (Exception e) {
- // Ignore
- }
- if (StringUtils.isNotBlank(connectionId)) {
- // 使用ConnectionManager移除连接
- connectionManager.unregister(connectionId);
- }
- }
复制代码 ConnectionManager移除连接:- public synchronized void unregister(String connectionId) {
- // 从Connection集移除连接
- Connection remove = this.connections.remove(connectionId);
- if (remove != null) {
- String clientIp = remove.getMetaInfo().clientIp;
- AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
- // IP连接数--
- if (atomicInteger != null) {
- int count = atomicInteger.decrementAndGet();
- if (count <= 0) {
- connectionForClientIp.remove(clientIp);
- }
- }
- remove.close();
- // 通知ClientManager层断开连接
- clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
- }
- }
复制代码 事件处理流程
ClientSubscribeServiceEvent事件:Client subscribe service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。- public boolean clientDisconnected(String clientId) {
- ConnectionBasedClient client = clients.remove(clientId);
- if (null == client) {
- return true;
- }
- client.release();
- // 推送一个ClientDisconnectEvent事件
- NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
- return true;
- }
复制代码 ServiceSubscribedEvent事件:Service is subscribed by one client event. NamingSubscriberServiceV2Impl进行处理。- public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryRequest, QueryServiceResponse> {
-
- private final ServiceStorage serviceStorage;
-
- private final NamingMetadataManager metadataManager;
-
- public ServiceQueryRequestHandler(ServiceStorage serviceStorage,
- NamingMetadataManager metadataManager) {
- this.serviceStorage = serviceStorage;
- this.metadataManager = metadataManager;
- }
-
- @Override
- @Secured(action = ActionTypes.READ, parser = NamingResourceParser.class)
- public QueryServiceResponse handle(
- ServiceQueryRequest request, RequestMeta meta) throws NacosException {
- String namespaceId = request.getNamespace();
- String groupName = request.getGroupName();
- String serviceName = request.getServiceName();
- Service service = Service.newService(namespaceId, groupName, serviceName);
- String cluster = null == request.getCluster() ? "" : request.getCluster();
- boolean healthyOnly = request.isHealthyOnly();
- // ServiceInfo封装服务基本信息和其实例集合
- ServiceInfo result = serviceStorage.getData(service);
- ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
- result = ServiceUtil
- .selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true);
- return QueryServiceResponse.buildSuccessResponse(result);
- }
- }
复制代码 取消服务订阅
- public ServiceInfo getData(Service service) {
- // 如果缓存里面有服务信息则直接从缓存查找
- return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
- }
- public ServiceInfo getPushData(Service service) {
- ServiceInfo result = emptyServiceInfo(service);
- if (!ServiceManager.getInstance().containSingleton(service)) {
- return result;
- }
- // 从ClientServiceIndexesManager查找
- result.setHosts(getAllInstancesFromIndex(service));
- serviceDataIndexes.put(service, result);
- return result;
- }
- private List<Instance> getAllInstancesFromIndex(Service service) {
- Set<Instance> result = new HashSet<>();
- Set<String> clusters = new HashSet<>();
- // 从ClientServiceIndexesManager查找service绑定的client集
- for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
- // 查找该client注册的实例信息
- Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
- if (instancePublishInfo.isPresent()) {
- Instance instance = parseInstance(service, instancePublishInfo.get());
- result.add(instance);
- clusters.add(instance.getClusterName());
- }
- }
- // cache clusters of this service
- serviceClusterIndex.put(service, clusters);
- return new LinkedList<>(result);
- }
- private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
- // 获取到client对象
- Client client = clientManager.getClient(clientId);
- if (null == client) {
- return Optional.empty();
- }
- // 查找该client指定service注册的实例信息
- // AbstractClient使用Map<Service, InstancePublishInfo>结构保存
- // 前文介绍过在服务注册时会使用client.addServiceInstance方法添加注册信息
- return Optional.ofNullable(client.getInstancePublishInfo(service));
- }
复制代码 推送一个ClientUnsubscribeServiceEvent事件,还是使用ClientServiceIndexesManager来处理,移除订阅关系。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |