分析sentinel的限流和熔断

打印 上一主题 下一主题

主题 985|帖子 985|积分 2955

前言

平时发起一个请求,在体系内部微服务之间调用就会形成一条调用链路,那对于每个服务要起到掩护的作用,怎样掩护呢?本文就来做一个先容,笔者平时是使用sentinel居多,是通过其提供的限流和熔断的措施来掩护的,于是文原来分析sentinel的限流和熔断;
先先容雪崩问题:
比方a1服务——》b服务——》c服务;
a2服务——》b服务——》c服务;
a3服务——》b服务;
d服务——》a3服务;
在a1服务调用b时,往下调用c服务,因服务c故障,会占用着b服务连接至超时时间,此时又会有服务a2调b服务,再调c服务,会因服务c故障,继续堆积连接在b服务上至连接超时,堆积到服务b性能瓶颈也会导致服务b故障,此时a3服务调用b服务也会继续导致a3服务堆积连接,依次导致所有服务故障,这就是雪崩
市面上解决雪崩的方案:

流量控制是限流的,用于掩护下游服务,然后别的三种是下游发生了故障的前提下,掩护上游服务的方案,其中舱壁模式,也是线程隔离,下游发生故障时,是为调上游的多个服务,建立属于它们的线程池,仍然会调下游,如许就会占用着线程;而熔断降级则不会,下游有问题,比方上游根据调用下游的失败比例、失败数来决定触发熔断;
所以说流量控制避免雪崩,有了流量控制就不会发生雪崩,而别的三种则解决雪崩问题;
再先容限流与熔断的概念:
限流是掩护当前服务,提前对服务做了qps限流掩护;
熔断则是某个服务因没有限流发生了故障,然后上游调用该服务时,不会再调用了,而是在上游定义了关于该服务的降级逻辑,起到掩护上游服务的作用,避免因为该服务的故障,导致雪崩问题;
那限流、熔断、雪崩之间的关系?
更像是因为没有限流容易导致服务提供方问题,然后服务消耗方用熔断来掩护自己,从而避免雪崩;
源码分析

在 Sentinel 内里,所有的资源都对应一个资源名称以及一个 Entry。Entry 可以通过对主流框架的适配主动创建,也可以通过注解的方式或调用 API 显式创建;每一个 Entry 创建的时间,同时也会创建一系列功能插槽(slot chain);
比方服务a——》服务b——》服务c,每个服务就是一个掩护资源,默认是一个controller一个掩护资源,在一个服务里其实可以有多个掩护资源,用SentinelResource注解标识controller今后调的一个个service层的方法即可,sentinel对于掩护资源的调用链路是如下图:

调用链路就是这8个slot,简朴来说就是统计访问的qps,判断是否达到触发限流(FlowSlot)或熔断(DegradeSlot)阈值,每个slot抛出blockExeception,就代表访问停止了,就不会触发今后的slot调用,它是使用责任链模式调用的。

SentinelResource的切面,主要是调SphU.entry方法
  1. //该方法是查找slot链,上面说的slot责任链就是指这个链
  2. com.alibaba.csp.sentinel.CtSph#entryWithPriority
  3. ProcessorSlot<Object> chain = this.lookProcessChain(resourceWrapper);
  4. //lookProcessChain方法:第一次访问资源时,使用copyOnWrite的方式添加到map中,后面就从map获取即可
  5. ProcessorSlotChain chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);
  6.         if (chain == null) {
  7.             synchronized(LOCK) {
  8.                 chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);
  9.                 if (chain == null) {
  10.                     if (chainMap.size() >= 6000) {
  11.                         return null;
  12.                     }
  13.                     chain = SlotChainProvider.newSlotChain();
  14.                     Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap(chainMap.size() + 1);
  15.                     newMap.putAll(chainMap);
  16.                     newMap.put(resourceWrapper, chain);
  17.                     chainMap = newMap;
  18.                 }
  19.             }
  20.         }
  21. //创建的slotChain是通过spi的方式从METAINFO目录获取所有的slot
  22.     public ProcessorSlotChain build() {
  23.         ProcessorSlotChain chain = new DefaultProcessorSlotChain();
  24.         List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
  25.         Iterator var3 = sortedSlotList.iterator();
  26.         while(var3.hasNext()) {
  27.             ProcessorSlot slot = (ProcessorSlot)var3.next();
  28.             if (!(slot instanceof AbstractLinkedProcessorSlot)) {
  29.                 RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain", new Object[0]);
  30.             } else {
  31.                     //获取的每个slot形成一个AbstractLinkedProcessorSlot链表,因为每个slot继承了该类
  32.                 chain.addLast((AbstractLinkedProcessorSlot)slot);
  33.             }
  34.         }
  35.         return chain;
  36.     }
  37. //创建完slotChain后,就开始执行每个slot
  38. chain.entry(context, resourceWrapper, (Object)null, count, prioritized, args);
  39. public class DefaultProcessorSlotChain extends ProcessorSlotChain {
  40.     AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
  41.             //这里是第一个slot,从这里开始调用
  42.         public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
  43.             super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
  44.         }
  45.         public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
  46.             super.fireExit(context, resourceWrapper, count, args);
  47.         }
  48.     };
  49. }
  50. //调用下一个slot是通过当前slot的next属性(就是下一个slot元素)
  51.     public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
  52.         if (this.next != null) {
  53.             this.next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
  54.         }
  55.     }
复制代码
这里详细先容了slot调用链路的过程,本文主要是关注限流和熔断,所以只需看FlowSlot和DegradeSlot即可;
滑动窗口源码

是用StatisticSlot统计访问资源的线程数、qps。
滑动窗口的原理:时间窗口设定qps阈值,先看看固定时间窗的缺点,一个时间窗口内是不会超出阈值,但一个时间窗的后半个时间窗和下一个时间窗的前半个时间窗加起来会超过阈值。于是将一个时间窗分别为多个时间样本,比方1s的时间窗分别为5个样本时间窗(每个200ms),纵然统计到一个时间窗的后半个时间窗和后一个时间窗的前半个时间窗时,会统计每个样本时间窗来是否达到阈值,从而解决该问题;
   留意:样本时间窗分别得越小,统计得越正确;
  1. com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry
  2.                         //访问链表所有的slot
  3.             this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
  4.             //统计线程数
  5.             node.increaseThreadNum();
  6.             //统计qps
  7.             node.addPassRequest(count);
复制代码
这里有以为希奇吗?先是访问所有的slot是否通事后(任何一个slot都不抛出BlockException),再统计资源的线程数和qps,其实是雷同于elastic search写文档时,先写内存,然后再写transaction log文件,如许是为了写内存乐成后再写入transaction log文件,不至于写入文件后,再回滚内存。同样是为了校验通事后,再统计,不至于统计完才校验失败。
基础知识:

这里是时间窗口类

时间窗口类包罗的属性,以及包罗的时间样本窗口类WindowWrap
  1. public void addPass(int count) {
  2.         //获取当前时间点所在的样本窗口
  3.     WindowWrap<MetricBucket> wrap = this.data.currentWindow();
  4.     //将当前请求的计数量添加到当前样本窗口的统计数据中
  5.     ((MetricBucket)wrap.value()).addPass(count);
  6. }
复制代码
就不往里跟了,主要是根据当前时间点统计是在当前时间窗的哪个样本时间窗里,然后往里添加
限流源码


对应的就是这些规则
是在FlowSlot中,根据staticFlow统计的qps、线程数,来判断是否达到阈值,从而触发限流
  1. com.alibaba.csp.sentinel.slots.block.flow.FlowSlot#entry
  2. public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
  3.         //检测并应用流控规则
  4.     this.checkFlow(resourceWrapper, context, node, count, prioritized);
  5.     //触发下一个slot
  6.     this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
  7. }
  8.     public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
  9.         if (ruleProvider != null && resource != null) {
  10.         //获取到指定资源的所有流控规则
  11.             Collection<FlowRule> rules = (Collection)ruleProvider.apply(resource.getName());
  12.             if (rules != null) {
  13.                 Iterator var8 = rules.iterator();
  14.                                 //这个应用流控规则。若是无法通过则抛出异常,后续规则不再应用
  15.                 while(var8.hasNext()) {
  16.                     FlowRule rule = (FlowRule)var8.next();
  17.                     if (!this.canPassCheck(rule, context, node, count, prioritized)) {
  18.                         throw new FlowException(rule.getLimitApp(), rule);
  19.                     }
  20.                 }
  21.             }
  22.         }
  23.     }
复制代码
熔断源码

  1. public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
  2.     public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
  3.         this.performChecking(context, resourceWrapper);
  4.         this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
  5.     }
  6.     void performChecking(Context context, ResourceWrapper r) throws BlockException {
  7.             //获取当前资源所有熔断器
  8.         List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
  9.         if (circuitBreakers != null && !circuitBreakers.isEmpty()) {
  10.             Iterator var4 = circuitBreakers.iterator();
  11.             CircuitBreaker cb;
  12.             //逐个尝试所有熔断器
  13.             do {
  14.                 if (!var4.hasNext()) {
  15.                     return;
  16.                 }
  17.                 cb = (CircuitBreaker)var4.next();
  18.                 //若没有通过当前熔断器,则直接抛出异常
  19.             } while(cb.tryPass(context));
  20.             throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
  21.         }
  22.     }
  23. }
复制代码

一样平常来说,熔断降级其实是对于服务的调用方来说的。在项目中会经常调用其它服务或者是第三方接口,而对于这些接口,一旦它们出现不稳定,就有可能导致自身服务长时间等候,从而出现响应延迟等等问题。此时服务调用方就可基于熔断降级方式解决。一旦第三方接口响应时间过长,那么就可以使用慢调用比例规则,当出现大量长时间响应的情况,那么就直接熔断,不去请求。固然说熔断降级是针对服务的调用方来说,但是Sentinel自己并没有限定熔断降级肯定是调用其它的服务。
完结撒花,sentinel源码还是挺简朴的,如有需要收藏的看官,顺便也用发财的小手点点赞哈,如有错漏,也欢迎各位在评论区评论!


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

干翻全岛蛙蛙

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表