Java线程池ThreadPoolExecutor封装工具类

打印 上一主题 下一主题

主题 1027|帖子 1027|积分 3081

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Java线程池ThreadPoolExecutor

一、线程池特性

1.动态调整线程数量

根据当前的工作负载,ThreadPoolExecutor 可以动态地增加或减少工作线程的数量。当有新的任务提交且当前线程数小于核心线程数时,会创建新的线程;如果已有充足的线程,则将任务放入队列中。如果队列已满并且线程数未达到最大值,还会创建额外的线程来处置惩罚任务。
2.队列管理

ThreadPoolExecutor 支持多种范例的阻塞队列,如 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 等。选择符合的队列范例可以影响线程池的行为和性能。
例如,利用无界队列(如 LinkedBlockingQueue)会导致所有超出核心线程数的任务都被排队,而不会立刻创建新的线程;相反,SynchronousQueue 则强制每个任务都由一个可用线程立刻处置惩罚,否则会被拒绝。
3.拒绝计谋

当线程池无法接受新任务时(比如由于线程数已达上限并且队列也满了),会触发拒绝计谋。Java 提供了几种内置的拒绝计谋实现,也可以自定义拒绝计谋:


  • AbortPolicy:抛出 RejectedExecutionException。
  • CallerRunsPolicy:由调用线程(提交任务的线程)执行该任务。
  • DiscardPolicy:静默抛弃任务。
  • DiscardOldestPolicy:抛弃队列中最老的任务,并尝试重新提交当前任务。
4.生命周期管理

ThreadPoolExecutor 提供了 shutdown() 和 shutdownNow() 方法来优雅地关闭线程池。前者会等待所有已提交的任务完成后再关闭,后者则试图立刻停止所有正在执行的任务,并返回尚未开始的任务列表。
二、参数介绍和基本利用

1.参数介绍

  1. public ThreadPoolExecutor(
  2.     int corePoolSize,
  3.     int maximumPoolSize,
  4.     long keepAliveTime,
  5.     TimeUnit unit,
  6.     BlockingQueue<Runnable> workQueue,
  7.     ThreadFactory threadFactory,
  8.     RejectedExecutionHandler handler)
复制代码


  • corePoolSize:线程池中保持的核心线程数,纵然这些线程是空闲的。
  • maximumPoolSize:线程池答应的最大线程数。
  • keepAliveTime:当线程数超过核心线程数时,多余的空闲线程在终止前等待新任务的时间。
  • unit:keepAliveTime参数的时间单位。
  • workQueue:用于保存等待执行的任务的阻塞队列。
  • threadFactory:用于创建新线程的工厂。
  • handler:当任务提交到已满的线程池时所利用的拒绝计谋。
2.基本利用

  1. public class ThreadPoolExample {
  2.     public static void main(String[] args) {
  3.         // 定义线程池参数
  4.         int corePoolSize = 2; // 核心线程数
  5.         int maximumPoolSize = 4; // 最大线程数
  6.         long keepAliveTime = 5000; // 空闲线程存活时间
  7.         TimeUnit unit = TimeUnit.MILLISECONDS; // 时间单位
  8.         BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10); // 工作队列
  9.         
  10.         // 创建 ThreadPoolExecutor
  11.         ThreadPoolExecutor executor = new ThreadPoolExecutor(
  12.             corePoolSize,
  13.             maximumPoolSize,
  14.             keepAliveTime,
  15.             unit,
  16.             workQueue
  17.         );
  18.         // 提交任务给线程池执行
  19.         for (int i = 0; i < 10; i++) {
  20.             final int taskNumber = i;
  21.             executor.execute(() -> {
  22.                 System.out.println("Executing Task " + taskNumber + " by " + Thread.currentThread().getName());
  23.                 try {
  24.                     Thread.sleep(2000); // 模拟任务耗时
  25.                 } catch (InterruptedException e) {
  26.                     e.printStackTrace();
  27.                 }
  28.             });
  29.         }
  30.         // 关闭线程池
  31.         executor.shutdown();
  32.         try {
  33.             if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {//等待60秒
  34.                 executor.shutdownNow();//不管线程池中的任务是否完成,都直接中断掉
  35.             }
  36.         } catch (InterruptedException e) {
  37.             executor.shutdownNow();
  38.         }
  39.     }
  40. }
复制代码
二、线程池状态

1.线程池状态范例

ThreadPoolExecutor 内部维护了一个状态机来跟踪线程池的差别状态。包罗:


  • 运行(RUNNING)
  • 关闭(SHUTDOWN)
  • 停止(STOP)
  • 整理(TIDYING)
  • 终结(TERMINATED)
2.线程池状态实现

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//初始化 ctl 的值,表示线程池处于运行状态并且当前没有活动的工作线程。
  2.     private static final int COUNT_BITS = Integer.SIZE - 3;//表示用于存储线程计数(workerCount)的位数。由于 Java 的 int 类型有 32 位(Integer.SIZE),这里减去 3 位留给状态标志,因此留下 29 位用于线程计数。如果线程数量不满足,则可以将类型改为long类型以及AtomicInteger改了AtomicLong
  3.     private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;//00011111111111111111111111111111 这是一个掩码,用于从 ctl 中提取出线程计数值。它由 COUNT_BITS 位全为 1 组成,即低 29 位为 1。
  4.     // runState is stored in the high-order bits
  5.     private static final int RUNNING    = -1 << COUNT_BITS;//当线程池处于运行状态时,高 3 位设置为 -1 的二进制补码形式(即 111...000)。
  6.     private static final int SHUTDOWN   =  0 << COUNT_BITS;//线程池正在关闭,不再接受新任务,但会继续处理队列中的任务,高 3 位为 000。
  7.     private static final int STOP       =  1 << COUNT_BITS;//线程池已停止,不再接受新任务,也不再处理队列中的任务,高 3 位为 001。
  8.     private static final int TIDYING    =  2 << COUNT_BITS;//所有任务都已完成,线程池即将进入终结状态,高 3 位为 010。
  9.     private static final int TERMINATED =  3 << COUNT_BITS;//线程池已经完全终止,所有资源都被释放,高 3 位为 011。
  10.     // Packing and unpacking ctl
  11.     /**
  12.      * COUNT_MASK=00011111111111111111111111111111 取反之后 11100000000000000000000000000000 既为RUNNING初始状态
  13.      * c & ~COUNT_MASK 与运算得到线程池当前的状态;可以这样理解:当前ctl的值取二进制高三位进行比对,得到的结果就是当前ctl代表的线程池状态
  14.      *
  15.      */
  16.     private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
  17.     /**
  18.      * COUNT_MASK=00011111111111111111111111111111
  19.      * c & COUNT_MASK 去除高三位状态,相当于重置高三位的二进制为0,后29位二进制就是当前线程数量
  20.      */
  21.     private static int workerCountOf(int c)  { return c & COUNT_MASK; }
  22.     /**
  23.      * 或运算运算,例如 00011111111111111111111111111111 | 11100000000000000000000000000000 = 11111111111111111111111111111111
  24.      * 这样理解,高三位表示状态|低29位表示线程数量,这个时候只要用一个int类型存储,那么将高三位的二进制和低29位的二进制合并一起为一个数,就是ctl的值
  25.      */
  26.     private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
三、封装线程池工具类

  1. package com.zzc.common.utils;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.Map;
  4. import java.util.concurrent.BlockingDeque;
  5. import java.util.concurrent.BlockingQueue;
  6. import java.util.concurrent.ConcurrentHashMap;
  7. import java.util.concurrent.Executor;
  8. import java.util.concurrent.Executors;
  9. import java.util.concurrent.LinkedBlockingDeque;
  10. import java.util.concurrent.RejectedExecutionException;
  11. import java.util.concurrent.RejectedExecutionHandler;
  12. import java.util.concurrent.ScheduledExecutorService;
  13. import java.util.concurrent.ScheduledThreadPoolExecutor;
  14. import java.util.concurrent.SynchronousQueue;
  15. import java.util.concurrent.ThreadFactory;
  16. import java.util.concurrent.ThreadPoolExecutor;
  17. import java.util.concurrent.TimeUnit;
  18. @Slf4j
  19. public class ThreadPoolUtils {
  20.     private static final Map<String, ThreadPoolExecutor> THREAD_POOL_EXECUTOR;
  21.     private static final String COMMON_THREAD_POOL_KEY;
  22.     private static final String COMMON_SCHEDULE_EXECUTOR_POOL_KEY;
  23.     private static final int DEFAULT_CORE_POOL_SIZE = 1;
  24.     private static final int DEFAULT_KEEP_ALIVE_TIME = 30;
  25.     private static final int DEFAULT_PROCESSORS = 2;
  26.     private static int ALB_PROCESSORS = DEFAULT_PROCESSORS;
  27.     static {
  28.         THREAD_POOL_EXECUTOR = new ConcurrentHashMap<>();
  29.         COMMON_THREAD_POOL_KEY = "COMMON";
  30.         COMMON_SCHEDULE_EXECUTOR_POOL_KEY = "SCHEDULE-COMMON";
  31.         int processors = Runtime.getRuntime().availableProcessors();
  32.         ALB_PROCESSORS = processors;
  33.         log.info("availableProcessors:{}, DEFAULT_PROCESS:{}", ALB_PROCESSORS, DEFAULT_PROCESSORS);
  34.         ALB_PROCESSORS = Math.max(ALB_PROCESSORS, DEFAULT_PROCESSORS);
  35.     }
  36. //    private static Executor executor = Executors.newFixedThreadPool()
  37.     /**
  38.      *
  39.      * @param threadPoolKey
  40.      * @param corePoolSize
  41.      * @param maxPoolSize
  42.      * @param keepAliveTime
  43.      * @param timeUnit
  44.      * @param discardContinueWait 如果被拒绝,则等待时间,单位ms
  45.      * @return
  46.      */
  47.     public static ThreadPoolExecutor newThreadPoolExecutorDirectAndAsy(String threadPoolKey, int corePoolSize,
  48.                                                                        int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
  49.                                                                        int discardContinueWait) {
  50.         return newThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit,  new SynchronousQueue(true), new DiscardSynchronousQueueWaitPolicy(discardContinueWait));
  51.     }
  52.     public static ThreadPoolExecutor newThreadPoolExecutorNewThreadToRun(String threadPoolKey, int corePoolSize,
  53.                                                                        int maxPoolSize, int keepAliveTime, TimeUnit timeUnit) {
  54.         return newThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit,  new SynchronousQueue(true), new NewThreadToRun());
  55.     }
  56.     public static ThreadPoolExecutor newThreadPoolExecutor(String threadPoolKey, int corePoolSize,
  57.                                                            int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
  58.                                                            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
  59.         if (StrUtils.isBlank(threadPoolKey)) {
  60.             throw new RuntimeException("threadPoolKey is null");
  61.         }
  62.         if (THREAD_POOL_EXECUTOR.containsKey(threadPoolKey)) {
  63.             return THREAD_POOL_EXECUTOR.get(threadPoolKey);
  64.         }
  65.         log.info("before new threadPool, threadPoolKey:{}, corePoolSize:{}, maxPoolSize:{}, keepAliveTime:{}, timeUnit:{}", threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit);
  66.         corePoolSize = corePoolSize <= 0 ? DEFAULT_CORE_POOL_SIZE : corePoolSize;
  67.         maxPoolSize = maxPoolSize <= 0 ? corePoolSize : maxPoolSize;
  68.         keepAliveTime = keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime;
  69.         timeUnit = timeUnit == null ? TimeUnit.SECONDS : timeUnit;
  70.         ThreadPoolExecutor executor = new PThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue, newThreadFactory(threadPoolKey), handler);
  71.         log.info("after new thread pool, threadPoolKey:{}, corePoolSize:{}, maxPoolSize:{}, keepAliveTime:{}, timeUnit:{}", threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit);
  72.         THREAD_POOL_EXECUTOR.put(threadPoolKey, executor);
  73.         return executor;
  74.     }
  75.     public static ScheduledExecutorService getCommonScheduleExecutorsPool() {
  76.         ScheduledThreadPoolExecutor executorService = (ScheduledThreadPoolExecutor) THREAD_POOL_EXECUTOR.get(COMMON_SCHEDULE_EXECUTOR_POOL_KEY);
  77.         if (executorService == null) {
  78.             executorService = new ScheduledThreadPoolExecutor(2, newThreadFactory(COMMON_SCHEDULE_EXECUTOR_POOL_KEY));
  79.             THREAD_POOL_EXECUTOR.put(COMMON_SCHEDULE_EXECUTOR_POOL_KEY, executorService);
  80.         }
  81.         return executorService;
  82.     }
  83.     public static ThreadFactory newThreadFactory(String threadPrefix) {
  84.         ThreadFactory threadFactory = new ThreadFactory() {
  85.             @Override
  86.             public Thread newThread(Runnable r) {
  87.                 Thread thread = new Thread(r);
  88.                 thread.setName(threadPrefix);
  89.                 thread.setDaemon(true);
  90.                 log.info("new thread:{}", threadPrefix);
  91.                 return thread;
  92.             }
  93.         };
  94.         return threadFactory;
  95.     }
  96.     /**
  97.      * 拒绝策略
  98.      * 直接给主线程自己执行
  99.      */
  100.     static class CallerRunsPolicy extends ThreadPoolExecutor.CallerRunsPolicy {
  101.         @Override
  102.         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  103.             String threadKey = "";
  104.             if (e instanceof PThreadPoolExecutor) {
  105.                 threadKey = ((PThreadPoolExecutor) e).getThreadPoolKey();
  106.             }
  107.             if (r instanceof Thread) {
  108.                 log.warn("CallRuns theadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
  109.             } else {
  110.                 log.warn("CallRuns theadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
  111.             }
  112.             super.rejectedExecution(r, e);
  113.         }
  114.     }
  115.     /**
  116.      * 拒绝策略 -- activateMQ的做法
  117.      * 重新尝试加入队列,等待超时,影响线程执行效率
  118.      */
  119.     static class DiscardSynchronousQueueWaitPolicy implements RejectedExecutionHandler {
  120.         private long discardContinueWait = 1;
  121.         public DiscardSynchronousQueueWaitPolicy(long discardContinueWait) {
  122.             if (discardContinueWait <= 0) {
  123.                 discardContinueWait = 1;
  124.             }
  125.             this.discardContinueWait = discardContinueWait;
  126.         }
  127.         @Override
  128.         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  129.             String threadKey = "";
  130.             if (executor instanceof PThreadPoolExecutor) {
  131.                 threadKey = ((PThreadPoolExecutor) executor).getThreadPoolKey();
  132.             }
  133.             if (r instanceof Thread) {
  134.                 log.warn("Discard threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
  135.             } else {
  136.                 log.warn("Discard threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
  137.             }
  138.             if (!executor.isShutdown()) {
  139.                 try {
  140.                     executor.getQueue().poll(discardContinueWait, TimeUnit.MICROSECONDS);
  141.                 } catch (InterruptedException e) {
  142.                     log.error("rejectedExecution", e);
  143.                 }
  144.                 executor.execute(r);
  145.             }
  146.         }
  147.     }
  148.     /**
  149.      * 拒绝策略 -- netty的做法
  150.      * 创建新的线程,直接执行,直到系统创建不了新的线程为止
  151.      */
  152.     static class NewThreadToRun implements RejectedExecutionHandler {
  153.         public NewThreadToRun() {
  154.             super();
  155.         }
  156.         @Override
  157.         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  158.             String threadKey = "";
  159.             if (executor instanceof PThreadPoolExecutor) {
  160.                 threadKey = ((PThreadPoolExecutor) executor).getThreadPoolKey();
  161.             }
  162.             if (r instanceof Thread) {
  163.                 log.warn("NewThreadToRun threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
  164.             } else {
  165.                 log.warn("NewThreadToRun threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
  166.             }
  167.             try {
  168.                 final Thread thread = new Thread(r, "new thread: " + threadKey);
  169.                 thread.start();
  170.             } catch (Exception e) {
  171.                 throw new RejectedExecutionException("Failed to start new thread. threadKey:" + threadKey, e);
  172.             }
  173.         }
  174.     }
  175.     /**
  176.      * 线程池,添加添加一些日志打印
  177.      */
  178.     static class PThreadPoolExecutor extends ThreadPoolExecutor {
  179.         private String threadPoolKey;
  180.         public PThreadPoolExecutor(String threadPoolKey, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
  181.             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
  182.             this.threadPoolKey = threadPoolKey;
  183.         }
  184.         @Override
  185.         public void execute(Runnable command) {
  186.             try {
  187.                 super.execute(command);
  188.             } catch (Exception e) {
  189.                 log.error("execute error.", e);
  190.             }
  191.             log.debug("execute runnable, hashCode:{}, threadPoolKey:{}, poolSize:{}, largestPoolSize:{}, activeCount:{}, taskCount:{}, completedTaskCount:{}, queueSize:{}",
  192.                     command.hashCode(), threadPoolKey, this.getPoolSize(), this.getLargestPoolSize(), this.getActiveCount(), this.getTaskCount(), this.getCompletedTaskCount(), this.getQueue().size());
  193.         }
  194.         @Override
  195.         protected void afterExecute(Runnable r, Throwable t) {
  196.             if (t != null) {
  197.                 log.error("execute Runnable error, hashCode:{}", r.hashCode(), t);
  198.             }
  199.             super.afterExecute(r, t);
  200.         }
  201.         public String getThreadPoolKey() {
  202.             return threadPoolKey;
  203.         }
  204.     }
  205. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大号在练葵花宝典

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表