高可用之限流 09-guava RateLimiter 入门利用简介 & 源码分析 ...

打印 上一主题 下一主题

主题 883|帖子 883|积分 2649

限流系列

开源组件 rate-limit: 限流
高可用之限流-01-入门介绍
高可用之限流-02-怎样计划限流框架
高可用之限流-03-Semaphore 信号量做限流
高可用之限流-04-fixed window 固定窗口
高可用之限流-05-slide window 滑动窗口
高可用之限流-06-slide window 滑动窗口 sentinel 源码
高可用之限流-07-token bucket 令牌桶算法
高可用之限流 08-leaky bucket漏桶算法
高可用之限流 09-guava RateLimiter 入门利用简介 & 源码分析
RateLimiter 入门利用

maven 引入
  1. <dependency>
  2.     <groupId>com.google.guava</groupId>
  3.     <artifactId>guava</artifactId>
  4.     <version>18.0</version>
  5. </dependency>
复制代码
测试案例
  1. @Test
  2. public void limitTest() {
  3.     RateLimiter limiter = RateLimiter.create(1);
  4.     for(int i = 1; i < 5; i++) {
  5.         double waitTime = limiter.acquire(i);
  6.         System.out.println("cutTime=" + System.currentTimeMillis() + " acq:" + i + " waitTime:" + waitTime);
  7.     }
  8. }
复制代码

  • 日志
  1. cutTime=1592880664419 acq:1 waitTime:0.0
  2. cutTime=1592880665420 acq:2 waitTime:0.999098
  3. cutTime=1592880667419 acq:3 waitTime:1.99867
  4. cutTime=1592880670419 acq:4 waitTime:2.999099
复制代码
说明

首先通过RateLimiter.create(1);创建一个限流器,参数代表每秒生成的令牌数,通过limiter.acquire(i);来以阻塞的方式获取令牌,固然也可以通过tryAcquire(int permits, long timeout, TimeUnit unit)来设置等待超时时间的方式获取令牌,如果超timeout为0,则代表非阻塞,获取不到立即返回。
从输出来看,RateLimiter支持预消耗,好比在acquire(5)时,等待时间是3秒,是上一个获取令牌时预消耗了3个两排,固须要等待3*1秒,然后又预消耗了5个令牌,以此类推
RateLimiter通过限定后面请求的等待时间,来支持一定程度的突发请求(预消耗),在利用过程中须要注意这一点,具体实现原理后面再分析。
RateLimiter实现原理

Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提拔直到维持在一个稳定值) 两种模式实现思路雷同,主要区别在等待时间的盘算上,本篇重点介绍SmoothBursty
RateLimiter的创建

通过调用RateLimiter的create接口来创建实例,实际是调用的SmoothBuisty稳定模式创建的实例。
  1. public static RateLimiter create(double permitsPerSecond) {
  2.   return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
  3. }
  4. static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
  5.   RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
  6.   rateLimiter.setRate(permitsPerSecond);
  7.   return rateLimiter;
  8. }
复制代码
SmoothBursty中的两个构造参数含义:
SleepingStopwatch:guava中的一个时钟类实例,会通过这个来盘算时间及令牌
maxBurstSeconds:官方解释,在ReteLimiter未利用时,最多保存几秒的令牌,默认是1
在剖析SmoothBursty原理前,重点解释下SmoothBursty中几个属性的含义
  1. /**
  2. * The work (permits) of how many seconds can be saved up if this RateLimiter is unused?
  3. * 在RateLimiter未使用时,最多存储几秒的令牌
  4. * */
  5. final double maxBurstSeconds;
  6. /**
  7. * The currently stored permits.
  8. * 当前存储令牌数
  9. */
  10. double storedPermits;
  11. /**
  12. * The maximum number of stored permits.
  13. * 最大存储令牌数 = maxBurstSeconds * stableIntervalMicros(见下文)
  14. */
  15. double maxPermits;
  16. /**
  17. * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
  18. * per second has a stable interval of 200ms.
  19. * 添加令牌时间间隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌数)
  20. */
  21. double stableIntervalMicros;
  22. /**
  23. * The time when the next request (no matter its size) will be granted. After granting a request,
  24. * this is pushed further in the future. Large requests push this further than small requests.
  25. * 下一次请求可以获取令牌的起始时间
  26. * 由于RateLimiter允许预消费,上次请求预消费令牌后
  27. * 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
  28. */
  29. private long nextFreeTicketMicros = 0L; // could be either in the past or future
复制代码
核心函数

setRate()

通过这个接口设置令牌通每秒生成令牌的数目,内部时间通过调用SmoothRateLimiter的doSetRate来实现
  1. public final void setRate(double permitsPerSecond) {
  2.   checkArgument(
  3.       permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
  4.   synchronized (mutex()) {
  5.     doSetRate(permitsPerSecond, stopwatch.readMicros());
  6.   }
  7. }
复制代码
doSetRate()

这里先通过调用resync生成令牌以及更新下一期令牌生成时间,然后更新stableIntervalMicros,最后又调用了SmoothBursty的doSetRate
  1. @Override
  2. final void doSetRate(double permitsPerSecond, long nowMicros) {
  3.   resync(nowMicros);
  4.   double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
  5.   this.stableIntervalMicros = stableIntervalMicros;
  6.   doSetRate(permitsPerSecond, stableIntervalMicros);
  7. }
复制代码
resync()
  1. /**
  2. * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
  3. * 基于当前时间,更新下一次请求令牌的时间,以及当前存储的令牌(可以理解为生成令牌)
  4. */
  5. void resync(long nowMicros) {
  6.     // if nextFreeTicket is in the past, resync to now
  7.     if (nowMicros > nextFreeTicketMicros) {
  8.       double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
  9.       storedPermits = min(maxPermits, storedPermits + newPermits);
  10.       nextFreeTicketMicros = nowMicros;
  11.     }
  12. }
复制代码
根据令牌桶算法,桶中的令牌是连续生成存放的,有请求时须要先从桶中拿到令牌才气开始执行,谁来连续生成令牌存放呢?
一种解法是,开启一个定时任务,由定时任务连续生成令牌。这样的问题在于会极大的消耗体系资源,如,某接口须要分别对每个用户做访问频率限定,假设体系中存在6W用户,则至多须要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。
另一种解法则是耽误盘算,如上resync函数。该函数会在每次获取令牌之前调用,实在现思路为,若当前时间晚于nextFreeTicketMicros,则盘算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只须要在获取令牌时盘算一次即可。
SmoothBursty 的 doSetRate

桶中可存放的最大令牌数由maxBurstSeconds盘算而来,其含义为最大存储maxBurstSeconds秒生成的令牌。
该参数的作用在于,可以更为灵活地控制流量。如,某些接口限定为300次/20秒,某些接口限定为50次/45秒等。也就是流量不局限于qps
作者:人在码途
链接:https://www.jianshu.com/p/5d4fe4b2a726
来源:简书
著作权归作者所有。商业转载请接洽作者获得授权,非商业转载请注明出处。
  1. @Override
  2. void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  3.   double oldMaxPermits = this.maxPermits;
  4.   maxPermits = maxBurstSeconds * permitsPerSecond;
  5.   if (oldMaxPermits == Double.POSITIVE_INFINITY) {
  6.     // if we don't special-case this, we would get storedPermits == NaN, below
  7.     // Double.POSITIVE_INFINITY 代表无穷啊
  8.     storedPermits = maxPermits;
  9.   } else {
  10.     storedPermits =
  11.         (oldMaxPermits == 0.0)
  12.             ? 0.0 // initial state
  13.             : storedPermits * maxPermits / oldMaxPermits;
  14.   }
  15. }
复制代码
RateLimiter 几个常用接口分析

在相识以上概念后,就非常轻易理解 RateLimiter 暴露出来的接口
  1. @CanIgnoreReturnValue
  2. public double acquire() {
  3.   return acquire(1);
  4. }
  5. /**
  6. * 获取令牌,返回阻塞的时间
  7. **/
  8. @CanIgnoreReturnValue
  9. public double acquire(int permits) {
  10.   long microsToWait = reserve(permits);
  11.   stopwatch.sleepMicrosUninterruptibly(microsToWait);
  12.   return 1.0 * microsToWait / SECONDS.toMicros(1L);
  13. }
  14. final long reserve(int permits) {
  15.   checkPermits(permits);
  16.   synchronized (mutex()) {
  17.     return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  18.   }
  19. }
复制代码
acquire函数主要用于获取permits个令牌,并盘算须要等待多长时间,进而挂起等待,并将该值返回,主要通过reserve返回须要等待的时间,reserve中通过调用reserveAndGetWaitLength获取等待时间
  1. /**
  2. * Reserves next ticket and returns the wait time that the caller must wait for.
  3. *
  4. * @return the required wait time, never negative
  5. */
  6. final long reserveAndGetWaitLength(int permits, long nowMicros) {
  7.   long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  8.   return max(momentAvailable - nowMicros, 0);
  9. }
复制代码
最后调用了 reserveEarliestAvailable
  1. @Override
  2. final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  3.   resync(nowMicros);
  4.   long returnValue = nextFreeTicketMicros;
  5.   double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  6.   double freshPermits = requiredPermits - storedPermitsToSpend;
  7.   long waitMicros =
  8.       storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
  9.           + (long) (freshPermits * stableIntervalMicros);
  10.   this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
  11.   this.storedPermits -= storedPermitsToSpend;
  12.   return returnValue;
  13. }
复制代码
首先通过resync生成令牌以及同步nextFreeTicketMicros时间戳,freshPermits从令牌桶中获取令牌后还须要的令牌数目,通过storedPermitsToWaitTime盘算出获取freshPermits还须要等待的时间,在稳定模式中,这里就是(long) (freshPermits * stableIntervalMicros) ,然后更新nextFreeTicketMicros以及storedPermits,这次获取令牌须要的等待到的时间点,reserveAndGetWaitLength返回须要等待的时间隔断。
从reserveEarliestAvailable可以看出RateLimiter的预消耗原理,以及获取令牌的等待时间时间原理(可以解释示例结果),再获取令牌不敷时,并没有等待到令牌全部生成,而是更新了下次获取令牌时的nextFreeTicketMicros,从而影响的是下次获取令牌的等待时间。
reserve这里返回等待时间后,acquire通过调用stopwatch.sleepMicrosUninterruptibly(microsToWait);进行sleep操作,这里不同于Thread.sleep(), 这个函数的sleep是uninterruptibly的,内部实现:
  1. public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
  2.     //sleep 阻塞线程 内部通过Thread.sleep()
  3.   boolean interrupted = false;
  4.   try {
  5.     long remainingNanos = unit.toNanos(sleepFor);
  6.     long end = System.nanoTime() + remainingNanos;
  7.     while (true) {
  8.       try {
  9.         // TimeUnit.sleep() treats negative timeouts just like zero.
  10.         NANOSECONDS.sleep(remainingNanos);
  11.         return;
  12.       } catch (InterruptedException e) {
  13.         interrupted = true;
  14.         remainingNanos = end - System.nanoTime();
  15.         //如果被interrupt可以继续,更新sleep时间,循环继续sleep
  16.       }
  17.     }
  18.   } finally {
  19.     if (interrupted) {
  20.       Thread.currentThread().interrupt();
  21.       //如果被打断过,sleep过后再真正中断线程
  22.     }
  23.   }
  24. }
复制代码
sleep之后,acquire返回sleep的时间,阻塞结束,获取到令牌。
[code]public boolean tryAcquire(int permits) {  return tryAcquire(permits, 0, MICROSECONDS);}public boolean tryAcquire() {  return tryAcquire(1, 0, MICROSECONDS);}public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {  long timeoutMicros = max(unit.toMicros(timeout), 0);  checkPermits(permits);  long microsToWait;  synchronized (mutex()) {    long nowMicros = stopwatch.readMicros();    if (!canAcquire(nowMicros, timeoutMicros)) {      return false;    } else {      microsToWait = reserveAndGetWaitLength(permits, nowMicros);    }  }  stopwatch.sleepMicrosUninterruptibly(microsToWait);  return true;}private boolean canAcquire(long nowMicros, long timeoutMicros) {  return queryEarliestAvailable(nowMicros) - timeoutMicros
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

熊熊出没

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表