Eureka服务注册发现源码流程简析

打印 上一主题 下一主题

主题 974|帖子 974|积分 2922

一: 服务的注册

客户端通过执行InstanceInfoReplicator#run()调用DiscoveryClient#register()发送http请求举行注册
InstanceInfoReplicator 是同于更新同步当前服务到服务端的任务实现
//A task for updating and replicating the local instanceinfo to the remote server.
  1. //服务注册
  2. boolean register() throws Throwable {
  3.         logger.info("DiscoveryClient_{}: registering service...", this.appPathIdentifier);
  4.         EurekaHttpResponse httpResponse;
  5.         try {
  6.             httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
  7.         } catch (Exception var3) {
  8.             Exception e = var3;
  9.             logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, e.getMessage(), e});
  10.             throw e;
  11.         }
  12.         if (logger.isInfoEnabled()) {
  13.             logger.info("DiscoveryClient_{} - registration status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
  14.         }
  15.         return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
  16.     }
  17. //服务续约
  18. /**
  19. eureka 初始化定时任务,依据设定的心跳时间触发 renew方法,检测服务是否宕机
  20. */
  21.     boolean renew() {
  22.         try {
  23.             EurekaHttpResponse<InstanceInfo> httpResponse = this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(), this.instanceInfo.getId(), this.instanceInfo, (InstanceInfo.InstanceStatus)null);
  24.             logger.debug("DiscoveryClient_{} - Heartbeat status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
  25.             if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
  26.                 this.REREGISTER_COUNTER.increment();
  27.                 logger.info("DiscoveryClient_{} - Re-registering apps/{}", this.appPathIdentifier, this.instanceInfo.getAppName());
  28.                 long timestamp = this.instanceInfo.setIsDirtyWithTime();
  29.                 boolean success = this.register();
  30.                 if (success) {
  31.                     this.instanceInfo.unsetIsDirty(timestamp);
  32.                 }
  33.                 return success;
  34.             } else {
  35.                 return httpResponse.getStatusCode() == Status.OK.getStatusCode();
  36.             }
  37.         } catch (Throwable var5) {
  38.             Throwable e = var5;
  39.             logger.error("DiscoveryClient_{} - was unable to send heartbeat!", this.appPathIdentifier, e);
  40.             return false;
  41.         }
  42.     }
复制代码
服务端服务注册接受和存储
  1. //eureka 客户端会通过此方法注册保存到eureka server 的内存中
  2. public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
  3.         this.read.lock();
  4.         try {
  5.             Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
  6.             EurekaMonitors.REGISTER.increment(isReplication);
  7.             if (gMap == null) {
  8.                 ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap();
  9.                 gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);
  10.                 if (gMap == null) {
  11.                     gMap = gNewMap;
  12.                 }
  13.             }
  14.             Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId());
  15.             if (existingLease != null && existingLease.getHolder() != null) {
  16.                 Long existingLastDirtyTimestamp = ((InstanceInfo)existingLease.getHolder()).getLastDirtyTimestamp();
  17.                 Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
  18.                 logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
  19.                 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
  20.                     logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
  21.                     logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
  22.                     registrant = (InstanceInfo)existingLease.getHolder();
  23.                 }
  24.             } else {
  25.                 synchronized(this.lock) {
  26.                     if (this.expectedNumberOfClientsSendingRenews > 0) {
  27.                         ++this.expectedNumberOfClientsSendingRenews;
  28.                         this.updateRenewsPerMinThreshold();
  29.                     }
  30.                 }
  31.                 logger.debug("No previous lease information found; it is new registration");
  32.             }
  33.             Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration);
  34.             if (existingLease != null) {
  35.                 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
  36.             }
  37.             ((Map)gMap).put(registrant.getId(), lease);
  38.             this.recentRegisteredQueue.add(new Pair(System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")"));
  39.             if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
  40.                 logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the overrides", registrant.getOverriddenStatus(), registrant.getId());
  41.                 if (!this.overriddenInstanceStatusMap.containsKey(registrant.getId())) {
  42.                     logger.info("Not found overridden id {} and hence adding it", registrant.getId());
  43.                     this.overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
  44.                 }
  45.             }
  46.             InstanceInfo.InstanceStatus overriddenStatusFromMap = (InstanceInfo.InstanceStatus)this.overriddenInstanceStatusMap.get(registrant.getId());
  47.             if (overriddenStatusFromMap != null) {
  48.                 logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
  49.                 registrant.setOverriddenStatus(overriddenStatusFromMap);
  50.             }
  51.             InstanceInfo.InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(registrant, existingLease, isReplication);
  52.             registrant.setStatusWithoutDirty(overriddenInstanceStatus);
  53.             if (InstanceStatus.UP.equals(registrant.getStatus())) {
  54.                 lease.serviceUp();
  55.             }
  56.             registrant.setActionType(ActionType.ADDED);
  57.             this.recentlyChangedQueue.add(new RecentlyChangedItem(lease));
  58.             registrant.setLastUpdatedTimestamp();
  59.             this.invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
  60.             logger.info("Registered instance {}/{} with status {} (replication={})", new Object[]{registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication});
  61.         } finally {
  62.             this.read.unlock();
  63.         }
  64.     }
复制代码
如图,registry保存有将注册、已注册到server 的eureka客户端 instance信息;
将要注册到注册表registry中的instance,会创建一个map结构保存
二: 服务的发现

依据上图展示,服务注册采取的是客户端建立DiscoveryClient建立http请求,同理使用此DiscoveryClient实例通过请求完成服务的发现,不再赘述;
重点将放入服务发现的缓存和调用
1.
  1. //其他发现实现 DiscoveryClient#getAndUpdateDelta 包括更新等具体操作不作继续深入讨论
  2.     /**
  3.      * Gets the full registry information from the eureka server and stores it locally.
  4.      * When applying the full registry, the following flow is observed:
  5.      *
  6.      * if (update generation have not advanced (due to another thread))
  7.      *   atomically set the registry to the new registry
  8.      * fi
  9.      *
  10.      * @return the full registry information.
  11.      * @throws Throwable
  12.      *             on error.
  13.      */
  14.     private void getAndStoreFullRegistry() throws Throwable {
  15.         long currentUpdateGeneration = fetchRegistryGeneration.get();
  16.         logger.info("Getting all instance registry info from the eureka server");
  17.         Applications apps = null;
  18.         EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
  19.                 ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
  20.                 : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
  21.         if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
  22.             apps = httpResponse.getEntity();
  23.         }
  24.         logger.info("The response status is {}", httpResponse.getStatusCode());
  25.         if (apps == null) {
  26.             logger.error("The application is null for some reason. Not storing this information");
  27.         } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
  28.     //存入本地服务缓存,即客户端将server端的服务注册信息缓存了一份,存在后续的缓存更新机制不做深入
  29.     //this.filterAndShuffle(apps) 目的是处理 UP 状态的实例以及 打乱保证随机性
  30.     // Shuffling helps in randomizing the applications list there by avoiding the same instances receiving traffic during start ups.
  31.             localRegionApps.set(this.filterAndShuffle(apps));
  32.             logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
  33.         } else {
  34.             logger.warn("Not updating applications as another thread is updating it already");
  35.         }
  36.     }
复制代码
2. 调用
一: 使用DiscoveryClient获取实例的信息,再构建http请求
二: 使用组件fegin完成服务转发
  1. @FeignClient(value = "eurekaclient")
  2. public interface ApiService {
  3.    @RequestMapping(value = "/index",method = RequestMethod.GET)
  4.    String index();
  5. }
  6. /**
  7. 等价于 new httpclient(eurekaClient) => 发送 /index 接口并接受到response
  8. */
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

伤心客

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表