go.uber.org/ratelimit 源码分析

打印 上一主题 下一主题

主题 869|帖子 869|积分 2607

go.uber.org/ratelimit 源码分析

go 提供了一用来接口限流的包。其中"go.uber.org/ratelimit" 包正是基于漏桶算法实现的。
利用方式:

  • 通过 ratelimit.New 创建限流器对象,参数为每秒允许的哀求数(RPS)。
  • 利用 Take() 方法来获取限流许可,该方法会壅闭哀求知道满足限速要求。
官方示例:
  1. import (
  2.         "fmt"
  3.         "time"
  4.         "go.uber.org/ratelimit"
  5. )
  6. func main() {
  7.     rl := ratelimit.New(100) // 每秒多少次
  8.     prev := time.Now()
  9.     for i := 0; i < 10; i++ {
  10.         now := rl.Take()        // 平均时间
  11.         fmt.Println(i, now.Sub(prev))
  12.         prev = now
  13.     }
  14.     // Output:
  15.     // 0 0
  16.     // 1 10ms
  17.     // 2 10ms
  18.     // 3 10ms
  19.     // 4 10ms
  20.     // 5 10ms
  21.     // 6 10ms
  22.     // 7 10ms
  23.     // 8 10ms
  24.     // 9 10ms
  25. }
复制代码
ratelimit.New()指的是每秒均匀多少次,在运行步伐后,并不会严格按照官方给的样例输出。
源码分析

不仅知其然,还要知其以是然。
最大松弛量

传统的漏桶算法每隔哀求的隔断是固定的,然而在现实上的互连网应用中,流量经常是突发性的。对于这种情况,uber引入了最大松弛量的概念。
假如我们要求每秒限定100个哀求,均匀每个哀求隔断 10ms。但是现实情况下,有些隔断比较长,有些隔断比较短。如下图所示:

哀求 1 完成后,15ms 后,哀求 2 才到来,可以对哀求 2 立即处理。哀求 2 完成后,5ms 后,哀求 3 到来,这个时候距离前次哀求还不足 10ms,因此还需要等待 5ms。
但是,对于这种情况,现实上三个哀求一共斲丧了 25ms 才完成,并不是预期的 20ms。在 uber-go 实现的 ratelimit 中,可以把之前隔断比较长的哀求的时间,匀给背面的利用,包管每秒哀求数 (RPS) 即可。
了解完这个前缀知识就可以查看源码了。
New()

ratelimit.New() 内部调用的是 newAtomicInt64Based 方法。
  1. type atomicInt64Limiter struct {
  2.         prepadding [64]byte // 填充字节,确保state独占一个缓存行
  3.         state      int64    // 最后一次权限发送的纳秒时间戳,用于控制请求的速度
  4.         postpadding [56]byte // 填充字节,确保state独占一个缓存行
  5.         perRequest time.Duration        // 限流器放行周期,用于计算下一个权限发送的state的值
  6.         maxSlack   time.Duration        // 最大松弛量
  7.         clock      Clock        // 指向当前时间获取函数的指针
  8. }
  9. // newAtomicBased返回一个新的基于原子的限制器。
  10. func newAtomicInt64Based(rate int, opts ...Option) *atomicInt64Limiter {
  11.         config := buildConfig(opts) // 加载配置,config.per 默认为 1s,config.slack 默认为 10
  12.         perRequest := config.per / time.Duration(rate)
  13.         l := &atomicInt64Limiter{
  14.                 perRequest: perRequest,
  15.                 maxSlack:   time.Duration(config.slack) * perRequest,        // 默认maxSlack为perRequest 10倍
  16.                 clock:      config.clock,
  17.         }
  18.         atomic.StoreInt64(&l.state, 0)       
  19.         return l
  20. }
复制代码
Take()
  1. // Take blocks to ensure that the time spent between multiple
  2. // Take calls is on average time.Second/rate.
  3. func (t *atomicInt64Limiter) Take() time.Time {
  4.    var (
  5.       newTimeOfNextPermissionIssue int64        // 下一次允许请求的时间
  6.       now                          int64        // 当前时间
  7.    )
  8.    for {
  9.       now = t.clock.Now().UnixNano()       
  10.       timeOfNextPermissionIssue := atomic.LoadInt64(&t.state) // 上一次允许请求时间
  11.       switch {
  12.       case timeOfNextPermissionIssue == 0 || (t.maxSlack == 0 && now-timeOfNextPermissionIssue > int64(t.perRequest)):
  13.         // if this is our first call or t.maxSlack == 0 we need to shrink issue time to now
  14.          newTimeOfNextPermissionIssue = now
  15.       case t.maxSlack > 0 && now-timeOfNextPermissionIssue > int64(t.maxSlack)+int64(t.perRequest):
  16.          // a lot of nanoseconds passed since the last Take call
  17.          // we will limit max accumulated time to maxSlack
  18.          newTimeOfNextPermissionIssue = now - int64(t.maxSlack)
  19.       default:
  20.          // calculate the time at which our permission was issued
  21.          newTimeOfNextPermissionIssue = timeOfNextPermissionIssue + int64(t.perRequest)
  22.       }
  23.       if atomic.CompareAndSwapInt64(&t.state, timeOfNextPermissionIssue, newTimeOfNextPermissionIssue) {
  24.          break
  25.       }
  26.    }
  27.    sleepDuration := time.Duration(newTimeOfNextPermissionIssue - now)
  28.    if sleepDuration > 0 {
  29.       t.clock.Sleep(sleepDuration)
  30.       return time.Unix(0, newTimeOfNextPermissionIssue)
  31.    }
  32.    // return now if we don't sleep as atomicLimiter does
  33.    return time.Unix(0, now)
  34. }
复制代码
switch 这块挺绕的,刚开始不停以为timeOfNextPermissionIssue 为下次放行的时间戳,这样的话当t.maxSlack = 0时,只要 now-timeOfNextPermissionIssue > 0 就应该放行。无法解释(t.maxSlack == 0 && now-timeOfNextPermissionIssue > int64(t.perRequest))。
让我们对上面的三个 case 分析一下
case 1
case timeOfNextPermissionIssue == 0 || (t.maxSlack == 0 && now-timeOfNextPermissionIssue > int64(t.perRequest))
这个比较好明白,我们仍以每秒100个哀求为例,均匀隔断 10ms。当本次哀求时间与前次放行时间 > 时间隔断时即可放行,并记录本次访问时间,如图:

case 2
case t.maxSlack > 0 && now-timeOfNextPermissionIssue > int64(t.maxSlack)+int64(t.perRequest)
这块比较巧妙,假如松弛量是3 ms,当我们在第二次哀求时的时间戳 > 13 ms,此时 newTimeOfNextPermissionIssue= now - maxSlack = 12 ms。
当 maxSlack 较大且与前次哀求相隔较长时,后续的大量哀求会被直接放行,以弥补此次浪费的时间。
假设第一次哀求时间为0, maxSlack 为 100 ms,perRequest为10 ms,在第二次哀求时与第一次隔断为 111 ms ,newTimeOfNextPermissionIssue = 111 - 100 = 11 ms。而 now 为 111 ms,限流器在背面的10次take中都会经过default直接放行,直到 newTimeOfNextPermissionIssue  > now。

case 3
对于其它的哀求, newTimeOfNextPermissionIssue = timeOfNextPermissionIssue + int64(t.perRequest)。
假如maxSlack为 100ms,perRequest 为 10ms,当哀求2在15ms访问后,state 更新为 10ms,这样在哀求3在20ms访问时,不会出现拦截的情况。

小结

uber 对基于漏桶实现的 ratelimit 进行了一些优化,让其限流更加的平滑。重要表现在两点:

  • 本次哀求时间距离前次放行时间 > 时间隔断 + 松弛量时,背面10次的哀求会根据情况直接放行
  • 时间隔断 + 松弛量 >= 本次哀求时间距离前次放行时间 > 时间隔断 ,state = state + perRequest

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

飞不高

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表