Java - ThreadPoolExecutor线程池分析

打印 上一主题 下一主题

主题 510|帖子 510|积分 1530

Java - ThreadPoolExecutor源码分析

 1. 为什么要自定义线程池

首先ThreadPoolExecutor中,一共提供了7个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。
但是如果直接采用JDK提供的方式去构建,可见设置的核心参数最多就两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自己创建自定义线程池。
自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。
ThreadPoolExecutor 七大核心参数:
  1. public ThreadPoolExecutor(int corePoolSize,                         //  核心工作线程(当前任务执行结束后,不会销毁)
  2.                           int maximumPoolSize,                      //  最大工作线程(代表当前线程池中一共可以有多少工作线程)
  3.                           long keepAliveTime,                       //  非核心工作线程在阻塞队列位置等待时间
  4.                           TimeUnit unit,                            //  非核心工作线程在阻塞队列位置等待时间的单位
  5.                           BlockingQueue<Runnable> workQueue,        //  任务在没有核心工作线程处理时,任务先到阻塞队列中
  6.                           ThreadFactory threadFactory,              //  构建线程的线程工厂,可以自定义thread信息
  7.                           RejectedExecutionHandler handler)         //  当线程池无法处理处理任务时,执行拒绝策略
复制代码
2.ThreadPoolExecutor应用

JDK提供的几种拒绝策略:

  • AbortPolicy: 当前拒绝策略会在无法执行任务时,直接抛出一个异常
  1. public static class AbortPolicy implements RejectedExecutionHandler {
  2.     /**
  3.      * Creates an {@code AbortPolicy}.
  4.      */
  5.     public AbortPolicy() { }
  6.     /**
  7.      * Always throws RejectedExecutionException.
  8.      *
  9.      * @param r the runnable task requested to be executed
  10.      * @param e the executor attempting to execute this task
  11.      * @throws RejectedExecutionException always
  12.      */
  13.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  14.         throw new RejectedExecutionException("Task " + r.toString() +
  15.                                              " rejected from " +
  16.                                              e.toString());
  17.     }
  18. }
复制代码

  • CallerRunsPolicy: 当前拒绝策略会在无法执行任务时,将任务交给调用者处理
  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2.     /**
  3.      * Creates a {@code CallerRunsPolicy}.
  4.      */
  5.     public CallerRunsPolicy() { }
  6.     /**
  7.      * Executes task r in the caller's thread, unless the executor
  8.      * has been shut down, in which case the task is discarded.
  9.      *
  10.      * @param r the runnable task requested to be executed
  11.      * @param e the executor attempting to execute this task
  12.      */
  13.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  14.         if (!e.isShutdown()) {
  15.             r.run();
  16.         }
  17.     }
  18. }
复制代码

  • DiscardPolicy:当前拒绝策略会在无法执行任务时,直接将任务丢弃
  1. public static class DiscardPolicy implements RejectedExecutionHandler {
  2.     /**
  3.      * Creates a {@code DiscardPolicy}.
  4.      */
  5.     public DiscardPolicy() { }
  6.     /**
  7.      * Does nothing, which has the effect of discarding task r.
  8.      *
  9.      * @param r the runnable task requested to be executed
  10.      * @param e the executor attempting to execute this task
  11.      */
  12.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  13.     }
  14. }
复制代码

  • DiscardOldestPolicy: 当前拒绝策略会在无法执行任务时,将阻塞队列中最早的任务丢弃,将当前任务再次交接线程池处理
  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  2.     /**
  3.      * Creates a {@code DiscardOldestPolicy} for the given executor.
  4.      */
  5.     public DiscardOldestPolicy() { }
  6.     /**
  7.      * Obtains and ignores the next task that the executor
  8.      * would otherwise execute, if one is immediately available,
  9.      * and then retries execution of task r, unless the executor
  10.      * is shut down, in which case task r is instead discarded.
  11.      *
  12.      * @param r the runnable task requested to be executed
  13.      * @param e the executor attempting to execute this task
  14.      */
  15.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  16.         if (!e.isShutdown()) {
  17.             e.getQueue().poll();
  18.             e.execute(r);
  19.         }
  20.     }
  21. }
复制代码

  • 当然也可以自定义拒绝策略,根据自己业务修改实现逻辑, 只需实现   RejectedExecutionHandler 类中的 rejectedExecution 方法。
 3. ThreadPoolExecutor的核心属性

线程池的核心属性就是ctl,它会基于ctl拿到线程池的状态以及工作线程个数。
[code]//  当前线程的核心属性//  当前的ctl其实就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子操作//  ctl表示线程池的两个核心属性//  线程池的状态: ctl的高3位,表示线程池状态//  工作线程的数量: ctl的低29位,表示工作线程的个数private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//  Integer.SIZE: 获取Integer的bit位个数//  声明一个常量: COUNT_BITS = 29private static final int COUNT_BITS = Integer.SIZE - 3;//  CAPACITY就是当前工作线程能记录的工作线程的最大个数private static final int CAPACITY   = (1
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

立聪堂德州十三局店

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表