王海鱼 发表于 2024-11-26 09:43:40

Redis中的分布式锁(步步为营)

分布式锁

概述

分布式锁指的是,所有服务中的所有线程都去获取同一把锁,但只有一个线程可以乐成的获得锁,其他没有获得锁的线程必须全部等待,直到持有锁的线程释放锁。
分布式锁是可以跨越多个实例,多个进程的锁
https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202411161136614.png
分布式锁具备的条件:

[*]互斥性:恣意时刻,只能有一个客户端持有锁
[*]锁超时释放:持有锁超时,可以释放,防止死锁
[*]可重入性:一个线程获取了锁之后,可以再次对其哀求加锁
[*]高可用、高性能:加锁息争锁开销要尽可能低,同时保证高可用
[*]安全性:锁只能被持有该锁的服务(或应用)释放。
[*]容错性:在持有锁的服务崩溃时,锁仍能得到释放,制止死锁。
分布式锁实现方案

分布式锁都是通过第三方组件来实现的,目前比力流行的分布式锁的解决方案有:

[*]数据库,通过数据库可以实现分布式锁,但是在高并发的情况下对数据库压力较大,所以很少使用。
[*]Redis,借助Redis也可以实现分布式锁,而且Redis的Java客户端种类很多,使用的方法也不尽相同。
[*]Zookeeper,Zookeeper也可以实现分布式锁,同样Zookeeper也存在多个Java客户端,使用方法也不相同
Redis实现分布式锁

SETNX

基本方案:Redis提供了setXX指令来实现分布式锁
格式: setnx key value
将key 的值设为value ,当且仅当key不存在。
若给定的 key已经存在,则SETNX不做任何动作。https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404270808679.png
设置分布式锁后,能保证并发安全,但上述代码还存在题目,如果执行过程中出现非常,程序就直接抛出非常退出,导致锁没有释放造成终极死锁的题目。(即使将锁放在finally中释放,但是假如是执行到中途系统宕机,锁照旧没有被乐成的释放掉,依然会出现死锁现象)
设置超时时间

SET lock_key unique_value NX PX 10000但是,即使设置了超时时间后,还存在题目。
假设有多个线程,假设设置锁的过期时间10s,线程1上锁后执行业务逻辑的时长凌驾十秒,锁到期释放锁,线程2就可以获得锁执行,此时线程1执行完删除锁,删除的就是线程2持有的锁,线程3又可以获取锁,线程2执行完删除锁,删除的是线程3的锁,如此往后,这样就会出题目。
让线程只删除自己的锁

解决办法就是让线程只能删除自己的锁,即给每个线程上的锁添加唯一标识(这里UUID实现,基本不会出现重复),删除锁时判断这个标识:
https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202411161136669.png
但上述红框中由于判定和释放锁不是原子的,极度情况下,可能判定可以释放锁,在执行删除锁利用前刚好时间到了,其他线程获取锁执行,前者线程删除锁删除的依然是别的线程的锁,所以要让删除锁具有原子性,可以利用redis事务或lua脚本实现原子利用判断+删除
Redis的单条命令利用是原子性的,但是多条命令利用并不是原子性的,因此Lua脚本实现的就是令Redis的多条命令也实现原子利用
redis事务不是原子利用的,详情请看 Redis的事务
但是,可以利用Redis的事务和watch实现的乐观锁 来监视锁的状态
    @RequestMapping(" /deduct_stock")
    public String deductStock() {
      String REDIS_LOCK = "good_lock";
      // 每个人进来先要进行加锁,key值为"good_lock"
      String value = UUID.randomUUID().toString().replace("-","");
      try{
            // 为key加一个过期时间
            Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);

            // 加锁失败
            if(!flag){
                return "抢锁失败!";
            }
            System.out.println( value+ " 抢锁成功");
            String result = template.opsForValue().get("goods:001");
            int total = result == null ? 0 : Integer.parseInt(result);
            if (total > 0) {
                // 如果在此处需要调用其他微服务,处理时间较长。。。
                int realTotal = total - 1;
                template.opsForValue().set("goods:001", String.valueOf(realTotal));
                System.out.println("购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8002");
                return "购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8002";
            } else {
                System.out.println("购买商品失败,服务端口为8002");
            }
            return "购买商品失败,服务端口为8002";
      }finally {
            // 谁加的锁,谁才能删除
            // 也可以使用redis事务
            // https://redis.io/commands/set
            // 使用Lua脚本,进行锁的删除

            Jedis jedis = null;
            try{
                jedis = RedisUtils.getJedis();

                String script = "if redis.call('get',KEYS) == ARGV " +
                        "then " +
                        "return redis.call('del',KEYS) " +
                        "else " +
                        "   return 0 " +
                        "end";

                Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));
                if("1".equals(eval.toString())){
                  System.out.println("-----del redis lock ok....");
                }else{
                  System.out.println("-----del redis lock error ....");
                }
            }catch (Exception e){

            }finally {

                if(null != jedis){
                  jedis.close();
                }
            }

            // redis事务
//            while(true){
//                template.watch(REDIS_LOCK);
//                if(template.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
//                  template.setEnableTransactionSupport(true);
//                  template.multi();
//                  template.delete(REDIS_LOCK);
//                  List<Object> list = template.exec();
//                  if(list == null){
//                        continue;
//                  }
//                }
//                template.unwatch();
//                break;
//            }
      }
      
    }
}只管这样,照旧会有题目,锁超时释放虽然可以制止死锁,但如果是业务执行耗时较长,也会导致锁的释放,但其实此时业务还在执行中,照旧应该将业务执行结束之后再释放锁。
续时

因此可以设定,使命不完成,锁就不释放。
可以维护一个定时线程池 ScheduledExecutorService,每隔 2s 去扫描参加队列中的 Task,判断失效时间是否快到了,如果快到了,则给锁续上时间。

那如何判断是否快到失效时间了呢?可以用以下公式:【失效时间】 {      // 这里记得加 try-catch,否者报错后定时使命将不会再执行=-=      Iterator iterator = holderList.iterator();      while (iterator.hasNext()) {            RedisLockDefinitionHolder holder = iterator.next();            // 判空            if (holder == null) {                iterator.remove();                continue;            }            // 判断 key 是否还有效,无效的话进行移除            if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {                iterator.remove();                continue;            }            // 超时重试次数,凌驾时给线程设定中断            if (holder.getCurrentCount() > holder.getTryCount()) {                holder.getCurrentTread().interrupt();                iterator.remove();                continue;            }            // 判断是否进入最后三分之一时间            long curTime = System.currentTimeMillis();            boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()){            if (e == null) {                if (ttlRemaining) {                  this.scheduleExpirationRenewal(threadId);                }            }      });      return ttlRemainingFuture;    } }此处出现leaseTime时间判断的2个分支,实际上就是加锁时是否设置过期时间,未设置过期时间(-1)时则会有watchDog 的锁续约 (下文),一个注册了加锁事件的续约使命。我们先来看有过期时间tryLockInnerAsync 部分
evalWriteAsync方法是eval命令执行lua的入口
// 扫描的任务队列
private static ConcurrentLinkedQueue<RedisLockDefinitionHolder> holderList = new ConcurrentLinkedQueue();
/**
* 线程池,维护keyAliveTime
*/
private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,
      new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
{
    // 两秒执行一次「续时」操作
    SCHEDULER.scheduleAtFixedRate(() -> {
      // 这里记得加 try-catch,否者报错后定时任务将不会再执行=-=
      Iterator<RedisLockDefinitionHolder> iterator = holderList.iterator();
      while (iterator.hasNext()) {
            RedisLockDefinitionHolder holder = iterator.next();
            // 判空
            if (holder == null) {
                iterator.remove();
                continue;
            }
            // 判断 key 是否还有效,无效的话进行移除
            if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {
                iterator.remove();
                continue;
            }
            // 超时重试次数,超过时给线程设定中断
            if (holder.getCurrentCount() > holder.getTryCount()) {
                holder.getCurrentTread().interrupt();
                iterator.remove();
                continue;
            }
            // 判断是否进入最后三分之一时间
            long curTime = System.currentTimeMillis();
            boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()) <= curTime;
            if (shouldExtend) {
                holder.setLastModifyTime(curTime);
                redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS);
                log.info("businessKey : [" + holder.getBusinessKey() + "], try count : " + holder.getCurrentCount());
                holder.setCurrentCount(holder.getCurrentCount() + 1);
            }
      }
    }, 0, 2, TimeUnit.SECONDS);
}eval命令执行Lua脚本的地方,此处将Lua脚本展开
// waitTime 等待时间,多久时间内都会在这尝试获取锁
// leaseTime 加锁时是否设置过期时间
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1L) {
      return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else {
      RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
      ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e == null) {
                if (ttlRemaining) {
                  this.scheduleExpirationRenewal(threadId);
                }
            }
      });
      return ttlRemainingFuture;
    }
}<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS) == 0) then redis.call('hincrby', KEYS, ARGV, 1); redis.call('pexpire', KEYS, ARGV); return nil; end; if (redis.call('hexists', KEYS, ARGV) == 1) then redis.call('hincrby', KEYS, ARGV, 1); redis.call('pexpire', KEYS, ARGV); return nil; end; return redis.call('pttl', KEYS);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}统共3个参数完成了一段逻辑:

[*]判断该锁是否已经有对应hash表存在,

[*]没有对应的hash表:则set该hash表中一个entry的key为锁名称,value为1,之后设置该hash表失效时间为leaseTime
[*]存在对应的hash表:则将该lockName的value执行+1利用,也就是盘算进入次数,再设置失效时间leaseTime

[*]最后返回这把锁的ttl剩余时间
再看看RLock如何解锁?
看unlock方法,同样查找方法名,一路到unlockInnerAsync
-- 不存在该key时
if (redis.call('exists', KEYS) == 0) then
-- 新增该锁并且hash中该线程id对应的count置1
redis.call('hincrby', KEYS, ARGV, 1);
-- 设置过期时间
redis.call('pexpire', KEYS, ARGV);
return nil;
end;

-- 存在该key 并且 hash中线程id的key也存在
if (redis.call('hexists', KEYS, ARGV) == 1) then
-- 线程重入次数++
redis.call('hincrby', KEYS, ARGV, 1);
redis.call('pexpire', KEYS, ARGV);
return nil;
end;
return redis.call('pttl', KEYS);将lua脚本展开
// keyName
KEYS = Collections.singletonList(this.getName())
// leaseTime
ARGV = this.internalLockLeaseTime
// uuid+threadId组合的唯一值
ARGV = this.getLockName(threadId)该Lua KEYS有2个Arrays.asList(getName(), getChannelName())
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS, ARGV) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS, ARGV, -1); if (counter > 0) then redis.call('pexpire', KEYS, ARGV); return 0; else redis.call('del', KEYS); redis.call('publish', KEYS, ARGV); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
}ARGV变量有三个LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
-- 不存在key
if (redis.call('hexists', KEYS, ARGV) == 0) then
return nil;
end;
-- 存在,计数器 -1
local counter = redis.call('hincrby', KEYS, ARGV, -1);
if (counter > 0) then
-- 过期时间重设
redis.call('pexpire', KEYS, ARGV);
return 0;
else
-- 删除并发布解锁消息
redis.call('del', KEYS);
redis.call('publish', KEYS, ARGV);
return 1;
end;
return nil;具体执行步骤如下:

[*]如果该锁不存在则返回nil;
[*]如果该锁存在则将其线程的hash key计数器-1,
[*]计数器counter>0,重置下失效时间,返回0;否则,删除该锁,发布解锁消息unlockMessage,返回1;
加锁解锁流程总结如下:
https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404270809775.png
总的来说就是通过Hash范例来存储锁的次数:
https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404270809508.png
RLock的锁重试题目

需要分析的是锁重试的,所以,在使用lock.tryLock()方法的时间,不能用无参的。
name 锁名称
channelName,用于pubSub发布消息的channel名称在调用tryAcquire方法后,返回了一个Long的ttl
LockPubSub.UNLOCK_MESSAGE,channel发送消息的类别,此处解锁为0
internalLockLeaseTime,watchDog配置的超时时间,默认为30s
lockName 这里的lockName指的是uuid和threadId组合的唯一值查看renewExpirationAsync方法源码,其调用了Lua脚本执行续命利用的。
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    return this.tryLock(waitTime, -1L, unit);
}pexpire重置锁的有效期。
总体逻辑如下:

[*]开启一个使命,10秒钟后执行
[*]开始的这个使命中重置有效期。假设设置的是默认30秒,则重置为30秒
[*]更新后又重复步骤1、2
那么什么时间取消这个续约的使命呢?在释放锁unlock时
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    if (ttl == null) {
      return true;
    } else {
      time -= System.currentTimeMillis() - current;
      if (time <= 0L) {
            this.acquireFailed(waitTime, unit, threadId);
            return false;
      } else {
                //省略multilock解决主从一致性题目

如果Redis是主从集群,主从同步存在耽误,当主机宕机时,从成为了主,但可能存在今后时还未完成同步,因此从上就没有锁标识,此时会出现并发安全题目。
因此redisson提出来了MutiLock锁,使用这把锁就不使用主从了,每个节点的职位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入乐成,此时才是加锁乐成,假设现在某个节点挂了,那么他去获得锁的时间,只要有一个节点拿不到,都不能算是加锁乐成,就保证了加锁的可靠性。
https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404270809473.png
使用multilock()方法。必须在所有的节点都获取锁乐成,才算乐成。 缺点是运维成本高,实现复杂。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS) == 0) then redis.call('hincrby', KEYS, ARGV, 1); redis.call('pexpire', KEYS, ARGV); return nil; end; if (redis.call('hexists', KEYS, ARGV) == 1) then redis.call('hincrby', KEYS, ARGV, 1); redis.call('pexpire', KEYS, ARGV); return nil; end; return redis.call('pttl', KEYS);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}总结Redisson

Redisson分布式锁解决前三个题目原理
https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404270809485.png
总结Redisson分布式锁原理:

[*]可重入:利用hash布局记录线程id和重入次数
[*]可重试:利用信号量和PubSub功能来实现等待、唤醒,获取锁失败的重试机制
[*]超时续约:利用watchDog,开启一个定时使命,每隔一段时间(releaseTime/3),重置超时时间。
[*]使用multilock: 多个独立的redis节点,必须在所有节点都获取重入锁,才算获取乐成;
redLock

不管是redLock,照旧redissonLock,两者底层都是通过相同的lua脚本来加锁、释放锁的,所以,两者只是外部形态的不同,底层是一样的。redLock是继续了redissonMultiLock,大部分的逻辑,都是在redissonMultiLock中去实现的,所以源码部分,大部分都是RedissonMultiLock
原理


[*]redLock的使用,需要有奇数台独立部署的Redis节点
[*]在加锁的时间,会分别去N台节点上加锁,如果半数以上的节点加锁乐成,就认为当火线程加锁乐成
https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404270809998.png
面试题专栏

Java面试题专栏已上线,欢迎访问。

[*]如果你不知道简历怎么写,简历项目不知道怎么包装;
[*]如果简历中有些内容你不知道该不该写上去;
[*]如果有些综合性题目你不知道怎么答;
那么可以私信我,我会尽我所能帮助你。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Redis中的分布式锁(步步为营)