ToB企服应用市场:ToB评测及商务社交产业平台

标题: 彻底搞懂ScheduledThreadPoolExecutor [打印本页]

作者: 北冰洋以北    时间: 2024-10-24 07:31
标题: 彻底搞懂ScheduledThreadPoolExecutor
前言

项目中常常会遇到一些非分布式的调度使命,需要在将来的某个时候周期性执行。实现如许的功能,我们有多种方式可以选择:
它全部使命都是串行执行的,同一时间只能有一个使命在执行,而且前一个使命的延迟或非常都将会影响到之后的使命。大概会出现使命执行时间过长而导致使命相互阻塞的情况
这种方式底层虽然是用线程池实现,但是有个最大的题目,全部的使命都使用的同一个线程池,大概会导致长周期的使命运行影响短周期使命运行,造成线程池"饥饿",更加保举的做法是同种类型的使命使用同一个线程池。
这也是本文重点讲解的方式,通过自定义ScheduledThreadPoolExecutor调度线程池,提交调度使命才是最优解。
基本先容

ScheduledThreadPoolExecutor继承自 ThreadPoolExecutor,为使命提供延迟或周期执行,属于线程池的一种。和 ThreadPoolExecutor 相比,它还具有以下几种特性:
基本使用

ScheduledThreadPoolExecutor 最常见的应用场景就是实现调度使命,
创建方式

创建ScheduledThreadPoolExecutor方式一共有两种,第一种是通过自定义参数,第二种通过Executors工厂方法创建。 根据阿里巴巴代码规范中的建议,更加保举使用第一种方式创建。
  1. ScheduledThreadPoolExecutor(int corePoolSize,
  2.                                        ThreadFactory threadFactory,                                       RejectedExecutionHandler handler)
复制代码

焦点API



  1. @Test
  2. public void testScheduleWithFixedDelay() throws InterruptedException {
  3.     // 创建调度任务线程池
  4.     ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
  5.     // 按照上次执行完成后固定延迟时间调度
  6.     scheduledExecutorService.scheduleWithFixedDelay(() -> {
  7.         try {
  8.             log.info("scheduleWithFixedDelay ...");
  9.             Thread.sleep(1000);
  10.         } catch (InterruptedException e) {
  11.             e.printStackTrace();
  12.         }
  13.     }, 1, 2, TimeUnit.SECONDS);
  14.     Thread.sleep(10000);
  15. }
复制代码

  1. @Test
  2. public void testScheduleAtFixedRate() throws InterruptedException {
  3.     // 创建调度任务线程池
  4.     ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
  5.     // 按照固定2秒时间执行
  6.     scheduledExecutorService.scheduleAtFixedRate(() -> {
  7.         try {
  8.             log.info("scheduleWithFixedDelay ...");
  9.             Thread.sleep(1000);
  10.         } catch (InterruptedException e) {
  11.             e.printStackTrace();
  12.         }
  13.     }, 1, 2, TimeUnit.SECONDS);
  14.     Thread.sleep(10000);
  15. }
复制代码
综合案例

通过ScheduledThreadPoolExecutor实现每周四 18:00:00 定时执利用命。
  1. // 通过ScheduledThreadPoolExecutor实现每周四 18:00:00 定时执行任务
  2. @Test
  3. public void test() {
  4.     //  获取当前时间
  5.     LocalDateTime now = LocalDateTime.now();
  6.     System.out.println(now);
  7.     // 获取周四时间
  8.     LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
  9.     // 如果 当前时间 > 本周周四,必须找到下周周四
  10.     if(now.compareTo(time) > 0) {
  11.         time = time.plusWeeks(1);
  12.     }
  13.     System.out.println(time);
  14.     // initailDelay 代表当前时间和周四的时间差
  15.     // period 一周的间隔时间
  16.     long initailDelay = Duration.between(now, time).toMillis();
  17.     long period = 1000 * 60 * 60 * 24 * 7;
  18.     ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
  19.     pool.scheduleAtFixedRate(() -> {
  20.         System.out.println("running...");
  21.     }, initailDelay, period, TimeUnit.MILLISECONDS);
  22. }
复制代码
底层源码剖析

接下来一起看看 ScheduledThreadPool 的底层源码
数据布局


ScheduledThreadPoolExecutor继承自 ThreadPoolExecutor:
ScheduledThreadPoolExecutor 内部构造了两个内部类 ScheduledFutureTask 和 DelayedWorkQueue:
内部类ScheduledFutureTask

属性
  1. //为相同延时任务提供的顺序编号
  2. private final long sequenceNumber;
  3. //任务可以执行的时间,纳秒级
  4. private long time;
  5. //重复任务的执行周期时间,纳秒级。
  6. private final long period;
  7. //重新入队的任务
  8. RunnableScheduledFuture<V> outerTask = this;
  9. //延迟队列的索引,以支持更快的取消操作
  10. int heapIndex;
复制代码
焦点方法run()
  1. public void run() {
  2.     boolean periodic = isPeriodic();//是否为周期任务
  3.     if (!canRunInCurrentRunState(periodic))//当前状态是否可以执行
  4.         cancel(false);
  5.     else if (!periodic)
  6.         //不是周期任务,直接执行
  7.         ScheduledFutureTask.super.run();
  8.     else if (ScheduledFutureTask.super.runAndReset()) {
  9.         setNextRunTime();//设置下一次运行时间
  10.         reExecutePeriodic(outerTask);//重排序一个周期任务
  11.     }
  12. }
复制代码
说明: ScheduledFutureTask 的run方法重写了 FutureTask 的版本,以便执行周期使命时重置/重排序使命。使命的执行通过父类 FutureTask 的run实现。内部有两个针对周期使命的方法:
  1. //设置下一次执行任务的时间
  2. private void setNextRunTime() {
  3.     long p = period;
  4.     if (p > 0)  //固定速率执行,scheduleAtFixedRate
  5.         time += p;
  6.     else
  7.         time = triggerTime(-p);  //固定延迟执行,scheduleWithFixedDelay
  8. }
  9. //计算固定延迟任务的执行时间
  10. long triggerTime(long delay) {
  11.     return now() +
  12.         ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
  13. }
复制代码
  1. //重排序一个周期任务
  2. void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  3.     if (canRunInCurrentRunState(true)) {//池关闭后可继续执行
  4.         super.getQueue().add(task);//任务入列
  5.         //重新检查run-after-shutdown参数,如果不能继续运行就移除队列任务,并取消任务的执行
  6.         if (!canRunInCurrentRunState(true) && remove(task))
  7.             task.cancel(false);
  8.         else
  9.             ensurePrestart();//启动一个新的线程等待任务
  10.     }
  11. }
复制代码
reExecutePeriodic与delayedExecute的执行策略一致,只不过reExecutePeriodic不会执行拒绝策略而是直接丢掉使命。
cancel方法
  1. public boolean cancel(boolean mayInterruptIfRunning) {
  2.     boolean cancelled = super.cancel(mayInterruptIfRunning);
  3.     if (cancelled && removeOnCancel && heapIndex >= 0)
  4.         remove(this);
  5.     return cancelled;
  6. }
复制代码
ScheduledFutureTask.cancel本质上由其父类 FutureTask.cancel 实现。取消使命成功后会根据removeOnCancel参数决定是否从队列中移除此使命。
焦点属性
  1. //关闭后继续执行已经存在的周期任务
  2. private volatile boolean continueExistingPeriodicTasksAfterShutdown;
  3. //关闭后继续执行已经存在的延时任务
  4. private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
  5. //取消任务后移除
  6. private volatile boolean removeOnCancel = false;
  7. //为相同延时的任务提供的顺序编号,保证任务之间的FIFO顺序
  8. private static final AtomicLong sequencer = new AtomicLong();
复制代码
构造函数

首先看下构造函数,ScheduledThreadPoolExecutor 内部有四个构造函数,这里我们只看这个最大构造灵活度的:
  1. public ScheduledThreadPoolExecutor(int corePoolSize,
  2.                                    ThreadFactory threadFactory,
  3.                                    RejectedExecutionHandler handler) {
  4.     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  5.           new DelayedWorkQueue(), threadFactory, handler);
  6. }
复制代码
构造函数都是通过super调用了ThreadPoolExecutor的构造,并且使用特定等待队列DelayedWorkQueue。
Schedule
  1. public <V> ScheduledFuture<V> schedule(Callable<V> callable,
  2.                                        long delay,
  3.                                        TimeUnit unit) {
  4.     if (callable == null || unit == null)
  5.         throw new NullPointerException();
  6.     RunnableScheduledFuture<V> t = decorateTask(callable,
  7.         new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));//构造ScheduledFutureTask任务
  8.     delayedExecute(t);//任务执行主方法
  9.     return t;
  10. }
复制代码
说明: schedule重要用于执行一次性(延迟)使命。函数执行逻辑分两步:
  1. protected <V> RunnableScheduledFuture<V> decorateTask(
  2.     Runnable runnable, RunnableScheduledFuture<V> task) {
  3.     return task;
  4. }
复制代码
  1. private void delayedExecute(RunnableScheduledFuture<?> task) {
  2.     if (isShutdown())
  3.         reject(task);//池已关闭,执行拒绝策略
  4.     else {
  5.         super.getQueue().add(task);//任务入队
  6.         if (isShutdown() &&
  7.             !canRunInCurrentRunState(task.isPeriodic()) &&//判断run-after-shutdown参数
  8.             remove(task))//移除任务
  9.             task.cancel(false);
  10.         else
  11.             ensurePrestart();//启动一个新的线程等待任务
  12.     }
  13. }
复制代码
说明: delayedExecute是执利用命的主方法,方法执行逻辑如下:
A: 如果池正在运行,或者 run-after-shutdown 参数值为true,则调用父类方法ensurePrestart启动一个新的线程等待执利用命。ensurePrestart源码如下:
  1. void ensurePrestart() {
  2.     int wc = workerCountOf(ctl.get());
  3.     if (wc < corePoolSize)
  4.         addWorker(null, true);
  5.     else if (wc == 0)
  6.         addWorker(null, false);
  7. }
复制代码
ensurePrestart是父类 ThreadPoolExecutor 的方法,用于启动一个新的工作线程等待执利用命,即使corePoolSize为0也会安排一个新线程。
B: 如果池已经关闭,并且 run-after-shutdown 参数值为false,则执行父类(ThreadPoolExecutor)方法remove移除队列中的指定使命,成功移除后调用ScheduledFutureTask.cancel取消使命
scheduleAtFixedRate 和 scheduleWithFixedDelay
  1. /**
  2. * 创建一个周期执行的任务,第一次执行延期时间为initialDelay,
  3. * 之后每隔period执行一次,不等待第一次执行完成就开始计时
  4. */
  5. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  6.                                               long initialDelay,
  7.                                               long period,
  8.                                               TimeUnit unit) {
  9.     if (command == null || unit == null)
  10.         throw new NullPointerException();
  11.     if (period <= 0)
  12.         throw new IllegalArgumentException();
  13.     //构建RunnableScheduledFuture任务类型
  14.     ScheduledFutureTask<Void> sft =
  15.         new ScheduledFutureTask<Void>(command,
  16.                                       null,
  17.                                       triggerTime(initialDelay, unit),//计算任务的延迟时间
  18.                                       unit.toNanos(period));//计算任务的执行周期
  19.     RunnableScheduledFuture<Void> t = decorateTask(command, sft);//执行用户自定义逻辑
  20.     sft.outerTask = t;//赋值给outerTask,准备重新入队等待下一次执行
  21.     delayedExecute(t);//执行任务
  22.     return t;
  23. }
  24. /**
  25. * 创建一个周期执行的任务,第一次执行延期时间为initialDelay,
  26. * 在第一次执行完之后延迟delay后开始下一次执行
  27. */
  28. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  29.                                                  long initialDelay,
  30.                                                  long delay,
  31.                                                  TimeUnit unit) {
  32.     if (command == null || unit == null)
  33.         throw new NullPointerException();
  34.     if (delay <= 0)
  35.         throw new IllegalArgumentException();
  36.     //构建RunnableScheduledFuture任务类型
  37.     ScheduledFutureTask<Void> sft =
  38.         new ScheduledFutureTask<Void>(command,
  39.                                       null,
  40.                                       triggerTime(initialDelay, unit),//计算任务的延迟时间
  41.                                       unit.toNanos(-delay));//计算任务的执行周期
  42.     RunnableScheduledFuture<Void> t = decorateTask(command, sft);//执行用户自定义逻辑
  43.     sft.outerTask = t;//赋值给outerTask,准备重新入队等待下一次执行
  44.     delayedExecute(t);//执行任务
  45.     return t;
  46. }
复制代码
原理探究

那大家有没有想过为什么使命出错会导致非常无法打印,以致调度都取消了呢?从源码出发,一探究竟。
从上面delayedExecute方法可以看到,延迟或周期性使命的重要执行方法, 重要是将使命丢到队列中,后续再由工作线程获取执行。
  1. public void shutdown() {
  2.     super.shutdown();
  3. }
  4. //取消并清除由于关闭策略不应该运行的所有任务
  5. @Override void onShutdown() {
  6.     BlockingQueue<Runnable> q = super.getQueue();
  7.     //获取run-after-shutdown参数
  8.     boolean keepDelayed =
  9.         getExecuteExistingDelayedTasksAfterShutdownPolicy();
  10.     boolean keepPeriodic =
  11.         getContinueExistingPeriodicTasksAfterShutdownPolicy();
  12.     if (!keepDelayed && !keepPeriodic) {//池关闭后不保留任务
  13.         //依次取消任务
  14.         for (Object e : q.toArray())
  15.             if (e instanceof RunnableScheduledFuture<?>)
  16.                 ((RunnableScheduledFuture<?>) e).cancel(false);
  17.         q.clear();//清除等待队列
  18.     }
  19.     else {//池关闭后保留任务
  20.         // Traverse snapshot to avoid iterator exceptions
  21.         //遍历快照以避免迭代器异常
  22.         for (Object e : q.toArray()) {
  23.             if (e instanceof RunnableScheduledFuture) {
  24.                 RunnableScheduledFuture<?> t =
  25.                     (RunnableScheduledFuture<?>)e;
  26.                 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
  27.                     t.isCancelled()) { // also remove if already cancelled
  28.                     //如果任务已经取消,移除队列中的任务
  29.                     if (q.remove(t))
  30.                         t.cancel(false);
  31.                 }
  32.             }
  33.         }
  34.     }
  35.     tryTerminate(); //终止线程池
  36. }
复制代码
  1. @Test
  2. public void testException() throws InterruptedException {
  3.     // 创建1个线程的调度任务线程池
  4.     ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
  5.     // 创建一个任务
  6.     Runnable runnable = new Runnable() {
  7.         volatile int num = 0;
  8.         @Override
  9.         public void run() {
  10.             num ++;
  11.             // 模拟执行报错
  12.             if(num > 5) {
  13.                 throw new RuntimeException("执行错误");
  14.             }
  15.             log.info("exec num: [{}].....", num);
  16.         }
  17.     };
  18.     // 每隔1秒钟执行一次任务
  19.     scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS);
  20.     Thread.sleep(10000);
  21. }
复制代码
小结

Java的ScheduledThreadPoolExecutor定时使命线程池所调度的使命中如果抛出了非常,并且非常没有捕获直接抛到框架中,会导致ScheduledThreadPoolExecutor定时使命不调度了。
封装包装类,统一调度

在现实项目使用中,可以在本身的项目中封装一个包装类,要求全部的调度都提交通过统一的包装类,从而规范代码,如下代码:
  1. public void run() {
  2.     try {
  3.         num++;
  4.         // 模拟执行报错
  5.         if (num > 5) {
  6.             throw new RuntimeException("执行错误");
  7.         }
  8.         log.info("exec num: [{}].....", num);
  9.     } catch (Exception e) {
  10.             e.printStackTrace();
  11.     }
  12. }
复制代码
当然,也还可以在包装类内里封装各种监控活动,如本例打印日志执行时间等。
其它使用注意点

由于 ScheduledThreadPoolExecutor 是一个固定焦点线程数大小的线程池,并且使用了一个无界队列,以是调整maximumPoolSize对其没有任何影响(以是 ScheduledThreadPoolExecutor 没有提供可以调整最大线程数的构造函数,默认最大线程数固定为Integer.MAX_VALUE)。此外,设置corePoolSize为0或者设置焦点线程空闲后清除(allowCoreThreadTimeOut)同样也不是一个好的策略,因为一旦周期使命到达某一次运行周期时,大概导致线程池内没有线程行止理这些使命。

注意: newScheduledThreadPool(1, threadFactory) 不等价于newSingleThreadScheduledExecutor。newSingleThreadScheduledExecutor创建的线程池包管内部只有一个线程执利用命,并且线程数不可扩展;而通过newScheduledThreadPool(1, threadFactory)创建的线程池可以通过setCorePoolSize方法来修改焦点线程数。
面试题专栏

Java面试题专栏已上线,接待访问。
那么可以私信我,我会尽我所能帮助你。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4