圆咕噜咕噜 发表于 2024-6-15 03:34:29

Redisson 分布式限流器 RRateLimiter 的使用及原理

一、根本使用

1.1 创建限流器

/**
* Returns rate limiter instance by name
*
* @param name of rate limiter
* @return RateLimiter object
*/
RRateLimiter getRateLimiter(String name);
/**
* Initializes RateLimiter's state and stores config to Redis server.
*
* @param mode - rate mode
* @param rate - rate
* @param rateInterval - rate time interval
* @param rateIntervalUnit - rate time interval unit
* @return true if rate was set and false otherwise
*/
boolean trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);
trySetRate 用于设置限流参数。此中 RateType 包罗 OVERALL 和 PER_CLIENT 两个罗列常量,分别表示全局限流和单机限流。后面三个参数表明白令牌的生成速率,即每 rateInterval 生成 rate 个令牌,rateIntervalUnit 为 rateInterval 的时间单位。
1.2 获取令牌

/**
* Acquires a specified permits from this RateLimiter,
* blocking until one is available.
*
* Acquires the given number of permits, if they are available
* and returns immediately, reducing the number of available permits
* by the given amount.
*
* @param permits the number of permits to acquire
*/
void acquire(long permits);

/**
* Acquires the given number of permits only if all are available
* within the given waiting time.
*
* Acquires the given number of permits, if all are available and returns immediately,
* with the value true, reducing the number of available permits by one.
*
* If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* the specified waiting time elapses.
*
* If a permits is acquired then the value true is returned.
*
* If the specified waiting time elapses then the value false
* is returned.If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param permits amount
* @param timeout the maximum time to wait for a permit
* @param unit the time unit of the timeout argument
* @return true if a permit was acquired and false
*         if the waiting time elapsed before a permit was acquired
*/
boolean tryAcquire(long permits, long timeout, TimeUnit unit);
acquire 和 tryAcquire 均可用于获取指定数量的令牌,不外 acquire 会阻塞等候,而 tryAcquire 会等候 timeout 时间,如果仍然没有获得指定数量的令牌直接返回 false。
1.3 使用示例

@Slf4j
@SpringBootTest
class RateLimiterTest {
   
    @Autowired
    private RedissonClient redissonClient;
   
    private static final int threadCount = 10;

    @Test
    void test() throws InterruptedException {
      RRateLimiter rateLimiter = redissonClient.getRateLimiter("my_limiter");
      rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);

      CountDownLatch latch = new CountDownLatch(threadCount);

      for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                rateLimiter.tryAcquire(5, 3, TimeUnit.SECONDS);
                latch.countDown();
                log.info("latch count {}", latch.getCount());
            }).start();
      }
      
      latch.await();
    }
}
2024-01-16 20:14:27 INFO atreus.ink.rate.RateLimiterTest : latch count 9
2024-01-16 20:14:27 INFO atreus.ink.rate.RateLimiterTest : latch count 8
2024-01-16 20:14:28 INFO atreus.ink.rate.RateLimiterTest : latch count 7
2024-01-16 20:14:29 INFO atreus.ink.rate.RateLimiterTest : latch count 6
2024-01-16 20:14:29 INFO atreus.ink.rate.RateLimiterTest : latch count 5
2024-01-16 20:14:30 INFO atreus.ink.rate.RateLimiterTest : latch count 4
2024-01-16 20:14:30 INFO atreus.ink.rate.RateLimiterTest : latch count 3
2024-01-16 20:14:30 INFO atreus.ink.rate.RateLimiterTest : latch count 2
2024-01-16 20:14:30 INFO atreus.ink.rate.RateLimiterTest : latch count 1
2024-01-16 20:14:30 INFO atreus.ink.rate.RateLimiterTest : latch count 0
二、实现原理

Redisson 的 RRateLimiter 基于令牌桶实现,令牌桶的重要特点如下:


[*]令牌以固定速率生成。
[*]生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当哀求到达时,会尝试从令牌桶中取令牌,取到了令牌的哀求可以实行。
[*]如果桶空了,那么尝试取令牌的哀求会被直接丢弃。
RRateLimiter 在创建限流器时通过下面 Lua 脚本设置限流器的相干参数:
redis.call('hsetnx', KEYS, 'rate', ARGV);
redis.call('hsetnx', KEYS, 'interval', ARGV);
return redis.call('hsetnx', KEYS, 'type', ARGV);
而获取令牌则是通过以下的 Lua 脚本实现:
-- 请求参数示例
-- KEYS my_limiter
-- KEYS {my_limiter}:value
-- KEYS {my_limiter}:permits
-- ARGV 3 本次请求的令牌数
-- ARGV 1705396021850 System.currentTimeMillis()
-- ARGV 6966135962453115904 ThreadLocalRandom.current().nextLong()

-- 读取 RRateLimiter.trySetRate 中配置的限流器信息
local rate = redis.call('hget', KEYS, 'rate');-- 10 一个时间窗口内产生的令牌数
local interval = redis.call('hget', KEYS, 'interval');-- 1000 一个时间窗口对应的毫秒数
local type = redis.call('hget', KEYS, 'type');-- 0 全局限流
assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')

local valueName = KEYS;-- {my_limiter}:value 当前可用令牌数字符串的 key
local permitsName = KEYS;-- {my_limiter}:permits 授权记录有序集合的 key

-- 单机限流配置 无需考虑
if type == '1' then
    valueName = KEYS;
    permitsName = KEYS;
end;

-- 查询当前可用的令牌数 查询失败表明是首次请求令牌
local currentValue = redis.call('get', valueName);
if currentValue == false then -- 首次请求令牌
    -- 单次请求的令牌数不能超过一个时间窗口内产生的令牌数
    assert(tonumber(rate) >= tonumber(ARGV), 'Requested permits amount could not exceed defined rate');
   
    -- 更新当前可用令牌数以及令牌授权记录 {my_limiter}:permits
    -- set {my_limiter}:permits 10
    redis.call('set', valueName, rate);
    -- zadd {my_limiter}:permits 1705396021850 6966135962453115904_1
    redis.call('zadd', permitsName, ARGV, struct.pack('fI', ARGV, ARGV));
    -- decrby {my_limiter}:permits 3
    redis.call('decrby', valueName, ARGV);
    return nil;
else -- 再次请求令牌
    -- 查询可以回收的令牌对应的授权记录 即一个时间窗口前的所有授权记录且包括一个时间窗口前这一时刻
    -- 旧令牌回收的本质是新令牌的加入 如果一个令牌是在一个时间窗口前被分配的 那经过一个时间窗口后这个空出的位置应该已经由新令牌填充
    -- zrangebyscore {my_limiter}:permits 0 1705396020850
    local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV) - interval); --
   
    -- 统计可以回收的令牌数
    local released = 0;
    for i, v in ipairs(expiredValues) do
      local random, permits = struct.unpack('fI', v);
      -- released = released + 2
      -- released = released + 5
      released = released + permits;
    end;

    -- 删除授权记录并回收令牌
    if released > 0 then
      -- zrem {my_limiter}:permits 1936135962853113704_2 536135765023123704_5
      redis.call('zrem', permitsName, unpack(expiredValues));
      currentValue = tonumber(currentValue) + released;
      -- incrby {my_limiter}:value 7
      redis.call('set', valueName, currentValue);
    end;

    if tonumber(currentValue) < tonumber(ARGV) then
      -- 如果回收后可用令牌数仍然不足 返回需要等待的时间
      -- zrangebyscore {my_limiter}:permits (1705396020850 1705396021850 withscores limit 0 1
      local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV) - interval), tonumber(ARGV), 'withscores', 'limit', 0, 1);
      local random, permits = struct.unpack('fI', nearest);
      -- 1705396021650 - 1705396021850 + 1000 = 800
      return tonumber(nearest) - (tonumber(ARGV) - interval);
    else
      
      redis.call('zadd', permitsName, ARGV, struct.pack('fI', ARGV, ARGV));
      redis.call('decrby', valueName, ARGV);
      return nil;
    end;
end;
参考:
https://github.com/oneone1995/blog/issues/13
https://www.infoq.cn/article/Qg2tX8fyw5Vt-f3HH673

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Redisson 分布式限流器 RRateLimiter 的使用及原理