详细分析Redisson的分布式锁

打印 上一主题 下一主题

主题 840|帖子 840|积分 2520

在Redisson中,锁的续期是一个关键特性,用于确保在锁的持有者仍在执行任务期间,锁不会被不测释放。

看门狗什么时间被启用

Redisson中的看门狗(watchdog)机制的行为确实与是否显式指定锁的超时时间有关。

  • lock() 方法与看门狗:


  • 当您利用 lock() 方法而不传递任何参数时,Redisson默认会启动看门狗机制。这是因为没有指定具体的锁超时时间,Redisson会以为需要自动续期锁,以防止因客户端崩溃或其他缘故原由导致锁未被释放而造成的死锁问题。

  • lock(long leaseTime, TimeUnit unit) 方法与看门狗:


  • 假如您在调用 lock() 方法时显式指定了锁的超时时间(例如 lock(5000, TimeUnit.SECONDS)),则Redisson不会启动看门狗机制。这是因为您已经指定了锁的确切逾期时间,Redisson会以为您希望在指定的时间内持有锁,而不希望自动续期。

  • tryLock() 方法与看门狗:


  • 利用 tryLock() 方法时,假如不传递 leaseTime 参数大概传递的 leaseTime 不大于0,Redisson会启动看门狗机制。这是因为看门狗机制用于在锁的持有期间自动续期,确保业务逻辑能够在锁释放前完成。
renewExpiration方法

锁的续期机制在Redisson中是自动管理的,锁的续期是基于一个定时任务的机制,定期查抄锁的状态并决定是否需要续期。具体实现为:
  1. private void renewExpiration() {
  2.     // 1、首先会从EXPIRATION_RENEWAL_MAP中获取一个值,如果为null说明锁可能已经被释放或过期,因此不需要进行续期,直接返回
  3.     ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  4.     if (ee == null) {
  5.         return;
  6.     }
  7.    
  8.     // 2、基于TimerTask实现一个定时任务,设置internalLockLeaseTime / 3的时长进行一次锁续期,也就是每10s进行一次续期。
  9.     Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
  10.         @Override
  11.         public void run(Timeout timeout) throws Exception {
  12.             // 从EXPIRATION_RENEWAL_MAP里获取一个值,检查锁是否被释放
  13.             ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  14.             // 如果为null则说明锁也被释放了,不需要续期
  15.             if (ent == null) {
  16.                 return;
  17.             }
  18.             // 如果不为null,则获取第一个thread(也就是持有锁的线程)
  19.             Long threadId = ent.getFirstThreadId();
  20.             if (threadId == null) {
  21.                 return;
  22.             }
  23.             
  24.             // 如果threadId 不为null,说明需要续期,它会异步调用renewExpirationAsync(threadId)方法来实现续期
  25.             RFuture<Boolean> future = renewExpirationAsync(threadId);
  26.             // 处理结果
  27.             future.onComplete((res, e) -> {
  28.                 // 如果有异常
  29.                 if (e != null) {
  30.                     log.error("Can't update lock " + getName() + " expiration", e);
  31.                     return;
  32.                 }
  33.                 // 如果续期成功,则会重新调用renewExpiration()方法进行下一次续期
  34.                 if (res) {
  35.                     // reschedule itself
  36.                     renewExpiration();
  37.                 }
  38.             });
  39.         }
  40.     }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
  41.    
  42.     ee.setTimeout(task);
  43. }
复制代码
具体步骤和逻辑分析
  1. ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  2.         if (ee == null) {
  3.             return;
  4.         }
复制代码
起首,从 EXPIRATION_RENEWAL_MAP 中获取当前锁的 ExpirationEntry 对象。假如该对象为null,说明锁可能已经被释放或逾期,因此不需要进行续期,直接返回。
  1. Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
  2.     @Override
  3.     public void run(Timeout timeout) throws Exception {
  4.         ...
  5.     }
  6. }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
复制代码
假如当前锁的 ExpirationEntry 对象不是null,就会继承往下执行,创建一个定时任务。这个定时任务的代码实现了一个锁的续期机制,具体步骤和逻辑分析如下:
在代码中,定时任务是通过 commandExecutor.getConnectionManager().newTimeout(...) 方法创建的,该任务的耽误时间设置为 internalLockLeaseTime / 3 毫秒,即每次续期的时间间隔。
  1. ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  2. if (ent == null) {
  3.     return;
  4. }
复制代码
在定时任务的 run 方法中,起首尝试从 EXPIRATION_RENEWAL_MAP 中获取与当前锁对应的 ExpirationEntry 实例。假如获取到的 ExpirationEntry 为 null,则说明锁已经被释放,此时无需续期,直接返回。
  1. Long threadId = ent.getFirstThreadId();
  2. if (threadId == null) {
  3.     return;
  4. }
复制代码
假如获取到的 ExpirationEntry 不为 null,说明假如锁仍旧有效,继承往下走,接下来获取持有该锁的线程 ID。假如 threadId 为 null,也说明锁可能已经被释放,直接返回。
  1. RFuture<Boolean> future = renewExpirationAsync(threadId);
复制代码
假如持有锁的线程 ID 不为 null,继承往下走,则调用 renewExpirationAsync(threadId) 方法异步续期锁的有效期。
继承进入这个renewExpirationAsync()方法,可以看到,方法的主要功能是延伸锁的有效期。下面是对这段代码的详细分析:
  1. protected RFuture<Boolean> renewExpirationAsync(long threadId) {
  2.         return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  3.                 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  4.                         "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  5.                         "return 1; " +
  6.                         "end; " +
  7.                         "return 0;",
  8.                 Collections.singletonList(getName()),
  9.                 internalLockLeaseTime, getLockName(threadId));
  10.     }
复制代码
renewExpiration()函数内部的RFuture<Boolean> future = renewExpirationAsync(threadId);又是一个关键的函数,跳入renewExpirationAsync(threadId)内部一探究竟。

  • 返回类型:RFuture 表现该方法返回一个表现异步操作结果的未来对象,终极会得到一个布尔值,指示续期操作是否成功。
  • 参数:long threadId 是持有锁的线程 ID,用于标识当前续期操作是否适用于该线程。
这个renewExpirationAsync()是一个异步革新有效期的函数,它主要是用evaLWriteAsync()方法来异步执行一段Lua脚本,重置当前threadId线程持有的锁的有效期。也就是说该方法负责执行给定的Lua脚本,以实现分布式锁的续期。

  • KEYS[1]:代表锁的名称,即 Redis 键。
  • ARGV[1]:引用传入的第一个非键参数,表现希望设置的新逾期时间(毫秒),锁的默认租约时间为internalLockLeaseTime。
  • ARGV[2]:引用传入的第二个非键参数,表现通过getLockName(threadId)根据线程ID天生特定的锁标识符,确保操作的是特定线程的锁。简单说就是持有锁的线程id。
  • getName():获取当前锁的名称,用于作为Redis中的键。
  • LongCodec.INSTANCE:编码器,指示如何处置处罚数据的序列化与反序列化。
  • RedisCommands.EVAL_BOOLEAN:表现执行的命令类型,这里是执行一个返回布尔值的Lua脚本。
Lua脚本中,起首执行redis.call('hexists', KEYS[1], ARGV[2]) == 1,该命令查抄锁的名称KEYS[1]下是否存在持有该锁的线程ID(ARGV[1])。假如存在,说明该线程仍旧是锁的持有者,则调用pexpire命令redis.call('pexpire', KEYS[1], ARGV[1])更新锁的逾期时间。假如续期成功,返回1,否则返回0。
因此,Lua脚本中的整体逻辑是假如当前key存在,说明当前锁还被该线程持有,那么就重置逾期时间为30s,并返回true表现续期成功,反之返回false。
这段代码的设计充分利用了Redis的Lua脚本特性,实现了高效且原子化的锁续期逻辑,减少了并发操作中的 race condition 问题,同时提供了异步执行的能力,提拔了系统的响应性和性能。
然后,我们退回到renewExpiration()方法中,继承往下走,
  1. future.onComplete((res, e) -> {
  2.     if (e != null) {
  3.         log.error("Can't update lock " + getName() + " expiration", e);
  4.         return;
  5.     }
  6.     if (res) {
  7.         renewExpiration();
  8.     }
  9. });
复制代码
通过 onComplete 方法处置处罚续期操作的结果,假如e 不为 null,说明有异常则记录错误日志。假如res 为 true,说明续期成功则调用 renewExpiration() 方法,安排下一次的续期操作。
总结一下,整体流程就是,在代码中,定时任务是通过 commandExecutor.getConnectionManager().newTimeout(...) 方法创建的。该任务会在指定的时间(internalLockLeaseTime / 3 毫秒)后执行一次。每当任务执行时,都会查抄当前锁的状态,并尝试续期。假如需要续期(即锁仍旧有效),则会调用 renewExpiration() 方法。
为什么需要递归调用?

在锁的实现中,为了确保锁在持有者处置处罚任务期间保持有效,通常会设置一个有效期(lease time)。在有效期内,假如持有锁的线程仍旧在执行任务,那么它需要定期续期,以防止在任务完成前锁逾期,从而导致其他线程获取锁。
递归调用的机制:在 run 方法的最后,假如续期成功,调用 renewExpiration() 方法。这通常意味着该方法会重新安排另一个定时任务,相当于在每次续期后再次创建一个新的定时任务,使得续期操作可以连续进行。这种递归调用的方式确保了只要锁仍旧被持有,续期操作就会不停地被调度,从而保持锁的有效性。
定时任务的生命周期?

每个定时任务的生命周期是短暂的,完成一次 run 方法的执行后,该任务就结束了。然后,通过递归调用,可能会创建新的定时任务,从而继承续期。
(1)任务通过 newTimeout 被创建,并且首次执行会在 internalLockLeaseTime / 3 毫秒后触发。这个时间间隔确保了任务在锁的生命周期的早期进行查抄和续期。此时,任务进入其生命周期,准备执行。
(2)当定时任务第一次执行时,run() 方法被调用。它主要的任务是:

  • 从 EXPIRATION_RENEWAL_MAP 获取锁的状态。
  • 假如锁被释放(ent == null),任务直接返回,不再进行续期。
  • 假如锁仍旧存在并且当前线程持有锁(threadId != null),则异步调用 renewExpirationAsync(threadId) 来续期锁。
  • 在续期的异步任务完成后,假如续期成功(res == true),会重新调用 renewExpiration() 进行下一次续期。
(3)续期条件:假如任务成功续期,它会在异步任务的 onComplete 回调中再次调用 renewExpiration() 方法。renewExpiration() 负责创建一个新的定时任务,这意味着每次任务续期成功后,系统会重新调度一个新的定时任务,以确保锁的有效期能够连续。
这个 renewExpiration() 方法的调用实际上是递归调用新的定时任务,续期继承进行下去。每次任务执行后,都可能会创建一个新的任务,直到锁被释放。
(3)定时任务的生命周期可能在以下情况下终止:

  • 锁被释放:当 EXPIRATION_RENEWAL_MAP.get(getEntryName()) 返回 null,表现锁已经被释放,定时任务会制止续期,不再创建新的定时任务。
  • 无持有锁的线程:假如没有线程持有锁(即 threadId == null),任务也会制止续期。
  • 异步任务失败:假如续期的异步任务失败(例如网络问题、数据库问题等),则可能无法继承续期。不外在代码中,假如发生异常,它只会记录错误,并不会立刻制止整个续期机制,但终极续期将会失败并终止。
定时任务的生命周期从它的创建开始,通过定期执行查抄和续期,直到锁被释放或没有线程持有锁时,任务才会制止。每次续期成功后,新的定时任务会继承执行,确保锁的有效期在持锁线程存在时不会逾期。
因此,虽然定时任务会被创建并执行,但它的执行是基于持锁状态的,只有在锁有效且持有者仍在执行任务的情况下才会连续进行续期。这个设计确保了资源的有效管理,避免不必要的续期操作。
gpt总结

在 Redisson 中,renewExpiration() 方法是分布式锁续期机制的核心部分。该方法的主要作用是延伸锁的逾期时间,确保持有锁的线程在执行任务期间不会因为锁的超时而被其他线程抢占。下面将详细分析 renewExpiration() 方法的实现及其工作原理。
1. 方法概述

renewExpiration() 方法通常是在锁被持偶尔定期调用的,用于更新 Redis 中存储的锁状态,以防止锁逾期。该方法的根本逻辑是:

  • 查抄当前线程是否仍旧持有锁。
  • 假如是,则更新锁的逾期时间。
2. 实现细节

以下是 renewExpiration() 方法的一些关键点和步骤(具体实现可能会因版本而异):
2.1 锁的状态查抄

在调用 renewExpiration() 方法之前,Redisson 会起首确认当前线程是否持有锁。这通常通过查抄与锁相干的标识符(如线程 ID 或 UUID)来实现。
  1. if (!isHeldByCurrentThread()) {
  2.     return;
  3. }
复制代码
2.2 获取当前锁的逾期时间

假如当前线程确实持有锁,接下来会获取当前锁的逾期时间。这个时间通常是通过 Redis 中存储的键值对来读取的。
2.3 更新逾期时间

一旦确认当前线程持有锁并获取了逾期时间,Redisson 将利用 Redis 的命令(如 EXPIRE 或 SETEX)来更新锁的逾期时间。例如:
  1. redisCommands.expire(lockKey, newExpirationTime);
复制代码
这里的 newExpirationTime 是根据配置或策略盘算得出的新的逾期时间。
2.4 异常处置处罚

在执行续期操作时,Redisson 还需要处置处罚可能发生的异常,例如网络问题或 Redis 服务不可用等。适当的异常处置处罚可以确保系统的健壮性和稳定性。
3. 续期策略

Redisson 通常会在后台启动一个线程定期调用 renewExpiration() 方法。续期的频率和锁的逾期时间可以通过配置进行调整,常见的做法是设置续期时间为锁逾期时间的一半,以确保续期操作在锁到期之前完成。
4. 总结

renewExpiration() 方法在 Redisson 分布式锁中起到了至关重要的作用,它通过连续更新锁的逾期时间,避免了由于任务执行时间过长导致的锁自动释放。公道地配置续期策略可以大幅提拔分布式系统的稳定性和数据一致性。
示例代码

以下是一个简化的示例,展示了如何在 Redisson 中利用分布式锁以及续期机制:
  1. RLock lock = redisson.getLock("myLock");
  2. try {
  3.     // 尝试获取锁,最多等待 10 秒,锁自动过期时间为 30 秒
  4.     if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
  5.         // 执行任务
  6.         // ...
  7.         // 定期续期
  8.         ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
  9.         executor.scheduleAtFixedRate(() -> {
  10.             lock.renewExpiration(); // 续期
  11.         }, 10, 10, TimeUnit.SECONDS); // 每 10 秒续期一次
  12.     }
  13. } finally {
  14.     if (lock.isHeldByCurrentThread()) {
  15.         lock.unlock(); // 确保释放锁
  16.     }
  17. }
复制代码
注意事项


  • 利用分布式锁时,要注意锁的利用场景,避免产生死锁或性能瓶颈。
  • 配置公道的逾期时间和续期策略,以适应差别的业务需求。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

小小小幸运

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表