使用线程池你应该知道的知识点

打印 上一主题 下一主题

主题 879|帖子 879|积分 2637

多线程编程是每一个开发必知必会的技能,在实际项目中,为了克制频仍创建和烧毁线程,我们通常使用池化的思想,用线程池进行多线程开发。
线程池在开发中使用频率非常高,也包含不少知识点,是一个高频口试题,本篇总结线程池的使用经验和需要注意的问题,更好的应对日常开发和口试。
如有更多知识点,欢迎补充~
异常处理

正如我们在异常处理机制所讲,假如你没有对提交给线程池的使命进行异常捕获,那么异常信息将会丢失,倒霉于问题排查。
通常异常处理要么是手动处理掉,要么是往上抛由全局异常处理器统一处理,切勿吃掉异常。
在实际开发中,我们可以使用装饰器模式对TheradPoolExecutor进行封装,重写它的execute和submit方法,进行try-catch处理,打印日志,防止开发同学直接使用ThreadPoolExecutor提交使命而漏了异常处理。
traceid

完备的日志链路对日志分析,问题排查是至关紧张的,否则拿到一堆日志没有关联性,根本无从下手。
一个完备的请求大概颠末许多个方法调用,服务调用,mq消息发送等,要串联起来需要一个全局id,称为traceid。
例如我们使用spring cloud sleuth链路跟踪,它就会在上下文(MDC)塞一个traceid,并不停传递下去。
很遗憾,假如你在请求过程使用线程池(直接new ThreadPoolExecutor),那么traceid将会丢失,例如你会看到如下日志:

很明显,线程池里打印的日志跟外面的关联不起来了,这会影响我们分析排盘问题。
解决方案,可以使用spring提供的ThreadPoolTaskExecutor,它内部也包装了ThreadPoolExecutor,提供更多功能。将其注册到spring容器中,使用spring cloud sleuth时,它会判定假如实现了ExecutorService接口的bean,就会进行动态代理为TraceableExecutorService,它会将当前上下文的traceid传递给线程池的线程,那么就可以关联起来了。如:

固然你也可以像前面说的,封装自己的ThreadPoolExecutor注册到spring容器,也一样会被代理。
关于traceid我们还在xxl这边有写到,可以参考给xxl新增traceId和spanId
ThreadLocal

ThreadLocal是线程内一块数据结构(Thread类内有一个ThreadLocal.ThreadLocalMap),线程间互不干扰,没有并发问题。上面我们提到使用MDC可以在各个位置打印traceid,实际就是利用了ThreadLocal,如使用logback:

ThreadLocal在线程内传递数据是没有问题的,但涉及到子线程怎么办呢?这个时候就无法传递已往了,不外Thread类内还有一个ThreadLocal.ThreadLocalMap inheritableThreadLocals,当创建子线程的时候会把父线程的ThreadLocal“继续”过来。

但使用线程池场景又不太一样了,因为线程池里的线程是只创建一次,后续复用的,而前面说的“继续”是创建时一次性传递过来,后续就不会更改,很明显不符合线程池的场景,使用线程池时盼望线程每次都从父线程获取最新的ThreadLocal。
解决方案,可以使用阿里的transmittable-thread-local,它的原理也不复杂,就是在每次提交使命给线程池的时候,拷贝ThreadLocal。  关于ThreadLocal我们之前有过介绍,有兴趣可以看下口试再也不怕问ThreadLocal了ThreadLocal扩展
核心参数

这是入门级八股文了吧,基本烂到口试官都不问了,但有一些点我仍想发掘一下亮点。
  1.     public ThreadPoolExecutor(int corePoolSize,
  2.                               int maximumPoolSize,
  3.                               long keepAliveTime,
  4.                               TimeUnit unit,
  5.                               BlockingQueue<Runnable> workQueue,
  6.                               ThreadFactory threadFactory,
  7.                               RejectedExecutionHandler handler)
复制代码
如上是参数最全的构造方法,参数解释:

  • corePoolSize
    核心线程数,默认是不初始化创建核心线程,也不回收。可以通过prestartCoreThread/prestartAllCoreThreads方法对线程池进行预热,也可以通过allowCoreThreadTimeOut方法对核心线程进行回收。
  • maximumPoolSize
    最大线程数,当核心线程开到最大,且使命队列也满了,还有使命提交,就会继续开线程到直到最大线程数。
  • keepAliveTime
    当线程数大于核心线程数,线程最大空闲时间,凌驾就会被烧毁回收。
  • unit
    keepAliveTime的时间单位。
  • workQueue
    队列,核心线程满了,使命会暂存到队列,等待实行。
  • threadFactory
    线程工厂,默认是Executors.defaultThreadFactory(),线程名称由:pool-数字-thread 组成。
  • RejectedExecutionHandler
    拒绝策略,当队列满了,最大线程数也满了,还提交使命就会触发拒绝策略,jdk提供了4种拒绝策略,默认是AbortPolicy,也可以自界说拒绝策略。

需要提到的点是:

  • 根据业务界说不同的线程池
    有的人喜欢界说一个所谓“通用”的线程池来处理各种业务,这种做法不好,每种业务的核心参数需求不一样,且会相互竞争资源,正确的做法应该是每种业务界说一个线程池。
  • 有意义的线程名称
    从上面可以看到默认的线程名称不是特殊友爱,我们可以根据业务取一个有意义的名称。这里我用到guava里的ThreadFactoryBuilder,例如:
  1. new ThreadFactoryBuilder()
  2.     .setNameFormat(“taskDispatchPool” + "-%d")
  3.     .build();
复制代码

  • 合适的队列长度
    过长的队列长度大概导致应用OOM,实际要根据具体环境指定,不宜过大,禁止使用无界队列。
    jdk的Executors辅助类创建的线程池队列长度许多都是无界的,稍有不慎就会导致内存溢出,这也是为什么阿里java开发规范明确禁止使用Executors创建线程池的缘故原由。
  • 与tomcat/hystirx线程池的区别
    jdk的线程池是在核心线程满了,使命先进入队列,队列满了再继续创建线程到最大线程数。
    而tomcat/hystrix的线程池策略是,核心线程满了,就继续创建线程到最大线程数,再有使命就进入队列。
  • 设置合适的核心参数
    一样平常使命可以分为cpu密集型或io密集型,对于cpu密集型比较容易设置,一样平常设置为cpu核数即可,不宜过大,因为cpu密集型线程过大会有大量的线程切换,反而降低性能。
    io密集型就不好估算了,而且大部分环境下都是io密集型,没有万能公式,只能根据经验,测试,生产运行观察,调解,才能到达一个比较合适的值,这就要求我们的线程池要支持动态调解参数,下面还会说到。
假如跟口试官提这些点,说明你不是单纯背八股文,是有真的在思索总结~
动态线程池

上面我们说到线程池的核心参数(重要是线程数和队列长度)不太好估算,设置过小大概使命处理不外来导致阻塞,设置过大大概影响整体服务或影响下游服务,所以生产环境的线程池要支持动态调解。幸运的是ThreadPoolExecutor提供了方法可以直接对核心参数进行修改,例如setCorePoolSize,setMaximumPoolSize,我们可以在运行过程进行设置,线程池内部就会根据参数调解线程了。
笔者所在的项目一份代码会部署在各个国家(环境),每个环境的业务,数据量不一样,呆板配置也不一样,所以需要根据环境设置不同的参数。在实际运行过程中,偶然候为了提升处理服从设置比较大的线程数,而忽略对下游服务的影响,导致下游服务被压垮。也偶然候开始估算太小,后面业务增长,导致处理不外来的环境。所以需要动态调解,我们把线程池参数配置接入到apollo,随时可以调解生效,同时我们也把线程池的运行环境上报到grafana,可以进行监控,告警。
关于动态线程池,之前也写过,可以参考:
加强版ThreadPoolExecutor升级
Java线程池实现原理及其在美团业务中的实践
hippo4j
如何设置线程池参数?美团给出了一个让口试官虎躯一震的回答
使命统计

偶然候我们需要对提交给线程池的使命进行统计,例如本次实行成功多少,失败多少,过滤多少。线程池就没有提供这种实现了,因为它是一直运行的状态,区分不了业务上的东西,只能简单获取总体完成成功次数(getCompletedTaskCount),或触发拒绝策略次数(getRejectCount)。
业务上的统计我们就可以结合CountDownLatch来进行计数,主线程提交完要进行await等待线程池所有使命完成,每个线程完成一次使命就countDown一次,使命计数可以使用LongAdder统计,相比AtomicLong更加高效。这也回应了我们上面说要根据业务界说不同的线程池的缘故原由,不同范例的统计不会相互影响。
后来我们有个同学提醒jdk还有个Phaser类,比CountDownLatch好用,也可以用它来完成,具体参见:并发工具类Phaser
默认线程池

你是否在你的团队见过如下代码:
  1.         @Test
  2.         public void test4() throws InterruptedException {
  3.                 List<String> list = Lists.newArrayList();
  4.                 list.parallelStream().forEach(item->{
  5.                         //run async
  6.                 });
  7.                 CompletableFuture.runAsync(()->{
  8.                         //run async
  9.                 });
  10.         }
复制代码
这些写法确实都是异步的,但底层都是用了系统默认的线程池ForkJoinPool.commonPool(),默认线程数是Runtime.getRuntime().availableProcessors() - 1。
假如是cpu密集型使命,这么使用也没啥问题。假如是io密集型,就会相互影响。如下,业务2的使命会卡住,知道业务1实行完成。
  1.                 //业务1
  2.                 for (int i = 0; i < ForkJoinPool.commonPool().getParallelism(); i++) {
  3.                         CompletableFuture.runAsync(() -> {
  4.                                 try {
  5.                                         Thread.sleep(5000);
  6.                                 } catch (InterruptedException e) {
  7.                                 }
  8.                         });
  9.                 }
  10.                 //业务2
  11.                 for (int i = 0; i < ForkJoinPool.commonPool().getParallelism(); i++) {
  12.                         CompletableFuture.runAsync(() -> {
  13.                                 System.out.println("running...");
  14.                         });
  15.                 }
复制代码
笔者所在的项目,最开始有的使用上面的写法,有的使用Executors创建,有的使用new ThreadPoolTaskExecutor,有的使用spring ThreadPoolTaskExecutor...,五花八门,直到后来我们统一使用自己封装的线程池,这种环境才得以纠正。
父子线程公用一个线程池

这也是一个实际案例,在我们团队有同学这么使用线程池,逻辑很简单,想要查一批数据的时间和还款概率,实际环境还查了更多信息,做了简化,这些查询需要调用外部接口,为了提升接口性能,把这些查询都丢到线程池里去并发实行,通过Future.get获取结果。同时主线程盼望这两个操纵也可以并发实行,所以通过CompletableFuture也提交到这个线程池里,最终通过CompletableFuture.join等待所有使命完成。
实际代码比较复杂,简化后如下:
  1.         //查一个时间
  2.         CompletableFuture<Void> timeFuture = CompletableFuture.runAsync(() -> {
  3.                 initTime(pageResult);
  4.         }, executor);
  5.         //预测还款概率
  6.         CompletableFuture<Void> repayDesireFuture = CompletableFuture.runAsync(() -> {
  7.                 initRepayDesire(pageResult);
  8.         }, executor);
  9.                
  10.         //主线程等待
  11.         CompletableFuture.allOf(timeFuture, repayDesireFuture).join();
  12.         private void initTime(List<JobListVo> data) {
  13.                 List<List<JobListVo>> lists = Lists.partition(data, CommonConst.PAGE_SIZE_50);
  14.                 List<Future<Result<List<TimeInfo>>>> futures = new ArrayList<>();
  15.                 lists.forEach(item -> futures.add(executor.submit(() -> client.getTimeList(ids))));
  16.                 futures.forEach(
  17.                         item -> {
  18.                                 try {
  19.                                         Result<List<TimeInfo>> result = item.get();
  20.                                 } catch (Exception e) {
  21.                                         log.error("fetch error", e);
  22.                                 }
  23.                         }
  24.                 );
  25.             }
  26.         private void initRepayDesire(List<JobListVo> data) {
  27.                 List<List<JobListVo>> lists = Lists.partition(data, CommonConst.PAGE_SIZE_50);
  28.                 List<Future<Result<List<RepayDesire>>>> futures = new ArrayList<>();
  29.                 lists.forEach(item -> futures.add(executor.submit(() -> client.queryRepayDesire(ids))));
  30.                 futures.forEach(
  31.                         item -> {
  32.                                 try {
  33.                                             Result<List<RepayDesire>> result = item.get();
  34.                                 } catch (Exception e) {
  35.                                             log.error("fetch error", e);
  36.                                 }
  37.                             }
  38.                     );
  39.         }
复制代码
看到这段代码,大概有的人会说主线程那两个操纵并发的意义不大,串行实行即可。但实际分析还是有意义的,假如只有一个请求,查询时间的使命提交到线程池后,线程池资源还有剩余,这个时候并发实行还款概率的使命,是可以加速整个查询速度的。
笔者在review这段代码的时候,感觉总是怪怪的,但逻辑上又好像说得通,直到我看到这边文章线程池碰到父子使命,有大坑,要注意!,这不就跟我们的场景几乎一样吗,确实大概会有问题。
根据文章所述,父子使命提交到同一个线程池大概导致父子使命相互等待,最终卡死。极端一点,假设线程池核心线程数是1,队列长度是1,现在主线程提交了一个使命,使用了这个核心线程,开始实行,并等待子线程实行完成。它的实行逻辑是在子线程内再提交一个使命给线程池,由于只有一个核心线程,所以这个使命进入队列等待。等到什么时候呢,等到主线程谁人使命完成线程才能释放,主线程又什么时候完成呢,等待子线程实行完成,死循环了...
总结:父子使命,不要公用一个线程池。
shutdown/shutdownNow

这两个方法都是关闭线程池的,区别是shutdown是不再接收新使命,但提交的使命还会实行,而shutdownNow除了不再接收新使命,已提交的使命也不会实行,正在实行中的使命会终止。
一样平常在线程池不再使用或应用下线前,就会调用线程关闭方法。假如关闭后再提交使命,就会触发拒绝策略。
最好确保在关闭后不再有使命提交给线程池,否则大概会有问题,笔者之前就碰到线程池关闭后还有请求(服务下线前)进来,导致报TimeoutException错误,具体分析在这篇/线程池shutdown引发TimeoutException,有兴趣的可以看下。
内存泄漏

将线程池变量界说为局部变量时,大概会发生内存泄漏。如下:
  1.         @GetMapping(value = "/test")
  2.         public Result<Void> info() {
  3.                 ExecutorService executorService = Executors.newFixedThreadPool(1);
  4.                 executorService.submit(() -> System.out.println("active thread"));
  5.                 return Result.success();
  6.         }
复制代码
在我们印象中,executorService作为一个局部变量,在方法返回时,生命周期就竣事了,这个时候应该是可以被gc回收的,怎么会发生内存泄漏呢?
使用线程池时有点不同,因为这里有个隐含的条件是,固然方法返回竣事了,但线程仍存活这,而存活的线程是可以作为gc root的。
我们知道线程池里的线程会被包装为Worker对象,这是界说在ThreadPoolExecutor里的非静态内部类。如下代码,Worker也实现了Runnable接口,把它作为参数传递给Thread对象。


活泼的线程作为gc root的,也就是不会被垃圾回收,而这个线程又引用着这个Worker对象,所以它也不会被回收。
谁人线程池对象又有什么关系呢?上面说到Worker是作为ThreadPoolExecutor里的非静态内部类,非静态内部类有一个规则就是持有外部类的引用,例如我们可以在InnerClass里调用外部类的方法。
大概通过编译后查看class文件也可以看到InnerClass持有外部类的this引用。
  1. public class OuterClass {
  2.         public OuterClass(Runnable runnable) {
  3.         }
  4.        
  5.         public void outterFunction() {
  6.         }
  7.         class InnerClass {
  8.                 public InnerClass() {
  9.                         outterFunction();
  10.                 }
  11.         }
  12. }
复制代码
所以,由于Worker持有ThreadPoolExecutor的引用,所以它也不会被回收,用一张图表现就是:

解决方案,1.不要使用局部线程池变量,界说为全局变量。2.调用shutdown/shutdownNow关闭线程池,关闭后线程就会被回收,线程池也可以被回收。
附chatgpt:jdk8中,哪些对象可以作为gc root?

虚拟线程

在jdk21以前,我们在java里使用的线程都称为平台线程,与内核线程是一对一的关系,开篇就说到,平台线程的使用成本比较高,所以才使用线程线程池来缓存复用它。

jdk21虚拟线程成为正式功能,可以投入生产使用。java里的虚拟线程类似于goland中的协程,虚拟线程与内核线程不再是一对一的关系,而是多对一,在jvm层面进行调度,可以大大内核线程的数目。
如图,可以看到多个虚拟线程在底层还是公用一个内核线程,它们之间的实行调度由jvm主动完成,虚拟线程的创建成本非常低,可以创建的虚拟线程数目可以宏大于平台线程。
当我们的程序碰到io的时候,以往的方式是将当前线程挂起,cpu进行线程切换,实行另外的使命。使用虚拟线程则不需要,使命的调度实行是在jvm层面完成的,cpu还是一直在实行同一个线程。

代码示例:
  1. Thread.ofVirtual().start(()->{});
复制代码
springboot3.2 tomcat开启虚拟线程:
  1. spring.threads.virtual.enabled = true
复制代码
那么使用虚拟线程,还需要线程池吗?答案是不需要的,我们之所以池化是因为对象的创建、烧毁成本较高,每次使用都创建,用完就丢弃,太浪费了,但虚拟线程的使用成本很低,所以它不需要池化了。
虚拟线程的设计固然不是为了取代传统线程和编程方式,可以看到jdk是通过扩展支持虚拟线程的,我们依然可以像以前一样编程开发,但在高并发场景虚拟线程是更好的选择,这也是大势所趋,说不定以后哪一天线程池也会被无情的标志上@Deprecated。
更多分享,欢迎关注我的github:https://github.com/jmilktea/jtea

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

怀念夏天

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表