实现分布式锁通常有三种方式:数据库、Redis 和 Zookeeper。我们比较常用的是通过 Redis 和 Zookeeper 实现分布式锁。Redisson 框架中封装了通过 Redis 实现的分布式锁,下面我们分析一下它的具体实现。
by emanjusaka from https://www.emanjusaka.top/2024/03/redisson-distributed-lock 彼岸花开可奈何
本文欢迎分享与聚合,全文转载请留下原文地点。
关键点
- 原子性
要么都乐成,要么都失败
- 过期时间
如果锁还没来得及释放就遇到了服务宕机,就会出现死锁的问题。给 Redis 的 key 设置过期时间,即使服务宕机了凌驾设置的过期时间锁会自动举行释放。
- 锁续期
因为给锁设置了过期时间而我们的业务逻辑具体要实验多长时间大概是变化和不确定的,如果设定了一个固定的过期时间,大概会导致业务逻辑还没有实验完,锁被释放了的问题。锁续期能保证锁是在业务逻辑实验完才被释放。
- 正确释放锁
保证释放自己持有的锁,不能出现 A 释放了 B 持有锁的情况。
Redis 实现分布式锁的几种部署方式
- 单机
在这种部署方式中,Redis 的所有实例都部署在同一台服务器上。这种部署方式简朴易行,但存在单点故障的风险。如果 Redis 实例宕机,则所有分布式锁都将失效。
- 哨兵
在这种部署方式中,Redis 的多个实例被配置为哨兵。哨兵负责监控 Redis 实例的状态,并在主实例宕机时自动选举一个新的主实例。这种部署方式可以提供更高的可用性和容错性。
- 集群
在这种部署方式中,Redis 的多个实例被配置为一个集群。集群中的每个实例都是平等的,而且可以处理读写操作。这种部署方式可以提供最高的可用性和容错性。
- 红锁
搞几个独立的 Master,好比 5 个,然后挨个加锁,只要凌驾一半以上(这里是 5/2+1=3 个)就代表加锁乐成,然后释放锁的时间也逐台释放。
使用方式
- 引入依靠
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson-spring-boot-starter</artifactId>
- <version>3.17.7</version>
- </dependency>
复制代码 版本依靠:
redisson-spring-data module nameSpring Boot versionredisson-spring-data-161.3.yredisson-spring-data-171.4.yredisson-spring-data-181.5.yredisson-spring-data-2x2.x.yredisson-spring-data-3x3.x.y
- yml配置
- spring:
- redis:
- redisson:
- config:
- singleServerConfig:
- address: redis://127.0.0.1:6379
- database: 0
- password: null
- timeout: 3000
复制代码 - 直接注入使用
- package top.emanjusaka;
- import org.redisson.api.RLock;
- import org.redisson.api.RedissonClient;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.util.concurrent.TimeUnit;
- /**
- * @Author emanjusaka www.emanjusaka.top
- * @Date 2024/2/28 16:41
- * @Version 1.0
- */
- @Service
- public class Lock {
- @Resource
- private RedissonClient redissonClient;
- public void lock() {
- // 写入redis的key值
- String lockKey = "lock-test";
- // 获取一个Rlock锁对象
- RLock lock = redissonClient.getLock(lockKey);
- // 获取锁,并为其设置过期时间为10s
- lock.lock(10, TimeUnit.SECONDS);
- try {
- // 执行业务逻辑....
- System.out.println("获取锁成功!");
- } finally {
- // 释放锁
- lock.unlock();
- System.out.println("释放锁成功!");
- }
- }
- }
复制代码 底层分析
lock()
关键代码- <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
- return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
- "if ((redis.call('exists', KEYS[1]) == 0) " +
- "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "return redis.call('pttl', KEYS[1]);",
- Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
- }
复制代码
- RFuture:表现返回一个异步结果对象,其中泛型参数 T 表现结果的类型。
- tryLockInnerAsync 方法接受一下参数:
- waitTime:期待时间,用于指定在获取锁时的最大期待时间。
- leaseTime:租约时间,用于指定锁的持有时间
- unit:时间单位,用于将 leaseTime 转换为毫秒
- threadId:线程 ID,用于标识当前线程
- command:Redis 命令对象,用于实验 Redis 操作
- 方法体中的代码使用 Lua 脚本来实现分布式锁的逻辑。
- if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)): 如果键不存在或者哈希表中已经存在对应的线程ID,则实验以下操作:
- redis.call('hincrby', KEYS[1], ARGV[2], 1): 将哈希表中对应线程ID的值加1。
- redis.call('pexpire', KEYS[1], ARGV[1]): 设置键的过期时间为租约时间。
- return nil: 返回nil表现乐成获取锁。
- else: 如果键存在且哈希表中不存在对应的线程ID,则实验以下操作:
- return redis.call('pttl', KEYS[1]): 返回键的剩余生存时间。
- commandExecutor.syncedEval:表现同步实验 Redis 命令
- LongCodec.INSTANCE:用于编码息争码长整型数据
- Collections.singletonList(getRawName()):创建一个只包含一个元素的列表,元素为锁的名称
- unit.toMillis(leaseTime):将租约时间转换为毫秒
- getLockName(threadId):根据线程 ID 生成锁的名称
- // 省去了那些无关重要的代码
- private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
- long threadId = Thread.currentThread().getId();
- // tryAcquire就是上面分析的lua完整脚本
- Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
- // 返回null就代表上锁成功。
- if (ttl == null) {
- return;
- }
- // 如果没成功,也就是锁的剩余时间不是null的话,那么就执行下面的逻辑
- // 其实就是说 如果有锁(锁剩余时间不是null),那就死循环等待重新抢锁。
- try {
- while (true) {
- // 重新抢锁
- ttl = tryAcquire(-1, leaseTime, unit, threadId);
- // 抢锁成功就break退出循环
- if (ttl == null) {
- break;
- }
- // 省略一些代码
- }
- } finally {}
- }
复制代码 上面代码实现了一个分布式锁的功能。它使用了Lua脚本来实验获取锁,并在乐成获取锁后返回锁的剩余时间(ttl)。如果获取锁失败,则进入一个死循环,不断实验重新获取锁,直到乐成为止。
unlock()
关键代码- protected RFuture<Boolean> unlockInnerAsync(long threadId) {
- return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
- "return nil;" +
- "end; " +
- "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
- "if (counter > 0) then " +
- "redis.call('pexpire', KEYS[1], ARGV[2]); " +
- "return 0; " +
- "else " +
- "redis.call('del', KEYS[1]); " +
- "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
- "return 1; " +
- "end; " +
- "return nil;",
- Arrays.asList(getRawName(), getChannelName()),
- LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand());
- }
复制代码
- RFuture: 表现返回一个异步结果对象,其中泛型参数Boolean表现结果的类型。
- unlockInnerAsync方法接受以下参数:
- 方法体中的代码使用Lua脚本来实现分布式锁的解锁逻辑。以下是对Lua脚本的解释:
- if (redis.call('hexists', KEYS[1], ARGV[3]) == 0): 如果哈希表中不存在对应的线程ID,则返回nil表现无法解锁。
- local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1): 将哈希表中对应线程ID的值减1,并将结果赋值给变量counter。
- if (counter > 0): 如果counter大于0,表现还有其他线程持有锁,实验以下操作:
- redis.call('pexpire', KEYS[1], ARGV[2]): 设置键的过期时间为租约时间。
- return 0: 返回0表现锁仍旧被其他线程持有。
- else: 如果counter等于0,表现当前线程是最后一个持有锁的线程,实验以下操作:
- redis.call('del', KEYS[1]): 删除键,释放锁。
- redis.call(ARGV[4], KEYS[2], ARGV[1]): 调用发布命令,通知其他线程锁已经释放。
- return 1: 返回1表现乐成释放锁。
- return nil: 如果前面的条件都不满足,返回nil表现无法解锁。
- evalWriteAsync方法用于实验Lua脚本并返回异步结果对象。
- getRawName(): 获取锁的名称。
- LongCodec.INSTANCE: 用于编码息争码长整型数据。
- RedisCommands.EVAL_BOOLEAN: 指定Lua脚本的返回类型为布尔值。
- Arrays.asList(getRawName(), getChannelName()): 创建一个包含两个元素的列表,元素分别为锁的名称和频道名称。
- LockPubSub.UNLOCK_MESSAGE: 发布消息的内容。
- internalLockLeaseTime: 锁的租约时间。
- getLockName(threadId): 根据线程ID生成锁的名称。
- getSubscribeService().getPublishCommand(): 获取发布命令。
锁续期
watchDog
核心工作流程是定时监测业务是否实验结束,没结束的话在看你这个锁是不是快到期了(凌驾锁的三分之一时间),那就重新续期。这样防止如果业务代码没实验完,锁却过期了所带来的线程不安全问题。
Redisson 的 watchDog 机制底层不是调度线程池,而是直接用的 netty 变乱轮。
Redisson的WatchDog机制是用于自动续期分布式锁和监控对象生命周期的一种机制,确保了分布式情况下锁的正确性和资源的实时释放。
- 自动续期:当Redisson客户端获取了一个分布式锁后,会启动一个WatchDog线程。这个线程负责在锁即将到期时自动续期,保证持有锁的线程可以继续实验任务。默认情况下,锁的初始超时时间是30秒,每10秒钟WatchDog会检查一次锁的状态,如果锁依然被持有,它会将锁的过期时间重新设置为30秒。
- 参数配置:可以通过设置lockWatchdogTimeout参数来调整WatchDog检查锁状态的频率和续期的超时时间。这个参数默认值是30000毫秒(即30秒),适用于那些没有明白指定leaseTimeout参数的加锁请求。
- 重连机制:除了锁自动续期外,WatchDog机制还用作Redisson客户端的自动重连功能。当客户端与Redis服务器失去连接时,WatchDog会自动实验重新连接,从而恢复服务的正常运作。
- 资源管理:WatchDog也负责监控Redisson对象的生命周期,例如分布式锁。当对象的生命周期到期时,WatchDog会将其从Redis中删除,避免过期数据占用过多内存空间。
- 异步加锁:在加锁的过程中,WatchDog会在RedissonLock#tryAcquireAsync方法中发挥作用,该方法是举行异步加锁的逻辑所在。通过这种方式,加锁操作不会阻塞当前线程,提高了体系的性能。
本文原创,才疏学浅,如有马虎,欢迎指正。如果本文对您有所帮助,欢迎点赞,并期待您的反馈交换,共同成长。
原文地点: https://www.emanjusaka.top/2024/03/redisson-distributed-lock
微信公众号:emanjusaka的编程栈
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |