饭宝 发表于 2024-10-23 23:49:39

【Redis】分布式锁之 Redission

一、基于setnx实现的分布式锁问题


重入问题:得到锁的线程应能再次进入相同锁的代码块,可重入锁能防止死锁。例如在HashTable中,方法用synchronized修饰,若在一个方法内调用另一个方法,不可重入会导致死锁。而synchronized和Lock锁都是可重入的。
不可重试:目前的分布式锁只能实行一次,公道的情况是线程在得到锁失败后应能再次实行。
超时开释:加锁时增长逾期时间可防止死锁,但如果卡顿时间超长,虽采用了 lua 表达式防止删锁时误删别人的锁,但毕竟没有锁住,存在安全隐患。
主从一致性:若 Redis 提供主从集群,向集群写数据时,主机异步同步数据给从机,若同步前主机宕机,会出现死锁问题。
https://i-blog.csdnimg.cn/direct/f729b3cc65b146fbbc57573de3767fa8.png
二、Redission 快速入门


引入依赖:根据项目需求引入 Redisson 相关依赖。
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
      // 配置
      Config config = new Config();
      config.useSingleServer().setAddress("redis://192.168.150.101:6379")
            .setPassword("123321");
      // 创建RedissonClient对象
      return Redisson.create(config);
    }
}
配置 Redisson 客户端:进行 Redisson 客户端的配置。
@Resource
private RedissionClient redissonClient;

@Test
void testRedisson() throws Exception{
    //获取锁(可重入),指定锁的名称
    RLock lock = redissonClient.getLock("anyLock");
    //尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
    //判断获取锁成功
    if(isLock){
      try{
            System.out.println("执行业务");         
      }finally{
            //释放锁
            lock.unlock();
      }
      
    }
} 使用 Redission 的分布式锁:在VoucherOrderServiceImpl中注入RedissonClient,以使用 Redisson 的分布式锁功能。
@Resource
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
      // 1.查询优惠券
      SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
      // 2.判断秒杀是否开始
      if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀尚未开始!");
      }
      // 3.判断秒杀是否已经结束
      if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀已经结束!");
      }
      // 4.判断库存是否充足
      if (voucher.getStock() < 1) {
            // 库存不足
            return Result.fail("库存不足!");
      }
      Long userId = UserHolder.getUser().getId();
      //创建锁对象 这个代码不用了,因为我们现在要使用分布式锁
      //SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
      RLock lock = redissonClient.getLock("lock:order:" + userId);
      //获取锁对象
      boolean isLock = lock.tryLock();
      
                //加锁失败
      if (!isLock) {
            return Result.fail("不允许重复下单");
      }
      try {
            //获取代理对象(事务)
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
      } finally {
            //释放锁
            lock.unlock();
      }
}
三、Redission 可重入锁原理


在分布式锁中,Redission 采用 hash 布局存储锁。大 key 表示锁是否存在,小 key 表示当前锁被哪个线程持有。下面分析 lua 表达式的三个参数:
KEYS:锁名称。
ARGV:锁失效时间。
ARGV:id + ":" + threadId,即锁的小 key。

实行过程如下:
redis.call('hset', KEYS, ARGV, 1),往 Redis 中写入数据,形成 hash 布局,如Lock{id + ":" + threadId : 1}。
"if (redis.call('exists', KEYS) == 0) then " +
                  "redis.call('hset', KEYS, ARGV, 1); " +
                  "redis.call('pexpire', KEYS, ARGV); " +
                  "return nil; " +
            "end; " +
            "if (redis.call('hexists', KEYS, ARGV) == 1) then " +
                  "redis.call('hincrby', KEYS, ARGV, 1); " +
                  "redis.call('pexpire', KEYS, ARGV); " +
                  "return nil; " +
            "end; " +
            "return redis.call('pttl', KEYS);"
若当前锁存在,第一个条件不满足,接着判定redis.call('hexists', KEYS, ARGV) == 1,通过大 key 和小 key 判定当前锁是否属于自己。若是自己的,则实行redis.call('hincrby', KEYS, ARGV, 1),将锁的 value 加 1,并实行redis.call('pexpire', KEYS, ARGV)设置逾期时间。若以上两个条件都不满足,则抢锁失败,返回锁的失效时间。

查看源码会发现,会判定当火线法的返回值是否为null。若为null,对应前两个条件,退出抢锁逻辑;若返回值不是null,即走第三个分支,在源码处会进行while(true)的自旋抢锁。
https://i-blog.csdnimg.cn/direct/c4629cb37f9e43439db4df99f8d94e9c.png
四、Redission 锁重试和 WatchDog 机制


抢锁过程中,得到当前线程,通过tryAcquire进行抢锁,逻辑与之前相同:
先判定当前锁是否存在,若不存在,插入一把锁,返回null。
判定当前锁是否属于当前线程,若是,则返回null。

若返回值为null,代表当前线程已抢锁完毕或可重入完毕;若以上两个条件都不满足,则进入第三个条件,返回锁的失效时间。
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
    return;
} 接下来根据lock方法的重载情况进行处理。若传入参数,leaseTime不为-1,则进行抢锁;
if (leaseTime != -1) {
    return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} 若没有传入时间,也会进行抢锁,且抢锁时间是默认看门狗时间。
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()

ttlRemainingFuture.onComplete((ttlRemaining, e) 这句话相当于对抢锁进行监听,抢锁完毕后会调用特定方法开启一个线程进行续约逻辑,即看门狗线程。
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                        commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                        TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    if (e != null) {
      return;
    }

    // lock acquired
    if (ttlRemaining == null) {
      scheduleExpirationRenewal(threadId);
    }
});
return ttlRemainingFuture; 续约逻辑是通过commandExecutor.getConnectionManager().newTimeout()方法实现的,该方法表示在肯定时间后实行特定任务。以锁失效时间为 30s,10s 后触发任务进行续约,将锁续约成 30s,若操作成功,会递归调用自己,重新设置任务,实现不绝续约。若线程出现宕机,则不会续约,等到时间后天然开释锁。
五、Redission 锁的 MutiLock 原理


为进步 Redis 的可用性,通常会搭建集群或主从。以主从为例,写命令在主机上,主时机将数据同步给从机,但在主机还未将数据写入从机时宕机,哨兵会选举一个 slave 变成 master,此时新的 master 中没有锁信息,锁就丢失了。
https://i-blog.csdnimg.cn/direct/c38de5cbb0e047a5b8ee47abeaaa01b7.png
Redission 提出 MutiLock 锁来解决这个问题。使用 MutiLock 锁不使用主从,每个节点职位相同,加锁逻辑需写入到每个节点上,只有所有服务器都写入成功才是加锁成功。若某个节点挂了,只要有一个节点拿不到锁,都不算加锁成功,保证了加锁的可靠性。
https://i-blog.csdnimg.cn/direct/0ab3091cd5294df79c51f9d4a52e711d.png
当设置多个锁时,Redission 会将多个锁添加到一个集合中,用while循环不绝实行拿锁,但有一个总共的加锁时间,为需要加锁的个数乘以 1500ms。例如有 3 个锁,时间就是 4500ms,在这时间内所有锁加锁成功才算加锁成功,若有线程加锁失败,则会再次重试。
https://i-blog.csdnimg.cn/direct/ec601db0918149c0838cd59d7ad16e6b.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【Redis】分布式锁之 Redission