马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
目次
媒介
1.原理分析
1.1 ReactiveLoadBalancerClientFilter源码分析
1.2 LoadBalancerClientFactory源码分析
2.代码实现
2.1 扩展原生RoundRobinLoadBalancer轮询策略
2.1.1 自界说实现RoundRobinLoadBalancer
2.1.2 设置自界说的RoundRobinLoadBalancer
2.2 扩展原生RandomLoadBalancer随机策略
2.2.1 自界说实现RandomLoadBalancer
2.2.2 设置自界说的RandomLoadBalancer
2.3 动态绑定client和loadBalancer并注册到LoadBalancerClientFactory
媒介
工作和爱好的使然,由于必要对各种开源的项目做一些自界说的插件以及扩展,所以会经常研究一些开源组件的源码。恰好前段阵子公司内部计划举行产物依赖版本升级,springcloud升级到2021.0.6,spring boot升级到2.7.11了,天然spring cloud gateway就随之升级到了3.1.6,带来的问题就是gateway内部组件的大调解和更新,比如scg在Hoxton.M2 RELEASED版本之前,内部的负载均衡组件不停用的都是ribbon,现在新版本的cloud包括boot内部的负载均衡组件同一用了spring自己的loadbalancer,那么我们开发的针对于老版本的一些插件和扩展就不能用了,必要对新的组件举行适配了,本人恰好负责这次的版本升级工作,所以想空闲时间就把这些升级的东西和过程写下来。都是本人手搓的一手代码,创作不易,望诸君高台贵手,点赞支持。
1.原理分析
直接进入主题,本文重要核心有两点:
- 针对scg的loadbalancer组件做自界说扩展,官方内置的负载策略目前只有两种:RandomLoad和RoundLoad,那么我们在实际业务中这两种大概满足不了业务需求,例如:使用轮询策略RoundRobinLoadBalancer会每次轮询访问目标服务实例,但是有大概业务上会有一些自己业务的判定逻辑,比如我的下游目标服务起了5个实例,每次轮询都会在这五个实例内里公平的循环轮询,现在公司说service端必要升级,但是又不想强制要client端升级才气使用,这样客户才会比较好轻易接受,所以现在把其中3个实例升级到最新版本,剩下的两个实例还是老版本,那么在网关这里假如还是使用的老的轮询策略,就会出现一个问题:假如当前请求是来自老的客户端,那么假如轮询到了新版本实例上去了大概新客户端请求轮询到老的版本实例上去了,就有大概会出现一系列问题,这个时间就必要在轮询的时间加一些自己业务的判定上去,好让实例choose能满足业务需求。
- 实现一个动态设置服务负载均衡策略的逻辑:就比如在scg内里,服务a使用轮询,服务b使用随机,运行了一段时间后,想把服务a的负载均衡策略改成随机大概是其他自界说的策略,无需改代码以及启停服务,直接动态修改设置热更新生效
1.1 ReactiveLoadBalancerClientFilter源码分析
查看scg的源码我们不难发现,scg内部负载均衡的核心逻辑都是由一个GlobalFilter来实现的,这个全局filter叫:ReactiveLoadBalancerClientFilter,查看这个filter的代码我们可以看到它有个核心方法choose(),这个方法的作用就是根据当前所必要路由的serviceId来选择对应的loadBalancer,然后通过loadBalancer来选择出终极要路由到的serviceInstance,那么通过这个方法我们能抓到两个核心要素:
1.拿到路由目标service对应的loadBalancer;
2.通过loadBalancer来choose出终极要路由的实例serviceInstance。
颠末上面分析,我们就能明白了,实现自界说loadBalancer的核心逻辑就是:
1.自界说自己的ReactorLoadBalancer;
2.在自界说的ReactorLoadBalancer内里实现自己的choose逻辑;
3.动态将serviceId和自己的loadBalancer绑定并注册到LoadBalancerClientFactory(由代码this.clientFactory.getInstance()可知)中去。
1.2 LoadBalancerClientFactory源码分析
spring cloud loadBalancer是怎么实现每个client绑定自己的设置的呢,官方提供了两个注解:@LoadBalancerClient和@LoadBalancerClients,首先我们来看@LoadBalancerClient的源码,可以看到@LoadBalancerClient提供了三个参数:name、value、configuration[],看注解就知道name、value就是设置客户端的名称(客户端名称我们就可以理解为一个应用的applicationName,因为在实际负载中都是用服务名作为clientId、serviceId),configuration就是客户端对应的自界说的设置,包括负载策略的设置、健康检查策略的设置等等,这里我们只必要关注负载策略的自界说就好,其他的用默认就行,假如有需求也可以都自界说。
接下来我们看@LoadBalancerClients源码
可以发现有个@Import注解,我们打开Import内里的LoadBalancerClientConfigurationRegister类,发现它实现了ImportBeanDefinitionRegister接口并重写了registerBeanDefinitions方法,我们重点看下这个方法,发现它内里干了3件事:
1.获取所有的@LoadBalancerClients注解的元数据,拿到代码里LoadBalancerClients注解的设置,然后根据设置举行loadBalancerClientConfiguration的绑定;
2.假如第一步内里@LoadBalancerClients内里假如设置的是defaultConfiguration,那么就用默认的设置举行绑定;
3.获取所有的@LoadBalancerClient注解的元数据,然后同样的拿到代码LoadBalancerClient注解的设置,然后根据设置举行loadBalancerClientConfiguration的绑定
那我们看下具体是怎么绑定的,看源码:
本质就是构建一个LoadBalancerClientSpecification bean,这个spec就是每个客户端自界说负载策略的核心bean,我们看下它的源码,不难发现,它有两个属性:name、configuration[],是不是发现似曾相识,就是@LoadBalancerClient注解的两个属性,所以终极就是为了装配这个bean,构建完ben之后,那俩注解的作用就完成了,接下来我们在继续看下spring是怎样实现前面我们所讲的:在路由时,是怎样通过serviceId获取到这个客户端设置的。
接下来我们继续,所有的设置动作已经解析完成了,那么就是将设置交给spring容器了,spring是怎样加载这个LoadBalancerClientSpecification的呢,我们跟随设置:spring.cloud.loadbalancer.ebabled会发现有个自动装配类:LoadBalancerAutoConfiguration,查看这个设置类源码我们可以发现,有参构造的参数就是LoadBalancerClientSpecification,然后再举行初始化bean:LoadBalancerClientFactory这个loadBalancer的核心factory。
查看LoadBalancerClientFactory的源码我们可以看到它继续了:NamedContextFactory,那么这个作用是什么呢:子容器之间的数据隔离。NamedContextFactory的作用是创建一个子容器(子上下文context),然后每个子容器通过LoadBalancerClientSpecification来界说客户端容器name以及数据设置。我们回到开头所讲的ReactiveLoadBalancerClientFilter这个filter,在请求进来的时间通过LoadBalancerClientFactory拿serviceId去获取这个客户端对应的loadBalancer,跟进源码,我们发现它终极调用的是NamedContextFactory的getInstance()方法,然后调用getContext(),重要逻辑就是通过serviceId去获取子容器,假如没有那么就创建一个新的子容器(子上下文),查看源码我们不难发现,新的子容器name就是用的serviceId,然后再拿到对应的LoadBalancerClientSpecification来注册到子容器中去;所以到这里,再回到上面,我们看LoadBalancerAutoConfiguration中的LoadBalanceClientrFactory的初始化就干了一件事:将所有的客户端的设置LoadBalancerClientSpecification注册到NamedContextFactory中去;然后随着请求过来时,拿到已经注册好的LoadBalancerClientSpecification对当前请求的客户端举行子上下文的初始化。
createContext()源码截图:
到此,颠末上述简单的源码分析,那么原理和实现方案我们就已经大抵明白了,接下来就直接上代码来验证。
2.代码实现
想要实现自界说负载策略,首先必要实现官方接口:ReactorServiceInstanceLoadBalancer ,查看代码我们会发现这个接口是spring loadBalancer官方提供的接口,所有的策略都必要实现它,所以我们自界说的策略也不破例,具体代码如下:
2.1 扩展原生RoundRobinLoadBalancer轮询策略
2.1.1 自界说实现RoundRobinLoadBalancer
- package com.primeton.gateway.core.lb;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.beans.factory.ObjectProvider;
- import org.springframework.cloud.client.ServiceInstance;
- import org.springframework.cloud.client.loadbalancer.DefaultResponse;
- import org.springframework.cloud.client.loadbalancer.EmptyResponse;
- import org.springframework.cloud.client.loadbalancer.Request;
- import org.springframework.cloud.client.loadbalancer.Response;
- import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
- import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
- import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
- import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
- import reactor.core.publisher.Mono;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.Random;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * @Description 轮询
- * @Author wx
- * @Date 2023/5/26
- */
- public class GatewayRoundLoadBalancer implements ReactorServiceInstanceLoadBalancer {
- private static final Log log = LogFactory.getLog(GatewayRoundLoadBalancer.class);
- final AtomicInteger position;
- private final String serviceId;
- private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
- public GatewayRoundLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
- String serviceId) {
- this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));
- }
- public GatewayRoundLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
- String serviceId,
- int seedPosition) {
- this.serviceId = serviceId;
- this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
- this.position = new AtomicInteger(seedPosition);
- }
- @Override
- public Mono<Response<ServiceInstance>> choose(Request request) {
- ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
- .getIfAvailable(NoopServiceInstanceListSupplier::new);
- return supplier.get(request).next()
- .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, request));
- }
- private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
- List<ServiceInstance> serviceInstances,
- Request request) {
- Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances, request);
- if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
- ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
- }
- return serviceInstanceResponse;
- }
- private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances,
- Request request) {
- if (instances.isEmpty()) {
- if (log.isWarnEnabled()) {
- log.warn("No servers available for service: " + serviceId);
- }
- return new EmptyResponse();
- }
- // Do not move position when there is only 1 instance, especially some suppliers
- // have already filtered instances
- if (instances.size() == 1) {
- return new DefaultResponse(instances.get(0));
- }
- List<ServiceInstance> useInstances = customChoose(instances, request);
- int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
- ServiceInstance instance = useInstances.get(pos % useInstances.size());
- return new DefaultResponse(instance);
- }
- /**
- * 自定义instances choose出满足业务请求的实例,然后按照轮询策略来从
- * 剩下的满足业务需求的实例列表选出最终的实例
- */
- private List<ServiceInstance> customChoose(List<ServiceInstance> instances, Request request) {
- //todo 比如根据request中的参数来筛选、 或者筛选出实例元数据中含有某些符合参数的实例 等等
- List<ServiceInstance> use = new ArrayList<>();
- for (ServiceInstance instance : instances) {
- Map<String, String> metadata = instance.getMetadata();
- if (metadata.containsKey("xxx")) use.add(instance);
- }
- return use;
- }
- }
复制代码 2.1.2 设置自界说的RoundRobinLoadBalancer
- package com.primeton.gateway.core.lb;
- import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
- import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
- import org.springframework.context.annotation.Bean;
- import org.springframework.core.env.Environment;
- /**
- * @Description TODO
- * @Author wx
- * @Date 2024/6/13
- */
- public class GatewayRoundLoadBalancerConfiguration {
- @Bean
- public GatewayRoundLoadBalancer gatewayRoundLoadBalancer(Environment environment,
- LoadBalancerClientFactory loadBalancerClientFactory) {
- String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
- return new GatewayRoundLoadBalancer(
- loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
- }
- }
复制代码 2.2 扩展原生RandomLoadBalancer随机策略
2.2.1 自界说实现RandomLoadBalancer
- package com.primeton.gateway.core.lb;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.beans.factory.ObjectProvider;
- import org.springframework.cloud.client.ServiceInstance;
- import org.springframework.cloud.client.loadbalancer.DefaultResponse;
- import org.springframework.cloud.client.loadbalancer.EmptyResponse;
- import org.springframework.cloud.client.loadbalancer.Request;
- import org.springframework.cloud.client.loadbalancer.Response;
- import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
- import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
- import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
- import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
- import reactor.core.publisher.Mono;
- import java.util.List;
- import java.util.concurrent.ThreadLocalRandom;
- /**
- * @Description 随机
- * @Author wx
- * @Date 2023/5/26
- */
- public class GatewayRandomLoadBalancer implements ReactorServiceInstanceLoadBalancer {
- private static final Log log = LogFactory.getLog(GatewayRandomLoadBalancer.class);
- private final String serviceId;
- private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
- public GatewayRandomLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
- String serviceId) {
- this.serviceId = serviceId;
- this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
- }
- @Override
- public Mono<Response<ServiceInstance>> choose(Request request) {
- ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
- .getIfAvailable(NoopServiceInstanceListSupplier::new);
- return supplier.get(request).next()
- .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, request));
- }
- private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
- List<ServiceInstance> serviceInstances,
- Request request) {
- Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances, request);
- if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
- ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
- }
- return serviceInstanceResponse;
- }
- private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances,
- Request request) {
- if (instances.isEmpty()) {
- if (log.isWarnEnabled()) {
- log.warn("No servers available for service: " + serviceId);
- }
- return new EmptyResponse();
- }
- List<ServiceInstance> useInstances = customChoose(instances, request);
-
- int index = ThreadLocalRandom.current().nextInt(useInstances.size());
- ServiceInstance instance = useInstances.get(index);
- return new DefaultResponse(instance);
- }
- //todo 节合实际业务来筛选
- private List<ServiceInstance> customChoose(List<ServiceInstance> instances, Request request) {
- return instances;
- }
- }
复制代码 2.2.2 设置自界说的RandomLoadBalancer
- package com.primeton.gateway.core.lb;
- import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
- import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
- import org.springframework.context.annotation.Bean;
- import org.springframework.core.env.Environment;
- /**
- * @Description TODO
- * @Author wx
- * @Date 2024/6/13
- */
- public class GatewayRandomLoadBalancerConfiguration {
- @Bean
- public GatewayRandomLoadBalancer gatewayRandomLoadBalancer(Environment environment,
- LoadBalancerClientFactory loadBalancerClientFactory) {
- String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
- return new GatewayRandomLoadBalancer(
- loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
- }
- }
复制代码 2.3 动态绑定client和loadBalancer并注册到LoadBalancerClientFactory
看完前面的原理分析,那我们就明白要实现client动态绑定loadBalancer并注册到LoadBalancerClientFactory去,要做的就是两件事:1.根据业务设置对每个客户端举行LoadBalancerClientSpecification组装;2.组装好LoadBalancerClientSpecification注册到LoadBalancerClientFactory也就是NamedContextFactory上下文内里去。具体实现如下:
- package com.primeton.gateway.core.lb;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClientSpecification;
- import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.core.env.ConfigurableEnvironment;
- import org.springframework.core.env.EnumerablePropertySource;
- import org.springframework.core.env.PropertySource;
- import javax.annotation.PostConstruct;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- /**
- * @Description TODO
- * @Author wx
- * @Date 2023/5/26
- */
- @Configuration
- public class GatewayLoadBalancerConfiguration {
- private static final String SUFFIX = ".loadbalancer.LoadBalancer-configuration-class-name";
- @Autowired
- private ConfigurableEnvironment env;
- @Autowired
- private LoadBalancerClientFactory loadBalancerClientFactory;
- @PostConstruct
- public void postConstruct() {
- //第一步:解析业务配置,从而解析出来每个客户端对应的负载策略配置
- HashMap<String, String> configs = new HashMap<>();
- for (PropertySource<?> propertySource : env.getPropertySources()) {
- if (propertySource instanceof EnumerablePropertySource) {
- for (String name : ((EnumerablePropertySource) propertySource).getPropertyNames()) {
- if (name != null && name.endsWith(SUFFIX)) {
- configs.put(name, env.getProperty(name));
- }
- }
- }
- }
- //第二步:组装Specification并绑定到spring上下文中去
- List<LoadBalancerClientSpecification> configurations = new ArrayList<>();
- for (String clientId : configs.keySet()) {
- String id = clientId.substring(0, clientId.length() - SUFFIX.length());
- try {
- Class<?>[] classes = {Class.forName(configs.get(clientId))};
- LoadBalancerClientSpecification specification = new LoadBalancerClientSpecification();
- specification.setName(id);
- specification.setConfiguration(classes);
- configurations.add(specification);
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
- loadBalancerClientFactory.setConfigurations(configurations);
- }
- }
复制代码 上面代码我做一下简单的思绪形貌:我们可以把上面代码分为两部门:
1.第一部门我是自界说了一个设置规则:serviceId..loadbalancer.LoadBalancer-configuration-class-name=xxxxx (全路径);比如:DEMO01..loadbalancer.LoadBalancer-configuration-class-name=com.primeton.gateway.core.lb.GatewayRoundLoadBalancerConfiguration(这个设置类看2.1.2章节),这就表现客户端应用DEMO01的负载均衡策略就是我在2.1.2自界说的轮询策略,所有必要路由到DEMO01的请求都要走我们2.1.1内里的逻辑举行choose()筛选出终极要路由的实例,这个设置可以放在设置文件,也可以放在其他地方;
2.第二部门就是针对所有的客户端负载策略设置举行组装spring loadBalancer必要的LoadBalancerClientSpecification,然后终极模拟源码内里的绑定动作将我们自己组装好的数据设置到NamedContextFactory上下文中去。
到此我们自界说客户端负载均衡策略方案就实现了,但是还差末了一步:怎么动态更新呢,比如现在DEMO01我们设置的是我们写的GatewayRoundLoadBalancerConfiguration,我们想要将它换成GatewayRandomLoadBalancerConfiguration 随机策略,由于时间问题,我这里就给大家出个方案,具体实现我就不写了,偶然间再给大家写:
方案1:结合设置中心:nacos、Apollo等做设置热更新,监听nacos、Apollo设置,当设置有变化时,nacos、Apollo服务端都会发送通知,你只必要在代码里创建一个listener,然后针对我们上面界说的key,然后把上面代码的步调再走一遍就行了。
方案2:将设置更新做成接口化,gateway写个controller专门用来管理设置,然后设置有变化通过调用gateway对应接口来通知gateway举行更新,末了再走一遍上面代码即可
方案3:结合redis,设置数据存在redis内里,利用redis的键空间监听通知监听这个设置,当设置有改动的时间redis会发出通知,我们在gateway内里监听好,然后举行上面的代码即可
固然还有许多其他方案,核心实现已经分享给大家了,剩下的就看诸位结合自身业务接纳什么样的设置方案了。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |