ToB企服应用市场:ToB评测及商务社交产业平台

标题: 微服务架构|go-zero 的自适应熔断器 [打印本页]

作者: 光之使者    时间: 2023-9-3 18:19
标题: 微服务架构|go-zero 的自适应熔断器
原文链接: go-zero 的自适应熔断器
上篇文章我们介绍了微服务的限流,详细分析了计数器限流和令牌桶限流算法,这篇文章来说说熔断。
熔断和限流还不太一样,限流是控制请求速率,只要还能承受,那么都会处理,但熔断不是。
在一条调用链上,如果发现某个服务异常,比如响应超时。那么调用者为了避免过多请求导致资源消耗过大,最终引发系统雪崩,会直接返回错误,而不是疯狂调用这个服务。
本篇文章会介绍主流熔断器的工作原理,并且会借助 go-zero 源码,分析 googleBreaker 是如何通过滑动窗口来统计流量,并且最终执行熔断的。
工作原理

这部分主要介绍两种熔断器的工作原理,分别是 Netflix 开源的 Hystrix,其也是 Spring Cloud 默认的熔断组件,和 Google 的自适应的熔断器。
Hystrix is no longer in active development, and is currently in maintenance mode.
注意,Hystrix 官方已经宣布不再积极开发了,目前处在维护模式。
Hystrix 官方推荐替代的开源组件:Resilience4j,还有阿里开源的 Sentinel 也是不错的替代品。
hystrixBreaker

Hystrix 采用了熔断器模式,相当于电路中的保险丝,系统出现紧急问题,立刻禁止所有请求,已达到保护系统的作用。

系统需要维护三种状态,分别是:
通过状态的变更,可以有效防止系统雪崩的问题。同时,在半断开状态下,又可以让系统进行自我修复。
googleBreaker

googleBreaker 实现了一种自适应的熔断模式,来看一下算法的计算公式,客户端请求被拒绝的概率

参数很少,也比较好理解:
通过分析公式,我们可以得到下面几个结论,也就是产生熔断的实际原理:
总的来说,googleBreaker 的实现方案更加优雅,而且参数也少,不用维护那么多的状态。
go-zero 就是采用了 googleBreaker 的方案,下面就来分析代码,看看到底是怎么实现的。
接口设计

接口定义这部分我个人感觉还是挺不好理解的,看了好多遍才理清了它们之间的关系。
其实看代码和看书是一样的,书越看越薄,代码会越看越短。刚开始看感觉代码很长,随着看懂的地方越来越多,明显感觉代码变短了。所以遇到不懂的代码不要怕,反复看,总会看懂的。

首先来看一下 breaker 部分的 UML 图,有了这张图,很多地方看起来还是相对清晰的,下面来详细分析。
这里用到了静态代理模式,也可以说是接口装饰器,接下来就看看到底是怎么定义的:
  1. // core/breaker/breaker.go
  2. internalThrottle interface {
  3.     allow() (internalPromise, error)
  4.     doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
  5. }
  6. // core/breaker/googlebreaker.go
  7. type googleBreaker struct {
  8.     k     float64
  9.     stat  *collection.RollingWindow
  10.     proba *mathx.Proba
  11. }
复制代码
这个接口是最终实现熔断方法的接口,由 googleBreaker 结构体实现。
  1. // core/breaker/breaker.go
  2. throttle interface {
  3.     allow() (Promise, error)
  4.     doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
  5. }
  6. type loggedThrottle struct {
  7.     name string
  8.     internalThrottle
  9.     errWin *errorWindow
  10. }
  11. func newLoggedThrottle(name string, t internalThrottle) loggedThrottle {
  12.     return loggedThrottle{
  13.         name:             name,
  14.         internalThrottle: t,
  15.         errWin:           new(errorWindow),
  16.     }
  17. }
复制代码
这个是实现了日志收集的结构体,首先它实现了 throttle 接口,然后它包含了一个字段 internalThrottle,相当于具体的熔断方法是代理给 internalThrottle 来做的。
  1. // core/breaker/breaker.go
  2. func (lt loggedThrottle) allow() (Promise, error) {
  3.     promise, err := lt.internalThrottle.allow()
  4.     return promiseWithReason{
  5.         promise: promise,
  6.         errWin:  lt.errWin,
  7.     }, lt.logError(err)
  8. }
  9. func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
  10.     return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
  11.         accept := acceptable(err)
  12.         if !accept && err != nil {
  13.             lt.errWin.add(err.Error())
  14.         }
  15.         return accept
  16.     }))
  17. }
复制代码
所以当它执行相应方法时,都是直接调用 internalThrottle 接口的方法,然后再加上自己的逻辑。
这也就是代理所起到的作用,在不改变原方法的基础上,扩展原方法的功能。
  1. // core/breaker/breaker.go
  2. circuitBreaker struct {
  3.     name string
  4.     throttle
  5. }
  6. // NewBreaker returns a Breaker object.
  7. // opts can be used to customize the Breaker.
  8. func NewBreaker(opts ...Option) Breaker {
  9.     var b circuitBreaker
  10.     for _, opt := range opts {
  11.         opt(&b)
  12.     }
  13.     if len(b.name) == 0 {
  14.         b.name = stringx.Rand()
  15.     }
  16.     b.throttle = newLoggedThrottle(b.name, newGoogleBreaker())
  17.     return &b
  18. }
复制代码
最终的熔断器又将功能代理给了 throttle。
这就是它们之间的关系,如果感觉有点乱的话,就反复看,看的次数多了,就清晰了。
日志收集

上文介绍过了,loggedThrottle 是为了记录日志而设计的代理层,这部分内容来分析一下是如何记录日志的。
  1. // core/breaker/breaker.go
  2. type errorWindow struct {
  3.     // 记录日志的数组
  4.     reasons [numHistoryReasons]string
  5.     // 索引
  6.     index   int
  7.     // 数组元素数量,小于等于 numHistoryReasons
  8.     count   int
  9.     lock    sync.Mutex
  10. }
  11. func (ew *errorWindow) add(reason string) {
  12.     ew.lock.Lock()
  13.     // 记录错误日志内容
  14.     ew.reasons[ew.index] = fmt.Sprintf("%s %s", time.Now().Format(timeFormat), reason)
  15.     // 对 numHistoryReasons 进行取余来得到数组索引
  16.     ew.index = (ew.index + 1) % numHistoryReasons
  17.     ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
  18.     ew.lock.Unlock()
  19. }
  20. func (ew *errorWindow) String() string {
  21.     var reasons []string
  22.     ew.lock.Lock()
  23.     // reverse order
  24.     for i := ew.index - 1; i >= ew.index-ew.count; i-- {
  25.         reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
  26.     }
  27.     ew.lock.Unlock()
  28.     return strings.Join(reasons, "\n")
  29. }
复制代码
核心就是这里采用了一个环形数组,通过维护两个字段来实现,分别是 index 和 count。
count 表示数组中元素的个数,最大值是数组的长度;index 是索引,每次 +1,然后对数组长度取余得到新索引。
我之前有一次面试就让我设计一个环形数组,当时答的还不是很好,这次算是学会了。
滑动窗口

一般来说,想要判断是否需要触发熔断,那么首先要知道一段时间的请求数量,一段时间内的数量统计可以使用滑动窗口来实现。
首先看一下滑动窗口的定义:
  1. // core/collection/rollingwindow.go
  2. type RollingWindow struct {
  3.     lock          sync.RWMutex
  4.     // 窗口大小
  5.     size          int
  6.     // 窗口数据容器
  7.     win           *window
  8.     // 时间间隔
  9.     interval      time.Duration
  10.     // 游标,用于定位当前应该写入哪个 bucket
  11.     offset        int
  12.     // 汇总数据时,是否忽略当前正在写入桶的数据
  13.     // 某些场景下因为当前正在写入的桶数据并没有经过完整的窗口时间间隔
  14.     // 可能导致当前桶的统计并不准确
  15.     ignoreCurrent bool
  16.     // 最后写入桶的时间
  17.     // 用于计算下一次写入数据间隔最后一次写入数据的之间
  18.     // 经过了多少个时间间隔
  19.     lastTime      time.Duration // start time of the last bucket
  20. }
复制代码
再来看一下 window 的结构:
  1. type Bucket struct {
  2.     // 桶内值的和
  3.     Sum   float64
  4.     // 桶内 add 次数
  5.     Count int64
  6. }
  7. func (b *Bucket) add(v float64) {
  8.     b.Sum += v
  9.     b.Count++
  10. }
  11. func (b *Bucket) reset() {
  12.     b.Sum = 0
  13.     b.Count = 0
  14. }
  15. type window struct {
  16.     // 桶,一个桶就是一个时间间隔
  17.     buckets []*Bucket
  18.     // 窗口大小,也就是桶的数量
  19.     size    int
  20. }
复制代码
有了这两个结构之后,我们就可以画出这个滑动窗口了,如图所示。

现在来看一下向窗口中添加数据,是怎样一个过程。
[code]func (rw *RollingWindow) Add(v float64) {    rw.lock.Lock()    defer rw.lock.Unlock()    // 获取当前写入下标    rw.updateOffset()    // 向 bucket 中写入数据    rw.win.add(rw.offset, v)}func (rw *RollingWindow) span() int {    // 计算距离 lastTime 经过了多少个时间间隔,也就是多少个桶    offset := int(timex.Since(rw.lastTime) / rw.interval)    // 如果在窗口范围内,返回实际值,否则返回窗口大小    if 0




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4