线程池底层原理详解与源码分析(补充部分---ScheduledThreadPoolExecutor类 ...

打印 上一主题 下一主题

主题 774|帖子 774|积分 2322

【1】前言
  本篇幅是对 线程池底层原理详解与源码分析  的补充,默认你已经看完了上一篇对ThreadPoolExecutor类有了足够的了解。
 
【2】ScheduledThreadPoolExecutor的介绍
  1.ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

  2.构造函数展示
  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2.     super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue());
  3. }
  4. public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
  5.     super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue(), threadFactory);
  6. }
  7. public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {
  8.     super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue(), handler);
  9. }
  10. public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
  11.     super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue(), threadFactory, handler);
  12. }
复制代码
 
  3.通过构造函数我们可以看到,它的线程池本身就是调用ThreadPoolExecutor类的构造方法,因此也继承了ThreadPoolExecutor类所存在的隐患:
    允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
    允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。(且CPU会变成100%)
 
  4.PS:既然隐患这么严重,使用原生的不太合适。正所谓,人无横财不富,马无夜草不肥,打不过就加入。ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,那就写个类继承它然后调用ThreadPoolExecutor的构造方法区解决掉创建线程数被写死为最大值的情况,然后了解一下DelayedWorkQueue(这个本质上也是优先级队列),继承一下也改写吧。毕竟自己的最合适不是吗。【毕竟我觉得这些都是大佬们留给菜鸡的底版,如拒绝策略不也是四个默认都没人用吗,都是要你根据自己的场景改】(毕竟我这猜测的原因是因为有了无尽队列,其实线程数设置为Integer.MAX_VALUE已经没有意义了)
 
【3】ScheduledThreadPoolExecutor的使用
 
  1)schedule(Runnable command, long delay, TimeUnit unit) 
    方法说明:无返回值的延迟任务,有个严重的问题,就是没有办法获知task的执行结果
  2)schedule(Callable callable, long delay, TimeUnit unit)
    方法说明:有返回值的延迟任务 :接收的是Callable实例,会返回一个ScheduleFuture对象,通过ScheduleFuture可以取消一个未执行的task,也可以获得这个task的执行结果
  3)scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 
    方法说明: 固定频率周期任务:第一次执行的延迟根据initialDelay参数确定,以后每一次执行都间隔period时长
  4)scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 
    方法说明: 固定延迟周期任务 :scheduleWithFixedDelay的参数和scheduleAtFixedRate参数完全一致,它们的不同之处在于对period调度周期的解释。在scheduleAtFixedRate中,period指的两个任务开始执行的时间间隔,也就是当前任务的开始执行时间和下个任务的开始执行时间之间的间隔。而在scheduleWithFixedDelay中,period指的当前任务的结束执行时间到下个任务的开始执行时间。
 
 
【4】任务ScheduledFutureTask类源码分析
  1.构造方法展示
    代码展示
  1. private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
  2. ...
  3.     ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber) {
  4.         super(r, result);
  5.         this.time = triggerTime; //表示这个任务将要被执行的具体时间
  6.         this.period = 0;  //表示任务执行的间隔周期
  7.         this.sequenceNumber = sequenceNumber;  //表示这个任务被添加到ScheduledThreadPoolExecutor中的序号(采用AtomicLong原子类累加当做序号)
  8.     }
  9.     ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber) {
  10.         super(r, result);
  11.         this.time = triggerTime;
  12.         this.period = period;
  13.         this.sequenceNumber = sequenceNumber;
  14.     }
  15.     ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber) {
  16.         super(callable);
  17.         this.time = triggerTime;
  18.         this.period = 0;
  19.         this.sequenceNumber = sequenceNumber;
  20.     }
  21. ...
  22. }
复制代码
 
    代码说明
      1.三个标注的参数是任务中主要的成员变量。
      2.其次,我们会发现callable的任务是没有间隔周期的:因为callable本身就是阻塞等待,而且周期性的也不合适。
      3.实现了RunnableScheduledFuture接口,其主要方法isPeriodic()用于判断是不是周期任务,又继承了RunnableFuture接口.
      4.ScheduledFutureTask又继承了FutureTask类,而FutureTask类实现了RunnableFuture接口。(故感觉RunnableFuture接口的那些方法挺重要的)
      5.RunnableFuture接口主要是由Runnable和Future两大接口组成(自己去看继承关系),主要有run()方法。
 
  2.ScheduledFutureTask类#run方法
    代码展示
  1. // 重写FutureTask,如果是周期性任务需要重新放入队列
  2. public void run() {
  3.     // 检查当前状态 不能执行任务,则取消任务
  4.     if (!canRunInCurrentRunState(this))
  5.         cancel(false);
  6.     //如果不是周期任务,调用FutureTask.run()执行任务(非周期任务直接执行)
  7.     else if (!isPeriodic())
  8.         super.run();
  9.     // 周期性任务
  10.     else if (super.runAndReset()) {
  11.         //与run方法的不同就是正常完成后任务的状态不会变化,依旧是NEW,且返回值为成功或失败,不会设置result属性
  12.         setNextRunTime(); //设置任务下次执行时间
  13.         reExecutePeriodic(outerTask);
  14.     }
  15. }
复制代码
 
 
 
    代码说明
      1.这里面很明显存在一个隐患,那就是没有捕捉异常,所以如果我们自定义的run()方法中如果没有捕捉异常的话,那么出现异常的时候我们容易两眼摸瞎。
      2.故使用定时任务的时候,自定义的run方法需要自行捕捉异常进行处理。
 
  3.ScheduledFutureTask类#setNextRunTime方法
    代码展示
  1. //判断指定的任务是否为定期任务
  2. private void setNextRunTime() {
  3.     long p = period; //取出周期时间
  4.     if (p > 0)
  5.         time += p; //time是周期任务的下一次执行时间
  6.     else
  7.         time = triggerTime(-p);
  8. }
  9. // ScheduledThreadPoolExecutor中的方法
  10. long triggerTime(long delay) {
  11.    //delay 的值是否小于 Long.MAX_VALUE 的一半,是的话,当前时间+延迟时间
  12.     return System.nanoTime() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
  13. }
  14. // ScheduledThreadPoolExecutor中的方法
  15. private long overflowFree(long delay) {
  16.     //获取队列中的首节点
  17.     Delayed head = (Delayed) super.getQueue().peek();
  18.     //获取的节点不为空,则进行后续处理
  19.     if (head != null) {
  20.         //从队列节点中获取延迟时间
  21.         long headDelay = head.getDelay(NANOSECONDS);
  22.         //如果从队列中获取的延迟时间小于0,并且传递的delay值减去从队列节点中获取延迟时间小于0
  23.         if (headDelay < 0 && (delay - headDelay < 0))
  24.             //将delay的值设置为Long.MAX_VALUE + headDelay(该数字为负数)
  25.             delay = Long.MAX_VALUE + headDelay;
  26.     }
  27.     //返回延迟时间
  28.     return delay;
  29. }
复制代码
 
 
 
 
    代码说明
 
      1.周期时间period有正有负,这是ScheduledThreadPoolExecutor的ScheduledAtFixedRate和ScheduledWithFixedDelay的方法区别,前者为正数,后者为负数。
      2.正数时,下一次执行时间为原来的执行时间+周期,即以执行开始时间为基准。
      3.负数时,不考虑溢出情况,下一次执行时间为当前时间+周期,即以执行结束时间为基准。如果溢出,下一次执行时间为Long.MAX_VALUE + headDelay。
 
 
    疑问说明(这一步有兴趣的需要自己去调试然后在核心方法处断点查看就可以了)
      其实只要当做作System.nanoTime() + delay就可以了,没必要关注overflowFree这一步,原因:
        1.如果执行了  Long.MAX_VALUE + headDelay ,triggerTime方法会获得负数,示例代码
  1. executor.scheduleAtFixedRate(task, 20, //其实把这个队列看作树结构会更容易理解(要理解数组与完全二叉树的关联)
  2. private void siftUp(int k, RunnableScheduledFuture<?> key) {
  3.     while (k > 0) {
  4.         int parent = (k - 1) >>> 1; //父节点坐标
  5.         RunnableScheduledFuture<?> e = queue[parent]; //获取父节点的值
  6.         // 如果 节点>= 父节点,确定最终位置
  7.         if (key.compareTo(e) >= 0)
  8.             break;
  9.         // 节点<父节点,将节点向上移动(就是将父节点放在k处)
  10.         queue[k] = e;
  11.         setIndex(e, k);
  12.         k = parent;
  13.     }
  14.     //确定key的最后落脚处
  15.     queue[k] = key;
  16.     setIndex(key, k);
  17. }, TimeUnit.NANOSECONDS);//任延迟取最大值 稳定定时器
  18. executor.scheduleWithFixedDelay(task, 1, 9223272036854775807L, TimeUnit.NANOSECONDS); //任务+延迟
复制代码
 
        2.如果不执行  Long.MAX_VALUE + headDelay ,triggerTime方法也有可能获得负数,示例代码:
  1. executor.scheduleAtFixedRate(task, 20, 4611686018427387900L, TimeUnit.NANOSECONDS);
  2. executor.scheduleWithFixedDelay(task, 1, 9223272036854775807L, TimeUnit.NANOSECONDS);
复制代码
 
        3.而且获得负数在compareTo这一步不影响排序。【可能是由于科技发展的缘故吧,现在Long.MAX_VALUE【9223372036854775807L】溢出了,就会变为-9223372036854775808L,对排序不影响】
 
 
 
 
【5】ScheduledThreadPoolExecutor类源码分析
  1.ScheduledThreadPoolExecutor的四种使用方法
  1. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  2.     if (command == null || unit == null)
  3.         throw new NullPointerException();
  4.     RunnableScheduledFuture<Void> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit), sequencer.getAndIncrement()));
  5.     delayedExecute(t);
  6.     return t;
  7. }
  8. public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,  TimeUnit unit) {
  9.     if (callable == null || unit == null)
  10.         throw new NullPointerException();
  11.     RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit), sequencer.getAndIncrement()));
  12.     delayedExecute(t);
  13.     return t;
  14. }
  15. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
  16.     if (command == null || unit == null)
  17.         throw new NullPointerException();
  18.     if (delay <= 0L)
  19.         throw new IllegalArgumentException();
  20.     //这里设置的-unit.toNanos(delay)是负数
  21.     ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay), sequencer.getAndIncrement());
  22.     //这个方法是用于以后做扩展的
  23.     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  24.     sft.outerTask = t;
  25.     delayedExecute(t);
  26.     return t;
  27. }
  28. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,  long initialDelay, long period,  TimeUnit unit) {
  29.     if (command == null || unit == null)
  30.         throw new NullPointerException();
  31.     if (period <= 0L)
  32.         throw new IllegalArgumentException();
  33.     //这里设置unit.toNanos(period)是正数
  34.     ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,  triggerTime(initialDelay, unit), unit.toNanos(period), sequencer.getAndIncrement());
  35.     //这个方法是用于以后做扩展的
  36.     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  37.     sft.outerTask = t;
  38.     delayedExecute(t);
  39.     return t;
  40. }
复制代码
 
 
 
  1.DelayedWorkQueue类#add方法
  1. //获取初始的延迟执行时间(以纳秒的形式,相当于我在哪个时间点要执行)
  2. private long triggerTime(long delay, TimeUnit unit) {
  3.     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
  4. }
  5. long triggerTime(long delay) {
  6.     return System.nanoTime() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
  7. }
复制代码
 
  2.DelayedWorkQueue类#siftUp方法
  1. private void delayedExecute(RunnableScheduledFuture<?> task) {
  2.     //如果处于非运行状态则拒绝任务(这个方法里面比较的是不是比关闭状态大)
  3.     if (isShutdown())
  4.         reject(task);
  5.     else {
  6.         //加入队列
  7.         super.getQueue().add(task);
  8.         //如果加入队列后canRunInCurrentRunState检测线程池,返回false则移除任务
  9.         if (!canRunInCurrentRunState(task) && remove(task))
  10.             task.cancel(false); //以不可中断方式执行完成执行中的调度任务
  11.         else
  12.             ensurePrestart();
  13.     }
  14. }
  15. boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
  16.     //如果处于运行状态返回true
  17.     if (!isShutdown())
  18.         return true;
  19.     //处于停止状态,整理状态,销毁状态,三者之一返回false
  20.     if (isStopped())
  21.         return false;
  22.     //处于关闭状态,返回run-after-shutdown参数
  23.     return task.isPeriodic()  
  24.         ? continueExistingPeriodicTasksAfterShutdown //默认false
  25.         : (executeExistingDelayedTasksAfterShutdown
  26.            || task.getDelay(NANOSECONDS) <= 0);
  27. }
  28. void ensurePrestart() {
  29.     int wc = workerCountOf(ctl.get());
  30.     if (wc < corePoolSize) //保持工作者与核心线程数持平
  31.         addWorker(null, true);
  32.     else if (wc == 0) //即时核心线程是0,也至少会启动一个
  33.         addWorker(null, false);
  34. }
复制代码
 
 
 
  6.DelayedWorkQueue类#remove方法
  1. private static final int INITIAL_CAPACITY = 16;  // 初始容量
  2. private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
  3. // 控制并发和阻塞等待
  4. private final ReentrantLock lock = new ReentrantLock();
  5. private final Condition available = lock.newCondition(); //这个可以参考take方法与offer方法,个人觉得是采用中断方式唤醒持有锁的线程
  6. private int size; // 节点数量
  7. private Thread leader;//记录持有锁的线程(当等待的时候)
复制代码
 
 
 
  7.DelayedWorkQueue类#siftDown方法
  1. public boolean add(Runnable e) {
  2.     return offer(e);
  3. }
  4. public boolean offer(Runnable x) {
  5.     //空值校验
  6.     if (x == null)
  7.         throw new NullPointerException();
  8.     RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
  9.     final ReentrantLock lock = this.lock;
  10.     //加锁
  11.     lock.lock();
  12.     try {
  13.         int i = size;
  14.         // 超过容量,扩容
  15.         if (i >= queue.length)
  16.             grow();
  17.         size = i + 1; //更新当前节点数
  18.         if (i == 0) {  //插入的是第一个节点(阻塞队列原本为空)
  19.             queue[0] = e;
  20.             setIndex(e, 0); //setIndex(e, 0)用于修改ScheduledFutureTask的heapIndex属性,表示该对象在队列里的下标
  21.         } else {//阻塞队列非空
  22.             siftUp(i, e); //在插入新节点后对堆进行调整,进行节点上移,保持其特性(节点的值小于子节点的值)不变
  23.         }
  24.         /**
  25.          * 这里最好结合take方法理解一下
  26.          * 队列头等于当前任务,说明了当前任务的等待时间是最小的。此时为什么要去清空leader?
  27.          * leader代表的是某一个正在等待获取元素的线程句柄,
  28.          * 在take的时候因为之前的头结点时间未到,不能拿,被休眠了一定时间(而这个时间就是距离之前那个队列头结点的可以出队列的时间差)。
  29.          * 此时头结点换了,理应清空句柄,唤醒它,让它再次尝试去获取最新的头结点(就算是再次休眠,时间也会比之前的少)。
  30.          */
  31.         if (queue[0] == e) {
  32.             leader = null;
  33.             available.signal();
  34.         }
  35.     } finally {
  36.         lock.unlock(); //解锁
  37.     }
  38.     return true;
  39. }
复制代码
 
 
 
 
  1. //其实把这个队列看作树结构会更容易理解(要理解数组与完全二叉树的关联)
  2. private void siftUp(int k, RunnableScheduledFuture<?> key) {
  3.     while (k > 0) {
  4.         int parent = (k - 1) >>> 1; //父节点坐标
  5.         RunnableScheduledFuture<?> e = queue[parent]; //获取父节点的值
  6.         // 如果 节点>= 父节点,确定最终位置
  7.         if (key.compareTo(e) >= 0)
  8.             break;
  9.         // 节点<父节点,将节点向上移动(就是将父节点放在k处)
  10.         queue[k] = e;
  11.         setIndex(e, k);
  12.         k = parent;
  13.     }
  14.     //确定key的最后落脚处
  15.     queue[k] = key;
  16.     setIndex(key, k);
  17. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

欢乐狗

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表