Java - ThreadPoolExecutor源码分析
1. 为什么要自定义线程池
首先ThreadPoolExecutor中,一共提供了7个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。
但是如果直接采用JDK提供的方式去构建,可见设置的核心参数最多就两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自己创建自定义线程池。
自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。
ThreadPoolExecutor 七大核心参数:- public ThreadPoolExecutor(int corePoolSize, // 核心工作线程(当前任务执行结束后,不会销毁)
- int maximumPoolSize, // 最大工作线程(代表当前线程池中一共可以有多少工作线程)
- long keepAliveTime, // 非核心工作线程在阻塞队列位置等待时间
- TimeUnit unit, // 非核心工作线程在阻塞队列位置等待时间的单位
- BlockingQueue<Runnable> workQueue, // 任务在没有核心工作线程处理时,任务先到阻塞队列中
- ThreadFactory threadFactory, // 构建线程的线程工厂,可以自定义thread信息
- RejectedExecutionHandler handler) // 当线程池无法处理处理任务时,执行拒绝策略
复制代码 2.ThreadPoolExecutor应用
JDK提供的几种拒绝策略:
- AbortPolicy: 当前拒绝策略会在无法执行任务时,直接抛出一个异常
- public static class AbortPolicy implements RejectedExecutionHandler {
- /**
- * Creates an {@code AbortPolicy}.
- */
- public AbortPolicy() { }
- /**
- * Always throws RejectedExecutionException.
- *
- * @param r the runnable task requested to be executed
- * @param e the executor attempting to execute this task
- * @throws RejectedExecutionException always
- */
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- throw new RejectedExecutionException("Task " + r.toString() +
- " rejected from " +
- e.toString());
- }
- }
复制代码
- CallerRunsPolicy: 当前拒绝策略会在无法执行任务时,将任务交给调用者处理
- public static class CallerRunsPolicy implements RejectedExecutionHandler {
- /**
- * Creates a {@code CallerRunsPolicy}.
- */
- public CallerRunsPolicy() { }
- /**
- * Executes task r in the caller's thread, unless the executor
- * has been shut down, in which case the task is discarded.
- *
- * @param r the runnable task requested to be executed
- * @param e the executor attempting to execute this task
- */
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- if (!e.isShutdown()) {
- r.run();
- }
- }
- }
复制代码
- DiscardPolicy:当前拒绝策略会在无法执行任务时,直接将任务丢弃
- public static class DiscardPolicy implements RejectedExecutionHandler {
- /**
- * Creates a {@code DiscardPolicy}.
- */
- public DiscardPolicy() { }
- /**
- * Does nothing, which has the effect of discarding task r.
- *
- * @param r the runnable task requested to be executed
- * @param e the executor attempting to execute this task
- */
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- }
- }
复制代码
- DiscardOldestPolicy: 当前拒绝策略会在无法执行任务时,将阻塞队列中最早的任务丢弃,将当前任务再次交接线程池处理
- public static class DiscardOldestPolicy implements RejectedExecutionHandler {
- /**
- * Creates a {@code DiscardOldestPolicy} for the given executor.
- */
- public DiscardOldestPolicy() { }
- /**
- * Obtains and ignores the next task that the executor
- * would otherwise execute, if one is immediately available,
- * and then retries execution of task r, unless the executor
- * is shut down, in which case task r is instead discarded.
- *
- * @param r the runnable task requested to be executed
- * @param e the executor attempting to execute this task
- */
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- if (!e.isShutdown()) {
- e.getQueue().poll();
- e.execute(r);
- }
- }
- }
复制代码
- 当然也可以自定义拒绝策略,根据自己业务修改实现逻辑, 只需实现 RejectedExecutionHandler 类中的 rejectedExecution 方法。
3. ThreadPoolExecutor的核心属性
线程池的核心属性就是ctl,它会基于ctl拿到线程池的状态以及工作线程个数。
[code]// 当前线程的核心属性// 当前的ctl其实就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子操作// ctl表示线程池的两个核心属性// 线程池的状态: ctl的高3位,表示线程池状态// 工作线程的数量: ctl的低29位,表示工作线程的个数private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// Integer.SIZE: 获取Integer的bit位个数// 声明一个常量: COUNT_BITS = 29private static final int COUNT_BITS = Integer.SIZE - 3;// CAPACITY就是当前工作线程能记录的工作线程的最大个数private static final int CAPACITY = (1 |