ToB企服应用市场:ToB评测及商务社交产业平台

标题: 如何实现一个分布式锁 [打印本页]

作者: 金歌    时间: 2024-7-13 20:04
标题: 如何实现一个分布式锁
如何实现一个分布式锁

本篇内容主要先容如何使用 Java 语言实现一个注解式的分布式锁,主要是通过注解+AOP 环绕通知来实现。
1. 锁注解

我们首先写一个锁的注解
  1. /**
  2. * 分布式锁注解
  3. */
  4. @Retention(RetentionPolicy.RUNTIME)
  5. @Target({ElementType.METHOD})
  6. @Documented
  7. public @interface RedisLock {
  8.         long DEFAULT_TIMEOUT_FOR_LOCK = 5L;
  9.         long DEFAULT_EXPIRE_TIME = 60L;
  10.         String key() default "your-biz-key";
  11.         long expiredTime() default DEFAULT_EXPIRE_TIME;
  12.         long timeoutForLock() default DEFAULT_TIMEOUT_FOR_LOCK;
  13. }
复制代码
expiredTime 是设置锁的逾期时间,timeoutForLock 是设置等待锁的超时时间。如果没有等待得到锁的超时时间这个功能,那么其他线程在获取锁失败时只能直接失败,无法进行排队等待。
我们如何使用这个注解呢,很轻易,在需要加锁的业务方法上直接用就行.如下,我们有一个库存服务类,它有一个扣减库存方法,该方法将数据库中的一个库存商品的数量减一。在并发场景下,如果我们没有对其进行资源控制,必然会发生库存扣减差别等征象。
  1. public class StockServiceImpl {
  2.         @RedisLock(key = "stock-lock", expiredTime = 10L, timeoutForLock = 5L)
  3.         public void deduct(Long stockId) {
  4.                 Stock stock = this.getById(1L);
  5.                 Integer count = stock.getCount();
  6.                 stock.setCount(count - 1);
  7.                 this.updateById(stock);
  8.         }
  9. }
复制代码
2. 在 AOP 切面中进行加锁处置处罚

我们需要使用 AOP 来处置处罚什么?自然是处置处罚使用@RedisLock的方法,因此我们写一个切点表达式,它匹配所有标有 @RedisLock 注解的方法。
接着,我们将此切点表达式与 @Around 注解结合使用,以创建环绕通知,在目标方法执行前后执行我们的加锁解锁逻辑。
因此,根本的逻辑我们就理清了,代码大致长下面这个样子:
  1. public class RedisLockAspect {
  2.         private final RedisTemplate<String, Object> redisTemplate;
  3.         // 锁的redis key前缀
  4.         private static final String DEFAULT_KEY_PREFIX = "lock:";
  5.         // 匹配所有标有 @RedisLock 注解的方法
  6.         @Pointcut("@annotation(com.kelton.lock.annotation.RedisLock)")
  7.         public void lockAnno() {
  8.         }
  9.         @Around("lockAnno()")
  10.         public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
  11.                 // 获取拦截方法上的RedisLock注解
  12.                 RedisLock annotation = getLockAnnotationOnMethod(joinPoint);
  13.                 // 获取锁key
  14.                 String key = getKey(annotation);
  15.                 // 锁过期时间
  16.                 long expireTime = annotation.expiredTime();
  17.                 // 获取锁的等待时间
  18.                 long timeoutForLock = annotation.timeoutForLock();
  19.                 // 在这里加锁
  20.                 someCodeForLock...
  21.                 // 执行业务
  22.                 joinPoint.proceed();
  23.                 // 在这里解锁
  24.                 someCodeForUnLock...
  25.         }
复制代码
我们在加锁的时候,需要用上 timeoutForLock 这个属性,我们通过自旋加线程休眠的方式,来达到在一段时间内等待获取锁的目的。如果自旋时间结束后,还没获取锁,则抛出异常,这里可以根据自己情况而定。自旋加锁代码如下:
  1.     // 自旋获取锁
  2.     long endTime = System.currentTimeMillis() + timeoutForLock * 1000;
  3.     boolean acquired = false;
  4.     String uuid = UUID.randomUUID().toString();
  5.     while(System.currentTimeMillis() < endTime) {
  6.         Boolean absent = redisTemplate.opsForValue()
  7.                 .setIfAbsent(key, uuid, expireTime, TimeUnit.SECONDS);
  8.         if (Boolean.TRUE.equals(absent)) {
  9.             acquired = true;
  10.             break;
  11.         } else {
  12.             // 获取不到锁,尝试休眠100毫秒后重试
  13.             Thread.sleep(100);
  14.         }
  15.     }
  16.     // 超时未获取到锁, 抛出异常,可根据自己业务而定
  17.     if (!acquired) {
  18.         throw new RuntimeException("获取锁异常");
  19.     }
复制代码
我们发现上面加锁的时候设置了一个 uuid 作为 value 值,这是为了在锁释放的时候,不误删其他线程上的锁,随后,我们就可以执行被 AOP 切中的方法,执行结束释放锁。代码如下:
  1.     try {
  2.         // 执行业务
  3.         joinPoint.proceed();
  4.     } catch (Throwable e) {
  5.         log.error("业务执行出错!");
  6.     } finally {
  7.         // 解锁时进行校验,只删除自己线程加的锁
  8.         String value = (String) redisTemplate.opsForValue().get(key);
  9.         if (uuid.equals(value)) {
  10.             redisTemplate.delete(key);
  11.         } else {
  12.             log.warn("锁已过期!");
  13.         }
  14.     }
复制代码
到这里,我们就以注解+AOP 的方式实现了分布式锁的功能。当然,以上只实现了分布式锁的简单功能,还缺少了分布式锁的 key 自动续约防止锁逾期功能,以及锁重入功能。
目前,RedisLockAspect的完整代码如下:
  1. @Component
  2. @Aspect
  3. @Slf4j
  4. @AllArgsConstructor
  5. public class RedisLockAspect {
  6.         // 匹配所有标有 @RedisLock 注解的方法
  7.         @Pointcut("@annotation(com.kelton.lock.annotation.RedisLock)")
  8.         public void lockAnno() {
  9.         }
  10.         @Around("lockAnno()")
  11.         public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
  12.                 // 获取拦截方法上的RedisLock注解
  13.                 RedisLock annotation = getLockAnnotationOnMethod(joinPoint);
  14.                 String key = getKey(annotation);
  15.                 // 锁过期时间
  16.                 long expireTime = annotation.expiredTime();
  17.                 // 获取锁的等待时间
  18.                 long timeoutForLock = annotation.timeoutForLock();
  19.                 // 自旋获取锁
  20.                 long endTime = System.currentTimeMillis() + timeoutForLock * 1000;
  21.                 boolean acquired = false;
  22.                 String uuid = UUID.randomUUID().toString();
  23.                 while(System.currentTimeMillis() < endTime) {
  24.                         Boolean absent = redisTemplate.opsForValue()
  25.                                         .setIfAbsent(key, uuid, expireTime, TimeUnit.SECONDS);
  26.                         if (Boolean.TRUE.equals(absent)) {
  27.                                 acquired = true;
  28.                                 break;
  29.                         } else {
  30.                                 // 获取不到锁,尝试休眠100毫秒后重试
  31.                                 Thread.sleep(100);
  32.                         }
  33.                 }
  34.                 // 超时未获取到锁, 抛出异常,可根据自己业务而定
  35.                 if (!acquired) {
  36.                         throw new RuntimeException("获取锁异常");
  37.                 }
  38.                 try {
  39.                         // 执行业务
  40.                         joinPoint.proceed();
  41.                 } catch (Throwable e) {
  42.                         log.error("业务执行出错!");
  43.                 } finally {
  44.                         // 解锁时进行校验,只删除自己线程加的锁
  45.                         String value = (String) redisTemplate.opsForValue().get(key);
  46.                         if (uuid.equals(value)) {
  47.                                 redisTemplate.delete(key);
  48.                         } else {
  49.                                 log.warn("锁已过期!");
  50.                         }
  51.                 }
  52.         }
  53.         private String getKey(RedisLock redisLock) {
  54.                 if (Objects.isNull(redisLock)) {
  55.                         return DEFAULT_KEY_PREFIX + "default";
  56.                 }
  57.                 return DEFAULT_KEY_PREFIX + redisLock.key();
  58.         }
  59.         private RedisLock getLockAnnotationOnMethod(ProceedingJoinPoint joinPoint) {
  60.                 MethodSignature signature = (MethodSignature) joinPoint.getSignature();
  61.                 Method method = signature.getMethod();
  62.                 return method.getAnnotation(RedisLock.class);
  63.         }
  64. }
复制代码
3. key 自动续约防止锁逾期

我们接着完善该分布式锁,为其添加 key 自动续约防止锁逾期的功能。我们的思路与Redission的watch dog类似,开启一个后台线程,来定时查抄需要续约的锁。我们如何判断一个锁是否需要续约呢,我们可以简单定义一个续约分界线,比如在锁逾期时间的三分之二的时间点及之后,对锁进行续约。
3.1 定义一个续约任务4

我们来定义一个锁续约任务,那我们需要什么信息呢?
我们至少需要锁的 key,锁要设置的逾期时间。这是两个最根本的信息。
要判断在锁逾期时间的三分之二的时间点及之后进行续约,那么我们还需要纪录锁上次续约的时间点。
此外,我们还可以为锁续约任务添加最大续约次数限制,这可以避免某些执行时间特别久的任务不断占用锁。所以我们还需要纪录当前锁续约次数和最大续约次数。
对超过最大续约次数的锁的线程,我们直接将其制止,因此我们也纪录一下该锁的线程。
结合上面的分析,我们定义的锁续约任务类如下:
  1. public class LockRenewTask {
  2.         /**
  3.          * key
  4.          */
  5.         private final String key;
  6.         /**
  7.          * 过期时间。单位:秒
  8.          */
  9.         private final long expiredTime;
  10.         /**
  11.          * 锁的最大续约次数
  12.          */
  13.         private final int maxRenewCount;
  14.         /**
  15.          * 锁的当前续约次数
  16.          */
  17.         private int currentRenewCount;
  18.         /**
  19.          * 最新更新时间
  20.          */
  21.         private LocalDateTime latestRenewTime;
  22.         /**
  23.          * 业务线程
  24.          */
  25.         private final Thread thread;
  26.         public LockRenewTask(String key, long expiredTime, int maxRenewCount, Thread thread) {
  27.                 this.key = key;
  28.                 this.expiredTime = expiredTime;
  29.                 this.maxRenewCount = maxRenewCount;
  30.                 this.thread = thread;
  31.                 this.latestRenewTime = LocalDateTime.now();
  32.         }
  33.         /**
  34.          * 是否到达续约时间
  35.          * @return
  36.          */
  37.         public boolean isTimeToRenew() {
  38.                 LocalDateTime now = LocalDateTime.now();
  39.                 Duration duration = Duration.between(latestRenewTime, now);
  40.                 return duration.toSeconds() >= ((double)(this.expiredTime / 3) * 2);
  41.         }
  42.         /**
  43.          * 是否达到最大续约次数
  44.          * @return
  45.          */
  46.         public boolean exceedMaxRenewCount() {
  47.                 return this.currentRenewCount >= this.maxRenewCount;
  48.         }
  49.         public synchronized void renew() {
  50.                 this.currentRenewCount++;
  51.                 this.latestRenewTime = LocalDateTime.now();
  52.         }
  53.         // 取消业务方法
  54.         public void cancel() {
  55.                 thread.interrupt();
  56.         }
  57.         public String getKey() {
  58.                 return key;
  59.         }
  60.         public long getExpiredTime() {
  61.                 return expiredTime;
  62.         }
  63. }
复制代码
我们添��了一些关于锁续约的方法:
3.2 定义一个锁续约任务处置处罚器

接着,我们定义一个定时执行该续约任务的 handler。该 handler 也比较简答,核心逻辑是持有一个类型为 List的 taskList 来添加续约任务,且使用一个 ScheduledExecutorService 来定时遍历该 taskList 来执行续约任务。该 handler 再对外暴露一个 addRenewTask 方法,方便外部调用来添加续约任务到 taskList 中。
  1. @Slf4j
  2. @Component
  3. public class LockRenewHandler {
  4.         @Autowired
  5.         private RedisTemplate<String, Object> redisTemplate;
  6.         /**
  7.          * 保障对 taskList的添加删除操作是线程安全的
  8.          */
  9.         private final ReentrantLock taskListLock = new ReentrantLock();
  10.         private final List<LockRenewTask> taskList = new ArrayList<>();
  11.         private final ScheduledExecutorService taskExecutorService;
  12.         {
  13.                 taskExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
  14.                 taskExecutorService.scheduleAtFixedRate(() -> {
  15.                         try {
  16.                                 executeRenewTask();
  17.                         } catch (Exception e) {
  18.                                 //错误处理
  19.                         }
  20.                 }, 1, 2, TimeUnit.SECONDS);
  21.         }
  22.         /**
  23.          * 添加续约任务
  24.          */
  25.         public void addRenewTask(LockRenewTask task) {
  26.                 taskListLock.lock();
  27.                 try {
  28.                         taskList.add(task);
  29.                 } finally {
  30.                         taskListLock.unlock();
  31.                 }
  32.         }
  33.         /**
  34.          * 执行续约任务
  35.          */
  36.         private void executeRenewTask() {
  37.                 log.info("开始执行续约任务");
  38.                 if (CollectionUtils.isEmpty(taskList)) {
  39.                         return;
  40.                 }
  41.                 // 需要删除的任务,暂存这个集合中  取消
  42.                 List<LockRenewTask> cancelTask = new ArrayList<>();
  43.                 // 获取任务副本
  44.                 List<LockRenewTask> copyTaskList = new ArrayList<>(taskList);
  45.                 for (LockRenewTask task : copyTaskList) {
  46.                         try {
  47.                                 // 判断 Redis 中是否存在 key
  48.                                 if (!redisTemplate.hasKey(task.getKey())) {
  49.                                         cancelTask.add(task);
  50.                                         continue;
  51.                                 }
  52.                                 // 大于等于最大续约次数
  53.                                 if (task.exceedMaxRenewCount()) {
  54.                                         // 停止续约任务
  55.                                         task.cancel();
  56.                                         cancelTask.add(task);
  57.                                         continue;
  58.                                 }
  59.                                 // 到达续约时间
  60.                                 if (task.isTimeToRenew()) {
  61.                                         log.info("续约任务:{}", task.getKey());
  62.                                         redisTemplate.expire(task.getKey(), task.getExpiredTime(), TimeUnit.SECONDS);
  63.                                         task.renew();
  64.                                 }
  65.                         } catch (Exception e) {
  66.                                 //错误处理
  67.                                 log.error("处理任务出错:{}", task);
  68.                         }
  69.                 }
  70.                 // 加锁,删除 taskList 中需要移除的任务
  71.                 taskListLock.lock();
  72.                 try {
  73.                         taskList.removeAll(cancelTask);
  74.                         // 清理cancelTask,避免堆积,产生内存泄露
  75.                         cancelTask.clear();
  76.                 } finally {
  77.                         taskListLock.unlock();
  78.                 }
  79.         }
  80. }
复制代码
总结一下 LockRenewHandler的主要作用:它负责管理和执行续约任务,以延长 Redis 中键的逾期时间。
大概的工作流程如下:
两个需要注意的点
通过这种方式,LockRenewHandler 可以确保 Redis 中的键在需要时得到续约,并自动移除完成或失败的任务。
3.3 添加锁续约任务

在上面 3.1 节和 3.2 节我们定义好了锁续约任务和处置处罚锁续约任务的核心代码,接下来我们需要在第 2 节加锁解锁的 AOP 处置处罚逻辑上进行一点小小的修改,主要就是在执行加锁之后,执行业务代码之前,添加上锁续约任务。修改位置如下:
  1. public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
  2.     ... // 省略代码
  3.     try {
  4.         // 添加锁续约任务
  5.         LockRenewTask task = new LockRenewTask(key, annotation.expiredTime(), annotation.maxRenew(), Thread.currentThread());
  6.         lockRenewHandler.addRenewTask(task);
  7.         log.info("添加续约任务, key:{}", key);
  8.         // 执行业务
  9.         joinPoint.proceed();
  10.     } catch (Throwable e) {
  11.         log.error("业务执行出错!");
  12.     } finally {
  13.         // 解锁时进行校验,只删除自己线程加的锁
  14.         String value = (String) redisTemplate.opsForValue().get(key);
  15.         if (uuid.equals(value)) {
  16.             redisTemplate.delete(key);
  17.         } else {
  18.             log.warn("锁已过期!");
  19.         }
  20.     }
  21.     ... // 省略代码
  22. }
复制代码
到这里,我们的分布式锁已经相当完善了,把锁自动续约的功能也加上了。当然,还没有实现锁的可重入性。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4