ToB企服应用市场:ToB评测及商务社交产业平台

标题: 万字剖析OpenFeign整合Ribbon实现负载均衡的原理 [打印本页]

作者: 石小疯    时间: 2022-6-24 13:35
标题: 万字剖析OpenFeign整合Ribbon实现负载均衡的原理
大家好,前面我已经剖析了OpenFeign的动态代理生成原理和Ribbon的运行原理,这篇文章来继续剖析SpringCloud组件原理,来看一看OpenFeign是如何基于Ribbon来实现负载均衡的,两组件是如何协同工作的。
一、Feign动态代理调用实现rpc流程分析 

通过Feign客户端接口的动态代理生成原理讲解,我们可以清楚的知道,Feign客户端接口的动态代理生成是基于JDK的动态代理来实现的,那么在所有的方法调用的时候最终都会走InvocationHandler接口的实现,默认就是ReflectiveFeign.FeignInvocationHandler,那我们接下来就来看看,FeignInvocationHandler是如何实现rpc调用的。
FeignInvocationHandler对于invoke方法的实现。
  1. private final Map<Method, MethodHandler> dispatch;
  2.     @Override
  3.     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  4.       if ("equals".equals(method.getName())) {
  5.         try {
  6.           Object otherHandler =
  7.               args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
  8.           return equals(otherHandler);
  9.         } catch (IllegalArgumentException e) {
  10.           return false;
  11.         }
  12.       } else if ("hashCode".equals(method.getName())) {
  13.         return hashCode();
  14.       } else if ("toString".equals(method.getName())) {
  15.         return toString();
  16.       }
  17.       return dispatch.get(method).invoke(args);
  18.     }
复制代码
前几个if判断很简单,就是判断是不是调用的方法是不是equals,hashCode,toString,因为这些方法的调是不需要走rpc调用的。
接下就是从dispatch获取要调用的方法对应的MethodHandler,然后调用MethodHandler的invoke方法。那MethodHandler是什么时候生成的呢?MethodHandler是在构建动态代理的时候生成的,不清楚的同学可以翻一下OpenFeign那篇文章最后关于生成动态代理的那部分源码。那MethodHandler作用是什么呢?你可以理解为最终rpc的调用都是基于这个MethodHandler来实现的,每个方法都有对应MethodHandler来实现rpc调用,接下来我们就来看一下MethodHandler的invoke方法的实现。
MethodHandler是个接口,有两个实现类,一个是DefaultMethodHandler,这个是处理接口中的默认方法的,另一个是SynchronousMethodHandler,这个是实现rpc调用的方法。接下来我们就看看SynchronousMethodHandler关于invoke方法的实现。
  1. @Override
  2.   public Object invoke(Object[] argv) throws Throwable {
  3.     RequestTemplate template = buildTemplateFromArgs.create(argv);
  4.     Options options = findOptions(argv);
  5.     Retryer retryer = this.retryer.clone();
  6.     while (true) {
  7.       try {
  8.         return executeAndDecode(template, options);
  9.       } catch (RetryableException e) {
  10.         try {
  11.           retryer.continueOrPropagate(e);
  12.         } catch (RetryableException th) {
  13.           Throwable cause = th.getCause();
  14.           if (propagationPolicy == UNWRAP && cause != null) {
  15.             throw cause;
  16.           } else {
  17.             throw th;
  18.           }
  19.         }
  20.         if (logLevel != Logger.Level.NONE) {
  21.           logger.logRetry(metadata.configKey(), logLevel);
  22.         }
  23.         continue;
  24.       }
  25.     }
  26.   }
复制代码
第一行通过方法的参数构建了一个RequestTemplate,RequestTemplate可以看成是组装http请求所需各种参数的封装,比如什么情头,body之类的都放在这里面。
第二行 Options options = findOptions(argv); 这个很有意思,Options主要是封装了发送请求是连接超时时间和读超时时间的配置,findOptions(argv)也就是先从参数里面找有没有Options,没有就返回构造SynchronousMethodHandler的入参时的Options,也就是说,连接超时时间和读超时时间可以从方法入参来传入,不过一般没有人这么玩。 
第三行就是搞一个重试的组件,是可以实现重试的,一般不设置。
 
然后执行到executeAndDecode(template, options),进入这个方法
  1. Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
  2.     Request request = targetRequest(template);
  3.     if (logLevel != Logger.Level.NONE) {
  4.       logger.logRequest(metadata.configKey(), logLevel, request);
  5.     }
  6.     Response response;
  7.     long start = System.nanoTime();
  8.     try {
  9.       response = client.execute(request, options);
  10.     } catch (IOException e) {
  11.       if (logLevel != Logger.Level.NONE) {
  12.         logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
  13.       }
  14.       throw errorExecuting(request, e);
  15.     }
  16.     long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
  17.     boolean shouldClose = true;
  18.     try {
  19.       if (logLevel != Logger.Level.NONE) {
  20.         response =
  21.             logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime);
  22.       }
  23.       if (Response.class == metadata.returnType()) {
  24.         if (response.body() == null) {
  25.           return response;
  26.         }
  27.         if (response.body().length() == null ||
  28.             response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
  29.           shouldClose = false;
  30.           return response;
  31.         }
  32.         // Ensure the response body is disconnected
  33.         byte[] bodyData = Util.toByteArray(response.body().asInputStream());
  34.         return response.toBuilder().body(bodyData).build();
  35.       }
  36.       if (response.status() >= 200 && response.status() < 300) {
  37.         if (void.class == metadata.returnType()) {
  38.           return null;
  39.         } else {
  40.           Object result = decode(response);
  41.           shouldClose = closeAfterDecode;
  42.           return result;
  43.         }
  44.       } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) {
  45.         Object result = decode(response);
  46.         shouldClose = closeAfterDecode;
  47.         return result;
  48.       } else {
  49.         throw errorDecoder.decode(metadata.configKey(), response);
  50.       }
  51.     } catch (IOException e) {
  52.       if (logLevel != Logger.Level.NONE) {
  53.         logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime);
  54.       }
  55.       throw errorReading(request, response, e);
  56.     } finally {
  57.       if (shouldClose) {
  58.         ensureClosed(response.body());
  59.       }
  60.     }
  61.   }
复制代码
首先调用了targetRequest方法,贴出源码
  1. Request targetRequest(RequestTemplate template) {
  2.     for (RequestInterceptor interceptor : requestInterceptors) {
  3.       interceptor.apply(template);
  4.     }
  5.     return target.apply(template);
  6.   } 
复制代码
这个方法会遍历所有的拦截器RequestInterceptor,这是feign的一个扩展点,也就说再发送请求前,你仍然还有机会对请求的内容进行调整,比如说加个请求头,这也是很常见的一种方式,在微服务之间鉴权的时候使用。RequestInterceptor是在构建Feign.Builder的时候传进来的,Feign.Builder的组件都是通过ioc容器获取的,组件又是通过配置类来的,所以你需要的话就可以在配置类中声明RequestInterceptor对象。配置类有不同的优先级,按照自己的需求,可以在其中一个优先级使用,不过一般这种通用的东西,不是某个微服务特有的功能,一般选择在springboot启动中的容器中配置。
执行完targetRequest,回到executeAndDecode之后,会构建出一个Request,Request很好理解,就是一个请求,里面封装了http请求的东西。接下来就会调用Client的execute方法来执行请求,拿到响应,接下来就是基于处理这个响应,将响应数据封装成需要返回的参数,之后返回给调用方。 
到这里,我们已经分析出接口的动态代理是如何运行的。其实就是通过每个方法对应的MethodHandler来实现的,MethodHandler主要就是拼接各种参数,组装成一个请求,随后交由Client接口的实现去发送请求。
二、LoadBalancerFeignClient

通过上面分析整个动态代理调用过程可以看出,Client是发送http请求的关键类。那么Client是什么玩意?还记得我在关于OpenFeign动态代理生成的那篇文章中留下的一个疑问么,当Feign客户端在构建动态代理的时候,填充很多组件到Feign.Builder中,其中有个组件就是Client的实现,我们并没有在FeignClientsConfiguration配置类中找到关于Client的对象的声明。不过当时我就提到了,这个组件的实现是要依赖负载均衡的,也就是这个组件是Feign用来整合Ribbon的入口。 
接下来,我们就着重看一下Client的实现,看看Feign是如何通过ribbon实现负载均衡的。 
我们先来看一下Feign跟ribbon整合的配置类。
  1. @Import({ HttpClientFeignLoadBalancedConfiguration.class,
  2.     OkHttpFeignLoadBalancedConfiguration.class,
  3.     DefaultFeignLoadBalancedConfiguration.class })
  4. public class FeignRibbonClientAutoConfiguration {
  5.   @Bean
  6.   @Primary
  7.   @ConditionalOnMissingBean
  8.   @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
  9.   public CachingSpringLoadBalancerFactory cachingLBClientFactory(
  10.       SpringClientFactory factory) {
  11.     return new CachingSpringLoadBalancerFactory(factory);
  12.   }
  13.   @Bean
  14.   @Primary
  15.   @ConditionalOnMissingBean
  16.   @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
  17.   public CachingSpringLoadBalancerFactory retryabeCachingLBClientFactory(
  18.       SpringClientFactory factory, LoadBalancedRetryFactory retryFactory) {
  19.     return new CachingSpringLoadBalancerFactory(factory, retryFactory);
  20.   }
  21.   @Bean
  22.   @ConditionalOnMissingBean
  23.   public Request.Options feignRequestOptions() {
  24.     return LoadBalancerFeignClient.DEFAULT_OPTIONS;
  25.   }
  26. }
复制代码
我们来分析一下,首先通过@Impot注解导入了三个配置类。
这里我们看一下DefaultFeignLoadBalancedConfiguration配置类,因为默认就是这,HttpClientFeignLoadBalancedConfiguration和OkHttpFeignLoadBalancedConfiguration都需要有引入HttpClient和OkHttp依赖才会有用
  1. @Configuration(proxyBeanMethods = false)
  2. class DefaultFeignLoadBalancedConfiguration {
  3.   @Bean
  4.   @ConditionalOnMissingBean
  5.   public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
  6.       SpringClientFactory clientFactory) {
  7.     return new LoadBalancerFeignClient(new Client.Default(null, null), cachingFactory,
  8.         clientFactory);
  9.   }
  10. }
复制代码
这个配置类很简单,声明了LoadBalancerFeignClient到spring容器,传入了三个参数,一个Client的实现,一个CachingSpringLoadBalancerFactory和一个SpringClientFactory。LoadBalancerFeignClient这个类实现了Client接口,也就数说我们在构建Feign.Builder填充的就是这个对象,也就是上面说feign的执行流程最后用来执行请求的Client的实现。 
接下来我说一下入参的三个参数是什么意思。
其实大家可以自行去看OkHttpFeignLoadBalancedConfiguration和HttpClientFeignLoadBalancedConfiguration,其实他们配置跟DefaultFeignLoadBalancedConfiguration是一样的,声明的对象都是LoadBalancerFeignClient,只不过将Client.Default换成了基于HttpClient和OkHttp的实现,也就是发送http请求使用的工具不一样。 
FeignRibbonClientAutoConfiguration除了导入配置类还声明了CachingSpringLoadBalancerFactory,只不过一种是带基于spring实现的重试功能的,一种是不带的,主要看有没有引入spring重试功能的包,所以上面构建LoadBalancerFeignClient注入的CachingSpringLoadBalancerFactory就是在这声明的。
这里就说完了Feign整合ribbon的配置类FeignRibbonClientAutoConfiguration,我们也找到了构造Feign.Builder的实现LoadBalancerFeignClient,接下来就来剖析LoadBalancerFeignClient的实现。
  1. public class LoadBalancerFeignClient implements Client {
  2.   static final Request.Options DEFAULT_OPTIONS = new Request.Options();
  3.   private final Client delegate;
  4.   private CachingSpringLoadBalancerFactory lbClientFactory;
  5.   private SpringClientFactory clientFactory;
  6.   public LoadBalancerFeignClient(Client delegate,
  7.       CachingSpringLoadBalancerFactory lbClientFactory,
  8.       SpringClientFactory clientFactory) {
  9.     this.delegate = delegate;
  10.     this.lbClientFactory = lbClientFactory;
  11.     this.clientFactory = clientFactory;
  12.   }
  13.   static URI cleanUrl(String originalUrl, String host) {
  14.     String newUrl = originalUrl;
  15.     if (originalUrl.startsWith("https://")) {
  16.       newUrl = originalUrl.substring(0, 8)
  17.           + originalUrl.substring(8 + host.length());
  18.     }
  19.     else if (originalUrl.startsWith("http")) {
  20.       newUrl = originalUrl.substring(0, 7)
  21.           + originalUrl.substring(7 + host.length());
  22.     }
  23.     StringBuffer buffer = new StringBuffer(newUrl);
  24.     if ((newUrl.startsWith("https://") && newUrl.length() == 8)
  25.         || (newUrl.startsWith("http://") && newUrl.length() == 7)) {
  26.       buffer.append("/");
  27.     }
  28.     return URI.create(buffer.toString());
  29.   }
  30.   @Override
  31.   public Response execute(Request request, Request.Options options) throws IOException {
  32.     try {
  33.       URI asUri = URI.create(request.url());
  34.       String clientName = asUri.getHost();
  35.       URI uriWithoutHost = cleanUrl(request.url(), clientName);
  36.       FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
  37.           this.delegate, request, uriWithoutHost);
  38.       IClientConfig requestConfig = getClientConfig(options, clientName);
  39.       return lbClient(clientName)
  40.           .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
  41.     }
  42.     catch (ClientException e) {
  43.       IOException io = findIOException(e);
  44.       if (io != null) {
  45.         throw io;
  46.       }
  47.       throw new RuntimeException(e);
  48.     }
  49.   }
  50.   IClientConfig getClientConfig(Request.Options options, String clientName) {
  51.     IClientConfig requestConfig;
  52.     if (options == DEFAULT_OPTIONS) {
  53.       requestConfig = this.clientFactory.getClientConfig(clientName);
  54.     }
  55.     else {
  56.       requestConfig = new FeignOptionsClientConfig(options);
  57.     }
  58.     return requestConfig;
  59.   }
  60.   protected IOException findIOException(Throwable t) {
  61.     if (t == null) {
  62.       return null;
  63.     }
  64.     if (t instanceof IOException) {
  65.       return (IOException) t;
  66.     }
  67.     return findIOException(t.getCause());
  68.   }
  69.   public Client getDelegate() {
  70.     return this.delegate;
  71.   }
  72.   private FeignLoadBalancer lbClient(String clientName) {
  73.     return this.lbClientFactory.create(clientName);
  74.   }
  75.   static class FeignOptionsClientConfig extends DefaultClientConfigImpl {
  76.     FeignOptionsClientConfig(Request.Options options) {
  77.       setProperty(CommonClientConfigKey.ConnectTimeout,
  78.           options.connectTimeoutMillis());
  79.       setProperty(CommonClientConfigKey.ReadTimeout, options.readTimeoutMillis());
  80.     }
  81.     @Override
  82.     public void loadProperties(String clientName) {
  83.     }
  84.     @Override
  85.     public void loadDefaultValues() {
  86.     }
  87.   }
  88. }
复制代码
在动态代理调用的那里我们得出一个结论,那就是最后会调用Client接口的execute方法的实现,所以我们就看一下execute方法的实现,这里就是一堆操作,从请求的URL中拿到了clientName,也就是服务名。 
为什么可以拿到服务名?
其实很简单,OpenFeign构建动态代理的时候,传入了一个HardCodedTarget,当时说在构建HardCodedTarget的时候传入了一个url,那个url当时说了其实就是http://服务名,所以到这里,虽然有具体的请求接口的路径,但是还是类似 http://服务名/api/sayHello这种,所以可以通过路径拿到你锁请求的服务名。
拿到服务名之后,再拿到了一个配置类IClientConfig,最后调用lbClient,我们看一下lbClient的方法实现。
  1. private FeignLoadBalancer lbClient(String clientName) {
  2.     return this.lbClientFactory.create(clientName);
  3. }
复制代码
就是调用CachingSpringLoadBalancerFactory的create方法
  1. public FeignLoadBalancer create(String clientName) {
  2.     FeignLoadBalancer client = this.cache.get(clientName);
  3.     if (client != null) {
  4.       return client;
  5.     }
  6.     IClientConfig config = this.factory.getClientConfig(clientName);
  7.     ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
  8.     ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,
  9.         ServerIntrospector.class);
  10.     client = this.loadBalancedRetryFactory != null
  11.         ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
  12.             this.loadBalancedRetryFactory)
  13.         : new FeignLoadBalancer(lb, config, serverIntrospector);
  14.     this.cache.put(clientName, client);
  15.     return client;
  16.   }
复制代码
创建的过程就是从每个服务对应的容器中获取到IClientConfig和ILoadBalancer。Ribbon那篇文章都讲过这些核心类,这里不再赘述。
默认就是创建不带spring重试功能的FeignLoadBalancer,放入缓存,最后返回这个FeignLoadBalancer。所以第一次来肯定没有,需要构建,也就是最终一定会返回FeignLoadBalancer,所以我们通过lbClient方法拿到的是FeignLoadBalancer。从这里可以看出CachingSpringLoadBalancerFactory是构建FeignLoadBalancer的工厂类,只不过先从缓存中查找,找不到再创建FeignLoadBalancer。 
拿到FeignLoadBalancer之后就会调用executeWithLoadBalancer,接收到Response之后直接返回。
三、FeignLoadBalancer

那么这个FeignLoadBalancer又是啥呢?这里放上FeignLoadBalancer核心源码。
  1. public class FeignLoadBalancer extends
  2.     AbstractLoadBalancerAwareClient<FeignLoadBalancer.RibbonRequest, FeignLoadBalancer.RibbonResponse> {
  3.   private final RibbonProperties ribbon;
  4.   protected int connectTimeout;
  5.   protected int readTimeout;
  6.   protected IClientConfig clientConfig;
  7.   protected ServerIntrospector serverIntrospector;
  8.   public FeignLoadBalancer(ILoadBalancer lb, IClientConfig clientConfig,
  9.       ServerIntrospector serverIntrospector) {
  10.     super(lb, clientConfig);
  11.     this.setRetryHandler(RetryHandler.DEFAULT);
  12.     this.clientConfig = clientConfig;
  13.     this.ribbon = RibbonProperties.from(clientConfig);
  14.     RibbonProperties ribbon = this.ribbon;
  15.     this.connectTimeout = ribbon.getConnectTimeout();
  16.     this.readTimeout = ribbon.getReadTimeout();
  17.     this.serverIntrospector = serverIntrospector;
  18.   }
  19.   @Override
  20.   public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
  21.       throws IOException {
  22.     Request.Options options;
  23.     if (configOverride != null) {
  24.       RibbonProperties override = RibbonProperties.from(configOverride);
  25.       options = new Request.Options(override.connectTimeout(this.connectTimeout),
  26.           override.readTimeout(this.readTimeout));
  27.     }
  28.     else {
  29.       options = new Request.Options(this.connectTimeout, this.readTimeout);
  30.     }
  31.     Response response = request.client().execute(request.toRequest(), options);
  32.     return new RibbonResponse(request.getUri(), response);
  33.   }
  34. }
复制代码
FeignLoadBalancer继承自AbstractLoadBalancerAwareClient,AbstractLoadBalancerAwareClient又是啥玩意?看过我写的关于Ribbon核心组件已经运行原理的那篇文章小伙伴肯定知道,AbstractLoadBalancerAwareClient类主要作用是通过ILoadBalancer组件获取一个Server,然后基于这个Server重构了URI,也就是将你的请求路径http://服务名/api/sayHello转换成类似http://192.168.1.101:8088/api/sayHello这种路径,也就是将原服务名替换成服务所在的某一台机器ip和端口,替换之后就交由子类实现的exceut方法来发送http请求。
所以我们知道调用executeWithLoadBalancer之后,就会重构请求路径,将服务名替换成某个具体的服务器所在的ip和端口,之后交给子类execute来处理,对于这里来说,也就是FeignLoadBalancer的execute方法,因为FeignLoadBalancer继承AbstractLoadBalancerAwareClient。
直接定位到execute方法最核心的一行代码
  1. Response response = request.client().execute(request.toRequest(), options);
复制代码
request.client()就会拿到构建LoadBalancerFeignClient传入的那个Client的实现,我提到过,这个Client的实现是具体发送请求的实现,默认的就是Client.Default类(不是默认就有可能是基于HttpClient或者是OkHttp的实现)。所以这行代码就是基于这个Client就成功的发送了Http请求,拿到响应,然后将这个Response 封装成一个RibbonResponse返回,最后就返回给MethodHandler,然后解析响应,封装成方法的返回值返回给调用者。
 
好了,其实到这里就完全知道Feign是如何整合Ribbon的,LoadBalancerFeignClient其实是OpenFeign适配Ribbon的入口,FeignLoadBalancer才是真正实现选择负载均衡,发送http请求的组件,因为他继承了AbstractLoadBalancerAwareClient。
 
为了大家能够清楚的知道整个动态代理的调用过程,我在Ribbon的那张图的基础上,加上Feign的调用链路。

 
 
 
通过这张图,我们可以清楚地看出OpenFeign、Ribbon以及注册中心之间的协同关系。
四、总结 

到这里,我通过三篇文章,算上Nacos那两篇,总共五篇文章完整的讲述了在微服务架构中,OpenFeign、Ribbon、Nacos(当然其它注册中心也可以)这三个组件协同工作的核心源码和流程。这里我再用简洁的话来总结一下他们的协同工作原理,OpenFeign在进行rpc调用的时候,由于不知道服务具体在哪台机器上,所以需要Ribbon这个负载均衡组件从服务所在的机器列表中选择一个,Ribbon中服务所在的机器列表是从注册中心拉取的,Ribbon提供了一个ServerList接口,注册中心实现之后,Ribbon就可以获取到服务所在的机器列表,这就是这三个组件最基本的原理。希望通过这五篇文章,小伙伴们可以对微服务架构的最基本的原理有一定的了解,同时也对OpenFeign、Ribbon、Nacos源码有一定的认识。
 
往期热门文章推荐
扫码或者搜索关注公众号 三友的java日记 ,及时干货不错过,公众号致力于通过画图加上通俗易懂的语言讲解技术,让技术更加容易学习。 



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4