北冰洋以北 发表于 2024-10-24 07:31:02

彻底搞懂ScheduledThreadPoolExecutor

前言

项目中常常会遇到一些非分布式的调度使命,需要在将来的某个时候周期性执行。实现如许的功能,我们有多种方式可以选择:

[*]Timer类, jdk1.3引入,不保举。
它全部使命都是串行执行的,同一时间只能有一个使命在执行,而且前一个使命的延迟或非常都将会影响到之后的使命。大概会出现使命执行时间过长而导致使命相互阻塞的情况

[*]Spring的@Scheduled注解,不是很保举
这种方式底层虽然是用线程池实现,但是有个最大的题目,全部的使命都使用的同一个线程池,大概会导致长周期的使命运行影响短周期使命运行,造成线程池"饥饿",更加保举的做法是同种类型的使命使用同一个线程池。

[*]自定义ScheduledThreadPoolExecutor实现调度使命
这也是本文重点讲解的方式,通过自定义ScheduledThreadPoolExecutor调度线程池,提交调度使命才是最优解。
基本先容

ScheduledThreadPoolExecutor继承自 ThreadPoolExecutor,为使命提供延迟或周期执行,属于线程池的一种。和 ThreadPoolExecutor 相比,它还具有以下几种特性:

[*]使用专门的使命类型—ScheduledFutureTask 来执行周期使命,也可以接收不需要时间调度的使命(这些使命通过 ExecutorService 来执行)。
[*]使用专门的存储队列—DelayedWorkQueue 来存储使命,DelayedWorkQueue 是无界延迟队列DelayQueue 的一种。相比ThreadPoolExecutor也简化了执行机制(delayedExecute方法,背面单独分析)。
[*]支持可选的run-after-shutdown参数,在池被关闭(shutdown)之后支持可选的逻辑来决定是否继续运行周期或延迟使命。并且当使命(重新)提交操纵与 shutdown 操纵重叠时,复查逻辑也不雷同。
基本使用

ScheduledThreadPoolExecutor 最常见的应用场景就是实现调度使命,
创建方式

创建ScheduledThreadPoolExecutor方式一共有两种,第一种是通过自定义参数,第二种通过Executors工厂方法创建。 根据阿里巴巴代码规范中的建议,更加保举使用第一种方式创建。

[*]自定义参数创建
ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,                                       RejectedExecutionHandler handler)

[*]corePoolSize:焦点工作的线程数量
[*]threadFactory:线程工厂,用来创建线程
[*]handler: 拒绝策略,饱和策略

[*]Executors工厂方法创建


[*]static ScheduledExecutorService newScheduledThreadPool(int corePoolSize):根据焦点线程数创建调度线程池。
[*]static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory):根据焦点线程数和线程工厂创建调度线程池。
焦点API


[*]schedule(Runnable command, long delay, TimeUnit unit):创建并执行在给定延迟后启用的一次性操纵


[*]command: 执行的使命
[*]delay:延迟的时间
[*]unit: 单位

[*]scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):定时执行周期使命,使命执行完成后,延迟delay时间执行


[*]command: 执行的使命
[*]initialDelay: 初始延迟的时间
[*]delay: 上次执行结束,延迟多久执行
[*]unit:单位
@Test
public void testScheduleWithFixedDelay() throws InterruptedException {
    // 创建调度任务线程池
    ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    // 按照上次执行完成后固定延迟时间调度
    scheduledExecutorService.scheduleWithFixedDelay(() -> {
      try {
            log.info("scheduleWithFixedDelay ...");
            Thread.sleep(1000);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
    }, 1, 2, TimeUnit.SECONDS);
    Thread.sleep(10000);
}
[*]scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):按照固定的评率定时执行周期使命,不受使命运行时间影响。


[*]command: 执行的使命
[*]initialDelay: 初始延迟的时间
[*]period: 周期
[*]unit:单位
@Test
public void testScheduleAtFixedRate() throws InterruptedException {
    // 创建调度任务线程池
    ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    // 按照固定2秒时间执行
    scheduledExecutorService.scheduleAtFixedRate(() -> {
      try {
            log.info("scheduleWithFixedDelay ...");
            Thread.sleep(1000);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
    }, 1, 2, TimeUnit.SECONDS);
    Thread.sleep(10000);
}综合案例

通过ScheduledThreadPoolExecutor实现每周四 18:00:00 定时执利用命。
// 通过ScheduledThreadPoolExecutor实现每周四 18:00:00 定时执行任务
@Test
public void test() {
    //获取当前时间
    LocalDateTime now = LocalDateTime.now();
    System.out.println(now);
    // 获取周四时间
    LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
    // 如果 当前时间 > 本周周四,必须找到下周周四
    if(now.compareTo(time) > 0) {
      time = time.plusWeeks(1);
    }
    System.out.println(time);
    // initailDelay 代表当前时间和周四的时间差
    // period 一周的间隔时间
    long initailDelay = Duration.between(now, time).toMillis();
    long period = 1000 * 60 * 60 * 24 * 7;
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    pool.scheduleAtFixedRate(() -> {
      System.out.println("running...");
    }, initailDelay, period, TimeUnit.MILLISECONDS);
}底层源码剖析

接下来一起看看 ScheduledThreadPool 的底层源码
数据布局

https://seven97-blog.oss-cn-hangzhou.aliyuncs.com/imgs/202404251124087.jpg
ScheduledThreadPoolExecutor继承自 ThreadPoolExecutor:
ScheduledThreadPoolExecutor 内部构造了两个内部类 ScheduledFutureTask 和 DelayedWorkQueue:

[*]ScheduledFutureTask: 继承了FutureTask,说明是一个异步运算使命;最上层分别实现了Runnable、Future、Delayed接口,说明它是一个可以延迟执行的异步运算使命。
[*]DelayedWorkQueue: 这是 ScheduledThreadPoolExecutor 为存储周期或延迟使命专门定义的一个延迟队列,继承了 AbstractQueue,为了契合 ThreadPoolExecutor 也实现了 BlockingQueue 接口。它内部只允许存储 RunnableScheduledFuture 类型的使命。与 DelayQueue 的差别之处就是它只允许存放 RunnableScheduledFuture 对象,并且本身实现了二叉堆(DelayQueue 是利用了 PriorityQueue 的二叉堆布局)。
内部类ScheduledFutureTask

属性

//为相同延时任务提供的顺序编号
private final long sequenceNumber;

//任务可以执行的时间,纳秒级
private long time;

//重复任务的执行周期时间,纳秒级。
private final long period;

//重新入队的任务
RunnableScheduledFuture<V> outerTask = this;

//延迟队列的索引,以支持更快的取消操作
int heapIndex;

[*]sequenceNumber: 当两个使命有雷同的延迟时间时,按照 FIFO 的顺序入队。sequenceNumber 就是为雷同延时使命提供的顺序编号。
[*]time: 使命可以执行时的时间,纳秒级,通过triggerTime方法计算得出。
[*]period: 使命的执行周期时间,纳秒级。正数表示固定速率执行(为scheduleAtFixedRate提供服务),负数表示固定延迟执行(为scheduleWithFixedDelay提供服务),0表示不重复使命。
[*]outerTask: 重新入队的使命,通过reExecutePeriodic方法入队重新排序。
焦点方法run()

public void run() {
    boolean periodic = isPeriodic();//是否为周期任务
    if (!canRunInCurrentRunState(periodic))//当前状态是否可以执行
      cancel(false);
    else if (!periodic)
      //不是周期任务,直接执行
      ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
      setNextRunTime();//设置下一次运行时间
      reExecutePeriodic(outerTask);//重排序一个周期任务
    }
}说明: ScheduledFutureTask 的run方法重写了 FutureTask 的版本,以便执行周期使命时重置/重排序使命。使命的执行通过父类 FutureTask 的run实现。内部有两个针对周期使命的方法:

[*]setNextRunTime(): 用来设置下一次运行的时间,源码如下:
//设置下一次执行任务的时间
private void setNextRunTime() {
    long p = period;
    if (p > 0)//固定速率执行,scheduleAtFixedRate
      time += p;
    else
      time = triggerTime(-p);//固定延迟执行,scheduleWithFixedDelay
}
//计算固定延迟任务的执行时间
long triggerTime(long delay) {
    return now() +
      ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

[*]reExecutePeriodic(): 周期使命重新入队等待下一次执行,源码如下:
//重排序一个周期任务
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {//池关闭后可继续执行
      super.getQueue().add(task);//任务入列
      //重新检查run-after-shutdown参数,如果不能继续运行就移除队列任务,并取消任务的执行
      if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
      else
            ensurePrestart();//启动一个新的线程等待任务
    }
}reExecutePeriodic与delayedExecute的执行策略一致,只不过reExecutePeriodic不会执行拒绝策略而是直接丢掉使命。
cancel方法

public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    if (cancelled && removeOnCancel && heapIndex >= 0)
      remove(this);
    return cancelled;
}ScheduledFutureTask.cancel本质上由其父类 FutureTask.cancel 实现。取消使命成功后会根据removeOnCancel参数决定是否从队列中移除此使命。
焦点属性

//关闭后继续执行已经存在的周期任务
private volatile boolean continueExistingPeriodicTasksAfterShutdown;

//关闭后继续执行已经存在的延时任务
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

//取消任务后移除
private volatile boolean removeOnCancel = false;

//为相同延时的任务提供的顺序编号,保证任务之间的FIFO顺序
private static final AtomicLong sequencer = new AtomicLong();

[*]continueExistingPeriodicTasksAfterShutdown:和executeExistingDelayedTasksAfterShutdown是 ScheduledThreadPoolExecutor 定义的 run-after-shutdown 参数,用来控制池关闭之后的使命执行逻辑。
[*]removeOnCancel:用来控制使命取消后是否从队列中移除。当一个已经提交的周期或延迟使命在运行之前被取消,那么它之后将不会运行。默认设置下,这种已经取消的使命在届期之前不会被移除。 通过这种机制,可以方便检查和监控线程池状态,但也大概导致已经取消的使命无限滞留。为了制止这种情况的发生,我们可以通过setRemoveOnCancelPolicy方法设置移除策略,把参数removeOnCancel设为true可以在使命取消后立刻从队列中移除。
[*]sequencer:是为雷同延时的使命提供的顺序编号,包管使命之间的 FIFO 顺序。与 ScheduledFutureTask 内部的sequenceNumber参数作用一致。
构造函数

首先看下构造函数,ScheduledThreadPoolExecutor 内部有四个构造函数,这里我们只看这个最大构造灵活度的:
public ScheduledThreadPoolExecutor(int corePoolSize,
                                 ThreadFactory threadFactory,
                                 RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}构造函数都是通过super调用了ThreadPoolExecutor的构造,并且使用特定等待队列DelayedWorkQueue。
Schedule

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
      throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
      new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));//构造ScheduledFutureTask任务
    delayedExecute(t);//任务执行主方法
    return t;
}说明: schedule重要用于执行一次性(延迟)使命。函数执行逻辑分两步:

[*]封装 Callable/Runnable
protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

[*]执利用命
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
      reject(task);//池已关闭,执行拒绝策略
    else {
      super.getQueue().add(task);//任务入队
      if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&//判断run-after-shutdown参数
            remove(task))//移除任务
            task.cancel(false);
      else
            ensurePrestart();//启动一个新的线程等待任务
    }
}说明: delayedExecute是执利用命的主方法,方法执行逻辑如下:

[*]如果池已关闭(ctl >= SHUTDOWN),执利用命拒绝策略;
[*]池正在运行,首先把使命入队排序;然后重新检查池的关闭状态,执行如下逻辑:
A: 如果池正在运行,或者 run-after-shutdown 参数值为true,则调用父类方法ensurePrestart启动一个新的线程等待执利用命。ensurePrestart源码如下:
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
      addWorker(null, true);
    else if (wc == 0)
      addWorker(null, false);
}ensurePrestart是父类 ThreadPoolExecutor 的方法,用于启动一个新的工作线程等待执利用命,即使corePoolSize为0也会安排一个新线程。
B: 如果池已经关闭,并且 run-after-shutdown 参数值为false,则执行父类(ThreadPoolExecutor)方法remove移除队列中的指定使命,成功移除后调用ScheduledFutureTask.cancel取消使命
scheduleAtFixedRate 和 scheduleWithFixedDelay

/**
* 创建一个周期执行的任务,第一次执行延期时间为initialDelay,
* 之后每隔period执行一次,不等待第一次执行完成就开始计时
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
      throw new NullPointerException();
    if (period <= 0)
      throw new IllegalArgumentException();
    //构建RunnableScheduledFuture任务类型
    ScheduledFutureTask<Void> sft =
      new ScheduledFutureTask<Void>(command,
                                    null,
                                    triggerTime(initialDelay, unit),//计算任务的延迟时间
                                    unit.toNanos(period));//计算任务的执行周期
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);//执行用户自定义逻辑
    sft.outerTask = t;//赋值给outerTask,准备重新入队等待下一次执行
    delayedExecute(t);//执行任务
    return t;
}

/**
* 创建一个周期执行的任务,第一次执行延期时间为initialDelay,
* 在第一次执行完之后延迟delay后开始下一次执行
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
      throw new NullPointerException();
    if (delay <= 0)
      throw new IllegalArgumentException();
    //构建RunnableScheduledFuture任务类型
    ScheduledFutureTask<Void> sft =
      new ScheduledFutureTask<Void>(command,
                                    null,
                                    triggerTime(initialDelay, unit),//计算任务的延迟时间
                                    unit.toNanos(-delay));//计算任务的执行周期
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);//执行用户自定义逻辑
    sft.outerTask = t;//赋值给outerTask,准备重新入队等待下一次执行
    delayedExecute(t);//执行任务
    return t;
}原理探究

那大家有没有想过为什么使命出错会导致非常无法打印,以致调度都取消了呢?从源码出发,一探究竟。
从上面delayedExecute方法可以看到,延迟或周期性使命的重要执行方法, 重要是将使命丢到队列中,后续再由工作线程获取执行。

[*]在使命入队列后,就是执利用命内容了,使命内容其实就是在继承了Runnable类的run方法中。
public void shutdown() {
    super.shutdown();
}
//取消并清除由于关闭策略不应该运行的所有任务
@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    //获取run-after-shutdown参数
    boolean keepDelayed =
      getExecuteExistingDelayedTasksAfterShutdownPolicy();
    boolean keepPeriodic =
      getContinueExistingPeriodicTasksAfterShutdownPolicy();
    if (!keepDelayed && !keepPeriodic) {//池关闭后不保留任务
      //依次取消任务
      for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
      q.clear();//清除等待队列
    }
    else {//池关闭后保留任务
      // Traverse snapshot to avoid iterator exceptions
      //遍历快照以避免迭代器异常
      for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                  (RunnableScheduledFuture<?>)e;
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                  t.isCancelled()) { // also remove if already cancelled
                  //如果任务已经取消,移除队列中的任务
                  if (q.remove(t))
                        t.cancel(false);
                }
            }
      }
    }
    tryTerminate(); //终止线程池
}

[*]这里的关键就是看ScheduledFutureTask.super.runAndReset()方法是否返回true,如果是true的话继续调度。

[*]runAndReset方法也很简单,关键就是看报非常如何处置惩罚。
@Test
public void testException() throws InterruptedException {
    // 创建1个线程的调度任务线程池
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    // 创建一个任务
    Runnable runnable = new Runnable() {

      volatile int num = 0;

      @Override
      public void run() {
            num ++;
            // 模拟执行报错
            if(num > 5) {
                throw new RuntimeException("执行错误");
            }
            log.info("exec num: [{}].....", num);
      }
    };

    // 每隔1秒钟执行一次任务
    scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS);
    Thread.sleep(10000);
}

[*]关键点ran变量,终极返回是不是下次继续调度执行
[*]如果抛出非常的话,可以看到不会修改ran为true。
小结

Java的ScheduledThreadPoolExecutor定时使命线程池所调度的使命中如果抛出了非常,并且非常没有捕获直接抛到框架中,会导致ScheduledThreadPoolExecutor定时使命不调度了。
封装包装类,统一调度

在现实项目使用中,可以在本身的项目中封装一个包装类,要求全部的调度都提交通过统一的包装类,从而规范代码,如下代码:
public void run() {
    try {
      num++;
      // 模拟执行报错
      if (num > 5) {
            throw new RuntimeException("执行错误");
      }
      log.info("exec num: [{}].....", num);
    } catch (Exception e) {
          e.printStackTrace();
    }
}当然,也还可以在包装类内里封装各种监控活动,如本例打印日志执行时间等。
其它使用注意点


[*]为什么ThreadPoolExecutor 的调整策略却不适用于 ScheduledThreadPoolExecutor?
由于 ScheduledThreadPoolExecutor 是一个固定焦点线程数大小的线程池,并且使用了一个无界队列,以是调整maximumPoolSize对其没有任何影响(以是 ScheduledThreadPoolExecutor 没有提供可以调整最大线程数的构造函数,默认最大线程数固定为Integer.MAX_VALUE)。此外,设置corePoolSize为0或者设置焦点线程空闲后清除(allowCoreThreadTimeOut)同样也不是一个好的策略,因为一旦周期使命到达某一次运行周期时,大概导致线程池内没有线程行止理这些使命。

[*]Executors 提供了哪几种方法来构造 ScheduledThreadPoolExecutor?


[*]newScheduledThreadPool: 可指定焦点线程数的线程池。
[*]newSingleThreadScheduledExecutor: 只有一个工作线程的线程池。如果内部工作线程由于执行周期使命非常而被终止,则会新建一个线程替换它的位置。
注意: newScheduledThreadPool(1, threadFactory) 不等价于newSingleThreadScheduledExecutor。newSingleThreadScheduledExecutor创建的线程池包管内部只有一个线程执利用命,并且线程数不可扩展;而通过newScheduledThreadPool(1, threadFactory)创建的线程池可以通过setCorePoolSize方法来修改焦点线程数。
面试题专栏

Java面试题专栏已上线,接待访问。

[*]如果你不知道简历怎么写,简历项目不知道怎么包装;
[*]如果简历中有些内容你不知道该不应写上去;
[*]如果有些综合性题目你不知道怎么答;
那么可以私信我,我会尽我所能帮助你。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 彻底搞懂ScheduledThreadPoolExecutor