Java中的线程池使用及原理

打印 上一主题 下一主题

主题 895|帖子 895|积分 2685

开篇-为什么要使用线程池?

​                Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来 3 个好处。
​                第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
​                第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
​                第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。
1. 线程池的任务执行步骤

​                当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?处理流程图如图 1-1所示。

      图1-1 线程池的主要处理流程 ​        从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下。
​                1.线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
​                2.线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
​                3.线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
​                ThreadPoolExecutor 执行 execute()方法的示意图,如图 1-2 所示。

      图1-2 ThreadPoolExecutor执行示意图 ​        `ThreadPoolExecutor `执行 `execute` 方法分下面 4 种情况。

  • 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
  • 如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。
  • 如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
  • 如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。
    ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute()方法调用都是执行步骤 2,而步骤 2 不需要获取全局锁。
2. 线程池的使用

2.1 线程池的创建

通过ThreadPoolExecutor可以创建一个线程池。
  1. ThreadPoolExecutor(int corePoolSize,
  2.                    int maximumPoolSize,
  3.                    long keepAliveTime,
  4.                    TimeUnit unit,
  5.                    BlockingQueue<Runnable> workQueue,
  6.                    ThreadFactory threadFactory,
  7.                    RejectedExecutionHandler handler)
复制代码
创建一个线程池时需要输入几个参数,如下。

  • corePoolSize(线程池的基本大小,也可以称之为核心线程数大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
  • maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列,这个参数就没有什么效果了。
  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。默认情况下该参数针对的是非核心线程,如果将参数allowCoreThreadTimeOut设置为true,那么核心线程也会受这个参数影响
  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。
  • workQueue(任务队列):用于保存等待执行的任务的阻塞队列。具体可以参考Java阻塞队列。
  • threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在 JDK 1.5 中 Java 线程池框架提供了以下 4 种策略。
    • AbortPolicy:直接抛出异常。
    • CallerRunsPolicy:只用调用者所在线程来运行任务。
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • DiscardPolicy:不处理,丢弃掉。
    当然,也可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略。如记录日志或持久化存储不能处理的任务。
2.2 向线程池中提交任务

​                可以使用两个方法向线程池提交任务,分别为 execute()和 submit()方法。
​                execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。通过以下代码可知 execute()方法输入的任务是一个 Runnable 类的实例。
  1. executorService.execute(()-> Thread.currentThread().getName());
复制代码
​                submit()方法用于提交需要返回值的任务。线程池会返回一个 future 类型的对象,通过这个 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
  1. Future<String> submit = executorService.submit(() -> Thread.currentThread().getName());
  2. System.out.println(submit.get());
复制代码
2.3 关闭线程池

​                可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
​                只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。
2.4  合理地配置线程池

​                要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
​                        • 任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。
​                        • 任务的优先级:高、中和低。
​                        • 任务的执行时间:长、中和短。
​                        • 任务的依赖性:是否依赖其他系统资源,如数据库连接。
​                性质不同的任务可以用不同规模的线程池分开处理。CPU 密集型任务应配置尽可能小的线程,如配置 N+1(其中N是CPU合适)个线程的线程池。由于 IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 2*N(其中N是CPU合适)。混合型的任务,如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的 CPU 个数。
​                优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先执行。
​                执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。
​                依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,等待的时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
​                建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。有一次,我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行 SQL 变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样问题时也会影响到其他任务。
2.5 线程池的监控

​                如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用以下属性。
​                • taskCount:线程池需要执行的任务数量。
​                • completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
​                • largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
​                • getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
​                • getActiveCount:获取活动的线程数。
​                通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute 和 terminated 方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。
3.线程池的生命周期


      图1-3 线程池的生命周期 线程池的生命周期包括以下几个状态:

  • 初始状态(NEW):线程池被创建后处于初始状态。此时线程池没有包含任何线程,也没有开始执行任务。
  • 运行状态(RUNNING):通过调用线程池的 execute() 或 submit() 方法,线程池开始接受任务并创建线程执行。线程池可以动态地调整线程数量来适应任务的需求。
  • 关闭状态(SHUTDOWN):当调用线程池的 shutdown() 方法后,线程池进入关闭状态。此时线程池不会再接受新的任务提交,但会继续处理已经提交的任务直到完成。处于关闭状态的线程池仍然可以调用 execute() 方法来提交任务,但会抛出 RejectedExecutionException。
  • 停止状态(STOP):通过调用线程池的 shutdownNow() 方法可以使线程池进入停止状态。此时线程池会立即停止,取消所有正在执行的任务,并且丢弃所有等待执行的任务。
  • 整理状态(TIDYING):当线程池处于STOP状态或者SHUTDOWN后,并且所有任务都已经完成,线程池会进入整理状态。在整理状态中,线程池会清理已终止的工作线程。当线程池变为TIDYING状态时,会执行钩子函数terminated()。
  • 终止状态(TERMINATED):当线程池完成整理操作后,最终进入终止状态。此时线程池彻底终止,不再接受任务和执行任务。
注意,线程池的状态可以通过isShutdown() 和 isTerminated() 方法进行查询,以确定线程池当前所处的状态。
4.代码分析线程池的运行原理

4.1 线程池控制状态ctl

​                ctl 是线程池源码中常常用到的一个变量,它的主要作用是记录线程池的生命周期状态和当前工作的线程数。它是一个原子整型变量。ctl是一个32位的整数,高3位用于表示线程池的运行状态,低29位用于表示线程池中的线程数量。具体的结构如下所示:
  1. 31-29   |   线程池运行状态(用来保存线程池的状态 RUNNING,SHUTDOWN,STOP,STOP,STOP)
  2. 28-0    |   线程池中线程数量
复制代码
ctl在ThreadPoolExecutor中的声明和初始化
    源码:java.util.concurrent.ThreadPoolExecutor#ctl
  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
复制代码

  • ctl (线程池控制状态)是原子整型的,这意味这对它进行的操作具有原子性。
  • 如此一来,作为ctl组成部分的 runState (线程池生命周期状态)和 workerCount (工作线程数) 也将同时具有原子性。
  • ThreadPoolExecutor 使用  ctlOf 方法来将  runState 和  workerCount 两个变量(都是整型)打包成一个 ctl 变量。
4.1.1 工具人常量COUNT_BITS 和 CAPACITY

    源码:java.util.concurrent.ThreadPoolExecutor
  1. private static final int COUNT_BITS = Integer.SIZE - 3;
  2. private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
复制代码
4.2.6 tryTerminate方法

根据当前线程池的状态判断是否需要进行线程池terminate处理,在工作线程退出时,调用shutdown方法,shutdownNow方法方法时都会调用这个方法。
    代码清单4-6 java.util.concurrent.ThreadPoolExecutor#tryTerminate
  1. int 类型的1用二进制表示为:00000000 00000000 00000000 00000001
  2. 1 << COUNT_BITS,其中COUNT_BITS为29,1 << 29,代表将数字1向左移动29位。结果为:00100000 00000000 00000000 00000000
  3. 将上一步计算出的结果减1,最终CAPACITY用二进制表示为:00011111 11111111 11111111 11111111
复制代码
到这里,浅析了execute方法的执行流程,从工作线程的创建,工作线程的结束,阻塞队列在线程池中的作用等等,下面将介绍线程池的关闭方法shutdown和shutdownNow
4.3 线程池的关闭

关于shutdown和shutdownNow方法的使用可参考本篇 2.3 关闭线程池
4.3.1 shutdown方法
  1. // runState is stored in the high-order bits
  2. private static final int RUNNING    = -1 << COUNT_BITS;
  3. private static final int SHUTDOWN   =  0 << COUNT_BITS;
  4. private static final int STOP       =  1 << COUNT_BITS;
  5. private static final int TIDYING    =  2 << COUNT_BITS;
  6. private static final int TERMINATED =  3 << COUNT_BITS;
复制代码
4.3.2 shutdownNow方法
  1. RUNNING    二进制为 11100000 00000000 00000000 00000000
  2. SHUTDOWN   二进制为 00000000 00000000 00000000 00000000
  3. STOP       二进制为 00100000 00000000 00000000 00000000
  4. TIDYING    二进制为 01000000 00000000 00000000 00000000
  5. TERMINATED 二进制为 01100000 00000000 00000000 00000000
复制代码
参考

书籍《Java并发编程的艺术》--Java中的线程池
详解Java线程池的ctl(线程池控制状态)【源码分析】
线程池与线程的几种状态

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万有斥力

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表