深入解析 Redisson 分布式限流器 RRateLimiter 的原理与实现 ...

打印 上一主题 下一主题

主题 1019|帖子 1019|积分 3057

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
RRateLimiter 介绍

  在分布式体系中,限流(Rate Limiting)是保障体系稳定性、避免过载的紧张机制。Redisson 作为一个功能强大的 Redis 客户端,不仅提供了广泛使用的分布式锁,还包罗了许多其他实用的分布式工具。其中,RRateLimiter 是 Redisson 提供的分布式限流器,功能强大。本文将具体解析 RRateLimiter 的原理,深入明白其工作机制。
   代码实现

  首先,通过一个简单的示例了解如何使用 RRateLimiter,它创建了一个限流器并启动多个线程来获取令牌:
  1. import org.redisson.Redisson; // 导入 Redisson 的核心类,用于创建 Redisson 客户端
  2. import org.redisson.api.RRateLimiter; // 导入 RRateLimiter 接口,用于实现分布式限流
  3. import org.redisson.api.RedissonClient; // 导入 RedissonClient 接口,用于与 Redis 进行交互
  4. import org.redisson.config.Config; // 导入 Redisson 的配置类,用于配置 Redis 连接
  5. import java.util.concurrent.CountDownLatch; // 导入 CountDownLatch 类,用于控制线程同步
  6. public class RateLimiterDemo { // 定义一个公共类 RateLimiterDemo
  7.     public static void main(String[] args) throws InterruptedException { // 主方法,程序入口,可能抛出 InterruptedException
  8.         RRateLimiter rateLimiter = createRateLimiter(); // 创建一个 RRateLimiter 实例
  9.         int totalThreads = 20; // 定义总线程数为 20
  10.         CountDownLatch latch = new CountDownLatch(totalThreads); // 创建一个 CountDownLatch 实例,初始计数为 totalThreads
  11.         long startTime = System.currentTimeMillis(); // 记录开始时间,用于计算总耗时
  12.         for (int i = 0; i < totalThreads; i++) { // 循环创建并启动 20 个线程
  13.             new Thread(() -> { // 创建一个新线程
  14.                 rateLimiter.acquire(1); // 每个线程尝试获取 1 个令牌,若令牌不足则阻塞等待
  15.                 latch.countDown(); // 线程完成后,调用 countDown() 方法减少计数器
  16.             }).start(); // 启动线程
  17.         }
  18.         latch.await(); // 主线程等待,直到所有子线程完成
  19.         System.out.println("Total elapsed time: " + (System.currentTimeMillis() - startTime) + " ms"); // 打印总耗时
  20.     }
  21.     /**
  22.      * 创建并配置 RRateLimiter 的方法
  23.      *
  24.      * @return 配置好的 RRateLimiter 实例
  25.      */
  26.     private static RRateLimiter createRateLimiter() { // 创建并配置 RRateLimiter 的方法
  27.         Config config = new Config(); // 创建一个新的 Redisson 配置实例
  28.         config.useSingleServer() // 配置使用单一 Redis 服务器
  29.               .setAddress("redis://127.0.0.1:6379") // 设置 Redis 服务器地址
  30.               .setTimeout(1000000); // 设置连接超时时间(毫秒)
  31.         RedissonClient redisson = Redisson.create(config); // 根据配置创建一个 Redisson 客户端实例
  32.         RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter"); // 获取名为 "myRateLimiter" 的 RRateLimiter 实例
  33.         rateLimiter.trySetRate(RRateLimiter.RateType.OVERALL, 1, 1, RateIntervalUnit.SECONDS); // 初始化限流器,设置全局速率为每秒 1 个令牌
  34.         return rateLimiter; // 返回配置好的限流器实例
  35.     }
  36. }
复制代码
  Lua 脚本

  为了更深入地明白 RRateLimiter 的工作原理,将进一步解析其底层的 Lua 脚本,实现分布式限流的核心逻辑。以下内容将逐行解释 Lua 脚本的功能和实现细节。
  1. redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]); -- 将速率设置到哈希表中,只有当 'rate' 字段不存在时才设置
  2. redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]); -- 将时间区间设置到哈希表中,只有当 'interval' 字段不存在时才设置
  3. return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]); -- 将类型设置到哈希表中,只有当 'type' 字段不存在时才设置,并返回结果
  4. -- ARGV[1] 为请求令牌数
  5. -- ARGV[2] 为请求时间戳
  6. -- ARGV[3] 为请求类型
  7. -- 获取限流器的速率、时间区间和类型
  8. local rate = redis.call("hget", KEYS[1], "rate") -- 从哈希表中获取速率
  9. local interval = redis.call("hget", KEYS[1], "interval") -- 获取时间区间(毫秒)
  10. local type = redis.call("hget", KEYS[1], "type") -- 获取限流器的类型(单机或集群)
  11. assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized") -- 确保限流器已初始化
  12. -- 默认情况下,使用 {name}:value 和 {name}:permits
  13. local valueName = KEYS[2] -- 当前令牌数的键名
  14. local permitsName = KEYS[4] -- 记录请求的有序集合键名
  15. -- 如果类型为 "1"(单机模式),则使用不同的键名
  16. if type == "1" then
  17.     valueName = KEYS[3] -- 单机模式下的令牌数键名
  18.     permitsName = KEYS[5] -- 单机模式下的有序集合键名
  19. end
  20. -- 确保请求的令牌数不超过限流器的速率
  21. assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")
  22. -- 获取当前剩余的令牌数
  23. local currentValue = redis.call("get", valueName)
  24. -- 第一次请求直接走else
  25. -- 第二次请求因为 valueName 更新有值,走if
  26. if currentValue ~= false then
  27.     -- 获取已过期的请求(初始时间 至 (当前时间(ARGV[2])-时间间隔(interval)) 准备清理失效的令牌数据
  28.     local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
  29.     local released = 0 -- 初始化拟新增失效令牌数
  30.     -- 遍历过期的请求,释放相应的令牌
  31.     for i, v in ipairs(expiredValues) do
  32.         local random, permits = struct.unpack("fI", v)
  33.         released = released + permits
  34.     end
  35.     -- 如果有释放的令牌,更新当前可用令牌数并移除过期的请求
  36.     if released > 0 then
  37.         redis.call("zrem", permitsName, unpack(expiredValues)) -- 清除 permitsName 中包含 expiredValues 的数据
  38.         currentValue = tonumber(currentValue) + released -- 清理失效令牌后计算总可用令牌数
  39.         redis.call("set", valueName, currentValue) -- 更新可用令牌
  40.     end
  41.     -- 如果当前令牌数不足以满足请求  
  42.     if tonumber(currentValue) < tonumber(ARGV[1]) then
  43.         -- 计算需要等待的时间
  44.         local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1) -- 找到最近一次的请求时间 nearest
  45.         local random, permits = struct.unpack("fI", nearest[1]) -- 解压为时间戳+请求令牌数
  46.         -- 返回等待时间,也可以写为 tonumber(nearest[2])+interval-tonumber(ARGV[2])
  47.         return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval) -- nearest[2] 为上行的 random
  48.     else
  49.         -- 当前可用令牌数足够,记录此次请求并减少可用令牌数
  50.         redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1])) -- 记录请求记录
  51.         redis.call("decrby", valueName, ARGV[1]) -- 更新可用令牌数 valueName -= ARGV[1](请求令牌数)
  52.         return nil -- 成功获取令牌
  53.     end
  54. else
  55.     -- 第一次请求,初始化令牌数和有序集合
  56.     redis.call("set", valueName, rate) -- 设置当前令牌数为最大速率值
  57.     redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1])) -- 记录请求记录
  58.     redis.call("decrby", valueName, ARGV[1]) -- 更新可用令牌数 valueName -= ARGV[1](请求令牌数)
  59.     return nil -- 成功获取令牌
  60. end
复制代码
  现实场景

  冰雪大世界的热门项目天天吸引着继续不停的顾客。为了避免人流过于集中,影响顾客的体验和项目的正常运行,管理团队制定了以下规则:


  • 每小时只欢迎6位客人。
  • 每位客人在进入项目嬉戏,一个小时后自动将入场票归还到废票区,确保不影响后续客人的入场。
    限流机制的设置
1. 初始化限流器

项目天天一开始,第一位客人进入嬉戏时,体系会进行以下操作:


  • 统计体系剩余票数目:记载为(valueName),代表同一时间段内的最大客容量。
  • 记载每次申请的客人及进场时间:存储在(permitsName)中。
  • 革新现实剩余票数目:更新为(currentValue = valueName),确保体系实时掌握当前剩余的入场票数。
    通过这些步骤,体系为当天的限流工作做好了准备。
2. 限流器应用场景(客人申请嬉戏流程)

当一位客人申请嬉戏项目时,体系会按照以下流程操作:
步骤一:查询可用票


  • 计算现实剩余票(currentValue = valueName)。
步骤二:回收废票


  • 从废票区 根据入场记载(permitsName)计算(当前时间-时间间隔)之前的所有废弃入场票(released),这意味着已进入嬉戏的客人在体系时间间隔后已不再影响项目后续游客的体验,归还的票可以重新使用。
  • 更新现实剩余票(currentValue):将回收的票数加到现实剩余票(currentValue += released)
  • 更新体系剩余票(valueName):(valueName = currentValue),确保体系知道当前有多少可用的入场票,反映最新的入场票状态。
步骤三:判断票数是否充足


  • 检查现实剩余票 (currentValue):与当前游客申请票数(tonumber(ARGV[1]))进行比较。

    • 如果票够用:

      • 记载此次哀求:将客人的申请信息和进场时间记载到(permitsName)中。
      • 更新体系剩余票:(valueName -= 申请票数)中扣除相应的票数。
      • 允许客人进入:客人成功进入项目嬉戏。

    • 如果票不敷:

      • 计算等候时间:根据上一位客人的入场时间和设定的时间间隔,计算出客人必要等候时间(上一位客人的入场时间+间隔时间-当前时间)。
      • 告知客人:将计算出的等候时间返回给客人,游客异步再尝试进入。



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

玛卡巴卡的卡巴卡玛

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表