ToB企服应用市场:ToB评测及商务社交产业平台

标题: 通过自界说feignclient 的LoadBalancerFeignClient实现灵活的负载均衡计谋 [打印本页]

作者: 羊蹓狼    时间: 2024-12-1 15:11
标题: 通过自界说feignclient 的LoadBalancerFeignClient实现灵活的负载均衡计谋
通过自界说feignclient 的LoadBalancerFeignClient 或IRule 能实现完全自界说的负载均衡计谋,本文重要是通过实现自界说的LoadBalancerFeignClient而达到自界说的负载均衡计谋
示例代码实现如下:
  1. package cn.zuowenjun.demo;
  2. import com.netflix.loadbalancer.Server;
  3. import feign.Client;
  4. import feign.Request;
  5. import feign.Response;
  6. import org.apache.commons.collections4.MapUtils;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.springframework.cloud.netflix.feign.FeignClient;
  10. import org.springframework.cloud.netflix.feign.ribbon.CachingSpringLoadBalancerFactory;
  11. import org.springframework.cloud.netflix.feign.ribbon.LoadBalancerFeignClient;
  12. import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
  13. import org.springframework.context.annotation.Bean;
  14. import org.springframework.util.CollectionUtils;
  15. import org.springframework.web.bind.annotation.RequestBody;
  16. import org.springframework.web.bind.annotation.RequestMapping;
  17. import org.springframework.web.bind.annotation.RequestMethod;
  18. import java.io.IOException;
  19. import java.net.URL;
  20. import java.util.*;
  21. import java.util.stream.Collectors;
  22. /**
  23. * FeignClient 服务BEAN(含自定义负载均衡请求机制),注意这里指定了configuration的类型
  24. */
  25. @FeignClient(value ="zuowenjun-demo" , configuration = {DemoProviderDispatchService.Config.class})
  26. public interface DemoProviderDispatchService {
  27.     Logger LOGGER = LoggerFactory.getLogger(FileFmsDispatchService.class);
  28.     //要RPC请求的接口,需要自定义负载均衡
  29.     @RequestMapping(value = "/fileContent/compare", method = RequestMethod.POST)
  30.     ResponseData<Integer> compare(@RequestBody FileBillCompareBO billCompareBO);
  31.     /**
  32.      * DemoProviderDispatchService 专用的配置类,在这个配置类里面添加的BEAN均可替换全局默认的BEAN
  33.      * 注意:此处不能加上@Configuration,否则将变成全局配置了
  34.      */
  35.     static class Config {
  36.         /**
  37.          * 为FileFmsDispatchClient 重新定义专用的Client,在里面实现自定义的URL请求
  38.          *
  39.          * @param cachingFactory
  40.          * @param clientFactory
  41.          * @return
  42.          */
  43.         @Bean
  44.         public Client requestClient(CachingSpringLoadBalancerFactory cachingFactory, SpringClientFactory clientFactory) {
  45.             //获取当前节点的ID与端口
  46.             String currentIpAndPort = CommonUtils.getCurrentIpAndPort();
  47.             //构建返回一个完全自定义的LoadBalancerFeignClient
  48.             return new LoadBalancerFeignClient(new Client.Default(null, null) {
  49.                 @Override
  50.                 public Response execute(Request request, Request.Options options) throws IOException {
  51.                     //通过重新创建Request,将Request 中的url指向自定义负载均衡的选中的URL
  52.                     Request newRequest = reCreateRequestForLoadBalance(request);
  53.                     if (newRequest == null) {
  54.                         return Response.builder().reason("服务器繁忙,当前无可用服务节点").status(204).build();
  55.                     }
  56.                     return super.execute(newRequest, options);
  57.                 }
  58.                 private Request reCreateRequestForLoadBalance(Request request) throws IOException {
  59.                     URL url = new URL(request.url());
  60.                     //获取可用的服务实例节点列表
  61.                     List<Server> upServers = clientFactory.getLoadBalancer(“zuowenjun-demo”).getReachableServers();
  62.                     Server bestServer = choose(url, upServers, url.getFile());
  63.                     if (bestServer == null) {
  64.                         //找不到最佳可用服务器节点,说明所有服务节点都压力山大
  65.                         return null;
  66.                     }
  67.                     url = new URL(url.getProtocol(), bestServer.getHost(), bestServer.getPort(), url.getFile());
  68.                     return Request.create(request.method(), url.toString(), request.headers(), request.body(), request.charset());
  69.                 }
  70.                 /**
  71.                  * 选择最优的服务节点
  72.                  * @param url
  73.                  * @param upServers
  74.                  * @param apiPath
  75.                  * @return
  76.                  */
  77.                 private Server choose(URL url, List<Server> upServers, String apiPath) {
  78.                     if (CollectionUtils.isEmpty(upServers)) {
  79.                         throw new ApplicationException(500, "从注册中心获取不到可用的服务节点信息");
  80.                     }
  81.                     apiPath=apiPath.startsWith("/")?apiPath.substring(1):apiPath;
  82.                     String hashKey = Constants.LOADBALANCE_API_PREFIX + apiPath.replace("/", "_");
  83.                     Boolean existRequest = RedisUtils.existHashKey(hashKey, String.format("%s:%s", url.getHost(), url.getPort()));
  84.                     Server bestServer = null;
  85.                     if (!Boolean.TRUE.equals(existRequest)) {
  86.                         //如果当前即将请求的URL的节点之前没有缓存标记请求处理中时,则可直接复用返回
  87.                         bestServer = upServers.stream().filter(s -> s.getHost().equals(url.getHost()) && s.getPort() == url.getPort()).findFirst().orElse(null);
  88.                         if (bestServer != null) {
  89.                             return bestServer;
  90.                         }
  91.                     }
  92.                     //先从缓存中找出当前API 的请求中的节点列表
  93.                     Map<Object, Object> existRequestMap = RedisUtils.getHashEntries(hashKey);
  94.                     Set<Object> existRequestIpAndPorts = new HashSet<>();
  95.                     if (MapUtils.isNotEmpty(existRequestMap)) {
  96.                         existRequestIpAndPorts.addAll(existRequestMap.keySet());
  97.                     }
  98.                     //排除API请求中的节点,保留空闲节点列表
  99.                     upServers = upServers.stream().filter(s -> !existRequestIpAndPorts.contains(s.getHostPort()) && !s.getHostPort().equals(currentIpAndPort)).collect(Collectors.toList());
  100.                     if (CollectionUtils.isEmpty(upServers)) {
  101.                         //说明当前所有节点全部都有处理请求中,无空闲节点
  102.                         return null;
  103.                     }
  104.                     //从空闲节点列表中随机返回一个节点
  105.                     int rndNo = new Random().nextInt(upServers.size());
  106.                     bestServer = upServers.get(rndNo);
  107.                     LOGGER.debug("DemoProviderDispatchService.Config.requestClient.Client#choose {}", bestServer.getHostPort());
  108.                     return bestServer;
  109.                 }
  110.             }, cachingFactory, clientFactory);
  111.         }
  112.     }
  113. }
复制代码
调用时就正常注入DemoProviderDispatchService BEAN,并使用:demoProviderDispatchService.compare(...) 即可实现在自界说负载均衡的计谋下请求远程服务API,这种自界说的负载均衡计谋可以满足特定的性能要求

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4