频繁创建新线程的缺点?
不受控风险
系统资源有限,每个人针对不同业务都可以手动创建线程,而且创建标准不一样(比如线程没著名字)。当系统运行起来,全部线程都在疯狂抢占资源,毫无规则,欠好管控。
另外,过多的线程自然也会引起上下文切换的开销。
频繁创建开销大
new Thread() 在操纵系统层面并没有创建新的线程;
真正转换为操纵系统层面创建一个线程,还要调用操纵系统内核的API,然后操纵系统要为该线程分配一系列的资源。
new Object() 过程
Object obj = new Object();
- 分配一块内存 M
- 在内存 M 上初始化该对象
- 将内存 M 的地址赋值给引用变量 obj
创建线程过程
- JVM为一个线程栈分配内存,该栈为每个线程方法调用保存一个栈帧
- 每一栈帧由一个局部变量数组、返回值、操纵数堆栈和常量池组成
- 一些支持本机方法的 jvm 也会分配一个本机堆栈
- 每个线程获得一个程序计数器,告诉它当前处理器实行的指令是什么
- 系统创建一个与Java线程对应的本机线程
- 将与线程相关的描述符添加到JVM内部数据结构中
- 线程共享堆和方法区域
使用线程池的长处
- 降低资源斲丧。通过重复利用已创建的线程降低线程创建和销毁造成的斲丧。
- 提高响应速度。当使命到达时,可以不必要比及线程创建就能立即实行。提高线程的可管理性。
- 统一管理线程,避免系统创建大量同类线程而导致斲丧完内存。
而且在线程池在提交使命前,可以提前创建线程,ThreadPoolExecutor 提供了两个方法帮助我们在提交使命之前,完成核心线程的创建,从而实现线程池预热的效果,以便在应用程序开始处理请求时立纵然用这些线程:
- prestartCoreThread():启动一个线程,等候使命,假如已达到核心线程数,这个方法返回 false,否则返回 true;
- prestartAllCoreThreads():启动全部的核心线程,并返回启动乐成的核心线程数。
线程池原理
JDK线程池参数
ThreadPoolExecutor 的通用构造函数:- public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
复制代码
- corePoolSize:当有新使命时,假如线程池中线程数没有达到核心线程池的大小corePoolSize,则会创建新的线程实行使命,否则将使命放入阻塞队列。当线程池中存活的线程数总是大于 corePoolSize 时,应该考虑调大 corePoolSize。
- maximumPoolSize:当阻塞队列填满时,假如线程池中线程数没有凌驾最大线程数maximumPoolSize,则会创建新的线程运行使命。假如线程池中线程数已经达到最大线程数maximumPoolSiz,则会根据拒绝计谋处理新使命。非核心线程类似于临时借来的资源,这些线程在空闲时间凌驾 keepAliveTime 之后,就应该退出,避免资源浪费。
- BlockingQueue:阻塞队列,存储等候运行的使命。
- keepAliveTime:非核心线程空闲后,保持存活的时间,此参数只对非核心线程有效。设置为0,表示多余的空闲线程会被立即终止。
- TimeUnit:keepAliveTime的时间单元TimeUnit.DAYS
- TimeUnit.HOURS
- TimeUnit.MINUTES
- TimeUnit.SECONDS
- TimeUnit.MILLISECONDS
- TimeUnit.MICROSECONDS
- TimeUnit.NANOSECONDS
- ThreadFactory:每当线程池创建一个新的线程时,都是通过线程工厂方法来完成的。在 ThreadFactory 中只界说了一个方法 newThread,每当线程池必要创建新线程就会调用它。
- public class MyThreadFactory implements ThreadFactory {
- private final String poolName;
-
- public MyThreadFactory(String poolName) {
- this.poolName = poolName;
- }
-
- public Thread newThread(Runnable runnable) {
- return new MyAppThread(runnable, poolName);//将线程池名字传递给构造函数,用于区分不同线程池的线程
- }
- }
复制代码
- RejectedExecutionHandler:当队列和线程池都满了的时间,根据拒绝计谋处理新使命。
- AbortPolicy:默认的计谋,直接抛出RejectedExecutionException。假如是比较关键的业务,推荐使用此拒绝计谋,这样子在系统不能承载更大的并发量的时间,能够实时的通过非常发现。
- DiscardPolicy:不处理,直接丢弃。建议是一些无关紧要的业务接纳此计谋。
- DiscardOldestPolicy:将等候队列队首的使命丢弃,并实行当前使命。得根据实际业务是否允许丢弃老使命来认真衡量。
- CallerRunsPolicy:由调用线程处理该使命CallerRunsPolicy:由调用线程处理该使命
线程池的核心组成
一个完备的线程池,应该包含以下几个核心部分:
- 使命提交:提供接口吸收使命的提交;
- 使命管理:选择合适的队列对提交的使命进行管理,包括对拒绝计谋的设置;
- 使命实行:由工作线程来实行提交的使命;
- 线程池管理:包括基本参数设置、使命监控、工作线程管理等
线程池实行具体过程
- 当线程池里存活的线程数小于核心线程数corePoolSize时,这时对于一个新提交的使命,线程池会创建一个线程去处理使命。当线程池内里存活的线程数小于等于核心线程数corePoolSize时,线程池内里的线程会一直存活着,就算空闲时间凌驾了keepAliveTime,线程也不会被销毁,而是一直阻塞在那里一直等候使命队列的使命来实行。因为keepAliveTime只对非核心线程有效。
- 当线程池内里存活的线程数已经等于corePoolSize了,这是对于一个新提交的使命,会被放进使命队列workQueue排队等候实行。
- 当线程池内里存活的线程数已经等于corePoolSize了,而且使命队列也满了,假设maximumPoolSize>corePoolSize,这时假如再来新的使命,线程池就会继承创建新的线程来处理新的使命,直到线程数达到maximumPoolSize,就不会再创建了。
- 假如当前的线程数达到了maximumPoolSize,而且使命队列也满了,假如还有新的使命过来,那就直接接纳拒绝计谋进行处理。默认的拒绝计谋是抛出一个RejectedExecutionException非常。
线程池生命周期
线程池状态状态释义RUNNING线程池被创建后的初始状态,能接受新提交的使命,而且也能处理阻塞队列中的使命SHUTDOWN关闭状态,不再接受新提交的使命,但仍可以继承处理已进入阻塞队列中的使命STOP会中断正在处理使命的线程,不能再接受新使命,也不继承处理队列中的使命TIDYING全部的使命都已终止,workerCount(有效工作线程数)为0TERMINATED线程池彻底终止运行Tips:千万不要把线程池的状态和线程的状态弄混了。补一张网上的线程状态图
Tips:当线程调用start(),线程在JVM中不肯定立即实行,有可能要等候操纵系统分配资源,此时为READY状态,当线程获得资源时进入RUNNING状态,才会真正开始实行。
线程池的预初始化机制
线程池的预初始化机制是指在线程池创建后,立即创建并启动肯定数目标线程,纵然这些线程临时还没有使命要实行。这样做的目标是减少在实际吸收到使命时创建线程所需的时间,从而提高响应速度。ThreadPoolExecutor提供了预初始化线程的功能。
预初始化方法(prestartCoreThread / prestartAllCoreThreads): ThreadPoolExecutor提供了两个方法来预初始化线程:
- prestartCoreThread():预初始化一个核心线程。假如核心线程数已经达到了设定的数目,则此方法不会有任何效果。
- public boolean prestartCoreThread() {
- return workerCountOf(ctl.get()) < corePoolSize &&
- addWorker(null, true);
- }
复制代码 - prestartAllCoreThreads():预初始化全部核心线程,即创建并启动等于核心线程数的线程
- public int prestartAllCoreThreads() {
- int n = 0;
- while (addWorker(null, true))
- ++n;
- return n;
- }
复制代码 拒绝计谋
- CallerRunsPolicy:调用实行自己的线程运行使命,也就是直接在调用execute方法的线程中运行(run)被拒绝的使命,假如实行程序已关闭,则会丢弃该使命。因此这种计谋会降低对于新使命提交速度,影响程序的整体性能。假如你的应用程序可以承受此耽误而且你要求任何一个使命请求都要被实行的话,你可以选择这个计谋。
- AbortPolicy:抛出 RejectedExecutionException来拒绝新使命的处理。
- DiscardPolicy:不处理新使命,直接丢弃掉。
- DiscardOldestPolicy:此计谋将丢弃最早的未处理的使命请求。
当没有显示指明拒绝计谋时,默认使用AbortPolicy
CallerRunsPolicy
假如不允许丢弃使命,就应该选择CallerRunsPolicy。CallerRunsPolicy 和其他的几个计谋不同,它既不会抛弃使命,也不会抛出非常,而是将使命回退给调用者,使用调用者的线程来实行使命。- public static class CallerRunsPolicy implements RejectedExecutionHandler {
- public CallerRunsPolicy() { }
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- if (!e.isShutdown()) {
- // 直接主线程执行,而不是线程池中的线程执行
- r.run();
- }
- }
- }
复制代码 存在的问题:假如走到CallerRunsPolicy的使命是个非常耗时的使命,且处理提交使命的线程是主线程,可能会导致主线程阻塞,进而导致后续使命无法实时实行,严峻的情况下很可能导致 OOM。
当然,接纳CallerRunsPolicy实在就是希望全部的使命都能够被实行,临时无法处理的使命又被保存在阻塞队列BlockingQueue中。这样的话,在内存允许的情况下,就可以增加阻塞队列BlockingQueue的大小并调整堆内存以容纳更多的使命,确保使命能够被准确实行。为了充分利用 CPU,还可以调整线程池的maximumPoolSize (最大线程数)参数,这样可以提高使命处理速度,避免累计在 BlockingQueue的使命过多导致内存用完。
但是,假如服务器资源达到可利用的极限了呢?导致主线程卡死的本质就是因为不希望任何一个使命被丢弃。换个思路,有没有办法既能包管使命不被丢弃且在服务器有余力时实时处理呢?
可以考虑使命持久化的思路,这里所谓的使命持久化,包括但不限于:
- 设计一张使命表间使命存储到 MySQL 数据库中。
- Redis缓存使命。
- 将使命提交到消息队列中。
这里以方案一为例,简单先容一下实现逻辑:
- 实现RejectedExecutionHandler接口自界说拒绝计谋,自界说拒绝计谋负责将线程池临时无法处理(此时阻塞队列已满)的使命入库(保存到 MySQL 中)。留意:线程池临时无法处理的使命会先被放在阻塞队列中,阻塞队列满了才会触发拒绝计谋。
- 继承BlockingQueue实现一个混合式阻塞队列,该队列包含JDK自带的ArrayBlockingQueue。另外,该混合式阻塞队列必要修改取使命处理的逻辑,也就是重写take()方法,取使命时优先从数据库中读取最早的使命,数据库中无使命时再从 ArrayBlockingQueue中去取使命。
也就是说,一旦线程池中线程达到满载时,就可以通过拒绝计谋将最新使命持久化到 MySQL 数据库中,比及线程池有了有余力处理全部使命时,让其优先处理数据库中的使命以避免"饥饿"问题。
当然,对于这个问题,也可以参考其他主流框架的做法:
- 以 Netty 为例,它的拒绝计谋则是直接创建一个线程池以外的线程处理这些使命,为了包管使命的实时处理,这种做法可能必要良好的硬件设备且临时创建的线程无法做到准确的监控:
- private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
- NewThreadRunsPolicy() {
- super();
- }
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- try {
- //创建一个临时线程处理任务
- final Thread t = new Thread(r, "Temporary task executor");
- t.start();
- } catch (Throwable e) {
- throw new RejectedExecutionException(
- "Failed to start a new thread", e);
- }
- }
- }
复制代码
- ActiveMQ 则是尝试在指定的时效内尽可能的争取将使命入队,以包管最大交付:
- new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
- try {
- //限时阻塞等待,实现尽可能交付
- executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
- }
- throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
- }
- });
复制代码 使命实行机制
- 通过实行execute方法 该方法无返回值,为ThreadPoolExecutor自带方法,传入Runnable类型对象
- 通过实行submit方法 该方法返回值为Future对象,为抽象类AbstractExecutorService的方法,被ThreadPoolExecutor继承,其内部实现也是调用了接口类Executor的execute方法,通过上面的类图可以看到,该方法的实现依然是ThreadPoolExecutor的execute方法
execute()实行流程图
execute()源码
- // 使用原子操作类AtomicInteger的ctl变量,前3位记录线程池的状态,后29位记录线程数
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // Integer的范围为[-2^31,2^31 -1], Integer.SIZE-3 =32-3= 29,用来辅助左移位运算
- private static final int COUNT_BITS = Integer.SIZE - 3;
- // 高三位用来存储线程池运行状态,其余位数表示线程池的容量
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
- // 线程池状态以常量值被存储在高三位中
- private static final int RUNNING = -1 << COUNT_BITS; // 线程池接受新任务并会处理阻塞队列中的任务
- private static final int SHUTDOWN = 0 << COUNT_BITS; // 线程池不接受新任务,但会处理阻塞队列中的任务
- private static final int STOP = 1 << COUNT_BITS; // 线程池不接受新的任务且不会处理阻塞队列中的任务,并且会中断正在执行的任务
- private static final int TIDYING = 2 << COUNT_BITS; // 所有任务都执行完成,且工作线程数为0,将调用terminated方法
- private static final int TERMINATED = 3 << COUNT_BITS; // 最终状态,为执行terminated()方法后的状态
- // ctl变量的封箱拆箱相关的方法
- private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取线程池运行状态
- private static int workerCountOf(int c) { return c & CAPACITY; } // 获取线程池运行线程数
- private static int ctlOf(int rs, int wc) { return rs | wc; } // 获取ctl对象
复制代码- public void execute(Runnable command) {
- if (command == null) // 任务为空,抛出NPE
- throw new NullPointerException();
-
- int c = ctl.get(); // 获取当前工作线程数和线程池运行状态(共32位,前3位为运行状态,后29位为运行线程数)
- if (workerCountOf(c) < corePoolSize) { // 如果当前工作线程数小于核心线程数
- if (addWorker(command, true)) // 在addWorker中创建核心线程并执行任务
- return;
- c = ctl.get();
- }
-
- // 核心线程数已满(工作线程数>核心线程数)才会走下面的逻辑
- if (isRunning(c) && workQueue.offer(command)) { // 如果当前线程池状态为RUNNING,并且任务成功添加到阻塞队列
- int recheck = ctl.get(); // 双重检查,因为从上次检查到进入此方法,线程池可能已成为SHUTDOWN状态
- if (! isRunning(recheck) && remove(command)) // 如果当前线程池状态不是RUNNING则从队列删除任务
- reject(command); // 执行拒绝策略
- else if (workerCountOf(recheck) == 0) // 当线程池中的workerCount为0时,此时workQueue中还有待执行的任务,则新增一个addWorker,消费workqueue中的任务
- addWorker(null, false);
- }
- // 阻塞队列已满才会走下面的逻辑
- else if (!addWorker(command, false)) // 尝试增加工作线程执行command
- // 如果当前线程池为SHUTDOWN状态或者线程池已饱和
- reject(command); // 执行拒绝策略
- }
复制代码- private boolean addWorker(Runnable firstTask, boolean core) {
- retry: // 循环退出标志位
- for (;;) { // 无限循环
- int c = ctl.get();
- int rs = runStateOf(c); // 线程池状态
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) // 换成更直观的条件语句
- // (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
- )
- // 返回false的条件就可以分解为:
- //(1)线程池状态为STOP,TIDYING,TERMINATED
- //(2)线程池状态为SHUTDOWN,且要执行的任务不为空
- //(3)线程池状态为SHUTDOWN,且任务队列为空
- return false;
- // cas自旋增加线程个数
- for (;;) {
- int wc = workerCountOf(c); // 当前工作线程数
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize)) // 工作线程数>=线程池容量 || 工作线程数>=(核心线程数||最大线程数)
- return false;
- if (compareAndIncrementWorkerCount(c)) // 执行cas操作,添加线程个数
- break retry; // 添加成功,退出外层循环
- // 通过cas添加失败
- c = ctl.get();
- // 线程池状态是否变化,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新cas
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- // 简单总结上面的CAS过程:
- //(1)内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas
- //(2)cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了
- //(3)如果变了,则重新进入外层循环重新获取线程池状态,否者重新进入内层循环继续进行cas
- // 走到这里说明cas成功,线程数+1,但并未被执行
- boolean workerStarted = false; // 工作线程调用start()方法标志
- boolean workerAdded = false; // 工作线程被添加标志
- Worker w = null;
- try {
- w = new Worker(firstTask); // 创建工作线程实例
- final Thread t = w.thread; // 获取工作线程持有的线程实例
- if (t != null) {
- final ReentrantLock mainLock = this.mainLock; // 使用全局可重入锁
- mainLock.lock(); // 加锁,控制并发
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int rs = runStateOf(ctl.get()); // 获取当前线程池状态
- // 线程池状态为RUNNING或者(线程池状态为SHUTDOWN并且没有新任务时)
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // 检查线程是否处于活跃状态
- throw new IllegalThreadStateException();
- workers.add(w); // 线程加入到存放工作线程的HashSet容器,workers全局唯一并被mainLock持有
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock(); // finally块中释放锁
- }
- if (workerAdded) { // 线程添加成功
- t.start(); // 调用线程的start()方法
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted) // 如果线程启动失败,则执行addWorkerFailed方法
- addWorkerFailed(w);
- }
- return workerStarted;
- }
复制代码 Worker源码
Worker是ThreadPoolExecutor类的内部类,此处只讲最紧张的构造函数和run方法- private void addWorkerFailed(Worker w) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (w != null)
- workers.remove(w); // 线程启动失败时,需将前面添加的线程删除
- decrementWorkerCount(); // ctl变量中的工作线程数-1
- tryTerminate(); // 尝试将线程池转变成TERMINATE状态
- } finally {
- mainLock.unlock();
- }
- }
复制代码 Worker实现了Runable接口,在调用start()方法候,实际实行的是run方法Worker实现了Runable接口,在调用start()方法候,实际实行的是run方法- final void tryTerminate() {
- for (;;) {
- int c = ctl.get();
- // 以下情况不会进入TERMINATED状态:
- //(1)当前线程池为RUNNING状态
- //(2)在TIDYING及以上状态
- //(3)SHUTDOWN状态并且工作队列不为空
- //(4)当前活跃线程数不等于0
- if (isRunning(c) ||
- runStateAtLeast(c, TIDYING) ||
- (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
- return;
- if (workerCountOf(c) != 0) { // 工作线程数!=0
- interruptIdleWorkers(ONLY_ONE); // 中断一个正在等待任务的线程
- return;
- }
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 通过CAS自旋判断直到当前线程池运行状态为TIDYING并且活跃线程数为0
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- terminated(); // 调用线程terminated()
- } finally {
- ctl.set(ctlOf(TERMINATED, 0)); // 设置线程池状态为TERMINATED,工作线程数为0
- termination.signalAll(); // 通过调用Condition接口的signalAll()唤醒所有等待的线程
- }
- return;
- }
- } finally {
- mainLock.unlock();
- }
- // else retry on failed CAS
- }
- }
复制代码 从使命队列中取出一个使命
- private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
- // 该worker正在运行的线程
- final Thread thread;
-
- // 将要运行的初始任务
- Runnable firstTask;
-
- // 每个线程的任务计数器
- volatile long completedTasks;
- // 构造方法
- Worker(Runnable firstTask) {
- setState(-1); // 调用runWorker()前禁止中断
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this); // 通过ThreadFactory创建一个线程
- }
- // 实现了Runnable接口的run方法
- public void run() {
- runWorker(this);
- }
-
- ... // 此处省略了其他方法
- }
复制代码 总结一下哪些情况getTask()会返回null:
- 线程池状态为SHUTDOWN且使命队列为空
- 线程池状态为STOP、TIDYING、TERMINATED
- 线程池线程数大于最大线程数
- 线程可以被超时回收的情况下等候新使命超时
工作线程退出
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask; // 获取工作线程中用来执行任务的线程实例
- w.firstTask = null;
- w.unlock(); // status设置为0,允许中断
- boolean completedAbruptly = true; // 线程意外终止标志
- try {
- // 如果当前任务不为空,则直接执行;否则调用getTask()从任务队列中取出一个任务执行
- while (task != null || (task = getTask()) != null) {
- w.lock(); // 加锁,保证下方临界区代码的线程安全
- // 如果状态值大于等于STOP且当前线程还没有被中断,则主动中断线程
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt(); // 中断当前线程
- try {
- beforeExecute(wt, task); // 任务执行前的回调,空实现,可以在子类中自定义
- Throwable thrown = null;
- try {
- task.run(); // 执行线程的run方法
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- afterExecute(task, thrown); // 任务执行后的回调,空实现,可以在子类中自定义
- }
- } finally {
- task = null; // 将循环变量task设置为null,表示已处理完成
- w.completedTasks++; // 当前已完成的任务数+1
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
复制代码 submit源码
提交使命到线程池有两种方法,一种是execute,另一种是submit。区别是execute没有返回值,submit是有返回值的,假如有非常抛出,submit同样可以获取非常结果。- private Runnable getTask() {
- boolean timedOut = false; // 通过timeOut变量表示线程是否空闲时间超时了
- // 无限循环
- for (;;) {
- int c = ctl.get(); // 线程池信息
- int rs = runStateOf(c); // 线程池当前状态
- // 如果线程池状态>=SHUTDOWN并且工作队列为空 或 线程池状态>=STOP,则返回null,让当前worker被销毁
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount(); // 工作线程数-1
- return null;
- }
- int wc = workerCountOf(c); // 获取当前线程池的工作线程数
- // 当前线程是否允许超时销毁的标志
- // 允许超时销毁:当线程池允许核心线程超时 或 工作线程数>核心线程数
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- // 如果(当前线程数大于最大线程数 或 (允许超时销毁 且 当前发生了空闲时间超时))
- // 且(当前线程数大于1 或 阻塞队列为空)
- // 则减少worker计数并返回null
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- // 根据线程是否允许超时判断用poll还是take(会阻塞)方法从任务队列头部取出一个任务
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();//线程池重用逻辑:没有任务了就阻塞在这里,等待新的任务
- if (r != null)
- return r; // 返回从队列中取出的任务
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
复制代码 submit中调用了newTaskFor方法来返回一个ftask对象,然后execute这个ftask对象,newTaskFor代码如下:- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- // 如果completedAbruptly为true则表示任务执行过程中抛出了未处理的异常
- // 所以还没有正确地减少worker计数,这里需要减少一次worker计数
- if (completedAbruptly)
- decrementWorkerCount();
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 把将被销毁的线程已完成的任务数累加到线程池的完成任务总数上
- completedTaskCount += w.completedTasks;
- workers.remove(w); // 从工作线程集合中移除该工作线程
- } finally {
- mainLock.unlock();
- }
- // 尝试结束线程池
- tryTerminate();
- int c = ctl.get();
- // 如果是RUNNING 或 SHUTDOWN状态
- if (runStateLessThan(c, STOP)) {
- // worker是正常执行完
- if (!completedAbruptly) {
- // 如果允许核心线程超时则最小线程数是0,否则最小线程数等于核心线程数
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- // 如果阻塞队列非空,则至少要有一个线程继续执行剩下的任务
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- // 如果当前线程数已经满足最小线程数要求,则不需要再创建替代线程
- if (workerCountOf(c) >= min)
- return; // replacement not needed
- }
- // 重新创建一个worker来代替被销毁的线程
- addWorker(null, false);
- }
- }
复制代码 newTaskFor又调用FutureTask的有参构造器来创建一个futureTask实例,代码如下:- // AbstractExecutorService.submit
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- execute(ftask);
- return ftask;
- }
复制代码 这个有参构造器中又调用了Executors的静态方法callable创建一个callable实例来赋值给futureTask的callable属性,代码如下:- // AbstractExecutorService.newTaskFor
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return new FutureTask<T>(runnable, value);
- }
复制代码 末了还是使用了RunnableAdapter来包装这个task,代码如下:- // FutureTask有参构造器
- public FutureTask(Runnable runnable, V result) {
- this.callable = Executors.callable(runnable, result);
- this.state = NEW; // ensure visibility of callable
- }
复制代码 梳理一下整个流程,run和call的关系的伪代码如下- // Executors.callable
- public static <T> Callable<T> callable(Runnable task, T result) {
- if (task == null)
- throw new NullPointerException();
- return new RunnableAdapter<T>(task, result);
- }
复制代码 为什么要这么麻烦封装一层又一层呢?
可能是为了适配。submit的返回值是futureTask,但是传给submit的是个runnable,然后submit会把这个runnable继承传给futureTask,futureTask的结果值是null,但是又由于futureTask的run方法已经被重写成实行call方法了,所以只能在call方法内里跑真正的run方法了
线程池提交一个使命占多大内存
提交使命到线程池有两种方法,一种是execute,一种是submit
那么这两种提交方式占用的内存是一样大的吗?一个空使命究竟占多少内存?
execute提交
使用execute提交一个使命,这个使命究竟多大呢?
使用得最多的就是使用lambda表达式来提交使命- // Executors.RunnableAdapter类
- static final class RunnableAdapter<T> implements Callable<T> {
- final Runnable task;
- final T result;
- RunnableAdapter(Runnable task, T result) {
- this.task = task;
- this.result = result;
- }
- public T call() {
- task.run();
- return result;
- }
- }
复制代码 那么这个lambda实例占用多少个字节呢?16字节;在开了指针压缩的情况下,对象头占12个字节,4个字节用于填充补齐到8的整数倍,由于这个lambda实例中没有其他成员变量了,所以它就是占据16个字节
除此之外,假如使用的是LinkedBlockingQueue阻塞队列来存放使命,那么还涉及到LinkedBlockingQueue中的Node,LinkedBlockingQueue会使用这个Node来封装使命- // submit
- run(){
- // RunnableAdapter.call
- call(){
- // task.run
- run(){
- // 实际的任务
- }
- }
- }
复制代码 这个Node占多少字节呢?24个字节;同样对象头占12字节,item是一个4字节的引用,next也是一个4字节的引用,一共20字节,4个字节用于填充对齐,所以一个node对象是24字节。
所以在使用execute且阻塞队列是LinkedBlockingQueue时一个使命占用40个字节,假如execute 20 w个使命,会占用 800w 个字节,约7.6MB内存
堆快照如下:
假如是使用ArrayBlockingQueue的话,只有lambda实例这一个开销,所以只会使用320w个字节,约3.05MB内存,比起LinkedBlockingQueue少了一倍不止
submit提交
根据上面源码分析得知,假如是用了submit方法之后,会多出两类对象,一个是FutureTask,一个是RunnableAdapter。
FutureTask的成员变量如下:- threadPoolExecutor.execute(() -> {
- // ...
- });
复制代码 一个FutureTask对象占用的字节数是12+4+4+4+4=28个字节,还必要4个字节做填充,所以一共是32个字节。
RunnableTask的成员变量如下:- static class Node<E> {
- E item;
- /**
- * One of:
- * - the real successor Node
- * - this Node, meaning the successor is head.next
- * - null, meaning there is no successor (this is the last node)
- */
- Node<E> next;
- Node(E x) { item = x; }
- }
复制代码 一个RunnableTask对象占用的字节数是12+4+4=20个字节,同样必要4个字节做填充,所以一共是24个字节。
所以在使用submit且阻塞队列是LinkedBlockingQueue时一个使命占用96个字节
假如submit 20w个使命,会占用1920w个字节,约18.31MB内存
假如使用的是ArrayBlockingQueue会省去Node的占用的内存。
lambda中没有使用上下文中的其他变量
- /** The underlying callable; nulled out after running */
- private Callable<V> callable;
- /** The result to return or exception to throw from get() */
- private Object outcome; // non-volatile, protected by state reads/writes
- /** The thread running the callable; CASed during run() */
- private volatile Thread runner;
- /** Treiber stack of waiting threads */
- private volatile WaitNode waiters;
复制代码 假如在lambda中没有使用到上下文的其他变量时,是不会重复创建lambda实例的,只会创建一个
只会创建一个lambda实例
假如配合上ArrayBlockingQueue以及execute,提交20w个使命的空间复杂度可以降至O(1)
因为20w个使命的实例都是同一个
小结
假如是lambda中没有上下文变量,使用的队列是ArrayBlockingQueue,提交方式是execute,那么空间复杂度可以达到O(1);假如lambda中有上下文变量,每次提交使命都会创建一个新的lambda实例;
假如使用的队列是LinkedBlockingQueue,那么还要算上LinkedBlockingQueue的Node实例的开销;假如提交的方式是submit,那么还要算上FutureTask和RunnableAdapter的开销。
当然这里只是浅层地讨论了一下创建一个空使命所占用的内存大小,假如是更加复杂的使命,使命内的内存开销必要算上。
线程池中线程非常后,销毁还是复用
先说结论,必要分两种情况:
- 使用execute()提交使命:当使命通过execute()提交到线程池并在实行过程中抛出非常时,假如这个非常没有在使命内被捕获,那么该非常会导致当前线程终止,而且非常会被打印到控制台或日志文件中。线程池会检测到这种线程终止,并创建一个新线程来替换它,从而保持配置的线程数不变。
- 使用submit()提交使命:对于通过submit()提交的使命,假如在使命实行中发生非常,这个非常不会直接打印出来。相反,非常会被封装在由submit()返回的Future对象中。当调用Future.get()方法时,可以捕获到一个ExecutionException。在这种情况下,线程不会因为非常而终止,它会继承存在于线程池中,准备实行后续的使命。
简单来说:使用execute()时,未捕获非常导致线程终止,线程池创建新线程替代;使用submit()时,非常被封装在Future中,线程继承复用。
这种设计允许submit()提供更灵活的错误处理机制,因为它允许调用者决定如那边理非常,而execute()则实用于那些不必要关注实行结果的场景。
execute()提交
查看execute方法的实行逻辑
可以发现,假如抛出非常,execute()提交的方式会移除抛出非常的线程,创建新的线程。
submit()提交
可以发现,submit也是调用了execute方法,但是在调用之前,包装了一层 RunnableFuture,那肯定是在RunnableFuture的实现 FutureTask中有特殊处理了,我们查看源码可以发现。
但是,
也就是说,通过java.util.concurrent.FutureTask#get(),就可以获取对应的非常信息。
线程池是如何保活和回收的
线程池的作用就是提高线程的利用率,必要线程时,可以直接从线程池中获取线程直接使用,而不用创建线程,那线程池中的线程,在没有使命实行时,是如何保活的呢?
在runWorker方法里,线程会循环getTask()获取阻塞队列中的使命。
不停地的从阻塞队列中获取使命,主要调用的是workQueue.poll()方法或take(), 这两个方法都会阻塞式的从队列中获取元素,区别是poll()方法可以设置一个超时时间, take()不能设置超时时间,所以这也间接的使得线程池中的线程阻塞等候从而达到保活的效果。
当然并不是线程池中的全部线程都必要一直保活,比如只有核心线程必要保活,非核心线程就不必要保活,那非核心线程是怎么回收的呢?
底层是这样的,当一个线程处理完当前使命后,就会开始去阻塞队列中获取使命,只不过,在调用poll或take方法之前, 会判定当前线程池中有多少个线程,假如多余核心线程数(也就是wc > corePlloSize),那么timed为true,此时当前线程就会调用poll()并设置超时时间来获取阻塞队列中的使命,这样一旦时间到了还没有获取到使命,那么poll方法获取到的r就是null,返回给上一级,runWorker()里的getTask方法就获取到null了,此时while循环就会退出。那么就会调用processWorkerExit()方法,remove当前线程
这里实在可以看到timed还有一个参数,allowCoreThreadTimeOut,这个主要是用来控制核心线程是否可以回收,默认是false,上面是讨论默认值false的情况,即核心线程不会超时。假如为true,工作线程可以全部销毁
实际上,固然有核心线程数,但线程并没有区分是核心还是非核心,并不是先创建的就是核心,凌驾核心线程数后创建的就是非核心,最终保留哪些线程,完全随机。
线程池的关闭
shutdown方法会将线程池的状态设置为SHUTDOWN,线程池进入这个状态后,就拒绝再接受使命,然后会将剩余的使命全部实行完- final Runnable task;
- final T result;
复制代码 shutdownNow做的比较绝,它先将线程池状态设置为STOP,然后拒绝全部提交的使命。末了中断左右正在运行中的worker,然后清空使命队列。- public static void main(String[] args) {
- ThreadPoolExecutor threadPoolExecutor =
- new ThreadPoolExecutor(8, 8, 15, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());
- for (int i = 0; i < (int) 2e5; i++) {
- // int finalI = i;
- threadPoolExecutor.submit(() -> {
- // 乱写...
- // int p = finalI;
- LockSupport.park();
- });
- }
- LockSupport.park();
- }
复制代码 池化带来的问题
- 线程污染:假如线程池中的线程被用于实行不同类型的使命,而这些使命之间存在状态共享或依靠关系,可能会导致线程状态被污染,进而影响使命的正确实行。
- 内存泄漏:假如池化资源(如对象池中的对象)没有被正确地回收或重置,可能会导致内存泄漏。
当然,解决方法就是确保每次使用池化资源后,资源状态被正确重置,避免污染;正确回收,避免泄露。而且通过监控池化资源的使用情况,实时调优配置,以适应不同的负载和需求。
线程池大小怎么设置?
首先应该明确线程池大小设置的目标是什么?实在就是为了提高 CPU 的利用率
CPU利用率=CPU有效工作时间/CPU总的运行时间
假如线程池线程数目太小,当有大量请求必要处理,必要创建非核心线程池处理,导致系统响应比较慢,会影响用户体验,甚至会出现使命队列大量堆积使命导致OOM。
假如核心线程池数目太大,会导致其一直阻塞在那里等候使命队列的使命来实行,斲丧内存。而且大量线程可能会同时抢占 CPU 资源,这样会导致大量的上下文切换,从而增加线程的实行时间,影响了实行效率。
根据线程数设置依据
最大线程数:原则上就是性能最高线程数,因为此时性能已经是最高,再设置比他大的线程数反而性能变低。极度情况下才会使用到最大线程数,正常情况下不应频繁出现凌驾核心线程数的创建。
核心线程数:基于性能考虑,及其他业务处理的最优效率考虑,估算平时的流量必要的线程数,设置核心线程数
阻塞队列:估算最大流量,设置阻塞队列长度
留意:必要通过压力测试来进行微调,只有经过压测的检验,才气最终包管的配置大小是准确的。
一般情况设置依据
一般用来计算核心线程数
CPU 密集型使命(N+1):
这种使命斲丧的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,多出来的一个线程是为了防止某些原因导致的线程阻塞(如IO操纵,线程sleep,等候锁)而带来的影响。一旦某个线程被阻塞,开释了CPU资源,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型使命(2N):
系统的大部分时间都在处理 IO 操纵,此时线程可能会被阻塞,开释CPU资源,这时就可以将 CPU 交出给其它线程使用。因此在 IO 密集型使命的应用中,可以多配置一些线程,具体的计算方法:最佳线程数 = CPU核心数 * (1/CPU利用率) = CPU核心数 * (1 + (IO耗时/CPU耗时)),一般可设置为2N。
ExecutorService 线程池实例
但是阿里为什么不推荐使用Executors来创建线程池,这是为了让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
FixedThreadPool:
固定线程数的线程池。任何时间点,最多只有 nThreads 个线程处于活动状态实行使命。- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- //检查是否可以关闭线程
- checkShutdownAccess();
- //设置线程池状态
- advanceRunState(SHUTDOWN);
- //尝试中断worker
- interruptIdleWorkers();
- //预留方法,留给子类实现
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- }
- private void interruptIdleWorkers() {
- interruptIdleWorkers(false);
- }
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- //遍历所有的worker
- for (Worker w : workers) {
- Thread t = w.thread;
- //先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它
- //注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能
- //它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
复制代码 为什么不建议使用?
使用无界队列 LinkedBlockingQueue(队列容量为 Integer.MAX_VALUE),运行中的线程池不会拒绝使命,即不会调用RejectedExecutionHandler.rejectedExecution()方法。maxThreadPoolSize 是无效参数,故将它的值设置为与 coreThreadPoolSize 同等。当添加使命的速度大于线程池处理使命的速度,可能会在队列堆积大量的请求,斲丧很大的内存,甚至导致OOM。
keepAliveTime 也是无效参数,设置为0L,因为此线程池里全部线程都是核心线程,核心线程不会被回收(除非设置了executor.allowCoreThreadTimeOut(true))。
实用场景:实用于处理CPU密集型的使命,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即实用实行长期的使命。必要留意的是,FixedThreadPool 不会拒绝使命,在使命比较多的时间会导致 OOM。
SingleThreadExecutor
只有一个线程的线程池。- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();
- //检测权限
- advanceRunState(STOP);
- //中断所有的worker
- interruptWorkers();
- //清空任务队列
- tasks = drainQueue();
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- return tasks;
- }
- private void interruptWorkers() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- //遍历所有worker,然后调用中断方法
- for (Worker w : workers)
- w.interruptIfStarted();
- } finally {
- mainLock.unlock();
- }
- }
复制代码 为什么不建议使用?
使用无界队列 LinkedBlockingQueue。线程池只有一个运行的线程,新来的使命放入工作队列,线程处理完使命就循环从队列里获取使命实行。包管顺序的实行各个使命。
实用场景:实用于串行实行使命的场景,一个使命一个使命地实行。在使命比较多的时间也是会导致 OOM。
CachedThreadPool
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
- }
复制代码 为什么不建议使用?
core是0,最大线程数是Integer.MAX_VALUE,因此当添加使命的速度大于线程池处理使命的速度,可能会创建大量的线程,极度情况下,这样会导致耗尽 cpu 和内存资源,甚至导致OOM。
使用没有容量的SynchronousQueue作为线程池工作队列,当线程池有空闲线程时,SynchronousQueue.offer(Runnable task)提交的使命会被空闲线程处理,否则会创建新的线程处理使命。
实用场景:用于并发实行大量短期的小使命。CachedThreadPool允许创建的线程数目为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
ScheduledThreadPoolExecutor
- public static ExecutionService newSingleThreadExecutor() {
- return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
- }
复制代码 在给定的耽误后运行使命,或者定期实行使命。
为什么不建议使用?
最大线程数是Integer.MAX_VALUE,因此当添加使命的速度大于线程池处理使命的速度,可能会创建大量的线程,极度情况下,这样会导致耗尽 cpu 和内存资源,甚至导致OOM。
关于作者
来自一线程序员Seven的探索与实践,持续学习迭代中~
本文已收录于我的个人博客:https://www.seven97.top
公众号:seven97,欢迎关注~
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |