线程池 · 语雀 (yuque.com)
为什么要用线程池
在 HotSpot VM 的线程模型中,Java 线程被一对一映射为内核线程。
Java 在使用线程执行程序时,需要调用操作系统内核的 API,创建一个内核线程,操作系统要为线程分配一系列的资源;当该 Java 线程被终止时,这个内核线程也会被回收。因此 Java 线程的创建与销毁的成本很高,从而增加系统的性能开销。
除此之外,无限制地创建线同样会给系统带来性能问题。因为 CPU 核数是有限的,大量的线程上下文切换会增加系统的性能开销。同时无限制地创建线程还可能导致 OOM。
为了解决上述两类问题,于是引入了线程池概念。
对于第一类问题,频繁创建与销毁线程:线程池复用线程,提高线程利用率,避免频繁的创建与销毁线程。
对于第二类问题,大量创建线程:线程池限制线程创建的最大数量,防止无限制地创建线程。
线程池提供了一种方式来管理线程和消费,维护基本数据统计等工作,比如统计已完成的任务数;
介绍线程池框架 Executor
Java 提供了一套线程池框架 Executor。
这个框架包括了 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 两个核心线程池。
- ThreadPoolExecutor 是用来执行被提交的任务
- ScheduledThreadPoolExecutor 是用来执行定时任务。
还有一个 ForkJoinPool 则是为 ForkJoinTask 定制的线程池,与通常意义的线程池有所不同。
除此之外,Executors 类为我们提供了各种方便的静态工厂方法来简化线程池的创建。- public class ScheduledThreadPoolExecutor
- extends ThreadPoolExecutor
- implements ScheduledExecutorService { }
复制代码 从类的定义我们可以看到,ScheduledThreadPoolExecutor 类继承自 ThreadPoolExecutor 类,因此下面我们就重点看看 ThreadPoolExecutor 类是如何实现线程池的。
ThreadPoolExecutor 的「构造参数」和「工作行为」
ThreadPoolExecutor 的构造函数非常复杂,最完备的构造函数有 7 个参数,如下面代码所示。- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
复制代码 下面我们一一介绍这些参数的意义。(参考了 ThreadPoolExecutor 的 javadoc )
线程数
corePoolSize:核心线程数
maximumPoolSize:最大线程数
线程池会根据 corePoolSize 和 maximumPoolSize 这两个参数的值,自动调整线程池大小。
当我们向线程池中提交任务时:
- 如果当前有少于 corePoolSize 个线程正在运行,那么将创建一个新的线程来处理请求,即使其他工作线程处于空闲状态(也就是说,前面说的正在运行的线程是指,所有已经创建的线程,包括处于空闲状态的线程)
- 如果当前有大于等于 corePoolSize 个线程正在运行,则尝试把任务加到任务队列中
- 如果任务队列未满,则加入成功,排队等待线程处理
- 如果任务队列已满,并且当前有不超过 maximumPoolSize 个线程,则创建一个新的线程来处理请求
- 如果当前有 maximumPoolSize 个线程正在运行,并且任务队列已满,那么线程池会拒绝接收任务,并按照指定的拒绝策略处理任务
通过将 corePoolSize 和 maximumPoolSize 设置为相同的值,我们可以创建固定大小的线程池。
通过将 maximumPoolSize 设置为一个本质上无界的值,例如 Integer.MAX VALUE,允许线程池容纳任意数量的线程。
通常,corePoolSize 和 maximumPoolSize 这两个参数的值只在构造 ThreadPoolExecutor 时设置,但这两个参数的值也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 动态修改。通过动态修改参数的值,我们可以做到动态配置自定义线程池,感兴趣的可以了解一下,这个视频里有讲。
在创建完线程池之后,默认情况下,线程池中没有任何线程,只有在新任务到达时线程才会被创建(new)和执行(start),但这可以通过使用 prestartCoreThread() 或 prestartAllCoreThreads() 方法来动态覆盖。如果使用非空队列构造线程池,则可能需要预启动线程。预启动线程在抢购系统中也经常被用到。
非核心线程的存活时间
keepAliveTime:线程存活的实现
TimeUnit:存活时间的单位(小时、分钟、秒、毫秒)
如果线程池当前有超过 corePoolSize 个线程,并且线程空闲的时间超过了 keepAliveTime,那么这些线程将被销毁,这样可以避免线程没有被使用时的资源浪费。
这个参数也可以使用方法 setKeepAliveTime(long,TimeUnit) 动态修改。
默认情况下,只有存在多于 corePoolSize 个线程时,才会应用 keep-alive 策略;但通过 allowCoreThreadTimeOut(boolean) 方法,将参数 allowCoreThreadTimeOut 的值设置为 true,则 keep-alive 策略也可应用于不超过 corePoolSize 个线程时。
任务队列
BlockingQueue:任务队列,用来储存等待被执行的任务
如果线程池当前有大于等于 corePoolSize 个线程正在运行,则尝试把任务加到任务队列中
- 如果任务队列未满,则加入成功,排队等待线程处理
- 如果任务队列已满,并且当前有不超过 maximumPoolSize 个线程,则创建一个新的线程来处理请求
也就是说,当线程数量达到 corePoolSize 个之后,不会立即扩容线程池,而是先把任务堆积到任务队列中,任务队列满了之后,才考虑扩容线程池,一直到线程个数达到 maximumPoolSize 为止。
这个任务队列必须必须是 BlockingQueue 类型的,也就是必须是阻塞队列。
阻塞队列其实就是在队列基础上支持了阻塞操作。
简单来说,阻塞操作就是:
- 如果队列为空,那么从队头取数据的操作会被阻塞,直到队列中有数据才能返回;
- 如果队列已满,那么从队尾插入数据的操作会被阻塞,直到队列中有空闲位置并插入数据后,才能返回。
Java 中 BlockingQueue 类型的队列也有很多,比如:(共 8 个)
- ArrayBlockingQueue:基于数组结构的有界阻塞队列
- LinkedBlockingQueue:基于链表结构的阻塞队列。可以在创建队列时指定容量;如果没有指定容量,那么其容量限制就自动被设置为 Integer.MAX_VALUE,成为了无界队列。
- SynchronousQueue:不存储元素的阻塞队列。每个移除操作必须等待另一个线程的插入操作,反之每个插入操作也都要等待另一个线程的移除操作。这个队列的容量是 0。
- PriorityBlockingQueue:具有优先级的无界阻塞队列
- DelayQueue:支持延时获取的无界阻塞队列,内部用 PriorityQueue 实现
- LinkedTransferQueue:基于链表结构的无界阻塞队列
- LinkedBlockingDeque:基于双向链表的阻塞队列,可以在创建队列时指定容量
- DelayedWorkQueue:无界阻塞队列
总结来说,BlockingQueue 类型的队列可以从以下两个维度划分:
- 底层结构:数组 or 单向链表 or 双向链表 or 优先级队列(DelayQueue 基于 PriorityQueue)
- 是否有界:有界 or 无界 or 既可以有界又可以无界
理论上两个维度中两两组合,就可以构成一种类型的 BlockingQueue。

线程工厂
ThreadFactory:线程工厂,用来创建线程
新线程是使用 ThreadFactory 创建的。
如果没有指定,则 ThreadPoolExecutor 的构造方法默认使用 Executors.defaultThreadFactory(),它将创建线程,使其全部位于同一个线程组中(ThreadGroup),并具有相同的优先级(默认都为 NORM_PRIORITY )和非守护线程状态。
通过提供不同的 ThreadFactory,我们可以更改线程的名称、线程组、优先级、是否设置为守护线程等。
如果 ThreadFactory#newThread() 方法创建线程失败返回 null ,程序将继续执行,但可能无法执行任何任务。
线程应该拥有“modifyThread”运行时权限。如果工作线程或线程池的其他线程不具备此权限,则服务可能降级:配置更改可能无法及时生效,并且关闭线程池可能处于可以终止但尚未完成的状态。- // ThreadPoolExecutor 类的成员变量
- private static final RuntimePermission shutdownPerm =
- new RuntimePermission("modifyThread");
- // 检查线程是否拥有 modifyThread 运行时权限
- // 该方法在 shutdown()、shutdownNow() 中被调用
- private void checkShutdownAccess() {
- SecurityManager security = System.getSecurityManager();
- if (security != null) {
- security.checkPermission(shutdownPerm);
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (ThreadPoolExecutor.Worker w : workers)
- security.checkAccess(w.thread);
- } finally {
- mainLock.unlock();
- }
- }
- }
复制代码 线程组
线程组的 javadoc:https://docs.oracle.com/javase/9/docs/api/java/lang/ThreadGroup.html
线程组表示一组线程,除此之外,线程组还可以包括其他线程组。
线程组形成一个树,其中除初始线程组之外的每个线程组都有一个父线程组。
允许线程访问有关其自己的线程组的信息,但不能访问有关其线程组的父线程组或任何其他线程组的信息。
线程组的作用:
- 用线程组来批量管理、控制一组线程,比如:销毁线程组及其所有子组
- 有效地对线程或线程组对象进行组织。
每个线程(Thread)必然存在于⼀个线程组(ThreadGroup)中,线程不能独立于线程组存在。
如果 new Thread 时没有显式指定所在的线程组,那么默认将父线程 (执行当前 new Thread 的线程)所在的线程组设置为自己所在的线程组。
JVM 创建的 system 线程组是线程组树结构的跟线程组。
system 线程组是用来处理 JVM 的系统任务的线程组,例如对象的销毁等。
system 线程组的直接子线程组是 main 线程组,这个线程组至少包含一个 main 线程,用于执行 main 方法。
main 线程组的子线程组就是应用程序创建的线程组。
拒绝策略
RejectedExecutionHandler:拒绝策略
如果我们把任务提交到线程池时,被线程池拒绝接收了,线程池会按照指定的拒绝策略处理任务。
线程池拒绝接收我们提交的任务的原因(时机)可能有以下两个:
- 线程池中的等待被执行的任务过多,任务队列已满,并且线程数达到 maximumPoolSize
- 线程池已经处于非 RUNNING 状态
Java 线程池框架提供了以下 4 种拒绝策略:
- AbortPolicy:直接抛出 RejectedExecutionException 异常
- CallerRunsPolicy:用调用者所在线程(提交任务的线程)来执行任务
- DiscardOldestPolicy:丢弃任务队列里最早的任务,把提交的任务加入任务队列
- DiscardPolicy:直接丢弃提交的任务。
除了使用以上 Java 线程池框架提供的拒绝策略之外,我们还可以自定义拒绝策略。
实现自定义拒绝策略的步骤:
- 定义一个拒绝策略类(xxxPolicy),实现 RejectedExecutionHandler 接口
- 实现接口里的唯一方法 void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
在实际工作中,自定义的拒绝策略往往和降级策略配合使用。例如将任务信息插入数据库或者消息引擎系统(Kafka、RocketMQ、...)等存储系统,启用一个专门用作补偿的线程池进行补偿。
所谓降级就是在服务无法正常提供功能的情况下,采取的补救措施。
具体采用何种降级手段,这要看具体场景。
线程池的生命周期
对于有生命周期的事物,要学好它,只要能搞懂生命周期中各个节点的状态转换机制就可以了。
线程池的运行状态
- // Integer.SIZE = 32
- private static final int COUNT_BITS = Integer.SIZE - 3;
- // runState 存储在数字的高阶位中
- // runState is stored in the high-order bits
- private static final int RUNNING = -1 << COUNT_BITS;
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- private static final int STOP = 1 << COUNT_BITS;
- private static final int TIDYING = 2 << COUNT_BITS;
- private static final int TERMINATED = 3 << COUNT_BITS;
复制代码 newSingleThreadScheduledExecutor() 和 newScheduledThreadPool(int corePoolSize),创建的都是以个 ScheduledExecutorService,可以执行定时或周期性的任务,区别在于单一工作线程还是多个工作线程。
WorkStealingPool
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- }
复制代码 newWorkStealingPool(int parallelism),这是一个经常被人忽略的线程池,Java 8 才加入这个创建方法,其内部会构建 ForkJoinPool,利用 Work-Stealing 算法,并行地处理任务,不保证处理顺序。
总结
FixedThreadPool 和 SingleThreadExecutor 的任务队列都是 LinkedBlockingQueue,没有数量限制,默认是 Integer.MAX_VALUE;
CachedThreadPool 和 scheduledThreadPool 中最大线程数默认 Integer.MAX_VALUE,没有限制。
当线程过多的时候,这两类线程池就都容易造成 OutOfMemoryError。所以我们在使用线程池时,最好根据实际情况自定义这些核心参数。
上面提到的几种线程池,只有 CachedThreadPool 的线程存活时间大于 0,为 60 秒,其余线程池的线程存活时间都为 0 秒。
参考资料
18 | 如何设置线程池大小? · 语雀 (yuque.com)
Java中的线程池——如何创建及使用Executors的四种线程池-极客时间 (geekbang.org)
深入浅出 Java Concurrency (30): 线程池 part 3 Executor 生命周期 - xylz,imxylz - BlogJava
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |