线程池焦点原理浅析

打印 上一主题 下一主题

主题 843|帖子 843|积分 2544

前言

由于系统资源是有限的,为了降低资源消耗,提高系统的性能和稳固性,引入了线程池对线程进行统一的管理和监控,本文将详细解说线程池的利用、原理。
为什么利用线程池

池化头脑

线程池主要用到了池化头脑,池化头脑在计算机领域十分常见,主要用于减少资源浪费、提高性能等。
池化头脑主要包含以下几个方面:
     
一些常见的资源池包括线程池、数据库毗连池、对象池、缓存池、毗连池等。
池化头脑可以提高系统的性能,因为它减少了资源的创建和销毁次数,避免了不必要的开销。通过池化,系统可以更好地应对高并发情况,降低资源竞争,提高响应速度。
什么是线程池

根据池化头脑,在一个系统中,为了避免线程频繁的创建和销毁,让线程可以复用,引入了线程池的概念。线程池中,总有那么几个活泼线程。
当你需要利用线程时,可以从池子中任意拿一个空闲线程,当完成工作时,并不急着关闭线程,而是将这个线程退回到池子,方便其他人利用。
简单说就是,在利用线程池后,创建线程变成了从线程池中获得空闲线程,关闭线程编程了向池子里归还线程。
大抵流程如下:
     
## 为什么利用线程池Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的步伐都可以利用线程池。在开发过程中,合理地利用线程池可以或许带来3个好处。

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳固性,利用线程池可以进行统一分配、调优和监控。
要做到合理利用线程池,必须对其实现原理了如指掌。
线程池的利用

     
## ThreadPoolExecutorThreadPoolExecutor 的创建方法总体来说可分为 2 种:

  • 通过 ThreadPoolExecutor 构造函数
  • 通过 Executors 类创建
通过构造函数

1.1. 入参含义

这个也是推荐利用的方法,因为通过 Executors 类创建可能会导致 OOM,如下图阿里开发规范中的描述。
     
构造函数入参:
  1. public ThreadPoolExecutor(int corePoolSize,
  2.                           int maximumPoolSize,
  3.                           long keepAliveTime,
  4.                           TimeUnit unit,
  5.                           BlockingQueue<Runnable> workQueue,
  6.                           ThreadFactory threadFactory,
  7.                           RejectedExecutionHandler handler)
复制代码
构造函数入参含义:
     
1.2. 阻塞队列

workQueue 可选的 BlockingQueue:
     
1.3. 拒绝策略

     
如下图,上述拒绝策略均实现 RejectedExecutionHandler 接口,且为 ThreadPoolExecutor 的内部类。
     
若以上策略仍无法满意实际应用需要,完全可以自已扩展 RejectedExecutionHandler 接口。
  1. public interface RejectedExecutionHandler {
  2.     /**
  3.      * @param r 当前请求执行的任务
  4.      * @param executor 当前的线程池
  5.      */
  6.     void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
  7. }
复制代码
示例:
  1. public class RejectedExecutionDemo {
  2.     public static class MyTask implements Runnable{
  3.         @Override
  4.         public void run() {
  5.             System.out.println(new Date() + ":Thread ID is" + Thread.currentThread().getId());
  6.             try {
  7.                 Thread.sleep(100);
  8.             } catch (InterruptedException e) {
  9.                 throw new RuntimeException(e);
  10.             }
  11.         }
  12.     }
  13.     public static void main(String[] args) throws InterruptedException {
  14.         MyTask myTask = new MyTask();
  15.         ExecutorService executorService = new ThreadPoolExecutor(5, 5,
  16.                 0L, TimeUnit.MILLISECONDS,
  17.                 new LinkedBlockingQueue<>(10),
  18.                 Executors.defaultThreadFactory(),
  19.                 (r, executor) -> System.out.println(r.hashCode() + "is discard")
  20.         );
  21.         for (int i = 0; i < 100; i++) {
  22.             executorService.submit(myTask);
  23.             Thread.sleep(10);
  24.         }
  25.     }
  26. }
复制代码
上述示例中,mytask 执行需要耗费100毫秒,因此,必然会导致一些任务被直接丢弃。在实际应用中,我们可以将更详细的信息记载到日志中,来分析任务丢失情况和系统负载。
     
通过 Executors

Executors 类饰演着线程池工厂的角色,通过该类可以取得一个拥有定功能的线程池。
该类可以创建三种类型的 ThreadPoolExecutor:

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool
2.1. FixedThreadPool

固定线程数的线程池,该线程池中的线程数目始终稳定。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被临时存在任务队列中,待有线程空闲时,在处理惩罚队列中的任务。
FixedThreadPool 利用的无界任务队列 LinkedBlockingQueue,可能造成内存泄漏。
  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2.     return new ThreadPoolExecutor(nThreads, nThreads,
  3.                                   0L, TimeUnit.MILLISECONDS,
  4.                                   new LinkedBlockingQueue<Runnable>());
  5. }
  6. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  7.     return new ThreadPoolExecutor(nThreads, nThreads,
  8.                                   0L, TimeUnit.MILLISECONDS,
  9.                                   new LinkedBlockingQueue<Runnable>(),
  10.                                   threadFactory);
  11. }
复制代码
2.2. SingleThreadExecutor

只有一个工作线程的线程池,当多于 1 个任务被提交时,会存到任务队列中。该线程池利用的无界任务队列 LinkedBlockingQueue,可能造成内存泄漏。
  1. public static ExecutorService newSingleThreadExecutor() {
  2.     return new FinalizableDelegatedExecutorService
  3.         (new ThreadPoolExecutor(1, 1,
  4.                                 0L, TimeUnit.MILLISECONDS,
  5.                                 new LinkedBlockingQueue<Runnable>()));
  6. }
  7. public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  8.     return new FinalizableDelegatedExecutorService
  9.         (new ThreadPoolExecutor(1, 1,
  10.                                 0L, TimeUnit.MILLISECONDS,
  11.                                 new LinkedBlockingQueue<Runnable>(),
  12.                                 threadFactory));
  13. }
复制代码
2.3. CachedThreadPool

根据实际情况调整线程数的线程池,线程池的线程数目不确定,若有空闲线程可复用,则会优先利用。若所有线程均在工作,此时新的任务则会创建新的线程优先处理惩罚。所有线程在任务执行完毕后,将返回线程池进行复用。
corePoolSize 被设置为0,maximumPoolSize 被设置为无界,存活时间设置为 60s,空闲线程超过60秒后将会被
终止。极度情况线程创建过多,会导致内存泄漏。
  1. public static ExecutorService newCachedThreadPool() {
  2.     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3.                                   60L, TimeUnit.SECONDS,
  4.                                   new SynchronousQueue<Runnable>());
  5. }
  6. public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  7.     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  8.                                   60L, TimeUnit.SECONDS,
  9.                                   new SynchronousQueue<Runnable>(),
  10.                                   threadFactory);
  11. }
复制代码
ScheduledThreadPoolExecutor

简介

如下图, ScheduledThreadPoolExecutor 继续自ThreadPoolExecutor,它主要用来定期执行任务,功能与 Timer 雷同且更加强大,可以在构造函数中指定多个对应的背景线程数。
     
利用

可通过 Executors 创建,源码如下:
  1. public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
  2.     return new DelegatedScheduledExecutorService
  3.         (new ScheduledThreadPoolExecutor(1, threadFactory));
  4. }
  5. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  6.     return new ScheduledThreadPoolExecutor(corePoolSize);
  7. }
  8. public static ScheduledExecutorService newScheduledThreadPool(
  9.         int corePoolSize, ThreadFactory threadFactory) {
  10.     return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
  11. }
复制代码
这里的返回值是 ScheduledExecutorService,根据时间对线程进行调理。有三个主要方法:
  1. public interface ScheduledExecutorService extends ExecutorService {
  2.     /**
  3.      * 给定时间对任务进行调度
  4.      */
  5.     public ScheduledFuture<?> schedule(Runnable command,
  6.                                        long delay, TimeUnit unit);
  7.     /**
  8.      * 周期性对任务进行调度
  9.      * 以第一个任务的开始时间 initialDelay + period
  10.      * 第一个任务在 initialDelay + period 执行
  11.      * 第二个任务在 initialDelay + period * 2 执行
  12.      */
  13.     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  14.                                                   long initialDelay,
  15.                                                   long period,
  16.                                                   TimeUnit unit);
  17.     /**
  18.      * 周期性对任务进行调度
  19.      * 上一个任务结束后,再经过 period 时间开始执行
  20.      */
  21.     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  22.                                                      long initialDelay,
  23.                                                      long delay,
  24.                                                      TimeUnit unit);
  25. }
复制代码
如果任务遇到异常,那么后续的所有子任务都会停止调理,因此,必须包管异常被及时处理惩罚,为周期性任务的稳固调理提供条件。
ForkJoinPool

fork 是开启子进程,join 是等待,意思是分支子进程结束后才能得到结果,实际开发中,若频繁的 fork 开启线程可能严重影响系统性能,所以引入了 ForkJoinPool。
大抵流程是,向 ForkJoinPool 线程池中提交一个 ForkJoinTask 任务,就是将任务分解成多个小任务,等任务全部完成后进行处理惩罚,这里采用了分治的头脑,详细我将在后续单独展开,这里不多做赘述。
ForkJoin 可能出现两个标题:

  • 子线程积累过多,可能导致系统性能严重降落;
  • 调用层次过深,可能导致栈溢出。
线程池的任务提交

execute()

该方法用于提交不需要返回值的任务,且无法判断任务是否被线程池执行乐成。
源码见下面的线程池原理章节。
submit()

该方法用于提交需要返回值的任务。线程池会返回 Future 对象,可以判断任务是否执行乐成,还可以通过 Future 的get()方法来获取返回值。
get()方法会阻塞当前线程直到任务完成,还可以设置超时时间,到时立即返回,不过这时有可能任务没有执行完。
  1. public Future<?> submit(Runnable task) {
  2.     if (task == null) throw new NullPointerException();
  3.     RunnableFuture<Void> ftask = newTaskFor(task, null);
  4.     execute(ftask);
  5.     return ftask;
  6. }
复制代码
线程池的关闭

可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。
它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt() 来中断线程,所以无法响应中断的任务可能永远无法终止。
两种方法存在肯定的区别,shutdownNow首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,表示线程池关闭乐成,这时调用isTerminaed方法会返回true。
至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不肯定要执行完,则可以调用 shutdownNow 方法。
线程池执行原理

执行源码
  1. public void execute(Runnable command) {
  2.     if (command == null)
  3.         throw new NullPointerException();
  4.     int c = ctl.get();
  5.    
  6.     // 如果当前工作线程数是否小于核心线程数
  7.     if (workerCountOf(c) < corePoolSize) {
  8.         // 添加核心线程去执行任务,成功则return
  9.         if (addWorker(command, true))
  10.             return;
  11.         // 添加失败,ctl有变化,需重新获取
  12.         c = ctl.get();
  13.     }
  14.     // 判断是否为RUNNING,此时核心线程数已满,需加入任务队列
  15.     if (isRunning(c) && workQueue.offer(command)) {
  16.         int recheck = ctl.get();
  17.         // 检查若不是RUNNING则将任务从队列移除
  18.         if (! isRunning(recheck) && remove(command))
  19.             // 执行拒绝策略
  20.             reject(command);
  21.             
  22.         // 正常则添加一个非核心空线程,执行队列中的任务
  23.         else if (workerCountOf(recheck) == 0)
  24.             addWorker(null, false);
  25.     }
  26.     // 表示核心线程满了,队列也满了,创建非核心线程,执行任务
  27.     else if (!addWorker(command, false))
  28.         // 最大线程数也满了,走拒绝策略
  29.         reject(command);
  30. }
复制代码
流程图

     
参考:
[1] 魏鹏. Java并发编程的艺术.
[2] 葛一鸣/郭超. 实战Java高并发步伐计划.

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大连全瓷种植牙齿制作中心

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表