ToB企服应用市场:ToB评测及商务社交产业平台

标题: 12分钟从Executor自顶向下彻底搞懂线程池 [打印本页]

作者: 勿忘初心做自己    时间: 2023-9-8 21:37
标题: 12分钟从Executor自顶向下彻底搞懂线程池
前言

上篇文章 13分钟聊聊并发包中常用同步组件并手写一个自定义同步组件 聊到并发包中常用的同步组件,并且还手把手实现了自定义的同步组件
本篇文章来聊聊并发包下的另一个核心-线程池
阅读本文大概12分钟
通读本篇文章前先来看看几个问题,看看你是否以及理解线程池
池化技术

线程的创建、销毁都会带来一定的开销
如果当我们需要使用到多线程时再去创建,使用完又去销毁,这样去使用不仅会拉长业务流程,还会增加创建、销毁线程的开销
于是有了池化技术的思想,将线程提前创建出来,放在一个池子(容器)中进行管理
当需要使用时,从池子里拿取一个线程来执行任务,执行完毕后再放回池子
不仅是线程有池化的思想,连接也有池化的思想,也就是连接池
池化技术不仅能复用资源、提高响应,还方便管理
Executor框架

Executor框架是什么?
可以暂时把Executor看成线程池的抽象,它定义如何去执行任务
  1.   public interface Executor {
  2.       void execute(Runnable command);
  3.   }
复制代码
Executor将工作任务与线程池进行分离解耦

工作任务被分为两种:无返回结果的Runnable和有返回结果的Callable
在线程池中允许执行这两种任务,其中它们都是函数式接口,可以使用lambda表达式来实现
有的同学可能会有疑问,上文Executor框架定义的执行方法不是只允许传入Runnable任务吗?
那Callable任务调用哪个方法来执行呢?
Future接口用来定义获取异步任务的结果,它的实现类常是FutureTask
FutureTask实现Runnable的同时,还用字段存储Callable,在其实现Runnable时实际上会去执行Callable任务
线程池在执行Callable任务时,会将使用FutureTask将其封装成Runnable执行(具体源码我们后面再聊),因此Executor的执行方法入参只有Runnable
FutureTask相当于适配器,将Callable转换为Runnable再进行执行

Executor 定义线程池,而它的重要实现是ThreadPoolExecutor
在ThreadPoolExecutor的基础上,还有个做定时的线程池ScheduledThreadPoolExecutor

ThreadPoolExecutor

核心参数

ThreadPoolExecutor主要有七个重要的参数
  1.   public ThreadPoolExecutor(int corePoolSize,
  2.                                 int maximumPoolSize,
  3.                                 long keepAliveTime,
  4.                                 TimeUnit unit,
  5.                                 BlockingQueue<Runnable> workQueue,
  6.                                 ThreadFactory threadFactory,
  7.                                 RejectedExecutionHandler handler)
复制代码
拒绝策略作用AbortPolicy 默认抛出异常CallerRunsPolicy调用线程来执行任务DiscardPolicy不处理,丢弃DiscardOldestPolicy丢弃队列中最近一个任务,并立即执行当前任务线程池中除了构造时的核心参数外,还使用内部类Worker来封装线程和任务,并使用HashSet容器workes工作队列存储工作线程worker
实现原理

流程图

为了清晰的理解线程池实现原理,我们先用流程图和总结概述原理,最后来看源码实现

execute

线程池有两种提交方式execute和submit,其中submit会封装成RunnableFuture最终都来执行execute
  1.       public <T> Future<T> submit(Callable<T> task) {
  2.           if (task == null) throw new NullPointerException();
  3.           RunnableFuture<T> ftask = newTaskFor(task);
  4.           execute(ftask);
  5.           return ftask;
  6.       }
复制代码
execute中实现线程池的整个运行流程
  1.   public void execute(Runnable command) {
  2.       //任务为空直接抛出空指针异常
  3.       if (command == null)
  4.           throw new NullPointerException();
  5.       //ctl是一个整型原子状态,包含workerCount工作线程数量 和 runState是否运行两个状态
  6.       int c = ctl.get();
  7.       //1.如果工作线程数 小于 核心线程数 addWorker创建工作线程
  8.       if (workerCountOf(c) < corePoolSize) {
  9.           if (addWorker(command, true))
  10.               return;
  11.           c = ctl.get();
  12.       }
  13.      
  14.       // 2.工作线程数 大于等于 核心线程数时
  15.       // 如果 正在运行 尝试将 任务加入队列
  16.       if (isRunning(c) && workQueue.offer(command)) {
  17.           //任务加入队列成功 检查是否运行
  18.           int recheck = ctl.get();
  19.           //不在运行 并且 删除任务成功 执行拒绝策略 否则查看工作线程为0就创建线程
  20.           if (! isRunning(recheck) && remove(command))
  21.               reject(command);
  22.           else if (workerCountOf(recheck) == 0)
  23.               addWorker(null, false);
  24.       }
  25.       // 3.任务加入队列失败,尝试去创建非核心线程,成功则结束
  26.       else if (!addWorker(command, false))
  27.           // 4.失败则执行拒绝策略
  28.           reject(command);
  29.   }
复制代码
addWorker

addWorker用于创建线程加入工作队列并执行任务
第二个参数用来判断是不是创建核心线程,当创建核心线程时为true,创建非核心线程时为false
  1.   private boolean addWorker(Runnable firstTask, boolean core) {
  2.           //方便跳出双层循环
  3.           retry:
  4.           for (;;) {
  5.               int c = ctl.get();
  6.               int rs = runStateOf(c);
  7.  
  8.               // Check if queue empty only if necessary.
  9.               // 检查状态
  10.               if (rs >= SHUTDOWN &&
  11.                   ! (rs == SHUTDOWN &&
  12.                      firstTask == null &&
  13.                      ! workQueue.isEmpty()))
  14.                   return false;
  15.  
  16.               for (;;) {
  17.                   int wc = workerCountOf(c);
  18.                   //工作线程数已满 返回false
  19.                   if (wc >= CAPACITY ||
  20.                       wc >= (core ? corePoolSize : maximumPoolSize))
  21.                       return false;
  22.                   //CAS自增工作线程数量 成功跳出双重循环
  23.                   if (compareAndIncrementWorkerCount(c))
  24.                       break retry;
  25.                   //CAS失败 重新读取状态 内循环
  26.                   c = ctl.get();  // Re-read ctl
  27.                   if (runStateOf(c) != rs)
  28.                       continue retry;
  29.                   // else CAS failed due to workerCount change; retry inner loop
  30.               }
  31.           }
  32.  
  33.           //来到这里说明已经自增工作线程数量 准备创建线程
  34.           boolean workerStarted = false;
  35.           boolean workerAdded = false;
  36.           Worker w = null;
  37.           try {
  38.               //创建worker 通过线程工厂创建线程
  39.               w = new Worker(firstTask);
  40.               final Thread t = w.thread;
  41.               if (t != null) {
  42.                   //全局锁
  43.                   final ReentrantLock mainLock = this.mainLock;
  44.                   mainLock.lock();
  45.                   try {
  46.                       // Recheck while holding lock.
  47.                       // Back out on ThreadFactory failure or if
  48.                       // shut down before lock acquired.
  49.                       int rs = runStateOf(ctl.get());
  50.  
  51.                       if (rs < SHUTDOWN ||
  52.                           (rs == SHUTDOWN && firstTask == null)) {
  53.                           if (t.isAlive()) // precheck that t is startable
  54.                               throw new IllegalThreadStateException();
  55.                           //添加线程
  56.                           workers.add(w);
  57.                           int s = workers.size();
  58.                           if (s > largestPoolSize)
  59.                               largestPoolSize = s;
  60.                           //标记线程添加完
  61.                           workerAdded = true;
  62.                       }
  63.                   } finally {
  64.                       mainLock.unlock();
  65.                   }
  66.                   //执行线程
  67.                   if (workerAdded) {
  68.                       t.start();
  69.                       workerStarted = true;
  70.                   }
  71.               }
  72.           } finally {
  73.               if (! workerStarted)
  74.                   addWorkerFailed(w);
  75.           }
  76.           return workerStarted;
  77.       }
复制代码
addWorker中会CAS自增工作线程数量,创建线程再加锁,将线程加入工作队列workes(hashset),解锁后开启该线程去执行任务
runWorker

worker中实现Runnable的是runWorker方法,在启动线程后会不停的执行任务,任务执行完就去获取任务执行
  1.   final void runWorker(Worker w) {
  2.       Thread wt = Thread.currentThread();
  3.       Runnable task = w.firstTask;
  4.       w.firstTask = null;
  5.       w.unlock(); // allow interrupts
  6.       boolean completedAbruptly = true;
  7.       try {
  8.           //循环执行任务 getTask获取任务
  9.           while (task != null || (task = getTask获取任务()) != null) {
  10.               w.lock();
  11.               // If pool is stopping, ensure thread is interrupted;
  12.               // if not, ensure thread is not interrupted.  This
  13.               // requires a recheck in second case to deal with
  14.               // shutdownNow race while clearing interrupt
  15.               if ((runStateAtLeast(ctl.get(), STOP) ||
  16.                    (Thread.interrupted() &&
  17.                     runStateAtLeast(ctl.get(), STOP))) &&
  18.                   !wt.isInterrupted())
  19.                   wt.interrupt();
  20.               try {
  21.                   //执行前 钩子方法
  22.                   beforeExecute(wt, task);
  23.                   Throwable thrown = null;
  24.                   try {
  25.                       //执行
  26.                       task.run();
  27.                   } catch (RuntimeException x) {
  28.                       thrown = x; throw x;
  29.                   } catch (Error x) {
  30.                       thrown = x; throw x;
  31.                   } catch (Throwable x) {
  32.                       thrown = x; throw new Error(x);
  33.                   } finally {
  34.                       //执行后钩子方法
  35.                       afterExecute(task, thrown);
  36.                   }
  37.               } finally {
  38.                   task = null;
  39.                   w.completedTasks++;
  40.                   w.unlock();
  41.               }
  42.           }
  43.           completedAbruptly = false;
  44.       } finally {
  45.           processWorkerExit(w, completedAbruptly);
  46.       }
  47.   }
复制代码
在执行前后预留两个钩子空方法,留给子类来扩展,后文处理线程池异常也会用到
配置参数

线程池中是不是越多线程就越好呢?
首先,我们要明白创建线程是有开销的,程序计数器、虚拟机栈、本地方法栈都是线程私有的空间
并且线程在申请空间时,是通过CAS申请年轻代的Eden区中一块内存(因为可能存在多线程同时申请所以要CAS)
线程太多可能导致Eden空间被使用太多导致young gc,并且线程上下文切换也需要开销
因此,线程池中线程不是越多越好,行业内分为两种大概方案
针对CPU密集型,线程池设置最大线程数量为CPU核心数量+1,避免上下文切换,提高吞吐量,多留一个线程兜底
针对IO密集型,线程池设置最大线程数量为2倍CPU核心数量,由于IO需要等待,为了避免CPU空闲就多一些线程
具体业务场景需要具体分析,然后加上大量测试才能得到最合理的配置
Executor框架通过静态工厂方法提供几种线程池,比如:Executors.newSingleThreadExecutor()、Executors.newFixedThreadPool()、Executors.newCachedThreadPool()
但由于业务场景的不同,最好还是自定义线程池;当理解线程池参数和实现原理后,查看它们的源码并不难,我们不过多叙述
处理异常

线程池中如果出现异常会怎么样?
Runnable

当我们使用Runnable任务时,出现异常会直接抛出
  1.          threadPool.execute(() -> {
  2.              int i = 1;
  3.              int j = 0;
  4.              System.out.println(i / j);
  5.          });
复制代码
面对这种情况,我们可以在Runnable任务中使用try-catch进行捕获
  1.          threadPool.execute(() -> {
  2.              try {
  3.                  int i = 1;
  4.                  int j = 0;
  5.                  System.out.println(i / j);
  6.              } catch (Exception e) {
  7.                  System.out.println(e);
  8.              }
  9.          });
复制代码
实际操作的话用日志记录哈,不要打印到控制台
Callable

当我们使用Callable任务时,使用submit方法会获取Future
  1.          Future<Integer> future = threadPool.submit(() -> {
  2.              int i = 1;
  3.              int j = 0;
  4.              return i / j;
  5.          });
复制代码
如果不使用Future.get()去获取返回值,那么异常就不会抛出,这是比较危险的
为什么会出现这样的情况呢?
前文说过执行submit时会将Callable封装成FutureTask执行
在其实现Runnable中,在执行Callable任务时,如果出现异常会封装在FutureTask中
  1.      public void run() {
  2.          //...其他略
  3.          try {
  4.              //执行call任务
  5.              result = c.call();
  6.              ran = true;
  7.          } catch (Throwable ex) {
  8.              //出现异常 封装到FutureTask
  9.              result = null;
  10.              ran = false;
  11.              setException(ex);
  12.          }
  13.          //..
  14.      }
复制代码
等到执行get时,先阻塞、直到完成任务再来判断状态,如果状态不正常则抛出封装的异常
  1.      private V report(int s) throws ExecutionException {
  2.          Object x = outcome;
  3.          if (s == NORMAL)
  4.              return (V)x;
  5.          if (s >= CANCELLED)
  6.              throw new CancellationException();
  7.          throw new ExecutionException((Throwable)x);
  8.      }
复制代码
因此在处理Callable任务时,可以对任务进行捕获也可以对get进行捕获
  1.          //捕获任务         Future f = threadPool.submit(() -> {             try {                 int i = 1;                 int j = 0;                 return i / j;             } catch (Exception e) {                 System.out.println(e);             } finally {                 return null;             }         }); ​         //捕获get         Future<Integer> future = threadPool.submit(() -> {
  2.              int i = 1;
  3.              int j = 0;
  4.              return i / j;
  5.          }); ​         try {             Integer integer = future.get();         } catch (Exception e) {             System.out.println(e);         }
复制代码
afterExecutor

还记得线程池的runWorker吗?
它在循环中不停的获取阻塞队列中的任务执行,在执行前后预留钩子方法
继承ThreadPoolExecutor来重写执行后的钩子方法,记录执行完是否发生异常,如果有异常则进行日志记录,作一层兜底方案
  1.  public class MyThreadPool extends ThreadPoolExecutor {  
  2.      //...
  3.      
  4.      @Override
  5.      protected void afterExecute(Runnable r, Throwable t) {
  6.          //Throwable为空 可能是submit提交 如果runnable为future 则捕获get
  7.          if (Objects.isNull(t) && r instanceof Future<?>) {
  8.              try {
  9.                  Object res = ((Future<?>) r).get();
  10.              } catch (InterruptedException e) {
  11.                  Thread.currentThread().interrupt();
  12.              } catch (ExecutionException e) {
  13.                  t = e;
  14.              }
  15.          }
  16.  ​
  17.          if (Objects.nonNull(t)) {
  18.              System.out.println(Thread.currentThread().getName() + ": " + t.toString());
  19.          }
  20.      }
  21.  }
复制代码
这样即使使用submit,忘记使用get时,异常也不会“消失”
setUncaughtException

创建线程时,可以设置未捕获异常uncaughtException方法,当线程出现异常未捕获时调用,也可以打印日志作兜底
我们定义我们自己的线程工厂,以业务组group为单位,创建线程(方便出错排查)并设置uncaughtException方法
  1.  public class MyThreadPoolFactory implements ThreadFactory {
  2.  ​
  3.      private AtomicInteger threadNumber = new AtomicInteger(1);
  4.     
  5.      private ThreadGroup group;
  6.  ​
  7.      private String namePrefix = "";
  8.  ​
  9.      public MyThreadPoolFactory(String group) {
  10.          this.group = new ThreadGroup(group);
  11.          namePrefix = group + "-thread-pool-";
  12.      }
  13.  ​
  14.  ​
  15.      @Override
  16.      public Thread newThread(Runnable r) {
  17.          Thread t = new Thread(group, r,
  18.                  namePrefix + threadNumber.getAndIncrement(),
  19.                  0);
  20.          t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  21.              @Override
  22.              public void uncaughtException(Thread t, Throwable e) {
  23.                  System.out.println(t.getName() + ":" + e);
  24.              }
  25.          });
  26.  ​
  27.          if (t.isDaemon()) {
  28.              t.setDaemon(false);
  29.          }
  30.          if (t.getPriority() != Thread.NORM_PRIORITY) {
  31.              t.setPriority(Thread.NORM_PRIORITY);
  32.          }
  33.          return t;
  34.      }
  35.  ​
  36.  }
复制代码
关闭线程池

关闭线程池的2种方法: shutdown(),shutdownNow()
它们的原理都是: 遍历工作队列wokers中的线程,逐个中断(调用线程的interrupt方法) 无法响应中断的任务可能永远无法终止
shutdown 任务会被执行完
shutdownNow 任务不一定会执行完
通常使用shutdown,如果任务不一定要执行完可以使用shutdownNow
SecheduledThreadPoolExecutor

ScheduledThreadPoolExecutor在ThreadPoolExecutor的基础上提供定时执行的功能
它有两个定时的方法
scheduleAtFixedRate 以任务开始为周期起点,比如说一个任务执行要0.5s,每隔1s执行,相当于执行完任务过0.5s又开始执行任务
scheduledWithFixedDelay 以任务结束为周期起点,比如说一个任务执行要0.5s,每隔1s执行,相当于执行完任务过1s才开始执行任务
  1.          ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
  2.          //scheduleAtFixedRate 固定频率执行任务 周期起点为任务开始
  3.          scheduledThreadPoolExecutor.scheduleAtFixedRate(()->{
  4.              try {
  5.                  TimeUnit.SECONDS.sleep(1);
  6.              } catch (InterruptedException e) {
  7.                  e.printStackTrace();
  8.              }
  9.              System.out.println("scheduleAtFixedRate 周期起点为任务开始");
  10.              //初始延迟:1s  周期:1s
  11.          },1,1, TimeUnit.SECONDS);
  12.  ​
  13.          //scheduledWithFixedDelay 固定延迟执行任务,周期起点为任务结束
  14.          scheduledThreadPoolExecutor.scheduleWithFixedDelay(()->{
  15.              try {
  16.                  TimeUnit.SECONDS.sleep(1);
  17.              } catch (InterruptedException e) {
  18.                  e.printStackTrace();
  19.              }
  20.              System.out.println("scheduledWithFixedDelay 周期起点为任务结束 ");
  21.              //初始延迟:1s  周期:1s
  22.          },1,1, TimeUnit.SECONDS);
复制代码
定时线程池使用延迟队列充当阻塞队列实现的
延迟队列是一个优先级队列,它排序存储定时任务,时间越小越先执行
线程获取任务时,会从延迟队列中获取定时任务,如果时间已到就执行
[code]     public RunnableScheduledFuture take() throws InterruptedException {             final ReentrantLock lock = this.lock;             lock.lockInterruptibly();             try {                 for (;;) {                     RunnableScheduledFuture first = queue[0];                     //没有定时任务 等待                     if (first == null)                         available.await();                     else {                         //获取延迟时间                         long delay = first.getDelay(NANOSECONDS);                         //小于等于0 说明超时,拿出来执行                         if (delay




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4