【源码阅读】Redisson lock源码

打印 上一主题 下一主题

主题 551|帖子 551|积分 1653

目录
底层原理
加锁机制
锁互斥机制
可重入锁机制
总结 

Redisson 加锁非常简单,还支持 redis 单实例、redis 哨兵、redis cluster、redis master-slave 等各种部署架构
  1. RLock lock = redisson.getLock("cyk-test");
  2. lock.lock();
  3. lock.unlock();
复制代码
底层原理



加锁机制

废话不多说,直接看源码,下面的代码先不看,先看 tryAcquire 是怎样获取锁的
  1. private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  2.         long threadId = Thread.currentThread().getId();
  3.         Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
  4.         // lock acquired
  5.         if (ttl == null) {
  6.             return;
  7.         }
  8.         CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
  9.         pubSub.timeout(future);
  10.         RedissonLockEntry entry;
  11.         if (interruptibly) {
  12.             entry = commandExecutor.getInterrupted(future);
  13.         } else {
  14.             entry = commandExecutor.get(future);
  15.         }
  16.         ...
  17. }
复制代码
查看 tryAcquire 方法,点进去看发现调用了 tryAcquireAsync0 方法,这里 RFuture 继承自 java.util.concurrent.Future,表现这是一个异步的使命,get 方法会同步获取效果
  1. private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  2.     return get(tryAcquireAsync0(waitTime, leaseTime, unit, threadId));
  3. }
  4. private RFuture<Long> tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  5.     return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId));
  6. }
复制代码
查看 tryAcquireAsync 方法
  1.     private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  2.         RFuture<Long> ttlRemainingFuture;
  3.         if (leaseTime > 0) {
  4.             ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  5.         } else {
  6.             ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
  7.                     TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  8.         }
  9.         ...
  10.     }
复制代码
查看 tryLockInnerAsync 方法
  1.     <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  2.         return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
  3.                 "if ((redis.call('exists', KEYS[1]) == 0) " +
  4.                             "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
  5.                         "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  6.                         "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  7.                         "return nil; " +
  8.                     "end; " +
  9.                     "return redis.call('pttl', KEYS[1]);",
  10.                 Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
  11.     }
复制代码
这是一段加锁的 lua 脚本,用于包管原子性,参数解释如下:


  • KEYS[1] 表现加锁的 key
  • ARGV[1] 表现锁 key 的默认超时时间
  • ARGV[2] 表现加锁的客户端 ID,由 UUID:线程 ID 构成
客户端在第一次加锁完成,会设置一个 key 为客户端 ID,value 为加锁次数的 hash 数据结构:


现在知道了内部方法的逻辑,往回倒一步,重点看我加在代码中的注释
  1.     private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  2.         RFuture<Long> ttlRemainingFuture;
  3.         if (leaseTime > 0) {
  4.             ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  5.         } else {
  6.             ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
  7.                     TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  8.         }
  9.         // 这里是定义了加锁Lua脚本的异步任务,通过CompletableFuture编排
  10.         CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
  11.         ttlRemainingFuture = new CompletableFutureWrapper<>(s);
  12.         // 这个f依赖ttlRemainingFuture的结果,如果入参的leaseTime<=0会触发看门狗机制
  13.         // 否则按照设置的过期时间来过期
  14.         CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
  15.             // lock acquired
  16.             if (ttlRemaining == null) {
  17.                 if (leaseTime > 0) {
  18.                     internalLockLeaseTime = unit.toMillis(leaseTime);
  19.                 } else {
  20.                     scheduleExpirationRenewal(threadId);
  21.                 }
  22.             }
  23.             return ttlRemaining;
  24.         });
  25.         // 返回编排好的CompletableFuture
  26.         return new CompletableFutureWrapper<>(f);
  27.     }
复制代码
把 RFuture 返回以后,就到了 get 方法阻塞获取方法这里
  1. private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  2.     return get(tryAcquireAsync0(waitTime, leaseTime, unit, threadId));
  3. }
  4. private RFuture<Long> tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  5.     return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId));
  6. }
复制代码
最后回到了这里
  1.     private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  2.         long threadId = Thread.currentThread().getId();
  3.         Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
  4.         // lock acquired
  5.         if (ttl == null) {
  6.             return;
  7.         }
  8.         // 这里的 subscribe 就是 Redis 订阅解锁的 lua 脚本中的 publish
  9.         CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
  10.         pubSub.timeout(future);
  11.         RedissonLockEntry entry;
  12.         if (interruptibly) {
  13.             entry = commandExecutor.getInterrupted(future);
  14.         } else {
  15.             entry = commandExecutor.get(future);
  16.         }
  17.         ...
  18.     }
复制代码
接着看下面循环获取锁的逻辑
  1.         try {
  2.            while (true) {
  3.                 // 自旋尝试获取锁
  4.                ttl = tryAcquire(-1, leaseTime, unit, threadId);
  5.                // 看Lua脚本,ttl为null说明锁获取到了
  6.                if (ttl == null) {
  7.                    break;
  8.                }
  9.                // waiting for message
  10.                if (ttl >= 0) {
  11.                    try {
  12.                        // 注意这里的tryAcquire和之前的tryAcquire不是同一个东西,这里是信号量的tryAcquire
  13.                        // entry.getLatch()这里返回的是信号量,释放锁的代码会释放一个许可
  14.                         // 如果没有可用的许可,会一直休眠直到超时时间 ttl ms
  15.                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  16.                    } catch (InterruptedException e) {
  17.                        if (interruptibly) {
  18.                            throw e;
  19.                        }
  20.                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  21.                    }
  22.                } else {
  23.                    // 当 key 存在但没有设置剩余生存时间时,pttl返回 -1,会走到这个逻辑
  24.                     // 我感觉正常流程走不到这个逻辑,因为当前线程无非是看到锁存在或者不存在
  25.                     // 看到锁不存在等于加锁成功了,因为Lua脚本是原子性的
  26.                     // 看到锁存在,默认也给了超时时间
  27.                    // 这里就没有设置超时时间,一直等释放锁的许可
  28.                    if (interruptibly) {
  29.                        entry.getLatch().acquire();
  30.                    } else {
  31.                        entry.getLatch().acquireUninterruptibly();
  32.                    }
  33.                }
  34.            }
  35.        } finally {
  36.            unsubscribe(entry, threadId);
  37.        }
复制代码
这里计划的奇妙之处就在于利用了消息订阅、信号量的机制,它不是无休止的盲等机制,也避免了不断的重试,而是检测到锁被释放才去尝试重新获取,这对 CPU 十分的友爱
锁互斥机制

此时如果客户端 2 来尝试加锁,同样走进 RedissonLock#lock 方法,会咋样呢?第一个 if 判断会实行 exists myLock,发现 myLock 这个锁 key 已经存在了。接着第二个 if 判断,判断一下,myLock 锁 key 的 hash 数据结构中,对应客户端 2 的 ID 的 key 的 value 为 1,也没有。最终会获取到 pttl myLock 返回的锁 key 的剩余生存时间,进入 while 循环,不停的尝试加锁
可重入锁机制

那如果客户端1都已经持有了这把锁了,效果可重入的加锁会怎么样呢?


此时会实行可重入加锁的逻辑,走第二个 if 逻辑,对客户端 1 的加锁次数累加 1,此时 myLock 数据结构变为下面这样:


总结 




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

愛在花開的季節

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表