SpringBoot Task定时使命

打印 上一主题 下一主题

主题 1492|帖子 1492|积分 4476

参数详解
  1. @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. @Repeatable(Schedules.class)
  5. public @interface Scheduled {
  6.     String CRON_DISABLED = "-";
  7.    
  8.     String cron() default "";
  9.    
  10.     String zone() default "";
  11.    
  12.     long fixedDelay() default -1;
  13.    
  14.     String fixedDelayString() default "";
  15.    
  16.     long fixedRate() default -1;
  17.    
  18.     String fixedRateString() default "";
  19.    
  20.     long initialDelay() default -1;
  21.    
  22.     String initialDelayString() default "";
  23. }
复制代码
fixedDelay

它的隔断时间是根据前次使命竣事的时间开始计时的,只要盯紧上一次使命执行竣事的时间即可,跟使命逻辑的执行时间无关,两个使命的隔断时间是固定的

fixedDelayString

与 fixedDalay 一样,不同的是使用的是 String 字符串,支持占位符方式
  1. @Scheduled(fixedDelayString = "${time.fixedDelay}")
  2. public void test() {
  3.     System.out.println("Execute at " + System.currentTimeMillis());
  4. }
复制代码
fixedRate

在理想情况下,下一次开始和上一次开始之间的时间隔断是一定的,但是默认情况下 SpringBoot 定时使命是单线程执行的。当下一轮的使命满足时间策略后使命就会加入队列,即当本次使命开始执行时下一次使命的时间就已经确定了,由于本次使命的“超时”执行,下一次使命的等待时间就会被压缩乃至阻塞

fixedRateString

与 fixedRate 一样,不同的是使用的是 String 字符串,支持占位符方式
initialDelay

这个参数只能配合 fixedDelay 或 fixedRate 使用。如:@Scheduled(initialDelay = 10000, fixedRate = 15000),意思是在容器启动后,耽误 10 秒再执行一次定时器,以后每 15 秒再执行一次该定时器
initialDelayString

与 initialDelay 一样,不同的是使用的是 String 字符串,支持占位符方式
cron 表达式

语法格式:

  • 秒 分 小时 月份中的日期 月份 星期中的日期 年份
  • 秒 分 小时 月份中的日期 月份 星期中的日期
字段值特别字符秒(Seconds)0~59 的整数, - * /分(Minutes)0~59 的整数, - * /小时(Hours)0~23 的整数, - * /日期(DayofMonth)1~31 的整数(需要看月的天数), - * ? / L W C月份(Month)1~12 的整数, - * /星期(DayOfWeek)1~7 的整数, - * ? / L W C年(Year)(可选)1970~2099, - * /

  • *:表示匹配该域的任意值。
    例如:在 Minutes 域使用*,即表示每分钟都会触发事件
  • ?:只能用在 DayofMonth 和 DayofWeek 两个域,它也匹配域的任意值,但实际不会,因为 DayofMonth 和 DayofWeek 会相互影响。
    例如:在每月的 20 日触发使命,不管 20 日是星期几,只能使用如下写法:13 13 15 20 * ?,此中最后一位只能用?,而不能使用,假如使用表示不管星期几都会触发
  • -:表示范围。
    例如:在 Minutes 域使用 5-20,表示从 5 到 20 分钟每分钟触发一次
  • /:表示起始时间开始触发,然后每隔固定时间触发一次。
    例如:在 Minutes 域使用 5/20,则意味着从第 5 分钟开始,每隔 20 分钟触发一次
  • ,:表示列出枚举值。
    例如:在 Minutes 域使用 5,20,则意味着在 5 和 20 分都会触发一次
  • L:表示最后,只能出如今 DayofWeek 和 DayofMonth 域。
    例如:在 DayofWeek 域使用 5L,意味着在最后的一个星期四触发
  • W:表示有效工作日(周一到周五),只能出如今 DayofMonth 域,体系将在离指定日期的最近的有效工作日触发事件。
    例如:在 DayofMonth 使用 5W,假如 5 日是星期六,则将在最近的工作日(星期五,即 4 日触发);假如 5 日是星期天,则在 6 日(星期一)触发;假如 5 日在星期一到星期五中的一天,则就在 5 日触发。注意:W 的最近寻找不会跨过月份
  • LW:这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五
  • #:用于确定每个月第 n 个星期 x(x#n),只能出如今 DayofMonth 域。
    例如:4#2 表示第 2 个星期三
常用表达式参考
  1. "*/5 * * * * ?"       # 每隔5秒执行一次
  2. "0 */1 * * * ?"       # 每隔1分钟执行一次
  3. "0 0 23 * * ?"        # 每天23点执行一次
  4. "0 0 1 * * ?"         # 每天凌晨1点执行一次
  5. "0 0 1 1 * ?"         # 每月1号凌晨1点执行一次
  6. "0 0 23 L * ?"        # 每月最后一天23点执行一次
  7. "0 0 1 ? * L"         # 每周星期天凌晨1点实行一次:
  8. "0 26,29,33 * * * ?"  # 在26分、29分、33分执行一次
  9. "0 0 0,3,8,21 * * ?"  # 每天的0点、3点、8点、21点执行一次
  10. "0 0 10,14,16 * * ?"  # 每天上午10点,下午2点,4点
  11. "0 0/30 9-17 * * ?"   # 朝九晚五工作时间内每半小时
  12. "0 0 12 ? * WED"      # 表示每个星期三中午12点
  13. "0 0 12 * * ?"        # 每天中午12点触发
  14. "0 15 10 ? * *"       # 每天上午10:15触发
  15. "0 15 10 * * ?"       # 每天上午10:15触发
  16. "0 15 10 * * ? *"     # 每天上午10:15触发
  17. "0 15 10 * * ?"       # 2005" 2005年的每天上午10:15触发
  18. "0 * 14 * * ?"        # 在每天下午2点到下午2:59期间的每1分钟触发
  19. "0 0/5 14 * * ?"      # 在每天下午2点到下午2:55期间的每5分钟触发
  20. "0 0/5 14,18 * * ?"   # 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
  21. "0 0-5 14 * * ?"      # 在每天下午2点到下午2:05期间的每1分钟触发
  22. "0 10,44 14 ? 3 WED"  # 每年三月的星期三的下午2:10和2:44触发
  23. "0 15 10 ? * MON-FRI" # 周一至周五的上午10:15触发
  24. "0 15 10 15 * ?"      # 每月15日上午10:15触发
  25. "0 15 10 L * ?"       # 每月最后一日的上午10:15触发
  26. "0 15 10 ? * 6L"      # 每月的最后一个星期五上午10:15触发
  27. "0 15 10 ? * 6#3"     # 每月的第三个星期五上午10:15触发
  28. "0 15 10 ? * 6L 2002-2005" # 2002年至2005年的每月的最后一个星期五上午10:15触发
复制代码
基本使用

基本方法
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-web</artifactId>
  4.         <version>2.3.12.RELEASE</version>
  5. </dependency>
复制代码
  1. @SpringBootApplication
  2. // 开启定时任务开关
  3. @EnableScheduling
  4. public class SpringtaskApplication {
  5.     public static void main(String[] args) {
  6.         SpringApplication.run(SpringtaskApplication.class, args);
  7.     }
  8. }
复制代码
  1. @Component
  2. public class TaskService01 {
  3.    
  4.     @Scheduled(fixedDelay = 1000)
  5.     public void task01(){
  6.         System.out.println("fixedDelay....");
  7.     }
  8.    
  9.     @Scheduled(fixedRate = 1000)
  10.     public void task02(){
  11.         System.out.println("fixedRate....");
  12.     }
  13.    
  14.     @Scheduled(initialDelay = 10000,fixedDelay = 1000)
  15.     public void task03(){
  16.         System.out.println("initialDelay");
  17.     }
  18.    
  19.     @Scheduled(cron = "1 * * * * *")
  20.     public void task04(){
  21.         System.out.println("cron");
  22.     }
  23. }
复制代码
定时使命开/关

通过设置文件控制Bean的实例化,根据需要进行开启/关闭定时使命
  1. @Component
  2. @Slf4j
  3. @RefreshScope
  4. @ConditionalOnProperty(prefix = "test.job", name = "enable", havingValue = "true", matchIfMissing = true)
  5. public class TestJob {
  6.    
  7.     @Scheduled(cron = "1 * * * * *")
  8.     public void task04(){
  9.         System.out.println("cron");
  10.     }
  11. }
复制代码
定时使命设置

@EnableScheduling 注解引入了 ScheduledAnnotationBeanPostProcessor 其 setScheduler(Object scheduler) 有以下的注释:
假如 TaskScheduler 或者 ScheduledExecutorService 没有定义为该方法的参数,该方法将在 Spring IoC 中寻找唯一的 TaskScheduler 或者名称为 taskScheduler 的 Bean 作为参数,当然你按照查找 TaskScheduler 的方法找一个 ScheduledExecutorService 也可以。要是都找不到那么只能使用当地单线程调度器了


执行器

SpringBoot 内默认自动设置 TaskExecutor 使命执行器线程池,主要用于执行单次使命
自动设置条件


  • 当类路径下存在 ThreadPoolTaskExecutor 类
  • 当 Spring 容器中不存在 Executor 的 bean
  1. // 仅在类 ThreadPoolTaskExecutor 存在于 classpath 时才应用
  2. @ConditionalOnClass(ThreadPoolTaskExecutor.class)
  3. @Configuration(proxyBeanMethods = false)
  4. @EnableConfigurationProperties(TaskExecutionProperties.class)
  5. public class TaskExecutionAutoConfiguration {
  6.     public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
  7.     @Bean
  8.     @ConditionalOnMissingBean
  9.     public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
  10.             ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
  11.             ObjectProvider<TaskDecorator> taskDecorator) {
  12.         TaskExecutionProperties.Pool pool = properties.getPool();
  13.         TaskExecutorBuilder builder = new TaskExecutorBuilder();
  14.         builder = builder.queueCapacity(pool.getQueueCapacity());
  15.         builder = builder.corePoolSize(pool.getCoreSize());
  16.         builder = builder.maxPoolSize(pool.getMaxSize());
  17.         builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
  18.         builder = builder.keepAlive(pool.getKeepAlive());
  19.         Shutdown shutdown = properties.getShutdown();
  20.         builder = builder.awaitTermination(shutdown.isAwaitTermination());
  21.         builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
  22.         builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
  23.         builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
  24.         builder = builder.taskDecorator(taskDecorator.getIfUnique());
  25.         return builder;
  26.     }
  27.     @Lazy
  28.     @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
  29.             AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
  30.     @ConditionalOnMissingBean(Executor.class)
  31.     public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
  32.         return builder.build();
  33.     }
  34. }
复制代码
线程池设置

TaskExecutionProperties 默认值:

  • 线程名称前缀:threadNamePrefix = “task-”
  • 核心线程数:coreSize = 8
  • 最大线程数:maxSize = Integer.MAX_VALUE
  • 非核心线程存活时长:keepAlive = Duration.ofSeconds(60)
调度器

SpringBoot 内默认自动设置 TaskScheduler 使命调度器线程池,主要用于执行周期性使命
自动设置条件


  • 当类路径下存在 ThreadPoolTaskScheduler 类
  • 当 Spring 容器中不存在 SchedulingConfigurer 、 TaskScheduler 、ScheduledExecutorService 的 bean
  1. @ConditionalOnClass(ThreadPoolTaskScheduler.class)
  2. @Configuration(proxyBeanMethods = false)
  3. @EnableConfigurationProperties(TaskSchedulingProperties.class)
  4. @AutoConfigureAfter(TaskExecutionAutoConfiguration.class)
  5. public class TaskSchedulingAutoConfiguration {
  6.     @Bean
  7.     @ConditionalOnBean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
  8.     @ConditionalOnMissingBean({ SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class })
  9.     public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {
  10.         return builder.build();
  11.     }
  12.     @Bean
  13.     @ConditionalOnMissingBean
  14.     public TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties,
  15.             ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) {
  16.         TaskSchedulerBuilder builder = new TaskSchedulerBuilder();
  17.         builder = builder.poolSize(properties.getPool().getSize());
  18.         Shutdown shutdown = properties.getShutdown();
  19.         builder = builder.awaitTermination(shutdown.isAwaitTermination());
  20.         builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
  21.         builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
  22.         builder = builder.customizers(taskSchedulerCustomizers);
  23.         return builder;
  24.     }
  25. }
复制代码

  • 当 Spring 容器中存在名字叫 org.springframework.context.annotation.internalScheduledAnnotationProcessor (需要设置 @EnableScheduling 注解将会注入这个名字的 bean)
  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Import(SchedulingConfiguration.class)
  4. @Documented
  5. public @interface EnableScheduling {
  6. }
复制代码
  1. @Configuration
  2. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  3. public class SchedulingConfiguration {
  4.     @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
  5.     @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  6.     public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
  7.         return new ScheduledAnnotationBeanPostProcessor();
  8.     }
  9. }
复制代码
线程池设置

TaskSchedulingProperties 默认设置值:

  • 线程名称前缀:threadNamePrefix = “scheduling-”
  • 线程数:size = 1
该设置的自定义设置以 spring.task.scheduling 开头。同时它需要在使命执行器设置 TaskExecutionAutoConfiguration 设置后才生效。我们只需要在中对其设置属性 spring.task.execution 相关属性设置即可。
注意:定义使命默认用的是 TaskSchedulingAutoConfiguration 实例化的 Bean(applicationTaskExecutor、taskScheduler)
Properties 设置
  1. ######任务调度线程池######
  2. # 任务调度线程池大小 默认 1 建议根据任务加大
  3. spring.task.scheduling.pool.size=1
  4. # 调度线程名称前缀 默认 scheduling-
  5. spring.task.scheduling.thread-name-prefix=scheduling-
  6. # 线程池关闭时等待所有任务完成
  7. spring.task.scheduling.shutdown.await-termination=true
  8. # 调度线程关闭前最大等待时间,确保最后一定关闭
  9. spring.task.scheduling.shutdown.await-termination-period=60
  10. ######任务执行线程池配置######
  11. # 是否允许核心线程超时。这样可以动态增加和缩小线程池
  12. spring.task.execution.pool.allow-core-thread-timeout=true
  13. #  核心线程池大小 默认 8
  14. spring.task.execution.pool.core-size=8
  15. # 线程空闲等待时间 默认 60s
  16. spring.task.execution.pool.keep-alive=60s
  17. # 线程池最大数  根据任务定制
  18. spring.task.execution.pool.max-size=16
  19. #  线程池 队列容量大小
  20. spring.task.execution.pool.queue-capacity=10
  21. # 线程池关闭时等待所有任务完成
  22. spring.task.execution.shutdown.await-termination=true
  23. # 执行线程关闭前最大等待时间,确保最后一定关闭
  24. spring.task.execution.shutdown.await-termination-period=60
  25. # 线程名称前缀
  26. spring.task.execution.thread-name-prefix=task-
复制代码
TaskSchedulingAutoConfiguration 源码

当 Spring Boot 应用程序中没有定义自定义的线程池 bean 时,Spring Boot 应用程序会根据自动设置类注入一个名为 applicationTaskExecutor 或 taskExecutor 的线程池对象,它的设置是在 TaskExecutionProperties 类中完成的,这个类使用 spring.task.execution 前缀进行设置,包含了很多线程池相关细节的设置选项,当我们容器中存在自定义线程池时,applicationTaskExecutor 或 taskExecutor 的线程池对象是不会被创建的。
@Async 注解相关设置

使用@Async 注解没有指定 value 属性时,项目启动的时间会有这样的提示:“在上下文中找到多个 TaskExecutor bean,并且没有一个名为' taskExecutor'。将此中一个标记为 primary 或将其命名为'taskExecutor'(可能作为别名),以便将其用于异步处理”
  1. // 标记为 Primary,即主要的线程
  2. @Bean
  3. @Primary
  4. public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
  5.     ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  6.     executor.setThreadNamePrefix("my-free-style-");
  7.     executor.setMaxPoolSize(maxPoolSize);
  8.     executor.setCorePoolSize(corePoolSize);
  9.     executor.setQueueCapacity(queueCapacity);
  10.     executor.setKeepAliveSeconds(keepAliveSeconds);
  11.     // 线程池对拒绝任务(无线程可用)的处理策略
  12.     executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  13.     return executor;
  14. }
  15. // 直接起别名为 taskExecutor
  16. @Bean(name = "taskExecutor")
  17. public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
  18.     ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  19.     executor.setThreadNamePrefix("my-free-style-");
  20.     executor.setMaxPoolSize(maxPoolSize);
  21.     executor.setCorePoolSize(corePoolSize);
  22.     executor.setQueueCapacity(queueCapacity);
  23.     executor.setKeepAliveSeconds(keepAliveSeconds);
  24.     // 线程池对拒绝任务(无线程可用)的处理策略
  25.     executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  26.     return executor;
  27. }
复制代码
使命阻塞

出现缘故因由

Spring 中@EnableScheduling 和@Scheduled 标注的定时使命默认单线程同步执行,多个使命时,一个使命执行完毕以后才能执行下一个使命,可能会有阻塞征象发生(假如希望并发运行,需要设置线程池)
  1. @SpringBootApplication
  2. @EnableScheduling
  3. public class SpringbootTaskApplication {
  4.     public static void main(String[] args) {
  5.         SpringApplication.run(SpringbootTaskApplication.class, args);
  6.     }
  7. }
复制代码
  1. @Component
  2. @Slf4j
  3. public class ScheduleTask1 {
  4.     @Scheduled(cron = "*/2 * * * * ?")
  5.     public void task1() throws InterruptedException {
  6.         log.info("我是task1,我需要执行 10s 钟的时间,我的线程的 id == > {},时间 == >{}", Thread.currentThread().getId(), new Date());
  7.         Thread.sleep(10000);
  8.         log.info("我是task1 ending ,我的线程的 id == > {} , 时间 == > {}", Thread.currentThread().getId(), new Date());
  9.     }
  10.     @Scheduled(cron = "*/4 * * * * ?")
  11.     public void task2() throws InterruptedException {
  12.         log.info("我是task2,我需要执行 2s 钟的时间,我的线程的 id == > {},时间 == >{}", Thread.currentThread().getId(), new Date());
  13.         Thread.sleep(2000);
  14.         log.info("我是task2 ending ,我的线程的 id == > {} , 时间 == > {}",Thread.currentThread().getId(), new Date());
  15.     }
  16. }
复制代码
  1. // 运行结果
  2. 我是task1,我需要执行 10s 钟的时间,我的线程的 id == > 95,时间 == >Fri Feb 01 15:16:52 CST 2019
  3. 我是task1 ending ,我的线程的 id == > 95 , 时间 == > Fri Feb 01 15:17:02 CST 2019
  4. 我是task2,我需要执行 2s 钟的时间,我的线程的 id == > 95,时间 == >Fri Feb 01 15:17:02 CST 2019
  5. task2 ending ,我的线程的 id == > 95 , 时间 == > Fri Feb 01 15:17:04 CST 2019
  6. 我是task1,我需要执行 10s 钟的时间,我的线程的 id == > 95,时间 == >Fri Feb 01 15:17:04 CST 2019
  7. task1 ending ,我的线程的 id == > 95 , 时间 == > Fri Feb 01 15:17:14 CST 2019
复制代码
可以看出,从 task1 使命运行时,等到 4s 时,task2 使命没有执行,而是等到 task1 使命执行竣事后才执行
解决方法

使用@Async 异步执利用命
  1. @SpringBootApplication
  2. @EnableScheduling
  3. @EnableAsync
  4. public class SpringbootTaskApplication {
  5.     public static void main(String[] args) {
  6.         SpringApplication.run(SpringbootTaskApplication.class, args);
  7.     }
  8. }
复制代码

  • 使用默认线程池设置
@Async默认的线程池设置是Bean名称为taskExecutor的类
  1. @Component
  2. @Slf4j
  3. public class ScheduleTask2 {
  4.     @Async
  5.     @Scheduled(cron = "*/2 * * * * ?")
  6.     public void task1() throws InterruptedException {
  7.         log.info("我是task1,我需要执行 10s 钟的时间,我的线程的 id == > {},时间 == >{}", Thread.currentThread().getId(), new Date());
  8.         Thread.sleep(10000);
  9.         log.info("我是task1 ending ,我的线程的 id == > {} , 时间 == > {}", Thread.currentThread().getId(), new Date());
  10.     }
  11.     @Async
  12.     @Scheduled(cron = "*/4 * * * * ?")
  13.     public void task2() throws InterruptedException {
  14.         log.info("我是task2,我需要执行 2s 钟的时间,我的线程的 id == > {},时间 == >{}", Thread.currentThread().getId(), new Date());
  15.         Thread.sleep(2000);
  16.         log.info("我是task2 ending ,我的线程的 id == > {} , 时间 == > {}",Thread.currentThread().getId(), new Date());
  17.     }
  18. }
复制代码

  • 自定义线程池设置
通过指定Bean名称来决定使用哪个线程池,用户可以自定义线程池设置
  1. @Component
  2. @Slf4j
  3. public class ScheduleTask3 {
  4.     @Async("myPoolTaskExecutor")
  5.     @Scheduled(cron = "*/2 * * * * ?")
  6.     public void task1() throws InterruptedException {
  7.         log.info("我是task1,我需要执行 10s 钟的时间,我的线程的 id == > {},时间 == >{}", Thread.currentThread().getId(), new Date());
  8.         Thread.sleep(10000);
  9.         log.info("我是task1 ending ,我的线程的 id == > {} , 时间 == > {}", Thread.currentThread().getId(), new Date());
  10.     }
  11.     @Async("myPoolTaskExecutor")
  12.     @Scheduled(cron = "*/4 * * * * ?")
  13.     public void task2() throws InterruptedException {
  14.         log.info("我是task2,我需要执行 2s 钟的时间,我的线程的 id == > {},时间 == >{}", Thread.currentThread().getId(), new Date());
  15.         Thread.sleep(2000);
  16.         log.info("我是task2 ending ,我的线程的 id == > {} , 时间 == > {}",Thread.currentThread().getId(), new Date());
  17.     }
  18.     /**
  19.     * 创建自定义线程池,提供异步调用时使用
  20.     **/
  21.     @Bean(name = "myPoolTaskExecutor")
  22.     public ThreadPoolTaskExecutor getMyPoolTaskExecutor() {
  23.         ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
  24.         //核心线程数
  25.         taskExecutor.setCorePoolSize(10);
  26.         //线程池维护线程的最大数量, 只有在缓冲队列满了之后才会申请超过核心线程数的线程
  27.         taskExecutor.setMaxPoolSize(100);
  28.         //缓存队列
  29.         taskExecutor.setQueueCapacity(50);
  30.         //许的空闲时间, 当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
  31.         taskExecutor.setKeepAliveSeconds(200);
  32.         //异步方法内部线程名称
  33.         taskExecutor.setThreadNamePrefix("poolTestThread-");
  34.         /**
  35.          * 当线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
  36.          * 通常有以下四种策略:
  37.          * ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出 RejectedExecutionException 异常。
  38.          * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  39.          * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  40.          * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
  41.          */
  42.         // 拒绝策略
  43.         taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  44.         taskExecutor.initialize();
  45.         System.out.println("@Async 业务处理线程配置成功,核心线程池:[{}],最大线程池:[{}],队列容量:[{}],线程名称前缀:[{}]");
  46.         return taskExecutor;
  47.     }
  48. }
复制代码
执行结果
  1. //运行结果:
  2. 我是task2,我需要执行 2s 钟的时间,我的线程的 id == > 116,时间 == >Fri Feb 01 16:19:32 CST 2019
  3. 我是task1,我需要执行 10s 钟的时间,我的线程的 id == > 117,时间 == >Fri Feb 01 16:19:32 CST 2019
  4. 我是task1,我需要执行 10s 钟的时间,我的线程的 id == > 124,时间 == >Fri Feb 01 16:19:34 CST 2019
  5. task2 ending ,我的线程的 id == > 116 , 时间 == > Fri Feb 01 16:19:34 CST 2019
  6. 我是task1,我需要执行 10s 钟的时间,我的线程的 id == > 125,时间 == >Fri Feb 01 16:19:36 CST 2019
  7. 我是task2,我需要执行 2s 钟的时间,我的线程的 id == > 126,时间 == >Fri Feb 01 16:19:36 CST 2019
  8. 我是task1,我需要执行 10s 钟的时间,我的线程的 id == > 127,时间 == >Fri Feb 01 16:19:38 CST 2019
  9. task2 ending ,我的线程的 id == > 126 , 时间 == > Fri Feb 01 16:19:38 CST 2019
  10. 我是task2,我需要执行 2s 钟的时间,我的线程的 id == > 128,时间 == >Fri Feb 01 16:19:40 CST 2019
  11. 我是task1,我需要执行 10s 钟的时间,我的线程的 id == > 129,时间 == >Fri Feb 01 16:19:40 CST 2019
复制代码
从日志可知:task1 和 task2 的确是并行执行的,因为开始的时间节点是一样的。
存在问题:当 task1 第一次使命执行时间过长时,此时 task1 又到了其第二次执利用命的调度时间,这时会并行执行两个使命
实现 SchedulingConfigurer 接口

使用@Async 会导致第一次使命执行时间过长,从而第二次使命和第一次使命并发执行
解决方法:实现 SchedulingConfigurer 接口,这样自动装配中 TaskSchedulingAutoConfiguration 的 taskScheduler 就不会被实例化,替换原来的线程池设置
  1. @Configuration
  2. @Slf4j
  3. public class ScheduleConfig implements SchedulingConfigurer {
  4.     @Override
  5.     public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
  6.         taskRegistrar.setScheduler(taskExecutor());
  7.     }
  8.      @Bean
  9.      public Executor taskExecutor(){
  10.          return Executors.newScheduledThreadPool(10);
  11.      }
  12. }
复制代码
  1. @Component
  2. @Slf4j
  3. public class ScheduleTask4 {
  4.     @Scheduled(cron = "*/2 * * * * ?")
  5.     public void task1() throws InterruptedException {
  6.         log.info("我是task1,我需要执行 10s 钟的时间,我的线程的 id == > {},时间 == >{}", Thread.currentThread().getId(), new Date());
  7.         Thread.sleep(10000);
  8.         log.info("我是task1 ending ,我的线程的 id == > {} , 时间 == > {}", Thread.currentThread().getId(), new Date());
  9.     }
  10.     @Scheduled(cron = "*/4 * * * * ?")
  11.     public void task2() throws InterruptedException {
  12.         log.info("我是task2,我需要执行 2s 钟的时间,我的线程的 id == > {},时间 == >{}", Thread.currentThread().getId(), new Date());
  13.         Thread.sleep(2000);
  14.         log.info("我是task2 ending ,我的线程的 id == > {} , 时间 == > {}",Thread.currentThread().getId(), new Date());
  15.     }
  16. }
复制代码
执行结果
  1. //执行结果:
  2. 我是task2,我需要执行 2s 钟的时间,我的线程的 id == > 95,时间 == >Fri Feb 01 16:28:16 CST 2019
  3. 我是task1,我需要执行 10s 钟的时间,我的线程的 id == > 96,时间 == >Fri Feb 01 16:28:16 CST 2019
  4. task2 ending ,我的线程的 id == > 95 , 时间 == > Fri Feb 01 16:28:18 CST 2019
  5. 我是task2,我需要执行 2s 钟的时间,我的线程的 id == > 95,时间 == >Fri Feb 01 16:28:20 CST 2019
  6. task2 ending ,我的线程的 id == > 95 , 时间 == > Fri Feb 01 16:28:22 CST 2019
  7. 我是task2,我需要执行 2s 钟的时间,我的线程的 id == > 121,时间 == >Fri Feb 01 16:28:24 CST 2019
  8. task1 ending ,我的线程的 id == > 96 , 时间 == > Fri Feb 01 16:28:26 CST 2019
  9. task2 ending ,我的线程的 id == > 121 , 时间 == > Fri Feb 01 16:28:26 CST 2019
  10. 我是task1,我需要执行 10s 钟的时间,我的线程的 id == > 95,时间 == >Fri Feb 01 16:28:28 CST 2019
  11. 我是task2,我需要执行 2s 钟的时间,我的线程的 id == > 122,时间 == >Fri Feb 01 16:28:28 CST 2019
复制代码
注意:此时每次定时使命执行的 traceId 是一致的,无法很好地追踪每次定时使命的情况,修改如下
  1. @Configuration
  2. @Slf4j
  3. public class ScheduleConfig implements SchedulingConfigurer {
  4.    
  5.     @Override
  6.     public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
  7.         // taskRegistrar.setScheduler(taskExecutor());  
  8.         ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
  9.                 taskScheduler.setPoolSize(10);
  10.                 taskScheduler.initialize();
  11.         taskRegistrar.setScheduler(taskScheduler);
  12.     }
  13.     // 缺点:可能每次定时任务产生的 traceId 是一致的
  14.     // @Bean
  15.     // public Executor taskExecutor(){
  16.     //     return Executors.newScheduledThreadPool(10);
  17.     // }
  18. }
复制代码
Properties 设置

修改默认的线程池设置,适当将调度线程池的设置修改,支持多使命并发执行
  1. ######任务调度线程池######
  2. # 任务调度线程池大小 默认 1 建议根据任务加大
  3. spring.task.scheduling.pool.size=10
  4. # 调度线程名称前缀 默认 scheduling-
  5. spring.task.scheduling.thread-name-prefix=scheduling-
  6. # 线程池关闭时等待所有任务完成
  7. spring.task.scheduling.shutdown.await-termination=true
  8. # 调度线程关闭前最大等待时间,确保最后一定关闭
  9. spring.task.scheduling.shutdown.await-termination-period=60
  10. ######任务执行线程池配置######
  11. # 是否允许核心线程超时。这样可以动态增加和缩小线程池
  12. spring.task.execution.pool.allow-core-thread-timeout=true
  13. # 核心线程池大小 默认 8
  14. spring.task.execution.pool.core-size=8
  15. # 线程空闲等待时间 默认 60s
  16. spring.task.execution.pool.keep-alive=60s
  17. # 线程池最大数 根据任务定制
  18. spring.task.execution.pool.max-size=16
  19. # 线程池队列容量大小
  20. spring.task.execution.pool.queue-capacity=10
  21. # 线程池关闭时等待所有任务完成
  22. spring.task.execution.shutdown.await-termination=true
  23. # 执行线程关闭前最大等待时间,确保最后一定关闭
  24. spring.task.execution.shutdown.await-termination-period=60
  25. # 线程名称前缀
  26. spring.task.execution.thread-name-prefix=task-
复制代码
缺点


  • 不支持集群设置,在分布式环境下会出现多个使命并发执行的情况
解决方法:通过分布式锁的方式预防使命并发执行的情况

  • 不支持指定的时间范围执利用命(例如在9点到11点间执利用命,其他时间段不执行)
  • 不支持分片执利用命
动态定时使命实现

出现问题

用实现 SpringBoot + @Scheduled 实现了定时使命。但是也存在很多问题:
通常,@Scheduled 注解的所有属性只在 Spring Context 启动时剖析和初始化一次。因此,当在 Spring 中使用 @Scheduled 注解时,无法在运行时更改 fixedDelay 或 fixedRate 值。

  • 在一个线程内执行,那么使命多了就可能被阻塞,导致使命耽误执行。
  • 每次修改执行频率都要改代码,重启服务。
  • 无法提供定时使命的启用、暂停、修改接口。
实现方法:参考 ScheduledTaskRegistrar 源码提供的方法
简单案例
  1. CREATE TABLE `sys_task` (
  2.   `id` bigint(21) NOT NULL AUTO_INCREMENT COMMENT '主键',
  3.   `task_uuid` varchar(50) DEFAULT NULL COMMENT '任务UUID',
  4.   `task_name` varchar(50) DEFAULT NULL COMMENT '任务名称',
  5.   `task_cron` varchar(50) DEFAULT NULL COMMENT '任务定时表达式',
  6.   `class_name` varchar(100) DEFAULT NULL COMMENT '任务类',
  7.   `method_name` varchar(100) DEFAULT NULL COMMENT '任务方法',
  8.   `task_type` int(1) DEFAULT NULL COMMENT '任务类型',
  9.   `remark` varchar(250) DEFAULT NULL,
  10.   `del_flag` int(1) DEFAULT '1',
  11.   `create_user` varchar(50) DEFAULT NULL,
  12.   `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  13.   `update_user` varchar(50) DEFAULT NULL,
  14.   `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  15.   PRIMARY KEY (`id`)
  16. ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8
复制代码
  1. @Configuration
  2. public class ScheduledConfig {
  3.     @Bean
  4.     public ScheduledTaskRegistrar taskRegistrar() {
  5.         return new ScheduledTaskRegistrar();
  6.     }
  7. }
复制代码
  1. @Slf4j
  2. public class ScheduleTask5 {
  3.     public void task1() throws InterruptedException {
  4.         log.info("我是task1,我的线程的 id == > {},时间 == >{}", Thread.currentThread().getId(), new Date());
  5.         Thread.sleep(4000);
  6.         log.info("我是task1 ending ,我的线程的 id == > {} , 时间 == > {}", Thread.currentThread().getId(), new Date());
  7.     }
  8. }
复制代码
  1. @Data
  2. public class SysTask {
  3.     /**
  4.     * 主键
  5.     */
  6.     private Long id;
  7.     /**
  8.     * 任务 UUID
  9.     */
  10.     private String taskUuid;
  11.     /**
  12.     * 任务名称
  13.     */
  14.     private String taskName;
  15.     /**
  16.     * 任务定时表达式
  17.     */
  18.     private String taskCron;
  19.     /**
  20.     * 任务类
  21.     */
  22.     private String className;
  23.     /**
  24.     * 任务方法
  25.     */
  26.     private String methodName;
  27.     /**
  28.     * 任务类型
  29.     */
  30.     private Integer taskType;
  31.     /**
  32.      * 备注
  33.      */
  34.     private String remark;
  35.     /**
  36.      * 删除标识
  37.      */
  38.     private Integer delFlag;
  39.     /**
  40.      * 创建人
  41.      */
  42.     private String createUser;
  43.     /**
  44.      * 创建时间
  45.      */
  46.     private Date createTime;
  47.     /**
  48.      * 修改人
  49.      */
  50.     private String updateUser;
  51.     /**
  52.      * 修改时间
  53.      */
  54.     private Date updateTime;
  55. }
复制代码
  1. @Service
  2. @Slf4j
  3. public class CronServiceImpl implements CronService {
  4.     private static Map<String, ScheduledTask> scheduledTaskMap = new HashMap<>();
  5.     @Resource
  6.     private ScheduledTaskRegistrar taskRegistrar;
  7.     @Override
  8.     public void add(SysTask sysTask) {
  9.         CronTask cronTask = new CronTask(getRunnable(sysTask), sysTask.getTaskCron());
  10.         ScheduledTask scheduledTask = taskRegistrar.scheduleCronTask(cronTask);
  11.         String uuid = UUID.randomUUID().toString();
  12.         scheduledTaskMap.put(uuid, scheduledTask);
  13.         log.info("添加任务成功, uuid == > {}, 任务名称 == > {}, 任务表达式 == > {}", uuid, sysTask.getTaskName(), sysTask.getTaskCron());
  14.     }
  15.     private Runnable getRunnable(SysTask sysTask) {
  16.         return () -> {
  17.             try {
  18.                 Class<?> aClass = Class.forName(sysTask.getClassName());
  19.                 Constructor<?> constructor = aClass.getConstructor();
  20.                 Object o = constructor.newInstance();
  21.                 Method method = aClass.getMethod(sysTask.getMethodName());
  22.                 method.invoke(o);
  23.             } catch (Exception e) {
  24.                 e.printStackTrace();
  25.             }
  26.         };
  27.     }
  28.     @Override
  29.     public void delete(String uuid) {
  30.         try {
  31.             ScheduledTask scheduledTask = scheduledTaskMap.get(uuid);
  32.             scheduledTask.cancel();
  33.             scheduledTaskMap.remove(uuid);
  34.         } catch (Exception e) {
  35.             e.printStackTrace();
  36.         }
  37.     }
  38.     @Override
  39.     public void update(SysTask sysTask) {
  40.         this.delete(sysTask.getTaskUuid());
  41.         this.add(sysTask);
  42.     }
  43. }
复制代码
  1. @RestController
  2. @RequestMapping("/cron")
  3. public class CronController {
  4.     @Resource
  5.     private CronService cronService;
  6.     @PostMapping("/add")
  7.     public String add(@RequestBody SysTask sysTask) {
  8.         cronService.add(sysTask);
  9.         return "success";
  10.     }
  11.     @PostMapping("/delete")
  12.     public String delete(String uuid) {
  13.         cronService.delete(uuid);
  14.         return "success";
  15.     }
  16.     @PostMapping("/update")
  17.     public String update(@RequestBody SysTask sysTask) {
  18.         cronService.update(sysTask);
  19.         return "success";
  20.     }
  21. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦见你的名字

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表