Redisson源码解读-分布式锁

  金牌会员 | 2022-11-7 16:11:28 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 987|帖子 987|积分 2961

前言

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。Redisson有一样功能是可重入的分布式锁。本文来讨论一下这个功能的特点以及源码分析。
前置知识

在讲Redisson,咱们先来聊聊分布式锁的特点以及Redis的发布/订阅机制,磨刀不误砍柴工。
分布式锁的思考

首先思考下,如果我们自己去实现一个分布式锁,这个锁需要具备哪些功能?

  • 互斥(这是一个锁最基本的功能)
  • 锁失效机制(也就是可以设置锁定时长,防止死锁)
  • 高性能、高可用
  • 阻塞、非阻塞
  • 可重入、公平锁
  • 。。。
可见,实现一个分布式锁,需要考虑的东西有很多。那么,如果用Redis来实现分布式锁呢?如果只需要具备上面说的1、2点功能,要怎么写?(ps:我就不写了,自己想去)
Redis订阅/发布机制

Redisson中用到了Redis的订阅/发布机制,下面简单介绍下。
简单来说就是如果client2 、 client5 和 client1 订阅了 channel1,当有消息发布到 channel1 的时候,client2 、 client5 和 client1 都会收到这个消息。

图片来自 菜鸟教程-Redis发布订阅
Redisson

源码版本:3.17.7
下面以Redisson官方的可重入同步锁例子为入口,解读下源码。
  1. RLock lock = redisson.getLock("anyLock");
  2. // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
  3. boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
  4. if (res) {
  5.    try {
  6.      ...
  7.    } finally {
  8.        lock.unlock();
  9.    }
  10. }
复制代码
加锁

我用时序图来表示加锁和订阅的过程。时序图中括号后面的c1、c2代表client1,client2

当线程2获取了锁但还没释放锁时,如果线程1去获取锁,会阻塞等待,直到线程2解锁,通过Redis的发布订阅机制唤醒线程1,再次去获取锁。
加锁方法是 lock.tryLock(100, 10, TimeUnit.SECONDS),对应着就是RedissonLock#tryLock
  1. /**
  2. * 获取锁
  3. * @param waitTime  尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
  4. * @param leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
  5. * @param unit 时间单位
  6. * @return 获取锁成功返回true,失败返回false
  7. */
  8. @Override
  9. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  10.     long time = unit.toMillis(waitTime);
  11.     long current = System.currentTimeMillis();// 当前时间
  12.     long threadId = Thread.currentThread().getId();// 当前线程id
  13.     // 尝试加锁,加锁成功返回null,失败返回锁的剩余超时时间
  14.     Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
  15.     // 获取锁成功
  16.     if (ttl == null) {
  17.         return true;
  18.     }
  19.     // time小于0代表此时已经超过获取锁的等待时间,直接返回false
  20.     time -= System.currentTimeMillis() - current;
  21.     if (time <= 0) {
  22.         // 没看懂这个方法,里面里面空运行,有知道的大神还请不吝赐教
  23.         acquireFailed(waitTime, unit, threadId);
  24.         return false;
  25.     }
  26.    
  27.     current = System.currentTimeMillis();
  28.     CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
  29.     try {
  30.         subscribeFuture.get(time, TimeUnit.MILLISECONDS);
  31.     } catch (TimeoutException e) {
  32.         if (!subscribeFuture.cancel(false)) {
  33.             subscribeFuture.whenComplete((res, ex) -> {
  34.                 // 出现异常,取消订阅
  35.                 if (ex == null) {
  36.                     unsubscribe(res, threadId);
  37.                 }
  38.             });
  39.         }
  40.         acquireFailed(waitTime, unit, threadId);
  41.         return false;
  42.     } catch (ExecutionException e) {
  43.         acquireFailed(waitTime, unit, threadId);
  44.         return false;
  45.     }
  46.     try {
  47.         // 判断是否超时(超过了waitTime)
  48.         time -= System.currentTimeMillis() - current;
  49.         if (time <= 0) {
  50.             acquireFailed(waitTime, unit, threadId);
  51.             return false;
  52.         }
  53.    
  54.         while (true) {
  55.             // 再次获取锁,成功则返回
  56.             long currentTime = System.currentTimeMillis();
  57.             ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
  58.             // lock acquired
  59.             if (ttl == null) {
  60.                 return true;
  61.             }
  62.             time -= System.currentTimeMillis() - currentTime;
  63.             if (time <= 0) {
  64.                 acquireFailed(waitTime, unit, threadId);
  65.                 return false;
  66.             }
  67.             // 阻塞等待信号量唤醒或者超时,接收到订阅时唤醒
  68.             // 使用的是Semaphore#tryAcquire()
  69.             currentTime = System.currentTimeMillis();
  70.             if (ttl >= 0 && ttl < time) {
  71.                 commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  72.             } else {
  73.                 commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
  74.             }
  75.             time -= System.currentTimeMillis() - currentTime;
  76.             if (time <= 0) {
  77.                 acquireFailed(waitTime, unit, threadId);
  78.                 return false;
  79.             }
  80.         }
  81.     } finally {
  82.         // 因为是同步操作,所以无论加锁成功或失败,都取消订阅
  83.         unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
  84.     }
  85. //        return get(tryLockAsync(waitTime, leaseTime, unit));
  86. }
复制代码
在tryAcquireAsync方法中,主要分为两段逻辑:

  • 调用lua脚本加锁:tryLockInnerAsync
  • 看门狗:scheduleExpirationRenewal
看门狗在后面讲,本小节重点还是在加锁
  1. private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  2.     return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
  3. }
  4. private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  5.     RFuture<Long> ttlRemainingFuture;
  6.     if (leaseTime > 0) {
  7.         // 调用lua脚本,尝试加锁
  8.         ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  9.     } else {
  10.         // 这里的if、else的区别就在于,如果没有设置leaseTime,就使用默认的internalLockLeaseTime(默认30秒)
  11.         ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
  12.                 TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  13.     }
  14.     CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
  15.         // lock acquired
  16.         // 如果ttlRemaining为空,也就是tryLockInnerAsync方法中的lua执行结果返回空,证明获取锁成功
  17.         if (ttlRemaining == null) {
  18.             if (leaseTime > 0) {
  19.                 internalLockLeaseTime = unit.toMillis(leaseTime);
  20.             } else {
  21.                 // 如果没有设置锁的持有时间(leaseTime),则启动看门狗,定时给锁续期,防止业务逻辑未执行完成锁就过期了
  22.                 scheduleExpirationRenewal(threadId);
  23.             }
  24.         }
  25.         return ttlRemaining;
  26.     });
  27.     return new CompletableFutureWrapper<>(f);
  28. }
复制代码
Redisson使用了 Hash 结构来表示一个锁,这样 Hash 里面的 key 为线程id,value 为锁的次数。这样巧妙地解决了可重入锁的问题。
下面我们来分析下这段 lua 脚本的逻辑(下面说的threadId都是指变量,不是说key就叫’threadId’):

  • 如果锁(hash结构)不存在,则创建,并添加一个键值对 (threadId : 1),并设置锁的过期时间
  • 如果锁存在,则将键值对 threadId 对应的值 + 1,并设置锁的过期时间
  • 如果不如何1,2点,则返回锁的剩余过期时间
订阅

让我们把视线重新回到RedissonLock#tryLock中,当经过一些尝试获取锁,超时判断之后,代码来到while循环中。这个while循环是个死循环,只有成功获取锁或者超时,才会退出。一般死循环的设计中,都会有阻塞等待的代码,否则如果循环中的逻辑短时间拿不到结果,会造成资源抢占和浪费。阻塞代码就是下面这段
  1. // RedissonLock#tryLockInnerAsync
  2. <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  3.     return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
  4.             "if (redis.call('exists', KEYS[1]) == 0) then " +
  5.                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  6.                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  7.                     "return nil; " +
  8.                     "end; " +
  9.                     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  10.                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  11.                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  12.                     "return nil; " +
  13.                     "end; " +
  14.                     "return redis.call('pttl', KEYS[1]);",
  15.             Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
  16. }
复制代码
commandExecutor.getNow(subscribeFuture).getLatch() 得到的是一个Semaphore信号量对象,这是jdk的内置对象,Semaphore#tryAcquire表示阻塞并等待唤醒。那么信号量什么时候被唤醒呢?在订阅方法中RedissonLock#subscribe。订阅方法的逻辑也不少,咱们直接讲其最终调用的处理方法
  1. if (ttl >= 0 && ttl < time) {
  2.     commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  3. } else {
  4.     commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
  5. }
复制代码
value.getLatch().release() 也就是Semaphore#release ,会唤醒Semaphore#tryAcquire阻塞的线程
解锁

上面我们聊了加锁,本小节来聊下解锁。调用路径如下
  1. // LockPubSub#onMessage
  2. protected void onMessage(RedissonLockEntry value, Long message) {
  3.     // 普通的解锁走的是这个
  4.     if (message.equals(UNLOCK_MESSAGE)) {
  5.         Runnable runnableToExecute = value.getListeners().poll();
  6.         if (runnableToExecute != null) {
  7.             runnableToExecute.run();
  8.         }
  9.         // 这里就是唤醒信号量的地方
  10.         value.getLatch().release();
  11.     // 这个是读写锁用的
  12.     } else if (message.equals(READ_UNLOCK_MESSAGE)) {
  13.         while (true) {
  14.             Runnable runnableToExecute = value.getListeners().poll();
  15.             if (runnableToExecute == null) {
  16.                 break;
  17.             }
  18.             runnableToExecute.run();
  19.         }
  20.         value.getLatch().release(value.getLatch().getQueueLength());
  21.     }
  22. }
复制代码
解锁的逻辑不复杂,调用lua脚本解锁以及取消看门狗。看门狗晚点说,先说下lua解锁
  1. // RedissonLock#unlock
  2. // RedissonBaseLock#unlockAsync(long threadId)
  3. public RFuture<Void> unlockAsync(long threadId) {
  4.     // 调用lua解锁
  5.     RFuture<Boolean> future = unlockInnerAsync(threadId);
  6.     CompletionStage<Void> f = future.handle((opStatus, e) -> {
  7.         // 取消看门狗
  8.         cancelExpirationRenewal(threadId);
  9.         if (e != null) {
  10.             throw new CompletionException(e);
  11.         }
  12.         if (opStatus == null) {
  13.             IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
  14.                     + id + " thread-id: " + threadId);
  15.             throw new CompletionException(cause);
  16.         }
  17.         return null;
  18.     });
  19.     return new CompletableFutureWrapper<>(f);
  20. }
复制代码
老规矩,分析下这段lua:

  • 如果锁不存在,返回null
  • 锁的值减1,如果锁的值大于0(也就是可重入锁仍然有加锁次数),则重新设置过期时间
  • 如果锁的值小于等于0,这说明可以真正解锁了,删除锁并通过发布订阅机制发布解锁消息
从 lua 中可以看到,解锁时会发布消息到 channel 中,加锁方法RedissonLock#tryLock中有相对应的订阅操作。
看门狗

试想一个场景:程序执行需要10秒,程序执行完成才去解锁,而锁的存活时间只有5秒,也就是程序执行到一半的时候锁就可以被其他程序获取了,这显然不合适。那么怎么解决呢?

  • 方式一:锁永远存在,直到解锁。不设置存活时间。
    这种方法的弊端在于,如果程序没解锁就挂了,锁就成了死锁
  • 方式二:依然设置锁存活时间,但是监控程序的执行,如果程序还没有执行完成,则定期给锁续期。
方式二就是Redisson的看门狗机制。看门狗只有在没有显示指定锁的持有时间(leaseTime)时才会生效。
  1. // RedissonLock#unlockInnerAsync
  2. protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  3.     return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  4.             "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  5.                     "return nil;" +
  6.                     "end; " +
  7.                     "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  8.                     "if (counter > 0) then " +
  9.                     "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  10.                     "return 0; " +
  11.                     "else " +
  12.                     "redis.call('del', KEYS[1]); " +
  13.                     "redis.call('publish', KEYS[2], ARGV[1]); " +
  14.                     "return 1; " +
  15.                     "end; " +
  16.                     "return nil;",
  17.             Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
  18. }
复制代码
scheduleExpirationRenewal 方法处理了ExpirationEntry和如果出现异常则取消看门狗,具体看门狗逻辑在 renewExpiration 方法中
  1. // RedissonLock#tryAcquireAsync
  2. // RedissonBaseLock#scheduleExpirationRenewal
  3. protected void scheduleExpirationRenewal(long threadId) {
  4.     // 创建ExpirationEntry,并放入EXPIRATION_RENEWAL_MAP中,下面的renewExpiration()方法会从map中再拿出来用
  5.     ExpirationEntry entry = new ExpirationEntry();
  6.     ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
  7.     if (oldEntry != null) {
  8.         oldEntry.addThreadId(threadId);
  9.     } else {
  10.         entry.addThreadId(threadId);
  11.         try {
  12.             // 看门狗的具体逻辑
  13.             renewExpiration();
  14.         } finally {
  15.             // 如果线程被中断了,就取消看门狗
  16.             if (Thread.currentThread().isInterrupted()) {
  17.                 // 取消看门狗
  18.                 cancelExpirationRenewal(threadId);
  19.             }
  20.         }
  21.     }
  22. }
复制代码
Timeout 是一个延时任务,延时 internalLockLeaseTime / 3 时间执行。任务的内容主要是通过 renewExpirationAsync 方法对锁进行续期,如果续期失败(解锁了、锁到期等),则取消看门狗,如果续期成功,则递归 renewExpiration 方法,继续创建延时任务。
internalLockLeaseTime 也就是 lockWatchdogTimeout 参数,默认是 30 秒。
总结

本文介绍了Redisson的加锁、解锁、看门狗机制,以及对Redis发布订阅机制的应用。因为篇幅有限,很多细节聊得不够深入。此外Redisson的异步机制、对Netty的使用等都是很值得水文章的。
参考资料
万字长文带你解读Redisson分布式锁的源码 - 知乎 (zhihu.com)
Redis分布式锁-这一篇全了解(Redission实现分布式锁完美方案)

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表