定时任务分布式锁SchedulerLock

打印 上一主题 下一主题

主题 1669|帖子 1669|积分 5009

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
作用

SchedulerLock 作用:确保任务在同一时刻最多执行一次。如果一个任务正在一个节点上执行,则它将得到一个锁,该锁将阻止从另一个节点(或线程)执行同一任务。如果一个任务已经在一个节点上执行,则在其他节点上的执行不会等待,只需跳过它即可 。
SchedulerLock 主要通太过布式锁实现,可以利用:

  • 数据库锁(基于数据库行锁或唯一约束)
  • Redis 分布式锁(利用 SET NX EX)
  • Zookeeper 分布式锁(基于临时节点)
  • 基于 Quartz/ShedLock 的框架实现
相关注解

@EnableSchedulerLock
  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Import(SchedulerLockConfigurationSelector.class)
  4. public @interface EnableSchedulerLock {
  5.     enum InterceptMode {
  6.         PROXY_SCHEDULER,
  7.         PROXY_METHOD
  8.     }
  9.     InterceptMode interceptMode() default InterceptMode.PROXY_METHOD;
  10.     String defaultLockAtMostFor();
  11.     String defaultLockAtLeastFor() default "PT0S";
  12.     AdviceMode mode() default AdviceMode.PROXY;
  13.     boolean proxyTargetClass() default false;
  14.     int order() default Ordered.LOWEST_PRECEDENCE;
  15. }
复制代码
指定在执行节点结束时应保存锁的默认时间利用 ISO8601 Duration 格式,作用就是在被加锁的节点挂了时,无法释放锁,造成其他节点无法进行下一任务,我们利用注解时间需要给定一个值。可以在每个 ScheduledLock 注解中被重写,也就是说每个定时任务都可以重新定义时间,来控制每个定时任务。

  • defaultLockAtMostFor:设定默认最大锁持有时间
  • defaultLockAtLeastFor:设定默认最小锁持有时间
@SchedulerLock
  1. @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Deprecated
  4. public @interface SchedulerLock {
  5.     String name() default "";
  6.     long lockAtMostFor() default -1L;
  7.     String lockAtMostForString() default "";
  8.     long lockAtLeastFor() default -1L;
  9.     String lockAtLeastForString() default "";
  10. }
复制代码

  • name:锁的名称,必须保证唯一,每个任务的锁名称应该唯一,因为它决定了这个锁在分布式环境中的唯一性
  • lockAtMostFor:乐成执行任务的节点所能拥有的独占锁的最长时间,设置的值要保证比定时任务正常执行完成的时间大一些,此属性保证了如果 task 节点忽然宕机,也能在超过设定值时释放任务锁
  • lockAtLeastFor:乐成执行任务的节点所能拥有的独占锁的最短时间,在指定的时间内,即使任务执行完成,锁也不会释放,这有助于防止任务被频仍触发
  • lockAtMostForString:最大时间的字符串形式,允许通过 Spring 的属性占位符(例如:${lock.duration})来动态配置值,例如“PT14M”表示为 14 分钟
  • lockAtLeastForString:最小时间的字符串形式
根本利用

redis 整合
  1. <dependency>
  2.    <groupId>org.springframework.boot</groupId>
  3.    <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency>
  5. <dependency>
  6.     <groupId>net.javacrumbs.shedlock</groupId>
  7.     <artifactId>shedlock-spring</artifactId>
  8.     <version>4.38.0</version>
  9. </dependency>
  10. <dependency>
  11.     <groupId>net.javacrumbs.shedlock</groupId>
  12.     <artifactId>shedlock-provider-redis-spring</artifactId>
  13.     <version>4.38.0</version>
  14. </dependency>
复制代码
  1. spring:
  2.   redis:
  3.     #数据库索引
  4.     database: 0
  5.     host: 127.0.0.1
  6.     port: 6379
  7.     password:
  8.     jedis:
  9.       pool:
  10.         #最大连接数
  11.         max-active: 8
  12.         #最大阻塞等待时间(负数表示没限制)
  13.         max-wait: -1
  14.         #最大空闲
  15.         max-idle: 8
  16.         #最小空闲
  17.         min-idle: 0
  18.         #连接超时时间
  19.     timeout: 10000
复制代码
  1. // 开启定时任务注解
  2. @EnableScheduling
  3. // 开启定时任务锁,默认设置锁最大占用时间为 30s
  4. @EnableSchedulerLock(defaultLockAtMostFor = "PT30S")
  5. @SpringBootApplication
  6. public class HelloSpringbootApplication {
  7.    public static void main(String[] args) {
  8.       SpringApplication.run(HelloSpringbootApplication.class, args);
  9.    }
  10. }
复制代码
  1. @Configuration
  2. @EnableCaching
  3. public class RedisConfig extends CachingConfigurerSupport {
  4.    
  5.     @Bean(name = "redisTemplate")
  6.     public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
  7.         RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
  8.         //参照 StringRedisTemplate 内部实现指定序列化器
  9.         redisTemplate.setConnectionFactory(redisConnectionFactory);
  10.         redisTemplate.setKeySerializer(keySerializer());
  11.         redisTemplate.setHashKeySerializer(keySerializer());
  12.         redisTemplate.setValueSerializer(valueSerializer());
  13.         redisTemplate.setHashValueSerializer(valueSerializer());
  14.         return redisTemplate;
  15.     }
  16.     private RedisSerializer<String> keySerializer(){
  17.         return new StringRedisSerializer();
  18.     }
  19.     //使用 Jackson 序列化器
  20.     private RedisSerializer<Object> valueSerializer(){
  21.         return new GenericJackson2JsonRedisSerializer();
  22.     }
  23.    
  24.     @Bean
  25.     public LockProvider lockProvider(RedisTemplate redisTemplate) {
  26.         return new RedisLockProvider(redisTemplate.getConnectionFactory());
  27.     }
  28. }
复制代码
  1. @Slf4j
  2. @Component
  3. public class TestScheduled {
  4.     @Resource
  5.     RedisTemplate redisTemplate;
  6.     // @SchedulerLock 的作用是保证当前定时任务的方法执行时获得锁,忽略其他相同任务的执行
  7.     // name 必须要指定,ShedLock 就是根据这个 name 进行相同任务判定的
  8.     // name:定时任务的名字,就是数据库中的主键(name)
  9.     // lockAtMostFor:锁的最大时间单位为毫秒
  10.     // lockAtLeastFor:锁的最小时间单位为毫秒
  11.     @Scheduled(fixedDelay = 30 * 1000)
  12.     @SchedulerLock(name = "evaluateUnsubmit",lockAtLeastFor = 5*60*1000,lockAtMostFor = 20*60*1000 )
  13.     public void testMethod(){
  14.         log.info("开始执行 {}", DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
  15.         try {
  16.             Thread.sleep(100);
  17.             redisTemplate.opsForValue().set("test" + System.currentTimeMillis(),"goodJob",100, TimeUnit.SECONDS);
  18.         } catch (InterruptedException e) {
  19.             throw new RuntimeException(e);
  20.         }
  21.         log.info("执行完成 {}", DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
  22.     }
  23.     @Scheduled(cron = "*/15 * * * * *")
  24.     @SchedulerLock(name = "TaskScheduler_scheduledTask", lockAtLeastForString = "PT5M", lockAtMostForString = "PT14M")
  25.     public void scheduledTask() {
  26.         // ...
  27.     }
  28. }
复制代码
mysql 整合
  1. <dependency>
  2.     <groupId>net.javacrumbs.shedlock</groupId>
  3.     <artifactId>shedlock-spring</artifactId>
  4.     <version>4.23.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>net.javacrumbs.shedlock</groupId>
  8.     <artifactId>shedlock-provider-jdbc-template</artifactId>
  9.     <version>4.23.0</version>
  10. </dependency>
  11. <dependency>
  12.     <groupId>mysql</groupId>
  13.     <artifactId>mysql-connector-java</artifactId>
  14.     <scope>runtime</scope>
  15. </dependency>
复制代码
  1. # MySQL, MariaDB
  2. CREATE TABLE shedlock(name VARCHAR(64) NOT NULL, lock_until TIMESTAMP(3) NOT NULL,
  3.     locked_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), locked_by VARCHAR(255) NOT NULL, PRIMARY KEY (name));
  4. # Postgres
  5. CREATE TABLE shedlock(name VARCHAR(64) NOT NULL, lock_until TIMESTAMP NOT NULL,
  6.     locked_at TIMESTAMP NOT NULL, locked_by VARCHAR(255) NOT NULL, PRIMARY KEY (name));
  7. # Oracle
  8. CREATE TABLE shedlock(name VARCHAR(64) NOT NULL, lock_until TIMESTAMP(3) NOT NULL,
  9.     locked_at TIMESTAMP(3) NOT NULL, locked_by VARCHAR(255) NOT NULL, PRIMARY KEY (name));
  10. # MS SQL
  11. CREATE TABLE shedlock(name VARCHAR(64) NOT NULL, lock_until datetime2 NOT NULL,
  12.     locked_at datetime2 NOT NULL, locked_by VARCHAR(255) NOT NULL, PRIMARY KEY (name));
  13. # DB2
  14. CREATE TABLE shedlock(name VARCHAR(64) NOT NULL PRIMARY KEY, lock_until TIMESTAMP NOT NULL,
  15.     locked_at TIMESTAMP NOT NULL, locked_by VARCHAR(255) NOT NULL);
复制代码
  1. @Configuration
  2. // 开启定时器
  3. @EnableScheduling
  4. // 开启定时任务锁,指定一个默认的锁的时间 30 秒
  5. @EnableSchedulerLock(defaultLockAtMostFor = "PT30S")
  6. public class ShedlockJdbcConfig {
  7.     /**
  8.      * 配置锁的提供者
  9.      */
  10.     @Bean
  11.     public LockProvider lockProvider(DataSource dataSource) {
  12.         return new JdbcTemplateLockProvider(
  13.                 JdbcTemplateLockProvider.Configuration.builder()
  14.                         .withJdbcTemplate(new JdbcTemplate(dataSource))
  15.                         .usingDbTime()
  16.                         .build()
  17.         );
  18.     }
  19. }
复制代码
  1. @Component
  2. @Slf4j
  3. public class TimeTaskJob {
  4.     private static Integer count = 1;
  5.     /**
  6.      * 任务 1 每 5 秒执行一次
  7.      * lockAtLeastFor:虽然定时任务是每隔5秒执行一次, 但是分布式锁定义的是: 每次任务要锁住20秒,20秒是持有锁的最小时间,必须等20秒后才释放锁,并且确保在20秒钟内,该任务不会运行超过 1 次;
  8.      * lockAtMostFor:锁最大持有时间30秒,表示最多锁定30秒钟,主要用于防止执行任务的节点挂掉(即使这个节点挂掉,在30秒钟后锁也被释放),一般将其设置为明显大于任务的最大执行时长;如果任务运行时间超过该值(即任务30秒钟没有执行完),则该任务可能被重复执行。
  9.      */
  10.     @Scheduled(cron = "0/5 * * * * ? ")
  11.     @SchedulerLock(name = "testJob1",lockAtLeastFor = "20000", lockAtMostFor = "30000")
  12.     public void scheduledTask1() {
  13.         log.info(Thread.currentThread().getName() + "->>>任务1执行第:" + (count++) + "次");
  14.     }
  15.     @Scheduled(cron = "0/5 * * * * ? ")
  16.     @SchedulerLock(name = "testJob2")
  17.     public void scheduledTask2() {
  18.         log.info(Thread.currentThread().getName() + "->>>任务2执行第:" + (count++) + "次");
  19.     }
  20. }
复制代码
实现原理


  • 利用@EnableSchedulerLock 注解后,会引入 SchedulerLockConfigurationSelector 类,根据其对应的模式(默认 InterceptMode.PROXY_METHOD)天生 LockConfigurationExtractorConfiguration 和 MethodProxyLockConfiguration 类
  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Import(SchedulerLockConfigurationSelector.class)
  4. public @interface EnableSchedulerLock {
  5.     enum InterceptMode {
  6.         PROXY_SCHEDULER,
  7.         PROXY_METHOD
  8.     }
  9.     InterceptMode interceptMode() default InterceptMode.PROXY_METHOD;
  10.     String defaultLockAtMostFor();
  11.     String defaultLockAtLeastFor() default "PT0S";
  12.     AdviceMode mode() default AdviceMode.PROXY;
  13.     boolean proxyTargetClass() default false;
  14.     int order() default Ordered.LOWEST_PRECEDENCE;
  15. }
复制代码
  1. public class SchedulerLockConfigurationSelector implements ImportSelector {
  2.     @Override
  3.     @NonNull
  4.     public String[] selectImports(@NonNull AnnotationMetadata metadata) {
  5.         AnnotationAttributes attributes = AnnotationAttributes.fromMap(metadata.getAnnotationAttributes(EnableSchedulerLock.class.getName(), false));
  6.         InterceptMode mode = attributes.getEnum("interceptMode");
  7.         if (mode == PROXY_METHOD) {
  8.             return new String[]{AutoProxyRegistrar.class.getName(), LockConfigurationExtractorConfiguration.class.getName(), MethodProxyLockConfiguration.class.getName()};
  9.         } else if (mode == PROXY_SCHEDULER) {
  10.             return new String[]{AutoProxyRegistrar.class.getName(), LockConfigurationExtractorConfiguration.class.getName(), SchedulerProxyLockConfiguration.class.getName(), RegisterDefaultTaskSchedulerPostProcessor.class.getName()};
  11.         } else {
  12.             throw new UnsupportedOperationException("Unknown mode " + mode);
  13.         }
  14.     }
  15. }
复制代码

  • LockConfigurationExtractorConfiguration 会获取@EnableSchedulerLock 注解上的属性进行配置,天生 SpringLockConfigurationExtractor
  1. @Configuration
  2. class LockConfigurationExtractorConfiguration extends AbstractLockConfiguration implements EmbeddedValueResolverAware {
  3.     private final StringToDurationConverter durationConverter = StringToDurationConverter.INSTANCE;
  4.     private StringValueResolver resolver;
  5.     @Bean
  6.     ExtendedLockConfigurationExtractor lockConfigurationExtractor() {
  7.         return new SpringLockConfigurationExtractor(defaultLockAtMostForDuration(), defaultLockAtLeastForDuration(), resolver, durationConverter);
  8.     }
  9.     private Duration defaultLockAtLeastForDuration() {
  10.         return toDuration(getDefaultLockAtLeastFor());
  11.     }
  12.     private Duration defaultLockAtMostForDuration() {
  13.         return toDuration(getDefaultLockAtMostFor());
  14.     }
  15.     private String getDefaultLockAtLeastFor() {
  16.         return getStringFromAnnotation("defaultLockAtLeastFor");
  17.     }
  18.     private String getDefaultLockAtMostFor() {
  19.         return getStringFromAnnotation("defaultLockAtMostFor");
  20.     }
  21.     private Duration toDuration(String string) {
  22.         return durationConverter.convert(resolver.resolveStringValue(string));
  23.     }
  24.     protected String getStringFromAnnotation(String name) {
  25.         return annotationAttributes.getString(name);
  26.     }
  27.     @Override
  28.     public void setEmbeddedValueResolver(@NonNull StringValueResolver resolver) {
  29.         this.resolver = resolver;
  30.     }
  31. }
复制代码

  • MethodProxyLockConfiguration 类会根据 LockProvider 和 ExtendedLockConfigurationExtractor 进行自动装配,天生 MethodProxyScheduledLockAdvisor
  1. @Configuration
  2. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  3. class MethodProxyLockConfiguration extends AbstractLockConfiguration {
  4.     @Bean
  5.     @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  6.     MethodProxyScheduledLockAdvisor proxyScheduledLockAopBeanPostProcessor(
  7.         @Lazy LockProvider lockProvider,
  8.         @Lazy ExtendedLockConfigurationExtractor lockConfigurationExtractor
  9.     ) {
  10.         MethodProxyScheduledLockAdvisor advisor = new MethodProxyScheduledLockAdvisor(
  11.             lockConfigurationExtractor,
  12.             new DefaultLockingTaskExecutor(lockProvider)
  13.         );
  14.         advisor.setOrder(getOrder());
  15.         return advisor;
  16.     }
  17. }
复制代码

  • 天生一个切面 MethodProxyScheduledLockAdvisor 类,对方法进行拦截
  1. class MethodProxyScheduledLockAdvisor extends AbstractPointcutAdvisor {
  2.     // ...
  3.     private static class LockingInterceptor implements MethodInterceptor {
  4.         private final ExtendedLockConfigurationExtractor lockConfigurationExtractor;
  5.         private final LockingTaskExecutor lockingTaskExecutor;
  6.         LockingInterceptor(ExtendedLockConfigurationExtractor lockConfigurationExtractor, LockingTaskExecutor lockingTaskExecutor) {
  7.             this.lockConfigurationExtractor = lockConfigurationExtractor;
  8.             this.lockingTaskExecutor = lockingTaskExecutor;
  9.         }
  10.         @Override
  11.         public Object invoke(MethodInvocation invocation) throws Throwable {
  12.             Class<?> returnType = invocation.getMethod().getReturnType();
  13.             if (returnType.isPrimitive() && !void.class.equals(returnType)) {
  14.                 throw new LockingNotSupportedException("Can not lock method returning primitive value");
  15.             }
  16.             // 查找@SchedulerLock 注解
  17.             LockConfiguration lockConfiguration = lockConfigurationExtractor.getLockConfiguration(invocation.getThis(), invocation.getMethod()).get();
  18.             // 执行加锁方法
  19.             TaskResult<Object> result = lockingTaskExecutor.executeWithLock(invocation::proceed, lockConfiguration);
  20.             if (Optional.class.equals(returnType)) {
  21.                 return toOptional(result);
  22.             } else {
  23.                 return result.getResult();
  24.             }
  25.         }
  26.         private static Object toOptional(TaskResult<Object> result) {
  27.             if (result.wasExecuted()) {
  28.                 return result.getResult();
  29.             } else {
  30.                 return Optional.empty();
  31.             }
  32.         }
  33.     }
  34. }
复制代码

  • SpringLockConfigurationExtractor 会查找方法上是否存在@SchedulerLock 注解
  1. class SpringLockConfigurationExtractor implements ExtendedLockConfigurationExtractor {
  2.     // ...
  3.    
  4.     @Override
  5.     public Optional<LockConfiguration> getLockConfiguration(Object target, Method method) {
  6.         AnnotationData annotation = findAnnotation(target, method);
  7.         if (shouldLock(annotation)) {
  8.             return Optional.of(getLockConfiguration(annotation));
  9.         } else {
  10.             return Optional.empty();
  11.         }
  12.     }
  13.     AnnotationData findAnnotation(Object target, Method method) {
  14.         AnnotationData annotation = findAnnotation(method);
  15.         if (annotation != null) {
  16.             return annotation;
  17.         } else {
  18.             Class<?> targetClass = AopUtils.getTargetClass(target);
  19.             try {
  20.                 Method methodOnTarget = targetClass
  21.                     .getMethod(method.getName(), method.getParameterTypes());
  22.                 return findAnnotation(methodOnTarget);
  23.             } catch (NoSuchMethodException e) {
  24.                 return null;
  25.             }
  26.         }
  27.     }
  28.     private AnnotationData findAnnotation(Method method) {
  29.         net.javacrumbs.shedlock.core.SchedulerLock annotation = AnnotatedElementUtils.getMergedAnnotation(method, net.javacrumbs.shedlock.core.SchedulerLock.class);
  30.         if (annotation != null) {
  31.             return new AnnotationData(annotation.name(), annotation.lockAtMostFor(), annotation.lockAtMostForString(), annotation.lockAtLeastFor(), annotation.lockAtLeastForString());
  32.         }
  33.         SchedulerLock annotation2 = AnnotatedElementUtils.getMergedAnnotation(method, SchedulerLock.class);
  34.         if (annotation2 != null) {
  35.             return new AnnotationData(annotation2.name(), -1, annotation2.lockAtMostFor(), -1, annotation2.lockAtLeastFor());
  36.         }
  37.         return null;
  38.     }
  39.     // ...
  40. }
复制代码

  • DefaultLockingTaskExecutor 类对方法进行加解锁,执行 LockProvider 提供的加锁方法
  1. public class DefaultLockingTaskExecutor implements LockingTaskExecutor {
  2.     // ...
  3.    
  4.     @Override
  5.     @NonNull
  6.     public <T> TaskResult<T> executeWithLock(@NonNull TaskWithResult<T> task, @NonNull LockConfiguration lockConfig) throws Throwable {
  7.         Optional<SimpleLock> lock = lockProvider.lock(lockConfig);
  8.         String lockName = lockConfig.getName();
  9.         if (alreadyLockedBy(lockName)) {
  10.             logger.debug("Already locked '{}'", lockName);
  11.             return TaskResult.result(task.call());
  12.         } else if (lock.isPresent()) {
  13.             try {
  14.                 LockAssert.startLock(lockName);
  15.                 LockExtender.startLock(lock.get());
  16.                 logger.debug("Locked '{}', lock will be held at most until {}", lockName, lockConfig.getLockAtMostUntil());
  17.                 return TaskResult.result(task.call());
  18.             } finally {
  19.                 LockAssert.endLock();
  20.                 SimpleLock activeLock = LockExtender.endLock();
  21.                 if (activeLock != null) {
  22.                     activeLock.unlock();
  23.                 } else {
  24.                     // This should never happen, but I do not know any better way to handle the null case.
  25.                     logger.warn("No active lock, please report this as a bug.");
  26.                     lock.get().unlock();
  27.                 }
  28.                 if (logger.isDebugEnabled()) {
  29.                     Instant lockAtLeastUntil = lockConfig.getLockAtLeastUntil();
  30.                     Instant now = ClockProvider.now();
  31.                     if (lockAtLeastUntil.isAfter(now)) {
  32.                         logger.debug("Task finished, lock '{}' will be released at {}", lockName, lockAtLeastUntil);
  33.                     } else {
  34.                         logger.debug("Task finished, lock '{}' released", lockName);
  35.                     }
  36.                 }
  37.             }
  38.         } else {
  39.             logger.debug("Not executing '{}'. It's locked.", lockName);
  40.             return TaskResult.notExecuted();
  41.         }
  42.     }
  43. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

拉不拉稀肚拉稀

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表