反转基因福娃 发表于 5 天前

Sentinel源码—6.熔断降级和数据统计的实现二

大纲
1.DegradeSlot实现熔断降级的原理与源码
2.Sentinel数据指标统计的滑动窗口算法

2.Sentinel数据指标统计的滑动窗口算法
(1)滑动窗口先容
(2)StatisticSlot利用滑动窗口算法进行数据统计

(1)滑动窗口先容
一.滑动窗口原理
滑动窗口不会指定固定的时间窗口出发点与尽头,而是将处置惩罚请求的时间点作为该请求对应时间窗口的尽头,出发点则是向前距离该尽头一个时间窗口长度的时间点。

二.滑动窗口的性能问题(样本窗口解决)
由于每到来一个请求,就会移动一下统计的时间窗口。如果先后到来的两个请求的相隔时间在一个时间窗口长度之内,那么分别在这个两个请求对应的时间窗口下统计请求数时就会出现重复计数的问题。如果在一个时间窗口长度内出现了大量的请求,则会进行大量重复计算从而浪费资源。

为了解决这个问题,就需要更细粒度的计算,比如引入样本窗口。样本窗口的长度会小于滑动窗口的长度,通常滑动窗口的长度是样本窗口的整数倍。每个样本窗口在到达尽头时间时,会统计其中的请求数并进行记录。这样统计请求对应的时间窗口的请求数时,就可复用样本窗口的数据了。

以是,通过多个样本窗口构成滑动窗口,可以解决滑动窗口的性能问题。

(2)StatisticSlot利用滑动窗口算法进行数据统计
一.StatisticNode为了实现统计数据而进行的计划
二.LeapArray实现滑动窗口算法的数据统计逻辑

一.StatisticNode为了实现统计数据而进行的计划
起首StatisticSlot的entry()方法会调用DefaultNode的addPassRequest()方法,接着DefaultNode的addPassRequest()方法又会调用StatisticNode的addPassRequest()方法,而StatisticNode的addPassRequest()方法便会通过利用滑动窗口算法来统计数据。

StatisticNode中会定义一个用来保存数据的ArrayMetric对象。创建该对象时默认就指定了样本窗口数量为2,时间窗口长度为1000ms。其中,ArrayMetric对象中的data属性会真正用来存储数据,而ArrayMetric对象中的data属性则是一个LeapArray对象。

在LeapArray对象中会详细记录:样本窗口长度、样本窗口数量、滑动窗口长度、样本窗口数组。LeapArray的array属性便是用来统计并保存数据的WindowWrap数组,WindowWrap数组也就是样本窗口数组。

WindowWrap有一个奇妙的计划:就是利用LongAdder数组而不是用LongAdder来存储统计数据。由于统计的数据是多维度的,且MetricEvent枚举类定义了这些维度类型,因此将MetricEvent维度类型枚举值对应的序号映射成数组索引,可以奇妙地将多维度的数据存储到LongAdder数组中。
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
      ...
      //执行下一个ProcessorSlot,先进行规则验证等
      fireEntry(context, resourceWrapper, node, count, prioritized, args);

      //如果通过了后面ProcessorSlot的验证
      //则将处理当前资源resourceWrapper的线程数+1 以及 将对当前资源resourceWrapper的成功请求数+1
      node.increaseThreadNum();
      node.addPassRequest(count);
      ...
    }
}

//A Node used to hold statistics for specific resource name in the specific context.
//Each distinct resource in each distinct Context will corresponding to a DefaultNode.
//This class may have a list of sub DefaultNodes.
//Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context.
public class DefaultNode extends StatisticNode {
    //Associated cluster node.
    private ClusterNode clusterNode;
    ...
   
    //DefaultNode会统计名字相同的Context下的某个资源的调用数据,按照单机里的资源维度进行调用数据统计
    //EntranceNode会统计名字相同的Context下的全部资源的调用数据,按接口维度来统计调用数据,即统计接口下所有资源的调用情况
    //ClusterNode会统计某个资源在全部Context下的调用数据,按照集群中的资源维度进行调用数据统计
    @Override
    public void addPassRequest(int count) {
      //增加当前资源对应的DefaultNode中的数据
      super.addPassRequest(count);
      //增加当前资源对应的ClusterNode中的全局统计数据
      this.clusterNode.addPassRequest(count);
    }
    ...
}

//The statistic node keep three kinds of real-time statistics metrics:
//1.metrics in second level rollingCounterInSecond
//2.metrics in minute level rollingCounterInMinute
//3.thread count

//Sentinel use sliding window to record and count the resource statistics in real-time.
//The sliding window infrastructure behind the ArrayMetric is LeapArray.

//case 1: When the first request comes in,
//Sentinel will create a new window bucket of a specified time-span to store running statics,
//such as total response time(rt), incoming request(QPS), block request(bq), etc.
//And the time-span is defined by sample count.
//   0      100ms
//+-------+--→ Sliding Windows
//         ^
//         |
//       request
//Sentinel use the statics of the valid buckets to decide whether this request can be passed.
//For example, if a rule defines that only 100 requests can be passed,
//it will sum all qps in valid buckets, and compare it to the threshold defined in rule.

//case 2: continuous requests
//0    100ms    200ms    300ms
//+-------+-------+-------+-----→ Sliding Windows
//                      ^
//                      |
//                   request

//case 3: requests keeps coming, and previous buckets become invalid
//0    100ms    200ms      800ms       900ms1000ms    1300ms
//+-------+-------+ ...... +-------+-------+ ...... +-------+-----→ Sliding Windows
//                                                      ^
//                                                      |
//                                                    request

//The sliding window should become:
// 300ms   800ms900ms1000ms1300ms
//+ ...... +-------+ ...... +-------+-----→ Sliding Windows
//                                                      ^
//                                                      |
//                                                    request
public class StatisticNode implements Node {
    //Holds statistics of the recent INTERVAL milliseconds.
    //The INTERVAL is divided into time spans by given sampleCount.
    //定义一个保存数据的ArrayMetric,指定了样本窗口数量默认为2(SAMPLE_COUNT),指定了时间窗口长度默认为1000ms(INTERVAL)
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
   
    //Holds statistics of the recent 60 seconds.
    //The windowLengthInMs is deliberately set to 1000 milliseconds,
    //meaning each bucket per second, in this way we can get accurate statistics of each second.
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
    ...
    @Override
    public void addPassRequest(int count) {
      //调用ArrayMetric.addPass()方法,根据当前请求增加计数
      rollingCounterInSecond.addPass(count);
      rollingCounterInMinute.addPass(count);
    }
    ...
}

//The basic metric class in Sentinel using a BucketLeapArray internal.
public class ArrayMetric implements Metric {
    //用于存储统计数据
    private final LeapArray<MetricBucket> data;
    ...
   
    @Override
    public void addPass(int count) {
      //1.通过LeapArray.currentWindow()方法获取当前时间所在的样本窗口
      WindowWrap<MetricBucket> wrap = data.currentWindow();
      //2.调用MetricBucket.addPass()方法将当前请求的计数量添加到样本窗口的统计数据中
      wrap.value().addPass(count);
    }
    ...
}

//Basic data structure for statistic metrics in Sentinel.
//Leap array use sliding window algorithm to count data.
//Each bucket cover windowLengthInMs time span, and the total time span is intervalInMs,
//so the total bucket amount is: sampleCount = intervalInMs / windowLengthInMs.
public abstract class LeapArray<T> {
    //样本窗口的长度
    protected int windowLengthInMs;
    //一个滑动窗口包含的样本窗口数量,公式 intervalInMs / windowLengthInMs,也就是滑动窗口长度 / 样本窗口长度
    protected int sampleCount;
    //滑动窗口长度
    protected int intervalInMs;
    //也是滑动窗口长度,只是单位为s
    private double intervalInSecond;
    //WindowWrap是样本窗口类,它是一个数组,泛型T实际类型为MetricBucket
    //LeapArray类似于一个样本窗口管理类,而真正的样本窗口类是WindowWrap<T>
    protected final AtomicReferenceArray<WindowWrap<T>> array;

    //The total bucket count is: sampleCount = intervalInMs / windowLengthInMs.
    //@param sampleCountbucket count of the sliding window
    //@param intervalInMs the total time interval of this LeapArray in milliseconds
    public LeapArray(int sampleCount, int intervalInMs) {
      ...
      this.windowLengthInMs = intervalInMs / sampleCount;//默认为500ms
      this.intervalInMs = intervalInMs;//默认为1000ms
      this.intervalInSecond = intervalInMs / 1000.0;//默认为1
      this.sampleCount = sampleCount;//默认为2
      this.array = new AtomicReferenceArray<>(sampleCount);
    }

    //Get the bucket at current timestamp.
    //获取当前时间点所在的样本窗口
    public WindowWrap<T> currentWindow() {
      return currentWindow(TimeUtil.currentTimeMillis());
    }
    ...
}

//Wrapper entity class for a period of time window.
//样本窗口类,泛型T比如是MetricBucket
public class WindowWrap<T> {
    //Time length of a single window bucket in milliseconds.
    //单个样本窗口的长度
    private final long windowLengthInMs;
    //Start timestamp of the window in milliseconds.
    //样本窗口的起始时间戳
    private long windowStart;
    //Statistic data.
    //当前样本窗口的统计数据,类型为MetricBucket
    private T value;
    ...
    //返回比如MetricBucket对象
    public T value() {
      return value;
    }
}

//Represents metrics data in a period of time span.
//统计数据的封装类
public class MetricBucket {
    //统计的数据会存放在LongAdder数组里
    //使用数组而不直接使用"LongAdder+1"是因为:
    //由于统计的数据是多维度的,并且MetricEvent枚举类定义了这些维度类型
    //因此将MetricEvent维度类型枚举值对应的序号映射成数组索引,巧妙地将多维度的数据定义在LongAdder数组中
    private final LongAdder[] counters;
    private volatile long minRt;
   
    public MetricBucket() {
      MetricEvent[] events = MetricEvent.values();
      this.counters = new LongAdder;
      for (MetricEvent event : events) {
            counters = new LongAdder();
      }
      initMinRt();
    }
   
    private void initMinRt() {
      this.minRt = SentinelConfig.statisticMaxRt();
    }
   
    public void addPass(int n) {
      add(MetricEvent.PASS, n);
    }
   
    public MetricBucket add(MetricEvent event, long n) {
      //统计数据并存储到counters中
      counters.add(n);
      return this;
    }
    ...
}

public enum MetricEvent {
    PASS,
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,
    OCCUPIED_PASS
} 二.LeapArray实现滑动窗口算法的数据统计逻辑
调用ArrayMetric的addPass()进行数据统计的逻辑如下:起首通过LeapArray的currentWindow()方法获取当前时间所在的样本窗口,然后调用MetricBucket的addPass()方法统计并存储数据到样本窗口中。

LeapArray的currentWindow()方法获取当前时间所在的样本窗口的逻辑为:

情况一:如果当前时间所在的样本窗口如果还没创建,则需要初始化。

情况二:如果当前样本窗口的起始时间与计算出的样本窗口起始时间相同,则说明这两个是同一个样本窗口,直接获取就行。

情况三:如果当前样本窗口的起始时间大于计算出的样本窗口起始时间,则说明计算出的样本窗口已过时,要将原来的样本窗口替换为新样本窗口。留意LeapArray.array数组是一个环形数组。

情况四:如果当前样本窗口的起始时间小于计算出的样本窗口起始时间,一样寻常不出现,因为时间不会倒流,除非人为修改系统时间导致时钟回拨。
https://i-blog.csdnimg.cn/direct/19dd4a586a604f1082e67d115c4bce65.png
public abstract class LeapArray<T> {
    //样本窗口的长度
    protected int windowLengthInMs;
    //一个滑动窗口包含的样本窗口数量,公式 intervalInMs / windowLengthInMs,也就是滑动窗口长度 / 样本窗口长度
    protected int sampleCount;
    //滑动窗口长度
    protected int intervalInMs;
    //也是滑动窗口长度,只是单位为s
    private double intervalInSecond;
    //WindowWrap是样本窗口类,它是一个数组,泛型T实际类型为MetricBucket
    //LeapArray类似于一个样本窗口管理类,而真正的样本窗口类是WindowWrap<T>
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    ...
    //假设timeMillis = 1600,windowLengthInMs = 500,array.length = 2,那么timeId = 3,返回1
    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
      long timeId = timeMillis / windowLengthInMs;
      //Calculate current index so we can map the timestamp to the leap array.
      return (int)(timeId % array.length());
    }

    //假设timeMillis = 1600,windowLengthInMs = 500,那么返回1500
    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
      return timeMillis - timeMillis % windowLengthInMs;
    }

    //Get bucket item at provided timestamp.
    public WindowWrap<T> currentWindow(long timeMillis) {
      if (timeMillis < 0) {
            return null;
      }

      //计算当前时间所在的样本窗口id,也就是样本窗口的下标,即计算在数组LeapArray中的下标
      int idx = calculateTimeIdx(timeMillis);

      //Calculate current bucket start time.
      //计算当前样本窗口的开始时间点
      long windowStart = calculateWindowStart(timeMillis);

      //Get bucket item at given time from the array.
      //(1) Bucket is absent, then just create a new bucket and CAS update to circular array.
      //(2) Bucket is up-to-date, then just return the bucket.
      //(3) Bucket is deprecated, then reset current bucket.
      while (true) {
            //获取当前时间所在的样本窗口
            WindowWrap<T> old = array.get(idx);

            //如果当前时间所在的样本窗口为null,则需要创建
            if (old == null) {
                //创建一个时间窗口
                //   B0       B1      B2    NULL      B4
                // ||_______|_______|_______|_______|_______||___
                // 200   400   600   800   1000    1200timestamp
                //                           ^
                //                        time=888
                //            bucket is empty, so create new and update
                //If the old bucket is absent, then we create a new bucket at windowStart,
                //then try to update circular array via a CAS operation.
                //Only one thread can succeed to update, while other threads yield its time slice.
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                //通过CAS将新创建的窗口放入到LeapArray中
                if (array.compareAndSet(idx, null, window)) {
                  //Successfully updated, return the created bucket.
                  return window;
                } else {
                  //Contention failed, the thread will yield its time slice to wait for bucket available.
                  Thread.yield();
                }
            }
            //如果当前样本窗口的起始时间与计算出的样本窗口起始时间相同,则说明这两个是同一个样本窗口,直接获取就行
            else if (windowStart == old.windowStart()) {
                //   B0       B1      B2   B3      B4
                // ||_______|_______|_______|_______|_______||___
                // 200   400   600   800   1000    1200timestamp
                //                           ^
                //                        time=888
                //            startTime of Bucket 3: 800, so it's up-to-date
                //If current windowStart is equal to the start timestamp of old bucket,
                //that means the time is within the bucket, so directly return the bucket.
                return old;
            }
            //如果当前样本窗口的起始时间大于计算出的样本窗口起始时间,则说明计算出来的样本窗口已经过时了,需要将原来的样本窗口替换为新的样本窗口
            //数组的环形数组,不是无限长的,比如存1s,1000个样本窗口,那么下1s的1000个时间窗口会覆盖上一秒的
            else if (windowStart > old.windowStart()) {
                //   (old)
                //             B0       B1      B2    NULL      B4
                // |_______||_______|_______|_______|_______|_______||___
                // ...    1200   1400    1600    1800    2000    2200timestamp
                //                              ^
                //                           time=1676
                //          startTime of Bucket 2: 400, deprecated, should be reset
                //If the start timestamp of old bucket is behind provided time, that means the bucket is deprecated.
                //We have to reset the bucket to current windowStart.
                //Note that the reset and clean-up operations are hard to be atomic,
                //so we need a update lock to guarantee the correctness of bucket update.
                //The update lock is conditional (tiny scope) and will take effect only when bucket is deprecated,
                //so in most cases it won't lead to performance loss.
                if (updateLock.tryLock()) {
                  try {
                        //Successfully get the update lock, now we reset the bucket.
                        //替换老的样本窗口
                        return resetWindowTo(old, windowStart);
                  } finally {
                        updateLock.unlock();
                  }
                } else {
                  //Contention failed, the thread will yield its time slice to wait for bucket available.
                  Thread.yield();
                }
            }
            //如果当前样本窗口的起始时间小于计算出的样本窗口起始时间
            //这种情况一般不会出现,因为时间不会倒流,除非人为修改系统时间导致时钟回拨
            else if (windowStart < old.windowStart()) {
                //Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
      }
    }
    ...
}

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Sentinel源码—6.熔断降级和数据统计的实现二