缠丝猫 发表于 2024-10-27 17:05:08

【redis-04】Redisson实现分布式锁实战和源码剖析

redis系列团体栏目
内容链接地址【一】redis基本数据类型和使用场景https://zhenghuisheng.blog.csdn.net/article/details/142406325【二】redis的长期化机制和原理https://zhenghuisheng.blog.csdn.net/article/details/142441756【三】redis缓存穿透、缓存击穿、缓存雪崩https://zhenghuisheng.blog.csdn.net/article/details/142577507【四】redisson实现分布式锁实战和源码剖析https://zhenghuisheng.blog.csdn.net/article/details/142646301 如需转载,请输入:https://blog.csdn.net/zhenghuishengq/article/details/142577507


一,redisson实现分布式锁实战和源码剖析

前面几篇讲解了redis的基本数据类型,接下来在本文中,讲解一下如何通过redis实现一把分布式锁。在分布式情况中,所有的jvm层面的锁将会失去该有的作用,因此在分布式情况中,可以通过redis来实现这种分布式锁,说白了就是在分布式和高并发的情况下,将并行的线程改成串行。
1,redis原生方式实现分布式锁

在redis内部,提供了实现分布式锁的方式,可以直接通过 setnx 下令的方式来,加下来直接通过代码的方式来演示一段扣减库存的代码,比如以下这段代码,对id为1001的手机举行扣减库存,此时redis中设置了库存为100
set phoneCount 100
随后自定义一把分布式锁,在设置 key/value 的同时,并设置逾期时间,可以保证整条下令的原子性
@RestController
@RequestMapping("/test")
@Slf4j
public class StockController {
    @Resource
    private RedisTemplate redisTemplate;
    //手机id
    public static final String PHONE_ID = "phone:1001";
    //手机数量
    public static final String PHONE_COUNT = "phoneCount";

    @GetMapping("/disStock")
    public AjaxResult disStock(){
      //每个线程分配一个唯一标识
      String flag = UUID.randomUUID().toString();
      //定义一把分布式锁,设置有效期为30s
      redisTemplate.opsForValue().setIfAbsent(PHONE_ID, flag, 30, TimeUnit.SECONDS);
      try {
            //当前库存
            Integer stock = Integer.parseInt(redisTemplate.opsForValue().get(PHONE_COUNT) + "");
            if (stock > 0){
                stock = stock - 1;
                redisTemplate.opsForValue().set(PHONE_COUNT,stock);
                log.info("当前库存值为:" + stock);
            }else{
                log.info("当前库存为空,扣减失败");
            }
      }finally {
            redisTemplate.delete(PHONE_ID);

      }
      return AjaxResult.success();
    }
}

如果是在并发量不大,或者说能继承超卖的情况下,上面这种方式实现分布式锁是够用的。
当然上面这种方式实现也有题目,就是有可能锁被误删的题目。假设此时线程1先拿到锁,线程2来时发现线程1已经拿到锁,那么线程2就会等待线程1执行完。但是现在另有题目,锁设置了一个逾期时间30s,假设说线程1在执行下面这段代码的时候,可能逻辑特别复杂执行时间超过了30s,假设需要淹灭40s才能完成,那么在30s的时候,锁就逾期了,那么线程2就能去抢锁
if (stock > 0){
        stock = stock - 1;
    xxxxx                //业务需要执行40s
        redisTemplate.opsForValue().set(PHONE_COUNT,stock);
        log.info("当前库存值为:" + stock);
}
但是线程1还是在执行的,假设此时线程2正拿到锁在执行任务,在40s后线程1执行完的时候,直接把这把锁给删了,导致线程2锁又失效了,线程2又没执行完,背面又会执行扣库存,删锁的下令,这样就会导致背面的线程的锁都会被莫名其妙的删除,库存方面终极也会出现超卖的题目。
https://i-blog.csdnimg.cn/direct/569b9cc5510d43648fb7bdf66d3e3e53.png
redisTemplate.delete(PHONE_ID);
而且还会导致超卖题目,如线程1还没有set减1的利用到redis中,线程2拿到的还是100,按理来说是线程1减掉的值99,然后还是对100举行利用,如果是在高并发情况下,就会严峻的出现超卖的题目。
因此需要在删锁时做一个进一步的优化,判断一下加锁的唯一标识是不是当火线程的唯一标识,是的话才能删
if(flag.equals(redisTemplate.opsForValue().get(PHONE_ID))){        redisTemplate.delete(PHONE_ID);
} 上面这种情况在系统稳定的时候举行释放锁时没有题目,但是也可能碰到非常的情况,比如在释放锁之前碰到系统卡顿的情况,导致还没执行释放锁的下令锁又逾期了,这样别的线程又能抢锁,然后当火线程在执行到删除锁下令的时候,有把别的抢到锁的线程的锁给删除了,又出现了上面的这个 超卖 的题目
if(flag.equals(redisTemplate.opsForValue().get(PHONE_ID))){    xxxx                 //系统卡顿        redisTemplate.delete(PHONE_ID);
} 总而言之如果用redis的自定义分布式锁时,会由于锁的超时时间、锁逾期和锁删除机制,会导致出现 超卖题目。其告急原因是锁会逾期,这样导致未执行完的线程还会对锁举行删除的利用,导致其他线程锁失效。
2,Redisson实现分布式锁

固然说redis确实可以通过自定义的方式实现一把分布式锁,但是其内部确实还存在一些题目,如经典的超卖题目,其告急原因还是,不能控制每个线程的逾期时间,导致如果某个线程超时的话,就会出现锁提前释放,后续也可能出现将其他线程的锁删除的行为
因此出现了redisson分布式锁的实现,其官网链接地址如下:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
https://i-blog.csdnimg.cn/direct/9eab767d66624495a46f0e324c6189e3.png
2.1,ReentrantLock 实现锁

看到这些可重入锁,公平锁等等,可以遐想到JDK内部的JUC的实现,如可以查看本人写的JUC系列的 ReentrantLock的实现: https://blog.csdn.net/zhenghuishengq/article/details/132857564
实现一把单JVM历程锁的方式如下,在定义完一把 ReentrantLock 锁之后呢,直接调用内部两个简朴的api就能实现加锁息争锁,底层通过aqs实现
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock()
底层通过clh同步等待队列实现,同时还支持公平锁和非公平锁,由于本文主角是redission,因此详细可以去看上面给的文章的链接
https://i-blog.csdnimg.cn/blog_migrate/8bace7657f400443814209cc490c3948.png
2.2,Redission实现分布式锁案例

接下来针对上面这段自定义实现的分布式锁,通过redisson举行优化
public class StockController {

    @Resource
    private RedisTemplate redisTemplate;

    @Resource
    private Redisson redisson;

    //手机id
    public static final String PHONE_ID = "phone:1001";
    //手机数量
    public static final String PHONE_COUNT = "phoneCount";

    @GetMapping("/disStock")
    public AjaxResult disStock(){
      String flag = UUID.randomUUID().toString();
      //定义一把分布式锁,设置有效期为30s
      RLock lock = redisson.getLock(PHONE_ID);
      lock.lock();
      try {
            //当前库存
            Integer stock = Integer.parseInt(redisTemplate.opsForValue().get(PHONE_COUNT) + "");
            if (stock > 0){
                stock = stock - 1;
                redisTemplate.opsForValue().set(PHONE_COUNT,stock);
                log.info("当前库存值为:" + stock);
            }else{
                log.info("当前库存为空,扣减失败");
            }
      }finally {
            lock.unlock();
      }
      return AjaxResult.success();
    }
}
通过上面这段代码可以发现,其内部的实现方式和ReentrantLock的实现是很像的,都是在获取到相对于的实例对象之后,通过lock方式加锁和通过unlock的方式举行解锁
RLock lock = redisson.getLock(PHONE_ID);
lock.lock();
lock.unlock();
2.3,Redission底层实现原理和源码剖析

2.3.1,lock加锁逻辑

在看源码之前,来先对加锁内部做一个预想,无非就是抢锁、没抢到的阻塞,阻塞的线程轮询抢锁。
接下里进入内部源码查看,先进入这个lock方法,然后进入这个 lockInterruptibly 方法,首先会获取到当火线程id,需要给背面使用
https://i-blog.csdnimg.cn/direct/16478752fe7b425b90e74545ef48c42d.png
接下来就是进入告急的 tryAcquire 方法,这个就是告急的获取锁的逻辑代码
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
末了进入这个最告急的 tryLockInnerAsync 方法,内部实在是通过一个lua脚原来实现原子性,由于redis中执行任务的线程还是单线程,因此下面这一大段都可以保证利用的原子性
https://i-blog.csdnimg.cn/direct/81ba821c96f444a3b5ba84ffea6e7fa5.png
感兴趣的可以相识一下lua脚本,上面也通过箭头表明了lua脚本的参数,正常的对象都是通过 key/value 的方式表示,在lua脚本中,前面的这个聚集标识key,背面的几个参数表示value,就是前面的 getName 是key,背面的 internalLockLeaseTime, getLockName(threadId) 是value,通过ARGV表示。lua脚本官方更加推荐使用一个key对应一个value,当然也允许key有1个或者多个,value有1个或者多个
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
接下来分析这段代码,其本质就是一个通过hset设置值,


[*]key是KEYS,代表的是getName(),这个name由外部提供,在实例化的时候传了一个key进来redisson.getLock(PHONE_ID),那么这个name此时表示的就是外部设置的手机id。
[*]ARGV表示第二个参数,对应的是 getLockName(threadId) ,就是线程id
[*]末了通过pexpire设置一个逾期时间,此时的 ARGV表示的是 internalLockLeaseTime,在内部定义了这个时间 private long lockWatchdogTimeout = 30 * 1000; 默认就是30s,看名字就知道是一个看门狗的超机遇制
[*]如果设置成功,那么就回返回一个nil值,对应java里面的null值
"if (redis.call('exists', KEYS) == 0) then " +
        "redis.call('hset', KEYS, ARGV, 1); " +
        "redis.call('pexpire', KEYS, ARGV); " +
        "return nil; " +
"end; "
redis通过这种管道的方式实现lua脚本,减少网络开销,同时保证利用执行的原子性,在redis官方也有介绍,可以直接通过lua脚本取代redis的事务。
2.3.2,lock锁续命逻辑

上面的讲解的就是 tryLockInnerAsync 方法,通过异步的方式去拿到锁,通过Future阻塞拿到执行任务的结果,拿到执行结果之后,再回调一下这个 addListener 方法
https://i-blog.csdnimg.cn/direct/6188aff7d8af4095a45d046056288dc5.png
接下来告急是看这个 addListener 中的核心方法 scheduleExpirationRenewal ,第一眼可以看到里面就是一个定时任务的线程类,看默认就是会在 internalLockLeaseTime / 3 时间内执行一次,也就是10s后执行一次
https://i-blog.csdnimg.cn/direct/3d697c2d34f84f56a454512dc80df284.png
这一块的内部实现延时通过lua脚本,实现锁续命机制。其续命逻辑也很简朴,如果10s后线程还没执行完成,内部会通过递归的方式循环调用,继承调用这个 scheduleExpirationRenewal 方法,很多中心件实现这种续命的方式都是采取内部递归调用的方式
//判断上一次续命是否续命成功
if (future.getNow()) {       
        // reschedule itself
    scheduleExpirationRenewal(threadId);
}
2.3.3,加锁失败阻塞逻辑

当某个线程加锁失败时,那么该线程就会设置成阻塞状态,从而让出cpu的使用权。仍旧得看这个抢锁逻辑的lua脚本,看到末了一句,如果抢锁失败的话,那么就会返回一个pttl的状态,实在就是一个拿到锁的逾期时间。比如拿到锁的线程1已经执行了10s,那么来拿锁的线程2就会获取到剩余20s的逾期时间
https://i-blog.csdnimg.cn/direct/b314e579dff849249d864f21d145e223.png
再回到进入这个最初的抢锁方法中,可以发现每一个线程都会返回一个ttl的逾期时间,首先会对这个ttl超时时间举行判断,上面抢锁的逻辑可以看到,如果拿到锁的线程会返回一个nil,就是对应java中的null,如果不为null,就会继承往下执行
https://i-blog.csdnimg.cn/direct/2ad2d893d13f45259e9d049757918d52.png
在举行阻塞的时候,作者使用了发布订阅的模式举行了优化 ,在线程举行阻塞时,把这些队列都订阅一个主题,当有锁释放的时候,那么就唤醒订阅了这个主题的线程。
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
随后进入一个自旋获取锁的阶段,在JDK的 ReentrantLock 中,就是采取的自旋的方式获取锁。但是在redis分布式情况下,一般都是适用于高流量高并发的场景,因此是不能完全不停空转自旋的,而且想想默认设置30s一个线程,刚运行就让大量的线程在那空转,肯定是不合适的
接下来看内部的这段自旋的代码,接下来告急分析这段ttl大于0的情况,getLatch表示一个JDK中的Semaphore信号量,这里用来做阻塞利用,比如说获取到的ttl为15s,那么这个线程就阻塞15s在这里,再举行一次自旋抢锁,而不是像 ReentrantLock 一样不停空转自旋在那抢锁,从而降低cpu的使用率,同时通过阻塞让出cpu的使用权
try {
    while (true) {
      ttl = tryAcquire(leaseTime, unit, threadId);
      // lock acquired
      if (ttl == null) {       
            break;
      }
      if (ttl >= 0) {                //如果ttl大于0
            //
            getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
      } else {
            getEntry(threadId).getLatch().acquire();
      }
    }
} finally {
    unsubscribe(future, threadId);        //唤醒订阅的线程
}
通过这种信号量阻塞的方式,达到间歇性加锁的目的。而且在抢锁时,内部没有公平锁的概念,默认就黑白公平锁。
2.3.4,unlock锁释放

在上面的加锁失败阻塞中,讲了有这段加锁失败阻塞的方法,内部提供了一个同步订阅的方法,就是每个加锁失败的线程会订阅一个topic主题,当锁被释放或者逾期之后就能通知订阅的线程来抢锁,否则每次自旋抢锁就太低效了,比如5s中执行完了,设置的30s,剩余的ttl另有25s,还要等25s去抢锁,显然不合适
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
接下来查看这段释放锁的逻辑,通过 unlockInnerAsync 举行释放锁
https://i-blog.csdnimg.cn/direct/505c983c69a5442fbca68717834bba26.png
详细查看这个 unlockInnerAsync 方法之后,内部又是一个lua脚本,告急判断锁是否存在,如果存在则举行解锁的利用,然后通过 publish 发送一条消息给订阅了这个主题的所有线程可以来抢锁,内部还包含一些可重入锁等
https://i-blog.csdnimg.cn/direct/68d553657f8e408d8ce6c989e492b4c8.png
3,Redisson总结

redission告急通过lua脚原来实现加锁息争锁的利用,从而保证相关利用的原子性,其告急利用有以下步骤。


[*]线程进来先执行抢锁的利用,抢锁成功则继承往下执行业务,而且通过内部递归的方式给当火线程一个watch dog 看门狗的一个续命方式。
[*]抢锁失败线程则阻塞,并将线程关注一个发布订阅的模型,供锁释放时被唤醒。而且内部通过间接性的方式轮旋抢锁,时间间隔为当火线程结束时间的ttl
[*]unlock释放锁时会往一个发布订阅的模型里面发送消息,关注了这个模型的线程吸收到消息之后,则会被唤醒,从而的去举行加锁的利用

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【redis-04】Redisson实现分布式锁实战和源码剖析