线程池:ThreadPoolExecutor源码解读

打印 上一主题 下一主题

主题 805|帖子 805|积分 2415

目录

1 带着问题去阅读

1.1 线程池的线程复用原理

用户每次调用execute()来提交一个任务,然后任务包装成Worker对象,并且启动一个worker线程来执行任务(任务可能会被先加入队列),只要任务队列不为空且worker线程没有被中断,线程的run()方法通过一个while循环,不断去队列获取任务并执行,而不会进入到run()方法底部。while循环是线程复用的关键
1.2 线程池如何管理线程

首先定义两个说明:

  • 关于获取任务超时,会依赖以下条件:
    --1、开启核心线程超时设置 或 线程池线程数大于核心线程数
    --2、符合1,且从workqueue获取任务超时。(如果不符合1,则以阻塞方式获取任务,不会超时)
  • 线程池最小保留线程数:
    --1、如果没有开启核心线程超时配置,则至少保留corePoolSize个线程
    --2、如果开启核心线程超时并且当前队列里面还有任务,只需保留1个线程
将线程池的生命周期分为三个阶段:创建阶段、运行期间、终止阶段
一、创建阶段
<ul>当线程池线程数(ctl低位)少于核心线程数(corePoolSize),创建新线程执行任务
当线程池线程数大于等于核心线程数,且任务队列未满时,将新任务放入到任务队列中,不创建线程
当线程池线程数大于等于核心线程数(maximumPoolSize),且任务队列已满
--如果工作线程数少于最大线程数,则创建新线程执行任务
--如果工作线程数等于最大线程数,则抛出异常,拒绝新任务进入
二、运行期间
1、线程启动后,将一直循环获取任务并执行,只有当获取任务超时,或者线程池被终止,才会结束。
2、如果获取任务超时,那么Worker线程自然结束。此时线程池减少了1个线程。
3、在线程结束后,线程池会检查:1、线程池线程数1成功),线程池状态变为SHUTDOWN
2、shutdownNow()方式终止线程池:
--关闭线程池,不再接受新的任务,中断已经启动的Worker线程(Work.state>0),线程池状态改为STOP</p>线程池创建线程及处理任务过程

梳理一下大概流程:

  • 用户线程调用execute()提交Runnable任务
  • execute()调用addWork()将任务提交给线程池处理:如果有可用的核心线程,则提交给核心线程处理。反则,将任务先添加到任务队列(workQueen)中。
  • addWorker()方法将启动一个worker线程,调用runWorker()来处理任务。
  • runWorker()方法将循环获取任务,并运行任务的run()方法来执行真正的业务。如果是以核心线程提交任务,则优先处理该任务,否则,循环调用getTask()来获取任务
  • getTask()方法,从任务队列(workQueen)取出任务,并返回。
  • getTask()没有拿到任务,则执行线程结束processWorkerExit()
线程池创建阶段

1.3 线程池配置的重要参数


  • ctl:存储线程池状态以及线程数
  • corePoolSize、maximumPoolSize、keepAliveTime、workQueue  参照下面的源码分析说明
  • allowCoreThreadTimeOut:是否开启核心线程超时。默认false,不在构造函数设置,需要调用方法设置
  • HashSet workers:线程池终止时会从该集合找线程来中断,源码分析有说明
1.4 shutdown()和shutdownNow()区别


  • shutdown() :关闭线程池,不再接受新的任务,已提交执行的任务继续执行;中断所有空闲线程;将线程池状态改为SHUTDOWN
  • ShutDownNow():关闭线程池,不再接受新的任务,中断已经启动的Worker线程;将线程池状态改为STOP;返回未完成的任务队列
1.5 线程池中的两个锁


  • mainLock主锁是可重入的锁,用来同步更新的成员变量
  • Worker内部实现了一个锁,它是不可重入的,在shutdown()场景中,通过tryLock确保不会中断还没有开始执行或者还在执行中的worker线程。
2 源码分析过程中的困惑及解惑

---什么情况任务会提交失败?
同时符合以下条件,任务才会被提交:

  • 线程池状态等于RUNNING状态
  • 如果任务队列已经满了,并且线程池线程数 少于 配置的线程池最大线程数(maximumPoolSize) 且小于线程池的最大支持线程数(CAPACITY)时。(如果队列没满,任务将会先加入到队列中)
特别说明:特殊情况会创建任务为空的Worker线程来帮助队列中的任务跑完
---核心线程数的意义?从测试结果看,他决定了工作线程最大并发数,但未代码验证

  • 核心线程数决定提交任务什么时候会被放入到队列中:即线程池线程数>=核心线程数时。
  • 核心线程数大小跟并发执行线程(任务)无关。也就是,它不决定工作线程最大并发数
  • 核心线程数可以动态修改。(如果增大了,可能会马上创建新的Worker线程)
---线程池状态不是RUNNING,或者往workQueue添加worker失败,这是为什么还要提交任务
以下情况会创建任务为空的Worker线程来执行队列中的任务

  • 当前线程池状态为shutdown,但是任务队列不为空,这时创建Worker线程来帮助执行队列的任务
  • 当前线程池状态为running, 任务添加到队列后,接着线程池被关闭,并且从队列移除该任务失败,并且线程池线程数为0,这时创建Worker线程来确保刚提交的任务有机会执行。
---为什么runWorker()方法在执行任务前后加锁,但是线程依然能够并发?

  • worker线程是通过创建Worker对象来创建的,在addWorke()的while循环创建了多个Worker对象,每个Worker对象都有自己的锁,Worker线程通过runWorker()访问的是当前对象的锁,因此Worker线程能够并发;
  • 锁的意义是限制不能中断执行中的任务,因为主线程调用shutdown()和shutdownNow()方法时,会遍历WorkerSet的Worker对象,调用tryLock(),这时主线程和Worker线程竞争同一个锁。
3 源码分析

3.1 类继承关系



  • Executo接口:专门提交任务,只有一个execute()方法。Executor 提供了一种将任务的提交和任务的执行两个操作进行解耦的思路:客户端无需关注执行任务的线程是如何创建、运行和回收的,只需要将任务的执行逻辑包装为一个 Runnable 对象传递进来即可,由 Executor 的实现类自己来完成最复杂的执行逻辑
  • ExecutorService接口:继承了Executor,扩展执行任务的能力。例如:获取任务的执行结果、取消任务等功能;提供了关闭线程池、停止线程池,以及阻塞等待线程池完全终止的方法,需要ThreadPoolExecutor实现
  • AbstractExecutorServic类:实现了 ExecutorService ,是上层的抽象类,负责将任务的执行流程串联起来,从而使得下层的实现类 ThreadPoolExecutor只需要实现一个执行任务的方法即可
  • ThreadPoolExecutor:可以看做是基于生产者-消费者模式的一种服务,内部维护的多个线程相当于消费者,提交的任务相当于产品,提交任务的外部就相当于生产者
3.2 类的常量/成员变量
  1.    //--------------------------常量部分------------------------
  2.    
  3. // 常量29。用在移位计算Integer.SIZE=32)
  4. private static final int COUNT_BITS = Integer.SIZE - 3; //29
  5. // 最大支持线程数 2^29-1:000 11111111111111111...
  6. private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
  7. // 以下为线程池的四个状态,用32位中的前三位表示
  8. // 011 terminated() 方法执行完成后,线程池的状态会转为TERMINATED.
  9. private static final int TERMINATED =  3 << COUNT_BITS;
  10. // 010 所有任务都销毁了,workCount=0的时候,线程池的装填在转换为TIDYING是,会执行钩子方法terminated()
  11. private static final int TIDYING    =  2 << COUNT_BITS; //翻译为整理
  12. // 001 拒绝新的任务提交,清空在队列中的任务
  13. private static final int STOP       =  1 << COUNT_BITS;
  14. // 000 拒绝新的任务提交,会将队列中的任务执行完,正在执行的任务继续执行.
  15. private static final int SHUTDOWN   =  0 << COUNT_BITS;
  16. // 111 00000 00000000 00000000 00000000  线程运行中 【running状态值为负数最小】
  17. private static final int RUNNING    = -1 << COUNT_BITS; //线程池的默认状态
  18. //------------------------变量部分------------------------
  19. // ctl存储线程池状态和线程池大小,那么用前3位表示线程池状态,后29位表示:线程池大小,即线程池线程数
  20. //线程池状态初始值为RUNNING
  21. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  22. //任务队列
  23. //保存不能马上执行的Runnable任务。
  24. //执行shutdownNow()时,会返回还在队列的任务
  25. private final BlockingQueue<Runnable> workQueue;
  26. // 主锁,对workers、largestPoolSize、completedTaskCount的访问都必须先获取该锁
  27. private final ReentrantLock mainLock = new ReentrantLock();
  28. // 包含池中的所有工作线程的集合。持有mainLock访问  
  29. // 创建Worker时,添加到集合
  30. // 线程结束时,从集合移除
  31. // 调用shutdown()时,从该集合中找到空闲线程并中断
  32. // 调用shutdownNow()时,从该集合中找到已启动的线程并中断
  33. private final HashSet<Worker> workers = new HashSet<Worker>();
  34. // 线程通信手段, 用于支持awaitTermination方法:等待所有任务完成,并支持设置超时时间,返回值代表是不是超时.
  35. private final Condition termination = mainLock.newCondition();
  36. // 记录workers历史以来的最大值。持有mainLock访问
  37. // 每次增加worker的时候,都会判断当前workers.size()是否大于最大值,大于则更新
  38. // 用于线程池监控的,作为重要指标
  39. private int largestPoolSize;
  40. // 计数所有已完成任务,持有mainLock访问
  41. // 每个worker都有一个自己的成员变量 completedTasks 来记录当前 worker 执行的任务次数, 当前线worker工作线程终止的时候, 才会将worker中的completedTasks的数量加入到 completedTaskCount 指标中.
  42. private long completedTaskCount;
  43. // 线程工厂
  44. private volatile ThreadFactory threadFactory;
  45. // 拒绝策略,默认四种AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy,建议自己实现,增加监控指标
  46. private volatile RejectedExecutionHandler handler;
  47. // keepAliveTime和allowCoreThreadTimeOut 是关于线程空闲是否会被销毁的配置
  48. // 关于空闲的说明:
  49. // 1、线程池在没有关闭之前,会一直向任务队列(workqueue)获取任务执行,如果任务队列是空的,在新任务提交上来之前,就会产生一个等待时间,期间,线程处于空闲状态
  50. // 2、向任务队列获取任务用:workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),表示阻塞式获取元素,等待超时,则终止等待并返回false。通过判断poll()方法是true/falle来判定线程是否超时
  51. // 获取任务的等待时间 ,以下两种情况会使用到该值
  52. //1、如果启用allowCoreThreadTimeOut,那表示核心线程的空闲时间  
  53. // 2、当线程池内线程数超过corePoolSize,表示线程获取任务的等待时间
  54. private volatile long keepAliveTime;
  55. // 核心线程是否开启超时
  56. // false:表示核心线程一旦启动,会一直运行,直至关闭线程池。默认该值
  57. // true:表示核心线程处于空闲且时间超过keepAliveTime,核心线程结束后,将不再创建新线程
  58. // (默认的构造函数没有设置这个属性,需要手工调用allowCoreThreadTimeOut()方法来设置)
  59. private volatile boolean allowCoreThreadTimeOut;
  60. //核心线程数量
  61. //核心线程是指:线程会一直存活在线程池中,不会被主动销毁【如果核心线程开启超时,有可能被被销毁】。
  62. private volatile int corePoolSize;
  63. // 配置的线程池最大线程数
  64. private volatile int maximumPoolSize;
  65. // 默认拒绝策略 AbortPolicy
  66. private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
  67. //  安全控制访问(主要用于shutdown和 shutdownNow方法
  68. private static final RuntimePermission shutdownPerm =  new RuntimePermission("modifyThread");
  69. // 在threadPoolExecutor初始化的时候赋值,acc对象是指当前调用上下文的快照,其中包括当前线程继承的AccessControlContext和任何有限的特权范围,使得可以在稍后的某个时间点(可能在另一个线程中)检查此上下文。
  70. private final AccessControlContext acc;
复制代码
runWorker()总结

  • 执行任务前先判断线程池是否是STOPING状态,是则中断worker线程。
  • 执行任务:先执行firstTask,再从任务队列获取执行
  • 如果没有任务,调用processWorkerExit()来执行线程退出的工作。
  • 只要还有任务,worker线程就一直执行任务,并刷新completedTasks
4.4 getTask()方法
  1. // 获取当前线程池的状态(前3位)
  2. private static int runStateOf(int c) { return c & ~CAPACITY; }
  3. // 获取当前线程池中线程数(后29位)
  4. private static int workerCountOf(int c){ return c & CAPACITY; }
  5. // 更新状态和数量
  6. private static int ctlOf(int rs, int wc) { return rs | wc; }
  7. // 小于判断C是不是小于S,比如runStateLessThan(var,STOP),那var就只有可能是(RUNNING,SHUTDOWN)
  8. private static boolean runStateLessThan(int c, int s) {
  9.     return c < s;
  10. }
  11. // 是不是C >= S
  12. private static boolean runStateAtLeast(int c, int s) {
  13.     return c >= s;
  14. }
  15. // 判断状态是不是RUNNING
  16. private static boolean isRunning(int c) {
  17.     return c < SHUTDOWN;
  18. }
复制代码
getTask()总结:

  • workQueue中获取一个任务并返回
  • 没有获取到任务就扣减线程池线程数。获取不到任务的四种情况:

    • 线程池的状态是>=STOP
    • 线程池的状态是SHUTDOWN并且任务队列为空
    • 获取任务超时
    • 线程池线程数大于maximumPoolSize并且队列为空

4.5 processWorkerExit()方法
  1. -1 << COUNT_BITS
  2. 这里是-1往左移29位,稍微有点不一样,-1的话需要我们自己算出补码来
  3. -1的原码
  4. 10000000 00000000 00000000 00000001
  5. -1的反码,负数的反码是将原码除符号位以外全部取反
  6. 11111111 11111111 11111111 11111110
  7. -1的补码,负数的补码就是将反码+1
  8. 11111111 11111111 11111111 11111111
  9. 关键了,往左移29位,所以高3位全是1就是RUNNING状态
  10. 111 00000 00000000 00000000 00000000
复制代码
ShutDownNow()方法总结

  • 关闭线程池,不再接受新的任务,中断已经启动的Worker线程
  • 将线程池状态改为STOP
  • 返回未完成的任务队列
4.10 isShutdown()方法

确认线程池是否关闭。判断状态是不是RUNNING.
  1. //corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue 这五个参数必须指定
  2. //最多参构造函数
  3. public ThreadPoolExecutor(int corePoolSize,
  4.                               int maximumPoolSize,
  5.                               long keepAliveTime,
  6.                               TimeUnit unit,
  7.                               BlockingQueue<Runnable> workQueue,
  8.                               ThreadFactory threadFactory,
  9.                               RejectedExecutionHandler handler) {
  10.       
  11.        //初始值的合法性校验
  12.        if (corePoolSize < 0 ||
  13.             maximumPoolSize <= 0 ||
  14.             maximumPoolSize < corePoolSize ||  //最大线程数必须大于核心线程数
  15.             keepAliveTime < 0)
  16.             throw new IllegalArgumentException();  
  17.         if (workQueue == null || threadFactory == null || handler == null)
  18.             throw new NullPointerException();
  19.             //成员变量赋初值
  20.         this.acc = System.getSecurityManager() == null ?
  21.                 null :
  22.                 AccessController.getContext();
  23.         this.corePoolSize = corePoolSize;
  24.         this.maximumPoolSize = maximumPoolSize;
  25.         this.workQueue = workQueue;//默认使用SynchronousQueue<Runnable>
  26.         this.keepAliveTime = unit.toNanos(keepAliveTime); //默认60S
  27.         this.threadFactory = threadFactory; //默认使用DefaultThreadFactory
  28.         this.handler = handler;
  29.     }
复制代码
4.11 prestartCoreThread()方法
  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable  {
  2. }
复制代码

  • 启动一个空闲的线程作为核心线程
  • 如果核心线程数已到阈值, 会加入失败, 返回false, 如果线程池处于SHUTDOWN以上的状态也返回false
  • 只有真正这个线程调用start方法跑起来, 才会返回true
4.12 prestartAllCoreThreads()方法

启动所有核心线程,使他们等待获取任务
  1. public interface ThreadFactory {
  2.     Thread newThread(Runnable r);
  3. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

玛卡巴卡的卡巴卡玛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表