ToB企服应用市场:ToB评测及商务社交产业平台

标题: JDK线程池详解(全网最全-原理分析、源码详解) [打印本页]

作者: 涛声依旧在    时间: 2024-10-10 21:18
标题: JDK线程池详解(全网最全-原理分析、源码详解)
频繁创建新线程的缺点?

不受控风险

系统资源有限,每个人针对不同业务都可以手动创建线程,而且创建标准不一样(比如线程没著名字)。当系统运行起来,全部线程都在疯狂抢占资源,毫无规则,欠好管控。
另外,过多的线程自然也会引起上下文切换的开销。
频繁创建开销大

new Thread() 在操纵系统层面并没有创建新的线程;
真正转换为操纵系统层面创建一个线程,还要调用操纵系统内核的API,然后操纵系统要为该线程分配一系列的资源。
new Object() 过程

Object obj = new Object();
创建线程过程

使用线程池的长处

而且在线程池在提交使命前,可以提前创建线程,ThreadPoolExecutor 提供了两个方法帮助我们在提交使命之前,完成核心线程的创建,从而实现线程池预热的效果,以便在应用程序开始处理请求时立纵然用这些线程:
线程池原理

JDK线程池参数

ThreadPoolExecutor 的通用构造函数:
  1. public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
复制代码
  1. public class MyThreadFactory implements ThreadFactory {
  2.     private final String poolName;
  3.    
  4.     public MyThreadFactory(String poolName) {
  5.         this.poolName = poolName;
  6.     }
  7.    
  8.     public Thread newThread(Runnable runnable) {
  9.         return new MyAppThread(runnable, poolName);//将线程池名字传递给构造函数,用于区分不同线程池的线程
  10.     }
  11. }
复制代码
线程池的核心组成


一个完备的线程池,应该包含以下几个核心部分:
线程池实行具体过程


线程池生命周期

线程池状态状态释义RUNNING线程池被创建后的初始状态,能接受新提交的使命,而且也能处理阻塞队列中的使命SHUTDOWN关闭状态,不再接受新提交的使命,但仍可以继承处理已进入阻塞队列中的使命STOP会中断正在处理使命的线程,不能再接受新使命,也不继承处理队列中的使命TIDYING全部的使命都已终止,workerCount(有效工作线程数)为0TERMINATED线程池彻底终止运行Tips:千万不要把线程池的状态和线程的状态弄混了。补一张网上的线程状态图

Tips:当线程调用start(),线程在JVM中不肯定立即实行,有可能要等候操纵系统分配资源,此时为READY状态,当线程获得资源时进入RUNNING状态,才会真正开始实行。
线程池的预初始化机制

线程池的预初始化机制是指在线程池创建后,立即创建并启动肯定数目标线程,纵然这些线程临时还没有使命要实行。这样做的目标是减少在实际吸收到使命时创建线程所需的时间,从而提高响应速度。ThreadPoolExecutor提供了预初始化线程的功能。
预初始化方法(prestartCoreThread / prestartAllCoreThreads): ThreadPoolExecutor提供了两个方法来预初始化线程:
拒绝计谋

当没有显示指明拒绝计谋时,默认使用AbortPolicy


CallerRunsPolicy

假如不允许丢弃使命,就应该选择CallerRunsPolicy。CallerRunsPolicy 和其他的几个计谋不同,它既不会抛弃使命,也不会抛出非常,而是将使命回退给调用者,使用调用者的线程来实行使命。
  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2.         public CallerRunsPolicy() { }
  3.         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4.             if (!e.isShutdown()) {
  5.                 // 直接主线程执行,而不是线程池中的线程执行
  6.                 r.run();
  7.             }
  8.         }
  9. }
复制代码
存在的问题:假如走到CallerRunsPolicy的使命是个非常耗时的使命,且处理提交使命的线程是主线程,可能会导致主线程阻塞,进而导致后续使命无法实时实行,严峻的情况下很可能导致 OOM。
当然,接纳CallerRunsPolicy实在就是希望全部的使命都能够被实行,临时无法处理的使命又被保存在阻塞队列BlockingQueue中。这样的话,在内存允许的情况下,就可以增加阻塞队列BlockingQueue的大小并调整堆内存以容纳更多的使命,确保使命能够被准确实行。为了充分利用 CPU,还可以调整线程池的maximumPoolSize (最大线程数)参数,这样可以提高使命处理速度,避免累计在 BlockingQueue的使命过多导致内存用完。
但是,假如服务器资源达到可利用的极限了呢?导致主线程卡死的本质就是因为不希望任何一个使命被丢弃。换个思路,有没有办法既能包管使命不被丢弃且在服务器有余力时实时处理呢?
可以考虑使命持久化的思路,这里所谓的使命持久化,包括但不限于:
这里以方案一为例,简单先容一下实现逻辑:

也就是说,一旦线程池中线程达到满载时,就可以通过拒绝计谋将最新使命持久化到 MySQL 数据库中,比及线程池有了有余力处理全部使命时,让其优先处理数据库中的使命以避免"饥饿"问题。
当然,对于这个问题,也可以参考其他主流框架的做法:
  1. private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
  2.     NewThreadRunsPolicy() {
  3.         super();
  4.     }
  5.     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  6.         try {
  7.             //创建一个临时线程处理任务
  8.             final Thread t = new Thread(r, "Temporary task executor");
  9.             t.start();
  10.         } catch (Throwable e) {
  11.             throw new RejectedExecutionException(
  12.                     "Failed to start a new thread", e);
  13.         }
  14.     }
  15. }
复制代码
  1. new RejectedExecutionHandler() {
  2.                 @Override
  3.                 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
  4.                     try {
  5.                         //限时阻塞等待,实现尽可能交付
  6.                         executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
  7.                     } catch (InterruptedException e) {
  8.                         throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
  9.                     }
  10.                     throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
  11.                 }
  12.             });
复制代码
使命实行机制




execute()实行流程图


execute()源码
  1.     // 使用原子操作类AtomicInteger的ctl变量,前3位记录线程池的状态,后29位记录线程数
  2.     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3.     // Integer的范围为[-2^31,2^31 -1], Integer.SIZE-3 =32-3= 29,用来辅助左移位运算
  4.     private static final int COUNT_BITS = Integer.SIZE - 3;
  5.     // 高三位用来存储线程池运行状态,其余位数表示线程池的容量
  6.     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
  7.     // 线程池状态以常量值被存储在高三位中
  8.     private static final int RUNNING    = -1 << COUNT_BITS; // 线程池接受新任务并会处理阻塞队列中的任务
  9.     private static final int SHUTDOWN   =  0 << COUNT_BITS; // 线程池不接受新任务,但会处理阻塞队列中的任务
  10.     private static final int STOP       =  1 << COUNT_BITS; // 线程池不接受新的任务且不会处理阻塞队列中的任务,并且会中断正在执行的任务
  11.     private static final int TIDYING    =  2 << COUNT_BITS; // 所有任务都执行完成,且工作线程数为0,将调用terminated方法
  12.     private static final int TERMINATED =  3 << COUNT_BITS; // 最终状态,为执行terminated()方法后的状态
  13.     // ctl变量的封箱拆箱相关的方法
  14.     private static int runStateOf(int c)     { return c & ~CAPACITY; } // 获取线程池运行状态
  15.     private static int workerCountOf(int c)  { return c & CAPACITY; } // 获取线程池运行线程数
  16.     private static int ctlOf(int rs, int wc) { return rs | wc; } // 获取ctl对象
复制代码
  1. public void execute(Runnable command) {
  2.     if (command == null) // 任务为空,抛出NPE
  3.         throw new NullPointerException();
  4.         
  5.     int c = ctl.get(); // 获取当前工作线程数和线程池运行状态(共32位,前3位为运行状态,后29位为运行线程数)
  6.     if (workerCountOf(c) < corePoolSize) { // 如果当前工作线程数小于核心线程数
  7.         if (addWorker(command, true)) // 在addWorker中创建核心线程并执行任务
  8.             return;
  9.         c = ctl.get();
  10.     }
  11.    
  12.     // 核心线程数已满(工作线程数>核心线程数)才会走下面的逻辑
  13.     if (isRunning(c) && workQueue.offer(command)) { // 如果当前线程池状态为RUNNING,并且任务成功添加到阻塞队列
  14.         int recheck = ctl.get(); // 双重检查,因为从上次检查到进入此方法,线程池可能已成为SHUTDOWN状态
  15.         if (! isRunning(recheck) && remove(command)) // 如果当前线程池状态不是RUNNING则从队列删除任务
  16.             reject(command); // 执行拒绝策略
  17.         else if (workerCountOf(recheck) == 0) // 当线程池中的workerCount为0时,此时workQueue中还有待执行的任务,则新增一个addWorker,消费workqueue中的任务
  18.             addWorker(null, false);
  19.     }
  20.     // 阻塞队列已满才会走下面的逻辑
  21.     else if (!addWorker(command, false)) // 尝试增加工作线程执行command
  22.         // 如果当前线程池为SHUTDOWN状态或者线程池已饱和
  23.         reject(command); // 执行拒绝策略
  24. }
复制代码
  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2.     retry: // 循环退出标志位
  3.     for (;;) { // 无限循环
  4.         int c = ctl.get();
  5.         int rs = runStateOf(c); // 线程池状态
  6.         // Check if queue empty only if necessary.
  7.         if (rs >= SHUTDOWN &&
  8.             ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) // 换成更直观的条件语句
  9.             // (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
  10.            )
  11.            // 返回false的条件就可以分解为:
  12.            //(1)线程池状态为STOP,TIDYING,TERMINATED
  13.            //(2)线程池状态为SHUTDOWN,且要执行的任务不为空
  14.            //(3)线程池状态为SHUTDOWN,且任务队列为空
  15.             return false;
  16.         // cas自旋增加线程个数
  17.         for (;;) {
  18.             int wc = workerCountOf(c); // 当前工作线程数
  19.             if (wc >= CAPACITY ||
  20.                 wc >= (core ? corePoolSize : maximumPoolSize)) // 工作线程数>=线程池容量 || 工作线程数>=(核心线程数||最大线程数)
  21.                 return false;
  22.             if (compareAndIncrementWorkerCount(c)) // 执行cas操作,添加线程个数
  23.                 break retry; // 添加成功,退出外层循环
  24.             // 通过cas添加失败
  25.             c = ctl.get();  
  26.             // 线程池状态是否变化,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新cas
  27.             if (runStateOf(c) != rs)
  28.                 continue retry;
  29.             // else CAS failed due to workerCount change; retry inner loop
  30.         }
  31.     }
  32.     // 简单总结上面的CAS过程:
  33.     //(1)内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas
  34.     //(2)cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了
  35.     //(3)如果变了,则重新进入外层循环重新获取线程池状态,否者重新进入内层循环继续进行cas
  36.     // 走到这里说明cas成功,线程数+1,但并未被执行
  37.     boolean workerStarted = false; // 工作线程调用start()方法标志
  38.     boolean workerAdded = false; // 工作线程被添加标志
  39.     Worker w = null;
  40.     try {
  41.         w = new Worker(firstTask); // 创建工作线程实例
  42.         final Thread t = w.thread; // 获取工作线程持有的线程实例
  43.         if (t != null) {
  44.             final ReentrantLock mainLock = this.mainLock; // 使用全局可重入锁
  45.             mainLock.lock(); // 加锁,控制并发
  46.             try {
  47.                 // Recheck while holding lock.
  48.                 // Back out on ThreadFactory failure or if
  49.                 // shut down before lock acquired.
  50.                 int rs = runStateOf(ctl.get()); // 获取当前线程池状态
  51.                 // 线程池状态为RUNNING或者(线程池状态为SHUTDOWN并且没有新任务时)
  52.                 if (rs < SHUTDOWN ||
  53.                     (rs == SHUTDOWN && firstTask == null)) {
  54.                     if (t.isAlive()) // 检查线程是否处于活跃状态
  55.                         throw new IllegalThreadStateException();
  56.                     workers.add(w); // 线程加入到存放工作线程的HashSet容器,workers全局唯一并被mainLock持有
  57.                     int s = workers.size();
  58.                     if (s > largestPoolSize)
  59.                         largestPoolSize = s;
  60.                     workerAdded = true;
  61.                 }
  62.             } finally {
  63.                 mainLock.unlock(); // finally块中释放锁
  64.             }
  65.             if (workerAdded) { // 线程添加成功
  66.                 t.start(); // 调用线程的start()方法
  67.                 workerStarted = true;
  68.             }
  69.         }
  70.     } finally {
  71.         if (! workerStarted) // 如果线程启动失败,则执行addWorkerFailed方法
  72.             addWorkerFailed(w);
  73.     }
  74.     return workerStarted;
  75. }
复制代码
Worker源码

Worker是ThreadPoolExecutor类的内部类,此处只讲最紧张的构造函数和run方法
  1. private void addWorkerFailed(Worker w) {
  2.     final ReentrantLock mainLock = this.mainLock;
  3.     mainLock.lock();
  4.     try {
  5.         if (w != null)
  6.             workers.remove(w); // 线程启动失败时,需将前面添加的线程删除
  7.         decrementWorkerCount(); // ctl变量中的工作线程数-1
  8.         tryTerminate(); // 尝试将线程池转变成TERMINATE状态
  9.     } finally {
  10.         mainLock.unlock();
  11.     }
  12. }
复制代码
Worker实现了Runable接口,在调用start()方法候,实际实行的是run方法Worker实现了Runable接口,在调用start()方法候,实际实行的是run方法
  1. final void tryTerminate() {
  2.     for (;;) {
  3.         int c = ctl.get();
  4.         // 以下情况不会进入TERMINATED状态:
  5.         //(1)当前线程池为RUNNING状态
  6.         //(2)在TIDYING及以上状态
  7.         //(3)SHUTDOWN状态并且工作队列不为空
  8.         //(4)当前活跃线程数不等于0
  9.         if (isRunning(c) ||
  10.             runStateAtLeast(c, TIDYING) ||
  11.             (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  12.             return;
  13.         if (workerCountOf(c) != 0) { // 工作线程数!=0
  14.             interruptIdleWorkers(ONLY_ONE); // 中断一个正在等待任务的线程
  15.             return;
  16.         }
  17.         final ReentrantLock mainLock = this.mainLock;
  18.         mainLock.lock();
  19.         try {
  20.             // 通过CAS自旋判断直到当前线程池运行状态为TIDYING并且活跃线程数为0
  21.             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  22.                 try {
  23.                     terminated(); // 调用线程terminated()
  24.                 } finally {
  25.                     ctl.set(ctlOf(TERMINATED, 0)); // 设置线程池状态为TERMINATED,工作线程数为0
  26.                     termination.signalAll(); // 通过调用Condition接口的signalAll()唤醒所有等待的线程
  27.                 }
  28.                 return;
  29.             }
  30.         } finally {
  31.             mainLock.unlock();
  32.         }
  33.         // else retry on failed CAS
  34.     }
  35. }
复制代码
从使命队列中取出一个使命
  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  2.     // 该worker正在运行的线程
  3.     final Thread thread;
  4.    
  5.     // 将要运行的初始任务
  6.     Runnable firstTask;
  7.    
  8.     // 每个线程的任务计数器
  9.     volatile long completedTasks;
  10.     // 构造方法   
  11.     Worker(Runnable firstTask) {
  12.         setState(-1); // 调用runWorker()前禁止中断
  13.         this.firstTask = firstTask;
  14.         this.thread = getThreadFactory().newThread(this); // 通过ThreadFactory创建一个线程
  15.     }
  16.     // 实现了Runnable接口的run方法
  17.     public void run() {
  18.         runWorker(this);
  19.     }
  20.    
  21.     ... // 此处省略了其他方法
  22. }
复制代码
总结一下哪些情况getTask()会返回null:
工作线程退出
  1. final void runWorker(Worker w) {
  2.     Thread wt = Thread.currentThread();
  3.     Runnable task = w.firstTask; // 获取工作线程中用来执行任务的线程实例
  4.     w.firstTask = null;
  5.     w.unlock(); // status设置为0,允许中断
  6.     boolean completedAbruptly = true; // 线程意外终止标志
  7.     try {
  8.         // 如果当前任务不为空,则直接执行;否则调用getTask()从任务队列中取出一个任务执行
  9.         while (task != null || (task = getTask()) != null) {
  10.             w.lock(); // 加锁,保证下方临界区代码的线程安全
  11.             // 如果状态值大于等于STOP且当前线程还没有被中断,则主动中断线程
  12.             if ((runStateAtLeast(ctl.get(), STOP) ||
  13.                  (Thread.interrupted() &&
  14.                   runStateAtLeast(ctl.get(), STOP))) &&
  15.                 !wt.isInterrupted())
  16.                 wt.interrupt(); // 中断当前线程
  17.             try {
  18.                 beforeExecute(wt, task); // 任务执行前的回调,空实现,可以在子类中自定义
  19.                 Throwable thrown = null;
  20.                 try {
  21.                     task.run(); // 执行线程的run方法
  22.                 } catch (RuntimeException x) {
  23.                     thrown = x; throw x;
  24.                 } catch (Error x) {
  25.                     thrown = x; throw x;
  26.                 } catch (Throwable x) {
  27.                     thrown = x; throw new Error(x);
  28.                 } finally {
  29.                     afterExecute(task, thrown); // 任务执行后的回调,空实现,可以在子类中自定义
  30.                 }
  31.             } finally {
  32.                 task = null; // 将循环变量task设置为null,表示已处理完成
  33.                 w.completedTasks++; // 当前已完成的任务数+1
  34.                 w.unlock();
  35.             }
  36.         }
  37.         completedAbruptly = false;
  38.     } finally {
  39.         processWorkerExit(w, completedAbruptly);
  40.     }
  41. }
复制代码
submit源码

提交使命到线程池有两种方法,一种是execute,另一种是submit。区别是execute没有返回值,submit是有返回值的,假如有非常抛出,submit同样可以获取非常结果。
  1. private Runnable getTask() {
  2.     boolean timedOut = false; // 通过timeOut变量表示线程是否空闲时间超时了
  3.     // 无限循环
  4.     for (;;) {
  5.         int c = ctl.get(); // 线程池信息
  6.         int rs = runStateOf(c); // 线程池当前状态
  7.         // 如果线程池状态>=SHUTDOWN并且工作队列为空 或 线程池状态>=STOP,则返回null,让当前worker被销毁
  8.         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  9.             decrementWorkerCount(); // 工作线程数-1
  10.             return null;
  11.         }
  12.         int wc = workerCountOf(c); // 获取当前线程池的工作线程数
  13.         // 当前线程是否允许超时销毁的标志
  14.         // 允许超时销毁:当线程池允许核心线程超时 或 工作线程数>核心线程数
  15.         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  16.         // 如果(当前线程数大于最大线程数 或 (允许超时销毁 且 当前发生了空闲时间超时))
  17.         // 且(当前线程数大于1 或 阻塞队列为空)
  18.         // 则减少worker计数并返回null
  19.         if ((wc > maximumPoolSize || (timed && timedOut))
  20.             && (wc > 1 || workQueue.isEmpty())) {
  21.             if (compareAndDecrementWorkerCount(c))
  22.                 return null;
  23.             continue;
  24.         }
  25.         try {
  26.             // 根据线程是否允许超时判断用poll还是take(会阻塞)方法从任务队列头部取出一个任务
  27.             Runnable r = timed ?
  28.                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  29.                 workQueue.take();//线程池重用逻辑:没有任务了就阻塞在这里,等待新的任务
  30.             if (r != null)
  31.                 return r; // 返回从队列中取出的任务
  32.             timedOut = true;
  33.         } catch (InterruptedException retry) {
  34.             timedOut = false;
  35.         }
  36.     }
  37. }
复制代码
submit中调用了newTaskFor方法来返回一个ftask对象,然后execute这个ftask对象,newTaskFor代码如下:
  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2.     // 如果completedAbruptly为true则表示任务执行过程中抛出了未处理的异常
  3.     // 所以还没有正确地减少worker计数,这里需要减少一次worker计数
  4.     if (completedAbruptly)
  5.         decrementWorkerCount();
  6.     final ReentrantLock mainLock = this.mainLock;
  7.     mainLock.lock();
  8.     try {
  9.         // 把将被销毁的线程已完成的任务数累加到线程池的完成任务总数上
  10.         completedTaskCount += w.completedTasks;
  11.         workers.remove(w); // 从工作线程集合中移除该工作线程
  12.     } finally {
  13.         mainLock.unlock();
  14.     }
  15.     // 尝试结束线程池
  16.     tryTerminate();
  17.     int c = ctl.get();
  18.     // 如果是RUNNING 或 SHUTDOWN状态
  19.     if (runStateLessThan(c, STOP)) {
  20.         // worker是正常执行完
  21.         if (!completedAbruptly) {
  22.             // 如果允许核心线程超时则最小线程数是0,否则最小线程数等于核心线程数
  23.             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  24.             // 如果阻塞队列非空,则至少要有一个线程继续执行剩下的任务
  25.             if (min == 0 && ! workQueue.isEmpty())
  26.                 min = 1;
  27.             // 如果当前线程数已经满足最小线程数要求,则不需要再创建替代线程
  28.             if (workerCountOf(c) >= min)
  29.                 return; // replacement not needed
  30.         }
  31.         // 重新创建一个worker来代替被销毁的线程
  32.         addWorker(null, false);
  33.     }
  34. }
复制代码
newTaskFor又调用FutureTask的有参构造器来创建一个futureTask实例,代码如下:
  1. // AbstractExecutorService.submit
  2. public Future<?> submit(Runnable task) {
  3.     if (task == null) throw new NullPointerException();
  4.     RunnableFuture<Void> ftask = newTaskFor(task, null);
  5.     execute(ftask);
  6.     return ftask;
  7. }
复制代码
这个有参构造器中又调用了Executors的静态方法callable创建一个callable实例来赋值给futureTask的callable属性,代码如下:
  1. // AbstractExecutorService.newTaskFor
  2. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  3.     return new FutureTask<T>(runnable, value);
  4. }
复制代码
末了还是使用了RunnableAdapter来包装这个task,代码如下:
  1. // FutureTask有参构造器
  2. public FutureTask(Runnable runnable, V result) {
  3.     this.callable = Executors.callable(runnable, result);
  4.     this.state = NEW;       // ensure visibility of callable
  5. }
复制代码
梳理一下整个流程,run和call的关系的伪代码如下
  1. // Executors.callable
  2. public static <T> Callable<T> callable(Runnable task, T result) {
  3.     if (task == null)
  4.         throw new NullPointerException();
  5.     return new RunnableAdapter<T>(task, result);
  6. }
复制代码
为什么要这么麻烦封装一层又一层呢?
可能是为了适配。submit的返回值是futureTask,但是传给submit的是个runnable,然后submit会把这个runnable继承传给futureTask,futureTask的结果值是null,但是又由于futureTask的run方法已经被重写成实行call方法了,所以只能在call方法内里跑真正的run方法了
线程池提交一个使命占多大内存

提交使命到线程池有两种方法,一种是execute,一种是submit
那么这两种提交方式占用的内存是一样大的吗?一个空使命究竟占多少内存?
execute提交

使用execute提交一个使命,这个使命究竟多大呢?
使用得最多的就是使用lambda表达式来提交使命
  1. // Executors.RunnableAdapter类
  2. static final class RunnableAdapter<T> implements Callable<T> {
  3.     final Runnable task;
  4.     final T result;
  5.     RunnableAdapter(Runnable task, T result) {
  6.         this.task = task;
  7.         this.result = result;
  8.     }
  9.     public T call() {
  10.         task.run();
  11.         return result;
  12.     }
  13. }
复制代码
那么这个lambda实例占用多少个字节呢?16字节;在开了指针压缩的情况下,对象头占12个字节,4个字节用于填充补齐到8的整数倍,由于这个lambda实例中没有其他成员变量了,所以它就是占据16个字节
除此之外,假如使用的是LinkedBlockingQueue阻塞队列来存放使命,那么还涉及到LinkedBlockingQueue中的Node,LinkedBlockingQueue会使用这个Node来封装使命
  1. // submit
  2. run(){
  3.     // RunnableAdapter.call
  4. call(){
  5.         // task.run
  6.   run(){
  7.    // 实际的任务
  8.   }
  9. }
  10. }
复制代码
这个Node占多少字节呢?24个字节;同样对象头占12字节,item是一个4字节的引用,next也是一个4字节的引用,一共20字节,4个字节用于填充对齐,所以一个node对象是24字节。
所以在使用execute且阻塞队列是LinkedBlockingQueue时一个使命占用40个字节,假如execute 20 w个使命,会占用 800w 个字节,约7.6MB内存
堆快照如下:

假如是使用ArrayBlockingQueue的话,只有lambda实例这一个开销,所以只会使用320w个字节,约3.05MB内存,比起LinkedBlockingQueue少了一倍不止

submit提交

根据上面源码分析得知,假如是用了submit方法之后,会多出两类对象,一个是FutureTask,一个是RunnableAdapter。
FutureTask的成员变量如下:
  1. threadPoolExecutor.execute(() -> {
  2. // ...
  3. });
复制代码
一个FutureTask对象占用的字节数是12+4+4+4+4=28个字节,还必要4个字节做填充,所以一共是32个字节。
RunnableTask的成员变量如下:
  1. static class Node<E> {
  2.     E item;
  3.     /**
  4.      * One of:
  5.      * - the real successor Node
  6.      * - this Node, meaning the successor is head.next
  7.      * - null, meaning there is no successor (this is the last node)
  8.      */
  9.     Node<E> next;
  10.     Node(E x) { item = x; }
  11. }
复制代码
一个RunnableTask对象占用的字节数是12+4+4=20个字节,同样必要4个字节做填充,所以一共是24个字节。
所以在使用submit且阻塞队列是LinkedBlockingQueue时一个使命占用96个字节
假如submit 20w个使命,会占用1920w个字节,约18.31MB内存

假如使用的是ArrayBlockingQueue会省去Node的占用的内存。
lambda中没有使用上下文中的其他变量
  1. /** The underlying callable; nulled out after running */
  2. private Callable<V> callable;
  3. /** The result to return or exception to throw from get() */
  4. private Object outcome; // non-volatile, protected by state reads/writes
  5. /** The thread running the callable; CASed during run() */
  6. private volatile Thread runner;
  7. /** Treiber stack of waiting threads */
  8. private volatile WaitNode waiters;
复制代码
假如在lambda中没有使用到上下文的其他变量时,是不会重复创建lambda实例的,只会创建一个

只会创建一个lambda实例

假如配合上ArrayBlockingQueue以及execute,提交20w个使命的空间复杂度可以降至O(1)
因为20w个使命的实例都是同一个

小结

假如是lambda中没有上下文变量,使用的队列是ArrayBlockingQueue,提交方式是execute,那么空间复杂度可以达到O(1);假如lambda中有上下文变量,每次提交使命都会创建一个新的lambda实例;
假如使用的队列是LinkedBlockingQueue,那么还要算上LinkedBlockingQueue的Node实例的开销;假如提交的方式是submit,那么还要算上FutureTask和RunnableAdapter的开销。
当然这里只是浅层地讨论了一下创建一个空使命所占用的内存大小,假如是更加复杂的使命,使命内的内存开销必要算上。
线程池中线程非常后,销毁还是复用

先说结论,必要分两种情况:
简单来说:使用execute()时,未捕获非常导致线程终止,线程池创建新线程替代;使用submit()时,非常被封装在Future中,线程继承复用。
这种设计允许submit()提供更灵活的错误处理机制,因为它允许调用者决定如那边理非常,而execute()则实用于那些不必要关注实行结果的场景。
execute()提交

查看execute方法的实行逻辑


可以发现,假如抛出非常,execute()提交的方式会移除抛出非常的线程,创建新的线程。
submit()提交


可以发现,submit也是调用了execute方法,但是在调用之前,包装了一层 RunnableFuture,那肯定是在RunnableFuture的实现 FutureTask中有特殊处理了,我们查看源码可以发现。

但是,
也就是说,通过java.util.concurrent.FutureTask#get(),就可以获取对应的非常信息。
线程池是如何保活和回收的

线程池的作用就是提高线程的利用率,必要线程时,可以直接从线程池中获取线程直接使用,而不用创建线程,那线程池中的线程,在没有使命实行时,是如何保活的呢?
在runWorker方法里,线程会循环getTask()获取阻塞队列中的使命。

不停地的从阻塞队列中获取使命,主要调用的是workQueue.poll()方法或take(), 这两个方法都会阻塞式的从队列中获取元素,区别是poll()方法可以设置一个超时时间, take()不能设置超时时间,所以这也间接的使得线程池中的线程阻塞等候从而达到保活的效果。

当然并不是线程池中的全部线程都必要一直保活,比如只有核心线程必要保活,非核心线程就不必要保活,那非核心线程是怎么回收的呢?
底层是这样的,当一个线程处理完当前使命后,就会开始去阻塞队列中获取使命,只不过,在调用poll或take方法之前, 会判定当前线程池中有多少个线程,假如多余核心线程数(也就是wc > corePlloSize),那么timed为true,此时当前线程就会调用poll()并设置超时时间来获取阻塞队列中的使命,这样一旦时间到了还没有获取到使命,那么poll方法获取到的r就是null,返回给上一级,runWorker()里的getTask方法就获取到null了,此时while循环就会退出。那么就会调用processWorkerExit()方法,remove当前线程

这里实在可以看到timed还有一个参数,allowCoreThreadTimeOut,这个主要是用来控制核心线程是否可以回收,默认是false,上面是讨论默认值false的情况,即核心线程不会超时。假如为true,工作线程可以全部销毁
实际上,固然有核心线程数,但线程并没有区分是核心还是非核心,并不是先创建的就是核心,凌驾核心线程数后创建的就是非核心,最终保留哪些线程,完全随机。
线程池的关闭

shutdown方法会将线程池的状态设置为SHUTDOWN,线程池进入这个状态后,就拒绝再接受使命,然后会将剩余的使命全部实行完
  1. final Runnable task;
  2. final T result;
复制代码
shutdownNow做的比较绝,它先将线程池状态设置为STOP,然后拒绝全部提交的使命。末了中断左右正在运行中的worker,然后清空使命队列。
  1. public static void main(String[] args) {
  2.     ThreadPoolExecutor threadPoolExecutor =
  3.             new ThreadPoolExecutor(8, 8, 15, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());
  4.     for (int i = 0; i < (int) 2e5; i++) {
  5.         // int finalI = i;
  6.         threadPoolExecutor.submit(() -> {
  7.             // 乱写...
  8.             // int p = finalI;
  9.             LockSupport.park();
  10.         });
  11.     }
  12.     LockSupport.park();
  13. }
复制代码
池化带来的问题

当然,解决方法就是确保每次使用池化资源后,资源状态被正确重置,避免污染;正确回收,避免泄露。而且通过监控池化资源的使用情况,实时调优配置,以适应不同的负载和需求。
线程池大小怎么设置?

首先应该明确线程池大小设置的目标是什么?实在就是为了提高 CPU 的利用率
CPU利用率=CPU有效工作时间/CPU总的运行时间
假如线程池线程数目太小,当有大量请求必要处理,必要创建非核心线程池处理,导致系统响应比较慢,会影响用户体验,甚至会出现使命队列大量堆积使命导致OOM。
假如核心线程池数目太大,会导致其一直阻塞在那里等候使命队列的使命来实行,斲丧内存。而且大量线程可能会同时抢占 CPU 资源,这样会导致大量的上下文切换,从而增加线程的实行时间,影响了实行效率。
根据线程数设置依据

最大线程数:原则上就是性能最高线程数,因为此时性能已经是最高,再设置比他大的线程数反而性能变低。极度情况下才会使用到最大线程数,正常情况下不应频繁出现凌驾核心线程数的创建。
核心线程数:基于性能考虑,及其他业务处理的最优效率考虑,估算平时的流量必要的线程数,设置核心线程数
阻塞队列:估算最大流量,设置阻塞队列长度
留意:必要通过压力测试来进行微调,只有经过压测的检验,才气最终包管的配置大小是准确的。
一般情况设置依据

一般用来计算核心线程数
CPU 密集型使命(N+1):

这种使命斲丧的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,多出来的一个线程是为了防止某些原因导致的线程阻塞(如IO操纵,线程sleep,等候锁)而带来的影响。一旦某个线程被阻塞,开释了CPU资源,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型使命(2N):

系统的大部分时间都在处理 IO 操纵,此时线程可能会被阻塞,开释CPU资源,这时就可以将 CPU 交出给其它线程使用。因此在 IO 密集型使命的应用中,可以多配置一些线程,具体的计算方法:最佳线程数 = CPU核心数 * (1/CPU利用率) = CPU核心数 * (1 + (IO耗时/CPU耗时)),一般可设置为2N
ExecutorService 线程池实例

但是阿里为什么不推荐使用Executors来创建线程池,这是为了让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
FixedThreadPool:

固定线程数的线程池。任何时间点,最多只有 nThreads 个线程处于活动状态实行使命。
  1. public void shutdown() {
  2.     final ReentrantLock mainLock = this.mainLock;
  3.     mainLock.lock();
  4.     try {
  5.         //检查是否可以关闭线程
  6.         checkShutdownAccess();
  7.         //设置线程池状态
  8.         advanceRunState(SHUTDOWN);
  9.         //尝试中断worker
  10.         interruptIdleWorkers();
  11.             //预留方法,留给子类实现
  12.         onShutdown(); // hook for ScheduledThreadPoolExecutor
  13.     } finally {
  14.         mainLock.unlock();
  15.     }
  16.     tryTerminate();
  17. }
  18. private void interruptIdleWorkers() {
  19.     interruptIdleWorkers(false);
  20. }
  21. private void interruptIdleWorkers(boolean onlyOne) {
  22.     final ReentrantLock mainLock = this.mainLock;
  23.     mainLock.lock();
  24.     try {
  25.         //遍历所有的worker
  26.         for (Worker w : workers) {
  27.             Thread t = w.thread;
  28.             //先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它
  29.             //注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能
  30.             //它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false
  31.             if (!t.isInterrupted() && w.tryLock()) {
  32.                 try {
  33.                     t.interrupt();
  34.                 } catch (SecurityException ignore) {
  35.                 } finally {
  36.                     w.unlock();
  37.                 }
  38.             }
  39.             if (onlyOne)
  40.                 break;
  41.         }
  42.     } finally {
  43.         mainLock.unlock();
  44.     }
  45. }
复制代码
为什么不建议使用?
使用无界队列 LinkedBlockingQueue(队列容量为 Integer.MAX_VALUE),运行中的线程池不会拒绝使命,即不会调用RejectedExecutionHandler.rejectedExecution()方法。maxThreadPoolSize 是无效参数,故将它的值设置为与 coreThreadPoolSize 同等。当添加使命的速度大于线程池处理使命的速度,可能会在队列堆积大量的请求,斲丧很大的内存,甚至导致OOM。
keepAliveTime 也是无效参数,设置为0L,因为此线程池里全部线程都是核心线程,核心线程不会被回收(除非设置了executor.allowCoreThreadTimeOut(true))。
实用场景:实用于处理CPU密集型的使命,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即实用实行长期的使命。必要留意的是,FixedThreadPool 不会拒绝使命,在使命比较多的时间会导致 OOM。
SingleThreadExecutor

只有一个线程的线程池。
  1. public List<Runnable> shutdownNow() {
  2.     List<Runnable> tasks;
  3.     final ReentrantLock mainLock = this.mainLock;
  4.     mainLock.lock();
  5.     try {
  6.         checkShutdownAccess();
  7.         //检测权限
  8.         advanceRunState(STOP);
  9.         //中断所有的worker
  10.         interruptWorkers();
  11.         //清空任务队列
  12.         tasks = drainQueue();
  13.     } finally {
  14.         mainLock.unlock();
  15.     }
  16.     tryTerminate();
  17.     return tasks;
  18. }
  19. private void interruptWorkers() {
  20.     final ReentrantLock mainLock = this.mainLock;
  21.     mainLock.lock();
  22.     try {
  23.         //遍历所有worker,然后调用中断方法
  24.         for (Worker w : workers)
  25.             w.interruptIfStarted();
  26.     } finally {
  27.         mainLock.unlock();
  28.     }
  29. }
复制代码
为什么不建议使用?
使用无界队列 LinkedBlockingQueue。线程池只有一个运行的线程,新来的使命放入工作队列,线程处理完使命就循环从队列里获取使命实行。包管顺序的实行各个使命。
实用场景:实用于串行实行使命的场景,一个使命一个使命地实行。在使命比较多的时间也是会导致 OOM。
CachedThreadPool
  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2.     return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
  3. }
复制代码
为什么不建议使用?
core是0,最大线程数是Integer.MAX_VALUE,因此当添加使命的速度大于线程池处理使命的速度,可能会创建大量的线程,极度情况下,这样会导致耗尽 cpu 和内存资源,甚至导致OOM。
使用没有容量的SynchronousQueue作为线程池工作队列,当线程池有空闲线程时,SynchronousQueue.offer(Runnable task)提交的使命会被空闲线程处理,否则会创建新的线程处理使命。
实用场景:用于并发实行大量短期的小使命。CachedThreadPool允许创建的线程数目为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
ScheduledThreadPoolExecutor
  1. public static ExecutionService newSingleThreadExecutor() {
  2.     return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
  3. }
复制代码
在给定的耽误后运行使命,或者定期实行使命。
为什么不建议使用?
最大线程数是Integer.MAX_VALUE,因此当添加使命的速度大于线程池处理使命的速度,可能会创建大量的线程,极度情况下,这样会导致耗尽 cpu 和内存资源,甚至导致OOM。
关于作者

来自一线程序员Seven的探索与实践,持续学习迭代中~
本文已收录于我的个人博客:https://www.seven97.top
公众号:seven97,欢迎关注~

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4