并发编程系列-线程池的正确使用

打印 上一主题 下一主题

主题 693|帖子 693|积分 2079


在Java语言中,创建线程并不像创建对象一样简单。虽然只需要使用new Thread()即可创建线程,但实际上创建线程比创建对象复杂得多。创建对象只需在JVM的堆中分配内存,而创建线程需要调用操作系统内核的API,并为线程分配一系列资源,这个成本相对较高。因此,线程被视为重量级的对象,应尽量避免频繁创建和销毁。
那么如何避免频繁创建线程呢?解决方案就是使用线程池。
由于线程池的需求非常普遍,所以Java SDK的并发包自然也包含了线程池。但是,很多人初次接触并发包中与线程池相关的工具类时,可能会感到有些困惑,不知从何入手。我认为,这主要是因为线程池与通常意义上的资源池是不同的。一般意义上的资源池在需要资源时调用acquire()方法申请资源,在使用完毕后调用release()释放资源。然而,如果你带着这种固有模型来看待并发包中的线程池相关工具类,会遗憾地发现它们与之不匹配,因为Java提供的线程池中根本不存在申请线程和释放线程的方法。
  1. class XXXPool{
  2.   // 获取池化资源
  3.   XXX acquire() {
  4.   }
  5.   // 释放池化资源
  6.   void release(XXX x){
  7.   }
  8. }
复制代码
线程池是一种生产者-消费者模式

线程池之所以没有采用一般意义上池化资源的设计方法,是因为线程池是基于生产者-消费者模式的设计。
在一般意义上的池化资源设计中,我们可以通过acquire()方法获取到一个空闲资源,然后通过使用资源来执行具体的任务,最后再通过release()方法释放资源。但是在线程池中,由于涉及到线程的管理和复用,采用了不同的设计思路。
当我们从线程池中获取到一个空闲线程时,我们期望能够像使用Thread类创建线程那样,通过调用该线程的execute()方法,传入一个Runnable对象来执行具体的业务逻辑。然而,遗憾的是,Thread类并没有像execute(Runnable target)这样的公共方法。这是因为线程池在管理线程时,需要考虑到线程的状态、任务队列等方面的复杂情况,因此不能简单地将线程的使用方式与传统的池化资源相同。
  1. //采用一般意义上池化资源的设计方法
  2. class ThreadPool{
  3.   // 获取空闲线程
  4.   Thread acquire() {
  5.   }
  6.   // 释放线程
  7.   void release(Thread t){
  8.   }
  9. }
  10. //期望的使用
  11. ThreadPool pool;
  12. Thread T1=pool.acquire();
  13. //传入Runnable对象
  14. T1.execute(()->{
  15.   //具体业务逻辑
  16.   ......
  17. });
复制代码
所以,线程池的设计,没有办法直接采用一般意义上池化资源的设计方法。那线程池该如何设计呢?目前业界线程池的设计,普遍采用的都是 生产者-消费者模式。线程池的使用方是生产者,线程池本身是消费者。在下面的示例代码中,我们创建了一个非常简单的线程池MyThreadPool,你可以通过它来理解线程池的工作原理。
  1. //简化的线程池,仅用来说明工作原理
  2. class MyThreadPool{
  3.   //利用阻塞队列实现生产者-消费者模式
  4.   BlockingQueue<Runnable> workQueue;
  5.   //保存内部工作线程
  6.   List<WorkerThread> threads
  7.     = new ArrayList<>();
  8.   // 构造方法
  9.   MyThreadPool(int poolSize,
  10.     BlockingQueue<Runnable> workQueue){
  11.     this.workQueue = workQueue;
  12.     // 创建工作线程
  13.     for(int idx=0; idx<poolSize; idx++){
  14.       WorkerThread work = new WorkerThread();
  15.       work.start();
  16.       threads.add(work);
  17.     }
  18.   }
  19.   // 提交任务
  20.   void execute(Runnable command){
  21.     workQueue.put(command);
  22.   }
  23.   // 工作线程负责消费任务,并执行任务
  24.   class WorkerThread extends Thread{
  25.     public void run() {
  26.       //循环取任务并执行
  27.       while(true){ ①
  28.         Runnable task = workQueue.take();
  29.         task.run();
  30.       }
  31.     }
  32.   }
  33. }
  34. /** 下面是使用示例 **/
  35. // 创建有界阻塞队列
  36. BlockingQueue<Runnable> workQueue =
  37.   new LinkedBlockingQueue<>(2);
  38. // 创建线程池
  39. MyThreadPool pool = new MyThreadPool(
  40.   10, workQueue);
  41. // 提交任务
  42. pool.execute(()->{
  43.     System.out.println("hello");
  44. });
复制代码
在MyThreadPool的内部,我们维护了一个阻塞队列workQueue和一组工作线程,工作线程的个数由构造函数中的poolSize来指定。用户通过调用execute()方法来提交Runnable任务,execute()方法的内部实现仅仅是将任务加入到workQueue中。MyThreadPool内部维护的工作线程会消费workQueue中的任务并执行任务,相关的代码就是代码①处的while循环。线程池主要的工作原理就这些,是不是还挺简单的?
如何使用Java中的线程池

Java并发包里提供的线程池,远比我们上面的示例代码强大得多,当然也复杂得多。Java提供的线程池相关的工具类中,最核心的是 ThreadPoolExecutor,通过名字你也能看出来,它强调的是Executor,而不是一般意义上的池化资源。
ThreadPoolExecutor的构造函数非常复杂,如下面代码所示,这个最完备的构造函数有7个参数。
  1. ThreadPoolExecutor(
  2.   int corePoolSize,
  3.   int maximumPoolSize,
  4.   long keepAliveTime,
  5.   TimeUnit unit,
  6.   BlockingQueue<Runnable> workQueue,
  7.   ThreadFactory threadFactory,
  8.   RejectedExecutionHandler handler)
复制代码
下面我们逐一介绍这些参数的含义,将线程池类比为一个项目组,而每个线程就是项目组的成员

  • corePoolSize:表示线程池所持有的最小线程数。有些项目可能很闲,但也不能将所有项目组成员都撤离,至少要留下corePoolSize个人守在岗位上。
  • maximumPoolSize:表示线程池可创建的最大线程数。当项目繁忙时,需要增加成员,但也不能无限制地增加,最多增加到maximumPoolSize个人。当项目变得闲暇时,需要减少成员,最多将成员减至corePoolSize个人。
  • keepAliveTime & unit:前面提到,项目根据忙闲来增减成员。在编程世界中,如何定义忙和闲呢?很简单,如果一个线程在一段时间内都没有执行任务,说明它处于闲置状态。而keepAliveTime和unit就用来定义这段时间的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么长时间,并且线程池的线程数超过了corePoolSize,那么该空闲线程就会被回收。
  • workQueue:工作队列,与上面示例代码中的工作队列意义相同。
  • threadFactory:通过该参数,你可以自定义如何创建线程。例如,你可以为线程指定一个有意义的名称。
  • handler:通过该参数,你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,且工作队列已满(前提是工作队列是有界队列),此时提交任务,线程池将会拒绝接收。至于拒绝策略,你可以通过handler参数来指定。ThreadPoolExecutor已经提供了以下四种策略:

    • CallerRunsPolicy:提交任务的线程自行执行该任务。
    • AbortPolicy:默认的拒绝策略,会抛出RejectedExecutionException异常。
    • DiscardPolicy:直接丢弃任务,没有任何异常抛出。
    • DiscardOldestPolicy:丢弃最老的任务,实际上是丢弃最早进入工作队列的任务,并将新任务加入工作队列。

Java 1.6版本还增加了allowCoreThreadTimeOut(boolean value)方法,它可以使所有线程都支持超时。这意味着如果项目很闲,项目组的成员都会被撤离。
使用线程池要注意些什么

考虑到ThreadPoolExecutor的构造函数相对复杂,Java并发包提供了一个线程池的静态工厂类Executors,通过Executors可以快速创建线程池。不过,目前大型公司的编码规范一般不建议使用Executors,所以我就不再详细介绍这方面内容了。
不建议使用Executors最重要的原因是:Executors提供的许多方法默认使用无界的LinkedBlockingQueue。在高负载情况下,无界队列很容易导致OOM(内存溢出),而OOM会导致所有请求都无法处理,这是一个严重的问题。因此,强烈建议使用有界队列
使用有界队列时,当任务过多时,线程池会触发执行拒绝策略。线程池的默认拒绝策略会抛出RejectedExecutionException异常,这是一个运行时异常,编译器不会强制要求捕获它,因此开发人员很容易忽略。因此,在使用默认拒绝策略时要谨慎。如果线程池处理的任务非常重要,建议自定义拒绝策略,并在实际工作中将自定义的拒绝策略与降级策略配合使用。
在使用线程池时,还需要注意异常处理的问题。例如,通过ThreadPoolExecutor对象的execute()方法提交任务时,如果任务在执行过程中出现运行时异常,会导致执行该任务的线程终止。然而,最致命的是尽管任务发生异常,你却无法获得任何通知,这可能让你误以为任务都正常执行了。尽管线程池提供了许多用于异常处理的方法,但最可靠和简单的方案是捕获所有异常并根据需要进行处理,你可以参考下面的示例代码。
  1. try {
  2.   //业务逻辑
  3. } catch (RuntimeException x) {
  4.   //按需处理
  5. } catch (Throwable x) {
  6.   //按需处理
  7. }
复制代码
总结

线程池在Java并发编程领域中扮演着重要角色,许多大型公司的编码规范要求使用线程池来管理线程。线程池与普通的资源池有很大的区别,实际上它是生产者-消费者模式的一种实现。理解生产者-消费者模式是理解线程池的关键所在。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我可以不吃啊

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表