Redis中的分布式锁(步步为营)

打印 上一主题 下一主题

主题 828|帖子 828|积分 2484

分布式锁

概述

分布式锁指的是,所有服务中的所有线程都去获取同一把锁,但只有一个线程可以乐成的获得锁,其他没有获得锁的线程必须全部等待,直到持有锁的线程释放锁。
分布式锁是可以跨越多个实例,多个进程的锁

分布式锁具备的条件:

  • 互斥性:恣意时刻,只能有一个客户端持有锁
  • 锁超时释放:持有锁超时,可以释放,防止死锁
  • 可重入性:一个线程获取了锁之后,可以再次对其哀求加锁
  • 高可用、高性能:加锁息争锁开销要尽可能低,同时保证高可用
  • 安全性:锁只能被持有该锁的服务(或应用)释放。
  • 容错性:在持有锁的服务崩溃时,锁仍能得到释放,制止死锁。
分布式锁实现方案

分布式锁都是通过第三方组件来实现的,目前比力流行的分布式锁的解决方案有:

  • 数据库,通过数据库可以实现分布式锁,但是在高并发的情况下对数据库压力较大,所以很少使用。
  • Redis,借助Redis也可以实现分布式锁,而且Redis的Java客户端种类很多,使用的方法也不尽相同。
  • Zookeeper,Zookeeper也可以实现分布式锁,同样Zookeeper也存在多个Java客户端,使用方法也不相同
Redis实现分布式锁

SETNX

基本方案:Redis提供了setXX指令来实现分布式锁
  1. 格式: setnx key value
  2. 将key 的值设为value ,当且仅当key不存在。
  3. 若给定的 key已经存在,则SETNX不做任何动作。
复制代码

设置分布式锁后,能保证并发安全,但上述代码还存在题目,如果执行过程中出现非常,程序就直接抛出非常退出,导致锁没有释放造成终极死锁的题目。(即使将锁放在finally中释放,但是假如是执行到中途系统宕机,锁照旧没有被乐成的释放掉,依然会出现死锁现象)
设置超时时间
  1. SET lock_key unique_value NX PX 10000
复制代码
但是,即使设置了超时时间后,还存在题目。
假设有多个线程,假设设置锁的过期时间10s,线程1上锁后执行业务逻辑的时长凌驾十秒,锁到期释放锁,线程2就可以获得锁执行,此时线程1执行完删除锁,删除的就是线程2持有的锁,线程3又可以获取锁,线程2执行完删除锁,删除的是线程3的锁,如此往后,这样就会出题目。
让线程只删除自己的锁

解决办法就是让线程只能删除自己的锁,即给每个线程上的锁添加唯一标识(这里UUID实现,基本不会出现重复),删除锁时判断这个标识:

但上述红框中由于判定和释放锁不是原子的,极度情况下,可能判定可以释放锁,在执行删除锁利用前刚好时间到了,其他线程获取锁执行,前者线程删除锁删除的依然是别的线程的锁,所以要让删除锁具有原子性,可以利用redis事务或lua脚本实现原子利用判断+删除
Redis的单条命令利用是原子性的,但是多条命令利用并不是原子性的,因此Lua脚本实现的就是令Redis的多条命令也实现原子利用
redis事务不是原子利用的,详情请看 Redis的事务
但是,可以利用Redis的事务和watch实现的乐观锁 来监视锁的状态
  1.     @RequestMapping(" /deduct_stock")
  2.     public String deductStock() {
  3.         String REDIS_LOCK = "good_lock";
  4.         // 每个人进来先要进行加锁,key值为"good_lock"
  5.         String value = UUID.randomUUID().toString().replace("-","");
  6.         try{
  7.             // 为key加一个过期时间
  8.             Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);
  9.             // 加锁失败
  10.             if(!flag){
  11.                 return "抢锁失败!";
  12.             }
  13.             System.out.println( value+ " 抢锁成功");
  14.             String result = template.opsForValue().get("goods:001");
  15.             int total = result == null ? 0 : Integer.parseInt(result);
  16.             if (total > 0) {
  17.                 // 如果在此处需要调用其他微服务,处理时间较长。。。
  18.                 int realTotal = total - 1;
  19.                 template.opsForValue().set("goods:001", String.valueOf(realTotal));
  20.                 System.out.println("购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8002");
  21.                 return "购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8002";
  22.             } else {
  23.                 System.out.println("购买商品失败,服务端口为8002");
  24.             }
  25.             return "购买商品失败,服务端口为8002";
  26.         }finally {
  27.             // 谁加的锁,谁才能删除
  28.             // 也可以使用redis事务
  29.             // https://redis.io/commands/set
  30.             // 使用Lua脚本,进行锁的删除
  31.             Jedis jedis = null;
  32.             try{
  33.                 jedis = RedisUtils.getJedis();
  34.                 String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +
  35.                         "then " +
  36.                         "return redis.call('del',KEYS[1]) " +
  37.                         "else " +
  38.                         "   return 0 " +
  39.                         "end";
  40.                 Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));
  41.                 if("1".equals(eval.toString())){
  42.                     System.out.println("-----del redis lock ok....");
  43.                 }else{
  44.                     System.out.println("-----del redis lock error ....");
  45.                 }
  46.             }catch (Exception e){
  47.             }finally {
  48.                 if(null != jedis){
  49.                     jedis.close();
  50.                 }
  51.             }
  52.             // redis事务
  53. //            while(true){
  54. //                template.watch(REDIS_LOCK);
  55. //                if(template.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
  56. //                    template.setEnableTransactionSupport(true);
  57. //                    template.multi();
  58. //                    template.delete(REDIS_LOCK);
  59. //                    List<Object> list = template.exec();
  60. //                    if(list == null){
  61. //                        continue;
  62. //                    }
  63. //                }
  64. //                template.unwatch();
  65. //                break;
  66. //            }
  67.         }
  68.         
  69.     }
  70. }
复制代码
只管这样,照旧会有题目,锁超时释放虽然可以制止死锁,但如果是业务执行耗时较长,也会导致锁的释放,但其实此时业务还在执行中,照旧应该将业务执行结束之后再释放锁。
续时

因此可以设定,使命不完成,锁就不释放。
可以维护一个定时线程池 ScheduledExecutorService,每隔 2s 去扫描参加队列中的 Task,判断失效时间是否快到了,如果快到了,则给锁续上时间。

那如何判断是否快到失效时间了呢?可以用以下公式:【失效时间】 {        // 这里记得加 try-catch,否者报错后定时使命将不会再执行=-=        Iterator iterator = holderList.iterator();        while (iterator.hasNext()) {            RedisLockDefinitionHolder holder = iterator.next();            // 判空            if (holder == null) {                iterator.remove();                continue;            }            // 判断 key 是否还有效,无效的话进行移除            if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {                iterator.remove();                continue;            }            // 超时重试次数,凌驾时给线程设定中断            if (holder.getCurrentCount() > holder.getTryCount()) {                holder.getCurrentTread().interrupt();                iterator.remove();                continue;            }            // 判断是否进入最后三分之一时间            long curTime = System.currentTimeMillis();            boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod())  {            if (e == null) {                if (ttlRemaining) {                    this.scheduleExpirationRenewal(threadId);                }            }        });        return ttlRemainingFuture;    } }[/code]此处出现leaseTime时间判断的2个分支,实际上就是加锁时是否设置过期时间,未设置过期时间(-1)时则会有watchDog 的锁续约 (下文),一个注册了加锁事件的续约使命。我们先来看有过期时间tryLockInnerAsync 部分
evalWriteAsync方法是eval命令执行lua的入口
  1. // 扫描的任务队列
  2. private static ConcurrentLinkedQueue<RedisLockDefinitionHolder> holderList = new ConcurrentLinkedQueue();
  3. /**
  4. * 线程池,维护keyAliveTime
  5. */
  6. private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,
  7.         new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
  8. {
  9.     // 两秒执行一次「续时」操作
  10.     SCHEDULER.scheduleAtFixedRate(() -> {
  11.         // 这里记得加 try-catch,否者报错后定时任务将不会再执行=-=
  12.         Iterator<RedisLockDefinitionHolder> iterator = holderList.iterator();
  13.         while (iterator.hasNext()) {
  14.             RedisLockDefinitionHolder holder = iterator.next();
  15.             // 判空
  16.             if (holder == null) {
  17.                 iterator.remove();
  18.                 continue;
  19.             }
  20.             // 判断 key 是否还有效,无效的话进行移除
  21.             if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {
  22.                 iterator.remove();
  23.                 continue;
  24.             }
  25.             // 超时重试次数,超过时给线程设定中断
  26.             if (holder.getCurrentCount() > holder.getTryCount()) {
  27.                 holder.getCurrentTread().interrupt();
  28.                 iterator.remove();
  29.                 continue;
  30.             }
  31.             // 判断是否进入最后三分之一时间
  32.             long curTime = System.currentTimeMillis();
  33.             boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()) <= curTime;
  34.             if (shouldExtend) {
  35.                 holder.setLastModifyTime(curTime);
  36.                 redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS);
  37.                 log.info("businessKey : [" + holder.getBusinessKey() + "], try count : " + holder.getCurrentCount());
  38.                 holder.setCurrentCount(holder.getCurrentCount() + 1);
  39.             }
  40.         }
  41.     }, 0, 2, TimeUnit.SECONDS);
  42. }
复制代码
eval命令执行Lua脚本的地方,此处将Lua脚本展开
  1. // waitTime 等待时间,多久时间内都会在这尝试获取锁
  2. // leaseTime 加锁时是否设置过期时间
  3. private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  4.     if (leaseTime != -1L) {
  5.         return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  6.     } else {
  7.         RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  8.         ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
  9.             if (e == null) {
  10.                 if (ttlRemaining) {
  11.                     this.scheduleExpirationRenewal(threadId);
  12.                 }
  13.             }
  14.         });
  15.         return ttlRemainingFuture;
  16.     }
  17. }
复制代码
  1. <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  2.     this.internalLockLeaseTime = unit.toMillis(leaseTime);
  3.     return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
  4. }
复制代码
统共3个参数完成了一段逻辑:

  • 判断该锁是否已经有对应hash表存在,

    • 没有对应的hash表:则set该hash表中一个entry的key为锁名称,value为1,之后设置该hash表失效时间为leaseTime
    • 存在对应的hash表:则将该lockName的value执行+1利用,也就是盘算进入次数,再设置失效时间leaseTime

  • 最后返回这把锁的ttl剩余时间
再看看RLock如何解锁?
看unlock方法,同样查找方法名,一路到unlockInnerAsync
  1. -- 不存在该key时
  2. if (redis.call('exists', KEYS[1]) == 0) then
  3.   -- 新增该锁并且hash中该线程id对应的count置1
  4.   redis.call('hincrby', KEYS[1], ARGV[2], 1);
  5.   -- 设置过期时间
  6.   redis.call('pexpire', KEYS[1], ARGV[1]);
  7.   return nil;
  8. end;
  9. -- 存在该key 并且 hash中线程id的key也存在
  10. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
  11.   -- 线程重入次数++
  12.   redis.call('hincrby', KEYS[1], ARGV[2], 1);
  13.   redis.call('pexpire', KEYS[1], ARGV[1]);
  14.   return nil;
  15. end;
  16. return redis.call('pttl', KEYS[1]);
复制代码
将lua脚本展开
  1. // keyName
  2. KEYS[1] = Collections.singletonList(this.getName())
  3. // leaseTime
  4. ARGV[1] = this.internalLockLeaseTime
  5. // uuid+threadId组合的唯一值
  6. ARGV[2] = this.getLockName(threadId)
复制代码
该Lua KEYS有2个Arrays.asList(getName(), getChannelName())
  1. protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  2.     return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
  3. }
复制代码
ARGV变量有三个LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
  1. -- 不存在key
  2. if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
  3.   return nil;
  4. end;
  5. -- 存在,计数器 -1
  6. local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
  7. if (counter > 0) then
  8.   -- 过期时间重设
  9.   redis.call('pexpire', KEYS[1], ARGV[2]);
  10.   return 0;
  11. else
  12.   -- 删除并发布解锁消息
  13.   redis.call('del', KEYS[1]);
  14.   redis.call('publish', KEYS[2], ARGV[1]);
  15.   return 1;
  16. end;
  17. return nil;
复制代码
具体执行步骤如下:

  • 如果该锁不存在则返回nil;
  • 如果该锁存在则将其线程的hash key计数器-1,
  • 计数器counter>0,重置下失效时间,返回0;否则,删除该锁,发布解锁消息unlockMessage,返回1;
加锁解锁流程总结如下:

总的来说就是通过Hash范例来存储锁的次数:

RLock的锁重试题目

需要分析的是锁重试的,所以,在使用lock.tryLock()方法的时间,不能用无参的。
  1. name 锁名称
  2. channelName,用于pubSub发布消息的channel名称
复制代码
在调用tryAcquire方法后,返回了一个Long的ttl
  1. LockPubSub.UNLOCK_MESSAGE,channel发送消息的类别,此处解锁为0
  2. internalLockLeaseTime,watchDog配置的超时时间,默认为30s
  3. lockName 这里的lockName指的是uuid和threadId组合的唯一值
复制代码
查看renewExpirationAsync方法源码,其调用了Lua脚本执行续命利用的。
  1. public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
  2.     return this.tryLock(waitTime, -1L, unit);
  3. }
复制代码
pexpire重置锁的有效期。
总体逻辑如下:

  • 开启一个使命,10秒钟后执行
  • 开始的这个使命中重置有效期。假设设置的是默认30秒,则重置为30秒
  • 更新后又重复步骤1、2
那么什么时间取消这个续约的使命呢?在释放锁unlock时
  1. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  2.     long time = unit.toMillis(waitTime);
  3.     long current = System.currentTimeMillis();
  4.     long threadId = Thread.currentThread().getId();
  5.     Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
  6.     if (ttl == null) {
  7.         return true;
  8.     } else {
  9.         time -= System.currentTimeMillis() - current;
  10.         if (time <= 0L) {
  11.             this.acquireFailed(waitTime, unit, threadId);
  12.             return false;
  13.         } else {
  14.                 //省略
复制代码
multilock解决主从一致性题目

如果Redis是主从集群,主从同步存在耽误,当主机宕机时,从成为了主,但可能存在今后时还未完成同步,因此从上就没有锁标识,此时会出现并发安全题目。
因此redisson提出来了MutiLock锁,使用这把锁就不使用主从了,每个节点的职位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入乐成,此时才是加锁乐成,假设现在某个节点挂了,那么他去获得锁的时间,只要有一个节点拿不到,都不能算是加锁乐成,就保证了加锁的可靠性。

使用multilock()方法。必须在所有的节点都获取锁乐成,才算乐成。 缺点是运维成本高,实现复杂。
  1. <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  2.     this.internalLockLeaseTime = unit.toMillis(leaseTime);
  3.     return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
  4. }
复制代码
总结Redisson

Redisson分布式锁解决前三个题目原理

总结Redisson分布式锁原理:

  • 可重入:利用hash布局记录线程id和重入次数
  • 可重试:利用信号量和PubSub功能来实现等待、唤醒,获取锁失败的重试机制
  • 超时续约:利用watchDog,开启一个定时使命,每隔一段时间(releaseTime/3),重置超时时间。
  • 使用multilock: 多个独立的redis节点,必须在所有节点都获取重入锁,才算获取乐成;
redLock

不管是redLock,照旧redissonLock,两者底层都是通过相同的lua脚本来加锁、释放锁的,所以,两者只是外部形态的不同,底层是一样的。redLock是继续了redissonMultiLock,大部分的逻辑,都是在redissonMultiLock中去实现的,所以源码部分,大部分都是RedissonMultiLock
原理


  • redLock的使用,需要有奇数台独立部署的Redis节点
  • 在加锁的时间,会分别去N台节点上加锁,如果半数以上的节点加锁乐成,就认为当火线程加锁乐成

面试题专栏

Java面试题专栏已上线,欢迎访问。

  • 如果你不知道简历怎么写,简历项目不知道怎么包装;
  • 如果简历中有些内容你不知道该不该写上去;
  • 如果有些综合性题目你不知道怎么答;
那么可以私信我,我会尽我所能帮助你。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王海鱼

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表