基于Mongodb分布式锁简单实现,解决定时任务并发执行问题 ...

打印 上一主题 下一主题

主题 888|帖子 888|积分 2666

前言

我们日常开发过程,会有一些定时任务的代码来统计一些系统运行数据,但是我们应用有需要部署多个实例,传统的通过配置文件来控制定时任务是否启动又太过繁琐,而且还经常出错,导致一些异常数据的产生
网上有很多分布式锁的实现方案,基于redis、zk、等有很多,但是我的就是一个用了mysql和mongo的小应用,不准备引入其他三方中间件来解决这个问题,撸一个简单的分布式锁来解决定时任务并发执行的问题,加锁操作的原子性和防死锁也都要支持,这里我使用mongodb写了AllInOne的工具类
All in one Code

先上代码
  1. @Component
  2. @Slf4j
  3. public class MongoDBLock {
  4.     private static final int DEFAULT_LOCK_TIMEOUT = 30;//锁的默认超时时间,单位秒
  5.     private MongoTemplate mongoTemplate;
  6.     private int lockTimeout;
  7.     public MongoDBLock(MongoTemplate mongoTemplate) {
  8.         this.mongoTemplate = mongoTemplate;
  9.         this.lockTimeout = DEFAULT_LOCK_TIMEOUT;
  10.     }
  11.     /**
  12.      * 尝试获取分布式锁
  13.      *
  14.      * @param lockKey 锁的key
  15.      * @return true:获取锁成功,false:获取锁失败
  16.      */
  17.     private boolean acquireLock(String lockKey) {
  18.         LockDocument document = new LockDocument();
  19.         document.setId(lockKey);
  20.         document.setExpireAt(Instant.ofEpochMilli(Instant.now().toEpochMilli() + lockTimeout * 1000));
  21.         try {
  22.             mongoTemplate.insert(document);
  23.             return true;
  24.         } catch (Exception e) {
  25.         }
  26.         return false;
  27.     }
  28.     /**
  29.      * 释放分布式锁
  30.      *
  31.      * @param lockKey 锁的key
  32.      */
  33.     private void releaseLock(String lockKey) {
  34.         Query query = new Query(Criteria.where("key").is(lockKey));
  35.         mongoTemplate.remove(query, LockDocument.class);
  36.         log.info("程序执行成功,释放分布式锁,lockKey:{}",lockKey);
  37.     }
  38.     /**
  39.      * 分布式锁入口方法,参数lockName为锁的名称,lockKey为需要加锁的key,执行完成后自动释放锁
  40.      *
  41.      * @param lockKey
  42.      * @param task
  43.      * @param <T>
  44.      * @throws Exception
  45.      */
  46.     public <T> void executeWithLock(String lockKey, ITask<T> task) throws Exception {
  47.         boolean locked = acquireLock(lockKey);
  48.         if (locked) {
  49.             log.info("获取分布式锁成功,lockKey:{}",lockKey);
  50.             try {
  51.                 task.execute();
  52.             } finally {
  53.                 releaseLock(lockKey);
  54.             }
  55.         } else {
  56.             log.warn("获取分布式锁失败,lockKey:{}", lockKey);
  57.             throw new AppException("获取分布式锁失败!");
  58.         }
  59.     }
  60.     @Data
  61.     @Document(collection = "lock_collection")
  62.     static class LockDocument {
  63.         @Id
  64.         private String id;
  65.         @Indexed(expireAfterSeconds = DEFAULT_LOCK_TIMEOUT)
  66.         private Instant expireAt;
  67.     }
  68.     @FunctionalInterface
  69.     public interface ITask<T> {
  70.         T execute() throws Exception;
  71.     }
  72. }
复制代码
调用示例
  1.     @Resource
  2.     MongoDBLock mongoDBLock;
  3.     mongoDBLock.executeWithLock("key", () -> {
  4.         // do some thing
  5.         return null;
  6.     });
复制代码
原理


  • 使用key作为主键,利用mongodb的insert原子性保障LockDocument不会重复插入
  • LockDocument中expireAt字段利用的mongodb索引过期机制,解决死锁问题,这里设置超时时间是30秒,并在执行完成之后会主动释放锁

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

tsx81428

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表