前言
上篇文章 13分钟聊聊并发包中常用同步组件并手写一个自定义同步组件 聊到并发包中常用的同步组件,并且还手把手实现了自定义的同步组件
本篇文章来聊聊并发包下的另一个核心-线程池
阅读本文大概12分钟
通读本篇文章前先来看看几个问题,看看你是否以及理解线程池
- 什么是池化技术?它有什么特点,哪些场景使用?
- Executor是什么?它的设计思想是什么样的?
- 工作任务有几种?有什么特点?如何适配然后交给Executor的?
- 线程池是如何实现的?有哪些核心参数,该如何配置?工作流程是怎样的?
- 线程池如何优雅的处理异常?如何关闭线程池?
- 处理定时的线程池是如何实现的?
池化技术
线程的创建、销毁都会带来一定的开销
如果当我们需要使用到多线程时再去创建,使用完又去销毁,这样去使用不仅会拉长业务流程,还会增加创建、销毁线程的开销
于是有了池化技术的思想,将线程提前创建出来,放在一个池子(容器)中进行管理
当需要使用时,从池子里拿取一个线程来执行任务,执行完毕后再放回池子
不仅是线程有池化的思想,连接也有池化的思想,也就是连接池
池化技术不仅能复用资源、提高响应,还方便管理
Executor框架
Executor框架是什么?
可以暂时把Executor看成线程池的抽象,它定义如何去执行任务- public interface Executor {
- void execute(Runnable command);
- }
复制代码 Executor将工作任务与线程池进行分离解耦

工作任务被分为两种:无返回结果的Runnable和有返回结果的Callable
在线程池中允许执行这两种任务,其中它们都是函数式接口,可以使用lambda表达式来实现
有的同学可能会有疑问,上文Executor框架定义的执行方法不是只允许传入Runnable任务吗?
那Callable任务调用哪个方法来执行呢?
Future接口用来定义获取异步任务的结果,它的实现类常是FutureTask
FutureTask实现Runnable的同时,还用字段存储Callable,在其实现Runnable时实际上会去执行Callable任务
线程池在执行Callable任务时,会将使用FutureTask将其封装成Runnable执行(具体源码我们后面再聊),因此Executor的执行方法入参只有Runnable
FutureTask相当于适配器,将Callable转换为Runnable再进行执行

Executor 定义线程池,而它的重要实现是ThreadPoolExecutor
在ThreadPoolExecutor的基础上,还有个做定时的线程池ScheduledThreadPoolExecutor

ThreadPoolExecutor
核心参数
ThreadPoolExecutor主要有七个重要的参数- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
复制代码
- corePoolSize 线程池核心线程数量
- maximumPoolSize 线程池允许创建的最大线程数
- keepAliveTime 超时时间,TimeUnit时间单位:非核心线程空闲后存活的时间
- workQueue 存放等待执行任务的阻塞队列
- threadFactory线程工厂:规定如何创建线程,可以根据业务不同规定 不同的线程组名称
- RejectedExecutionHandler 拒绝策略:当线程不够用,并且阻塞队列爆满时如何拒绝任务的策略
拒绝策略作用AbortPolicy 默认抛出异常CallerRunsPolicy调用线程来执行任务DiscardPolicy不处理,丢弃DiscardOldestPolicy丢弃队列中最近一个任务,并立即执行当前任务线程池中除了构造时的核心参数外,还使用内部类Worker来封装线程和任务,并使用HashSet容器workes工作队列存储工作线程worker
实现原理
流程图
为了清晰的理解线程池实现原理,我们先用流程图和总结概述原理,最后来看源码实现

- 如果工作线程数量小于核心线程数量,创建线程、加入工作队列、执行任务
- 如果工作线程数量大于等于核心线程数量并且线程池还在运行则尝试将任务加入阻塞队列
- 如果任务加入阻塞队列失败(说明阻塞队列已满),并且工作线程小于最大线程数,则创建线程执行
- 如果阻塞队列已满、并且工作线程数量达到最大线程数量则执行拒绝策略
execute
线程池有两种提交方式execute和submit,其中submit会封装成RunnableFuture最终都来执行execute- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
复制代码 execute中实现线程池的整个运行流程- public void execute(Runnable command) {
- //任务为空直接抛出空指针异常
- if (command == null)
- throw new NullPointerException();
- //ctl是一个整型原子状态,包含workerCount工作线程数量 和 runState是否运行两个状态
- int c = ctl.get();
- //1.如果工作线程数 小于 核心线程数 addWorker创建工作线程
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
-
- // 2.工作线程数 大于等于 核心线程数时
- // 如果 正在运行 尝试将 任务加入队列
- if (isRunning(c) && workQueue.offer(command)) {
- //任务加入队列成功 检查是否运行
- int recheck = ctl.get();
- //不在运行 并且 删除任务成功 执行拒绝策略 否则查看工作线程为0就创建线程
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 3.任务加入队列失败,尝试去创建非核心线程,成功则结束
- else if (!addWorker(command, false))
- // 4.失败则执行拒绝策略
- reject(command);
- }
复制代码 addWorker
addWorker用于创建线程加入工作队列并执行任务
第二个参数用来判断是不是创建核心线程,当创建核心线程时为true,创建非核心线程时为false- private boolean addWorker(Runnable firstTask, boolean core) {
- //方便跳出双层循环
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
-
- // Check if queue empty only if necessary.
- // 检查状态
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
-
- for (;;) {
- int wc = workerCountOf(c);
- //工作线程数已满 返回false
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- //CAS自增工作线程数量 成功跳出双重循环
- if (compareAndIncrementWorkerCount(c))
- break retry;
- //CAS失败 重新读取状态 内循环
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
-
- //来到这里说明已经自增工作线程数量 准备创建线程
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- //创建worker 通过线程工厂创建线程
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- //全局锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int rs = runStateOf(ctl.get());
-
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- //添加线程
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- //标记线程添加完
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- //执行线程
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
复制代码 addWorker中会CAS自增工作线程数量,创建线程再加锁,将线程加入工作队列workes(hashset),解锁后开启该线程去执行任务
runWorker
worker中实现Runnable的是runWorker方法,在启动线程后会不停的执行任务,任务执行完就去获取任务执行- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- //循环执行任务 getTask获取任务
- while (task != null || (task = getTask获取任务()) != null) {
- w.lock();
- // If pool is stopping, ensure thread is interrupted;
- // if not, ensure thread is not interrupted. This
- // requires a recheck in second case to deal with
- // shutdownNow race while clearing interrupt
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- //执行前 钩子方法
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- //执行
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- //执行后钩子方法
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
复制代码 在执行前后预留两个钩子空方法,留给子类来扩展,后文处理线程池异常也会用到
配置参数
线程池中是不是越多线程就越好呢?
首先,我们要明白创建线程是有开销的,程序计数器、虚拟机栈、本地方法栈都是线程私有的空间
并且线程在申请空间时,是通过CAS申请年轻代的Eden区中一块内存(因为可能存在多线程同时申请所以要CAS)
线程太多可能导致Eden空间被使用太多导致young gc,并且线程上下文切换也需要开销
因此,线程池中线程不是越多越好,行业内分为两种大概方案
针对CPU密集型,线程池设置最大线程数量为CPU核心数量+1,避免上下文切换,提高吞吐量,多留一个线程兜底
针对IO密集型,线程池设置最大线程数量为2倍CPU核心数量,由于IO需要等待,为了避免CPU空闲就多一些线程
具体业务场景需要具体分析,然后加上大量测试才能得到最合理的配置
Executor框架通过静态工厂方法提供几种线程池,比如:Executors.newSingleThreadExecutor()、Executors.newFixedThreadPool()、Executors.newCachedThreadPool()
但由于业务场景的不同,最好还是自定义线程池;当理解线程池参数和实现原理后,查看它们的源码并不难,我们不过多叙述
处理异常
线程池中如果出现异常会怎么样?
Runnable
当我们使用Runnable任务时,出现异常会直接抛出- threadPool.execute(() -> {
- int i = 1;
- int j = 0;
- System.out.println(i / j);
- });
复制代码 面对这种情况,我们可以在Runnable任务中使用try-catch进行捕获- threadPool.execute(() -> {
- try {
- int i = 1;
- int j = 0;
- System.out.println(i / j);
- } catch (Exception e) {
- System.out.println(e);
- }
- });
复制代码 实际操作的话用日志记录哈,不要打印到控制台
Callable
当我们使用Callable任务时,使用submit方法会获取Future- Future<Integer> future = threadPool.submit(() -> {
- int i = 1;
- int j = 0;
- return i / j;
- });
复制代码 如果不使用Future.get()去获取返回值,那么异常就不会抛出,这是比较危险的
为什么会出现这样的情况呢?
前文说过执行submit时会将Callable封装成FutureTask执行
在其实现Runnable中,在执行Callable任务时,如果出现异常会封装在FutureTask中- public void run() {
- //...其他略
- try {
- //执行call任务
- result = c.call();
- ran = true;
- } catch (Throwable ex) {
- //出现异常 封装到FutureTask
- result = null;
- ran = false;
- setException(ex);
- }
- //..
- }
复制代码 等到执行get时,先阻塞、直到完成任务再来判断状态,如果状态不正常则抛出封装的异常- private V report(int s) throws ExecutionException {
- Object x = outcome;
- if (s == NORMAL)
- return (V)x;
- if (s >= CANCELLED)
- throw new CancellationException();
- throw new ExecutionException((Throwable)x);
- }
复制代码 因此在处理Callable任务时,可以对任务进行捕获也可以对get进行捕获- //捕获任务 Future f = threadPool.submit(() -> { try { int i = 1; int j = 0; return i / j; } catch (Exception e) { System.out.println(e); } finally { return null; } }); //捕获get Future<Integer> future = threadPool.submit(() -> {
- int i = 1;
- int j = 0;
- return i / j;
- }); try { Integer integer = future.get(); } catch (Exception e) { System.out.println(e); }
复制代码 afterExecutor
还记得线程池的runWorker吗?
它在循环中不停的获取阻塞队列中的任务执行,在执行前后预留钩子方法
继承ThreadPoolExecutor来重写执行后的钩子方法,记录执行完是否发生异常,如果有异常则进行日志记录,作一层兜底方案- public class MyThreadPool extends ThreadPoolExecutor {
- //...
-
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- //Throwable为空 可能是submit提交 如果runnable为future 则捕获get
- if (Objects.isNull(t) && r instanceof Future<?>) {
- try {
- Object res = ((Future<?>) r).get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- t = e;
- }
- }
-
- if (Objects.nonNull(t)) {
- System.out.println(Thread.currentThread().getName() + ": " + t.toString());
- }
- }
- }
复制代码 这样即使使用submit,忘记使用get时,异常也不会“消失”
setUncaughtException
创建线程时,可以设置未捕获异常uncaughtException方法,当线程出现异常未捕获时调用,也可以打印日志作兜底
我们定义我们自己的线程工厂,以业务组group为单位,创建线程(方便出错排查)并设置uncaughtException方法- public class MyThreadPoolFactory implements ThreadFactory {
-
- private AtomicInteger threadNumber = new AtomicInteger(1);
-
- private ThreadGroup group;
-
- private String namePrefix = "";
-
- public MyThreadPoolFactory(String group) {
- this.group = new ThreadGroup(group);
- namePrefix = group + "-thread-pool-";
- }
-
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
- t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- System.out.println(t.getName() + ":" + e);
- }
- });
-
- if (t.isDaemon()) {
- t.setDaemon(false);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- }
-
- }
复制代码 关闭线程池
关闭线程池的2种方法: shutdown(),shutdownNow()
它们的原理都是: 遍历工作队列wokers中的线程,逐个中断(调用线程的interrupt方法) 无法响应中断的任务可能永远无法终止
shutdown 任务会被执行完
- 将线程池状态设置为SHUTDOWN
- 中断所有未正在执行任务的线程
shutdownNow 任务不一定会执行完
- 将线程池状态设置为STOP
- 尝试停止所有正在执行或暂停任务的线程
- 返回等待执行任务列表
通常使用shutdown,如果任务不一定要执行完可以使用shutdownNow
SecheduledThreadPoolExecutor
ScheduledThreadPoolExecutor在ThreadPoolExecutor的基础上提供定时执行的功能
它有两个定时的方法
scheduleAtFixedRate 以任务开始为周期起点,比如说一个任务执行要0.5s,每隔1s执行,相当于执行完任务过0.5s又开始执行任务
scheduledWithFixedDelay 以任务结束为周期起点,比如说一个任务执行要0.5s,每隔1s执行,相当于执行完任务过1s才开始执行任务- ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
- //scheduleAtFixedRate 固定频率执行任务 周期起点为任务开始
- scheduledThreadPoolExecutor.scheduleAtFixedRate(()->{
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("scheduleAtFixedRate 周期起点为任务开始");
- //初始延迟:1s 周期:1s
- },1,1, TimeUnit.SECONDS);
-
- //scheduledWithFixedDelay 固定延迟执行任务,周期起点为任务结束
- scheduledThreadPoolExecutor.scheduleWithFixedDelay(()->{
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("scheduledWithFixedDelay 周期起点为任务结束 ");
- //初始延迟:1s 周期:1s
- },1,1, TimeUnit.SECONDS);
复制代码 定时线程池使用延迟队列充当阻塞队列实现的
延迟队列是一个优先级队列,它排序存储定时任务,时间越小越先执行
线程获取任务时,会从延迟队列中获取定时任务,如果时间已到就执行
[code] public RunnableScheduledFuture take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture first = queue[0]; //没有定时任务 等待 if (first == null) available.await(); else { //获取延迟时间 long delay = first.getDelay(NANOSECONDS); //小于等于0 说明超时,拿出来执行 if (delay |