多线程系列(十四) -一文带你搞懂线程池技能

打印 上一主题 下一主题

主题 965|帖子 965|积分 2895

一、前言

虽然 Java 对线程的创建、中断、等待、通知、销毁、同步等功能提供了许多的支持,但是从操作体系角度来说,频繁的创建线程和销毁线程,其实是必要大量的时间和资源的。
例如,当有多个使命同时必要处理惩罚的时间,一个使命对应一个线程来执行,以此来提拔使命的执行效率,模子图如下:

如果使命数非常少,这种模式倒题目不大,但是如果使命数非常的多,可能就会存在很大的题目:

  • 1.线程数不可控:随着使命数的增多,线程数也会增多,这些线程都没办法进行统一管理
  • 2.体系的开销很大:创建线程对体系来说开销很高,随着线程数也会增多,可能会出现体系资源紧张的题目,严重的情况体系可能直接死机
假如把许多使命让一组线程来执行,而不是一个使命对应一个新线程,这种通过接受使命并进行分发处理惩罚的就是线程池

线程池内部维护了若干个线程,当没有使命的时间,这些线程都处于等待状态;当有新的使命进来时,就分配一个空闲线程执行;当所有线程都处于忙碌状态时,新使命要么放入队列中等待,要么增长一个新线程进行处理惩罚,要么直接拒绝。
很显然,这种通过线程池来执行多使命的思路,优势明显:

  • 1.资源更加可控:能有效的控制线程数,防止线程数过多,导致体系资源紧张
  • 2.资源消耗更低:因为线程可以复用,可以有效的低落创建和销毁线程的时间和资源
  • 3.执行效率更高:当新的使命进来时,可以不必要等待线程的创创建刻执行
关于这一点,我们可以看一个简单的对比示例。
  1. /**
  2. * 使用一个任务对应一个线程来执行
  3. * @param args
  4. */
  5. public static void main(String[] args) {
  6.     long startTime = System.currentTimeMillis();
  7.     final Random random = new Random();
  8.     List<Integer> list = new CopyOnWriteArrayList<>();
  9.     // 一个任务对应一个线程,使用20000个线程执行任务
  10.     for (int i = 0; i < 20000; i++) {
  11.         new Thread(new Runnable() {
  12.             @Override
  13.             public void run() {
  14.                 list.add(random.nextInt(100));
  15.             }
  16.         }).start();
  17.     }
  18.     // 等待任务执行完毕
  19.     while (true){
  20.         if(list.size() >= 20000){
  21.             break;
  22.         }
  23.     }
  24.     System.out.println("一个任务对应一个线程,执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
  25. }
复制代码
  1. /**
  2. * 使用线程池进行执行任务
  3. * @param args
  4. */
  5. public static void main(String[] args) {
  6.     long startTime = System.currentTimeMillis();
  7.     final Random random = new Random();
  8.     List<Integer> list = new CopyOnWriteArrayList<>();
  9.     // 使用线程池进行执行任务,默认4个线程
  10.     ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20000));
  11.     for (int i = 0; i < 20000; i++) {
  12.             // 提交任务
  13.         executor.submit(new Runnable() {
  14.             @Override
  15.             public void run() {
  16.                 list.add(random.nextInt(100));
  17.             }
  18.         });
  19.     }
  20.     // 等待任务执行完毕
  21.     while (true){
  22.         if(list.size() >= 20000){
  23.             break;
  24.         }
  25.     }
  26.     System.out.println("使用线程池,执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
  27.     // 关闭线程池
  28.     executor.shutdown();
  29. }
复制代码
两者执行耗时情况对比,如下:
  1. 一个任务对应一个线程,执行耗时:3073ms
  2. ---------------------------
  3. 使用线程池,执行耗时:578ms
复制代码
从结果上可以看出,同样的使命数,接纳线程池和不接纳线程池,执行耗时差距非常明显,一个使命对应一个新的线程来执行,反而效率不如接纳 4 个线程的线程池执行的快。
为什么会产生这种现象,下面我们就一起来聊聊线程池。
二、线程池概述

站在专业的角度讲,线程池其实是一种利用池化头脑来实现线程管理的技能,它将线程的创建和使命的执行进行解耦,同时复用已经创建的线程来低落频繁创建和销毁线程所带来的资源消耗。通过公道的参数设置,可以实现更低的体系资源利用率、更高的使命并发执行效率。
在 Java 中,线程池最顶级的接口是Executor,名下的实现类关系图如下:

关键接口和实现类,相关的描述如下:

  • 1.Executor是最顶级的接口,它的作用是将使命的执行和线程的创建进行抽象解藕
  • 2.ExecutorService接口继承了Executor接口,在Executor的基础上,增长了一些关于管理线程池的一些方法,好比查察使命的状态、获取线程池的状态、制止线程池等尺度方法
  • 3.ThreadPoolExecutor是一个线程池的焦点实现类,完备的封装了线程池相关的操作方法,通过它可以创建线程池
  • 4.ScheduledThreadPoolExecutor是一个利用线程池的定时调度实现类,完备的封装了定时调度相关的操作方法,通过它可以创建周期性线程池
整个关系图中,其中ThreadPoolExecutor是线程池最焦点的实现类,开发者可以利用它来创建线程池。
2.1、ThreadPoolExecutor 构造方法

ThreadPoolExecutor类的完备构造方法一共有七个参数,明白这些参数的配置对利用好线程池至关重要,完备的构造方法焦点源码如下:
  1. public ThreadPoolExecutor(int corePoolSize,
  2.                           int maximumPoolSize,
  3.                           long keepAliveTime,
  4.                           TimeUnit unit,
  5.                           BlockingQueue<Runnable> workQueue,
  6.                           ThreadFactory threadFactory,
  7.                           RejectedExecutionHandler handler)
复制代码
各个参数的解读如下:

  • corePoolSize:焦点线程数目,用于执行使命的焦点线程数。
  • maximumPoolSize:最大线程数目,线程池中答应创建线程的最大数目
  • keepAliveTime:空闲线程存活的时间。只有当线程池中的线程数大于 corePoolSize 时,这个参数才会起作用
  • unit:空闲线程存活的时间单位
  • workQueue:使命队列,用于存储还没来得及执行的使命
  • threadFactory:线程工厂。用于执行使命时创建新线程的工厂
  • handler:拒绝策略,当线程池和和队列容量处于饱满,利用某种策略来拒绝使命提交
2.2、ThreadPoolExecutor 执行流程

创建完线程池之后就可以提交使命了,当有新的使命进来时,线程池就会工作并分配线程去执行使命。
ThreadPoolExecutor的典范用法如下:
  1. // 创建固定大小的线程池
  2. ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100));
  3. // 提交任务
  4. executor.execute(task1);
  5. executor.execute(task2);
  6. executor.execute(task3);
  7. ...
复制代码
针对使命的提交方式,ThreadPoolExecutor还提供了两种方法。

  • execute()方法:一种无返回值的方法,也是最焦点的使命提交方法
  • submit()方法:支持有返回值,通过FutureTask对象来获取使命执行完后的返回值,底层依然调用的是execute()方法
ThreadPoolExecutor执行提交的使命流程虽然比较复杂,但是通过对源码的分析,大抵的使命执行流程,可以用如下图来概括。

整个执行流程,大体步骤如下:

  • 1.初始化完线程池之后,默认情况下,线程数为0,当有使命到来后才会创建新线程去执行使命
  • 2.每次收到提交的使命之后,会先检考焦点线程数是否已满,如果没有,就会继承创建新线程来执行使命,直到焦点线程数达到设定值
  • 3.当焦点线程数已满,会检查使命队列是否已满,如果没有,就会将使命存储到阻塞使命队列中
  • 4.当使命队列已满,会再次检查线程池中的线程数是否达到最大值,如果没有,就会创建新的线程来执行使命
  • 5.如果使命队列已满、线程数已达到最大值,此时线程池已经无法再接受新的使命,当收到使命之后,会执行拒绝策略
我们再转头来看上文提到的ThreadPoolExecutor构造方法中的七个参数,这些参数会直接影响线程的执行情况,各个参数的变革情况,可以用如下几点来概括:

  • 1.当线程池中的线程数小于 corePoolSize 时,新使命都不排队而是直接创新新线程来执行
  • 2.当线程池中的线程数大于即是 corePoolSize,workQueue 未满时,将新使命添加到 workQueue 中而不是创建新线程来执行
  • 3.当线程池中的线程数大于即是 corePoolSize,workQueue 已满,但是线程数小于 maximumPoolSize 时,此时会创建新的线程来处理惩罚被添加的使命
  • 4.当线程池中的线程数大于即是 maximumPoolSize,而且 workQueue 已满,新使命会被拒绝,利用 handler 执行被拒绝的使命
ThreadPoolExecutor执行使命的部门焦点源码如下!
2.2.1、execute 提交使命
  1. public void execute(Runnable command) {
  2.     if (command == null)
  3.         throw new NullPointerException();
  4.     int c = ctl.get();
  5.         // 工作线程数量 < corePoolSize,直接创建线程执行任务
  6.     if (workerCountOf(c) < corePoolSize) {
  7.         if (addWorker(command, true))
  8.             return;
  9.         c = ctl.get();
  10.     }
  11.         // 工作线程数量 >= corePoolSize,将任务添加至阻塞队列中
  12.     if (isRunning(c) && workQueue.offer(command)) {
  13.         int recheck = ctl.get();
  14.                 // 往阻塞队列中添加任务的时候,如果线程池非运行状态,将任务remove,并执行拒绝策略
  15.         if (! isRunning(recheck) && remove(command))
  16.             reject(command);
  17.         else if (workerCountOf(recheck) == 0)
  18.             addWorker(null, false);
  19.     }
  20.     // 阻塞队列已满,尝试添加新的线程去执行,如果工作线程数量 >= maximumPoolSize,执行拒绝策略
  21.     else if (!addWorker(command, false))
  22.         reject(command);
  23. }
复制代码
2.2.2、addWorker 创建线程加入线程池
  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2.     retry:
  3.     for (;;) {
  4.         int c = ctl.get();
  5.         int rs = runStateOf(c);
  6.                 // 线程池状态处于非 RUNNING 状态,添加worker失败
  7.         if (rs >= SHUTDOWN &&
  8.             ! (rs == SHUTDOWN &&
  9.                firstTask == null &&
  10.                ! workQueue.isEmpty()))
  11.             return false;
  12.                 // 判断线程池中线程数量大于等于该线程池允许的最大线程数量,如果大于则worker失败,反之cas更新线程池中的线程数
  13.         for (;;) {
  14.             int wc = workerCountOf(c);
  15.             if (wc >= CAPACITY ||
  16.                 wc >= (core ? corePoolSize : maximumPoolSize))
  17.                 return false;
  18.             if (compareAndIncrementWorkerCount(c))
  19.                 break retry;
  20.             c = ctl.get();  // Re-read ctl
  21.             if (runStateOf(c) != rs)
  22.                 continue retry;
  23.             // else CAS failed due to workerCount change; retry inner loop
  24.         }
  25.     }
  26.     boolean workerStarted = false;
  27.     boolean workerAdded = false;
  28.     Worker w = null;
  29.     try {
  30.                 // 创建工作线程
  31.         w = new Worker(firstTask);
  32.         final Thread t = w.thread;
  33.         if (t != null) {
  34.             final ReentrantLock mainLock = this.mainLock;
  35.             mainLock.lock();
  36.             try {
  37.                 int rs = runStateOf(ctl.get());
  38.                 if (rs < SHUTDOWN ||
  39.                     (rs == SHUTDOWN && firstTask == null)) {
  40.                         // 如果线程池处于 RUNNING 状态并且线程已经启动,则抛出线程异常启动
  41.                     if (t.isAlive())
  42.                         throw new IllegalThreadStateException();
  43.                                         // 将线程加入已创建的工作线程集合,更新用于追踪线程池中线程数量 largestPoolSize 字段
  44.                     workers.add(w);
  45.                     int s = workers.size();
  46.                     if (s > largestPoolSize)
  47.                         largestPoolSize = s;
  48.                     workerAdded = true;
  49.                 }
  50.             } finally {
  51.                 mainLock.unlock();
  52.             }
  53.             if (workerAdded) {
  54.                                 // 启动线程执行任务
  55.                 t.start();
  56.                 workerStarted = true;
  57.             }
  58.         }
  59.     } finally {
  60.         if (! workerStarted)
  61.             addWorkerFailed(w);
  62.     }
  63.     return workerStarted;
  64. }
复制代码
2.2.3、runWorker 执行使命
  1. final void runWorker(Worker w) {
  2.         // 获取执行任务线程
  3.     Thread wt = Thread.currentThread();
  4.     // 获取执行任务
  5.     Runnable task = w.firstTask;
  6.         // 将worker中的任务置空
  7.     w.firstTask = null;
  8.     w.unlock(); // allow interrupts
  9.     boolean completedAbruptly = true;
  10.     try {
  11.             // 从当前工作线程种获取任务,或者循环从阻塞任务队列中获取任务
  12.         while (task != null || (task = getTask()) != null) {
  13.             w.lock();
  14.                         // 双重检查线程池是否正在停止,如果线程池停止,并且当前线程能够中断,则中断线程
  15.             if ((runStateAtLeast(ctl.get(), STOP) ||
  16.                  (Thread.interrupted() &&
  17.                   runStateAtLeast(ctl.get(), STOP))) &&
  18.                 !wt.isInterrupted())
  19.                 wt.interrupt();
  20.             try {
  21.                                 // 前置执行任务钩子函数
  22.                 beforeExecute(wt, task);
  23.                 Throwable thrown = null;
  24.                 try {
  25.                                         // 执行当前任务
  26.                     task.run();
  27.                 } catch (RuntimeException x) {
  28.                     thrown = x; throw x;
  29.                 } catch (Error x) {
  30.                     thrown = x; throw x;
  31.                 } catch (Throwable x) {
  32.                     thrown = x; throw new Error(x);
  33.                 } finally {
  34.                                         // 后置执行任务钩子函数
  35.                     afterExecute(task, thrown);
  36.                 }
  37.             } finally {
  38.                 task = null;
  39.                 w.completedTasks++;
  40.                 w.unlock();
  41.             }
  42.         }
  43.         completedAbruptly = false;
  44.     } finally {
  45.                 // 回收线程
  46.         processWorkerExit(w, completedAbruptly);
  47.     }
  48. }
复制代码
2.2.4、reject 执行拒绝策略
  1. final void reject(Runnable command) {
  2.         // 执行拒绝策略
  3.     handler.rejectedExecution(command, this);
  4. }
复制代码
当线程池中的线程数大于即是 maximumPoolSize,而且 workQueue 已满,新使命会被拒绝,利用RejectedExecutionHandler接口的rejectedExecution()方法来处理惩罚被拒绝的使命。
线程池提供了四种拒绝策略实现类来拒绝使命,具体如下:
类描述AbortPolicy直接抛出一个RejectedExecutionException,这也是JDK默认的拒绝策略DiscardPolicy什么也不做,直接丢弃使命DiscardOldestPolicy将阻塞队列中的使命移除出来,然后执行当前使命CallerRunsPolicy尝试直接运行被拒绝的使命,如果线程池已经被关闭了,使命就被丢弃了2.3、ThreadPoolExecutor 线程池状态

我们知道 Java 种的线程一共 6 种状态,其实线程池也有状态。
因为线程池也是异步执行的,有的使命正在执行,有的使命存储在使命队列中,有的线程处于工作状态,有的线程处于空闲状态等待回收,为了更加精致化的管理线程池,线程池也设计了 5 中状态,部门焦点源码如下:
[code]public class ThreadPoolExecutor extends AbstractExecutorService {        // 线程池线程数的bit数        private static final int COUNT_BITS = Integer.SIZE - 3;                // 线程池状态        private static final int RUNNING    = -1

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

勿忘初心做自己

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表