马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
Java线程池ThreadPoolExecutor
一、线程池特性
1.动态调整线程数量
根据当前的工作负载,ThreadPoolExecutor 可以动态地增加或减少工作线程的数量。当有新的任务提交且当前线程数小于核心线程数时,会创建新的线程;如果已有充足的线程,则将任务放入队列中。如果队列已满并且线程数未达到最大值,还会创建额外的线程来处置惩罚任务。
2.队列管理
ThreadPoolExecutor 支持多种范例的阻塞队列,如 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 等。选择符合的队列范例可以影响线程池的行为和性能。
例如,利用无界队列(如 LinkedBlockingQueue)会导致所有超出核心线程数的任务都被排队,而不会立刻创建新的线程;相反,SynchronousQueue 则强制每个任务都由一个可用线程立刻处置惩罚,否则会被拒绝。
3.拒绝计谋
当线程池无法接受新任务时(比如由于线程数已达上限并且队列也满了),会触发拒绝计谋。Java 提供了几种内置的拒绝计谋实现,也可以自定义拒绝计谋:
- AbortPolicy:抛出 RejectedExecutionException。
- CallerRunsPolicy:由调用线程(提交任务的线程)执行该任务。
- DiscardPolicy:静默抛弃任务。
- DiscardOldestPolicy:抛弃队列中最老的任务,并尝试重新提交当前任务。
4.生命周期管理
ThreadPoolExecutor 提供了 shutdown() 和 shutdownNow() 方法来优雅地关闭线程池。前者会等待所有已提交的任务完成后再关闭,后者则试图立刻停止所有正在执行的任务,并返回尚未开始的任务列表。
二、参数介绍和基本利用
1.参数介绍
- public ThreadPoolExecutor(
- int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
复制代码
- corePoolSize:线程池中保持的核心线程数,纵然这些线程是空闲的。
- maximumPoolSize:线程池答应的最大线程数。
- keepAliveTime:当线程数超过核心线程数时,多余的空闲线程在终止前等待新任务的时间。
- unit:keepAliveTime参数的时间单位。
- workQueue:用于保存等待执行的任务的阻塞队列。
- threadFactory:用于创建新线程的工厂。
- handler:当任务提交到已满的线程池时所利用的拒绝计谋。
2.基本利用
- public class ThreadPoolExample {
- public static void main(String[] args) {
- // 定义线程池参数
- int corePoolSize = 2; // 核心线程数
- int maximumPoolSize = 4; // 最大线程数
- long keepAliveTime = 5000; // 空闲线程存活时间
- TimeUnit unit = TimeUnit.MILLISECONDS; // 时间单位
- BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10); // 工作队列
-
- // 创建 ThreadPoolExecutor
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- corePoolSize,
- maximumPoolSize,
- keepAliveTime,
- unit,
- workQueue
- );
- // 提交任务给线程池执行
- for (int i = 0; i < 10; i++) {
- final int taskNumber = i;
- executor.execute(() -> {
- System.out.println("Executing Task " + taskNumber + " by " + Thread.currentThread().getName());
- try {
- Thread.sleep(2000); // 模拟任务耗时
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- }
- // 关闭线程池
- executor.shutdown();
- try {
- if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {//等待60秒
- executor.shutdownNow();//不管线程池中的任务是否完成,都直接中断掉
- }
- } catch (InterruptedException e) {
- executor.shutdownNow();
- }
- }
- }
复制代码 二、线程池状态
1.线程池状态范例
ThreadPoolExecutor 内部维护了一个状态机来跟踪线程池的差别状态。包罗:
- 运行(RUNNING)
- 关闭(SHUTDOWN)
- 停止(STOP)
- 整理(TIDYING)
- 终结(TERMINATED)
2.线程池状态实现
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//初始化 ctl 的值,表示线程池处于运行状态并且当前没有活动的工作线程。
- private static final int COUNT_BITS = Integer.SIZE - 3;//表示用于存储线程计数(workerCount)的位数。由于 Java 的 int 类型有 32 位(Integer.SIZE),这里减去 3 位留给状态标志,因此留下 29 位用于线程计数。如果线程数量不满足,则可以将类型改为long类型以及AtomicInteger改了AtomicLong
- private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;//00011111111111111111111111111111 这是一个掩码,用于从 ctl 中提取出线程计数值。它由 COUNT_BITS 位全为 1 组成,即低 29 位为 1。
- // runState is stored in the high-order bits
- private static final int RUNNING = -1 << COUNT_BITS;//当线程池处于运行状态时,高 3 位设置为 -1 的二进制补码形式(即 111...000)。
- private static final int SHUTDOWN = 0 << COUNT_BITS;//线程池正在关闭,不再接受新任务,但会继续处理队列中的任务,高 3 位为 000。
- private static final int STOP = 1 << COUNT_BITS;//线程池已停止,不再接受新任务,也不再处理队列中的任务,高 3 位为 001。
- private static final int TIDYING = 2 << COUNT_BITS;//所有任务都已完成,线程池即将进入终结状态,高 3 位为 010。
- private static final int TERMINATED = 3 << COUNT_BITS;//线程池已经完全终止,所有资源都被释放,高 3 位为 011。
- // Packing and unpacking ctl
- /**
- * COUNT_MASK=00011111111111111111111111111111 取反之后 11100000000000000000000000000000 既为RUNNING初始状态
- * c & ~COUNT_MASK 与运算得到线程池当前的状态;可以这样理解:当前ctl的值取二进制高三位进行比对,得到的结果就是当前ctl代表的线程池状态
- *
- */
- private static int runStateOf(int c) { return c & ~COUNT_MASK; }
- /**
- * COUNT_MASK=00011111111111111111111111111111
- * c & COUNT_MASK 去除高三位状态,相当于重置高三位的二进制为0,后29位二进制就是当前线程数量
- */
- private static int workerCountOf(int c) { return c & COUNT_MASK; }
- /**
- * 或运算运算,例如 00011111111111111111111111111111 | 11100000000000000000000000000000 = 11111111111111111111111111111111
- * 这样理解,高三位表示状态|低29位表示线程数量,这个时候只要用一个int类型存储,那么将高三位的二进制和低29位的二进制合并一起为一个数,就是ctl的值
- */
- private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码 三、封装线程池工具类
- package com.zzc.common.utils;
- import lombok.extern.slf4j.Slf4j;
- import java.util.Map;
- import java.util.concurrent.BlockingDeque;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.Executor;
- import java.util.concurrent.Executors;
- import java.util.concurrent.LinkedBlockingDeque;
- import java.util.concurrent.RejectedExecutionException;
- import java.util.concurrent.RejectedExecutionHandler;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ScheduledThreadPoolExecutor;
- import java.util.concurrent.SynchronousQueue;
- import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- @Slf4j
- public class ThreadPoolUtils {
- private static final Map<String, ThreadPoolExecutor> THREAD_POOL_EXECUTOR;
- private static final String COMMON_THREAD_POOL_KEY;
- private static final String COMMON_SCHEDULE_EXECUTOR_POOL_KEY;
- private static final int DEFAULT_CORE_POOL_SIZE = 1;
- private static final int DEFAULT_KEEP_ALIVE_TIME = 30;
- private static final int DEFAULT_PROCESSORS = 2;
- private static int ALB_PROCESSORS = DEFAULT_PROCESSORS;
- static {
- THREAD_POOL_EXECUTOR = new ConcurrentHashMap<>();
- COMMON_THREAD_POOL_KEY = "COMMON";
- COMMON_SCHEDULE_EXECUTOR_POOL_KEY = "SCHEDULE-COMMON";
- int processors = Runtime.getRuntime().availableProcessors();
- ALB_PROCESSORS = processors;
- log.info("availableProcessors:{}, DEFAULT_PROCESS:{}", ALB_PROCESSORS, DEFAULT_PROCESSORS);
- ALB_PROCESSORS = Math.max(ALB_PROCESSORS, DEFAULT_PROCESSORS);
- }
- // private static Executor executor = Executors.newFixedThreadPool()
- /**
- *
- * @param threadPoolKey
- * @param corePoolSize
- * @param maxPoolSize
- * @param keepAliveTime
- * @param timeUnit
- * @param discardContinueWait 如果被拒绝,则等待时间,单位ms
- * @return
- */
- public static ThreadPoolExecutor newThreadPoolExecutorDirectAndAsy(String threadPoolKey, int corePoolSize,
- int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
- int discardContinueWait) {
- return newThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, new SynchronousQueue(true), new DiscardSynchronousQueueWaitPolicy(discardContinueWait));
- }
- public static ThreadPoolExecutor newThreadPoolExecutorNewThreadToRun(String threadPoolKey, int corePoolSize,
- int maxPoolSize, int keepAliveTime, TimeUnit timeUnit) {
- return newThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, new SynchronousQueue(true), new NewThreadToRun());
- }
- public static ThreadPoolExecutor newThreadPoolExecutor(String threadPoolKey, int corePoolSize,
- int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
- BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
- if (StrUtils.isBlank(threadPoolKey)) {
- throw new RuntimeException("threadPoolKey is null");
- }
- if (THREAD_POOL_EXECUTOR.containsKey(threadPoolKey)) {
- return THREAD_POOL_EXECUTOR.get(threadPoolKey);
- }
- log.info("before new threadPool, threadPoolKey:{}, corePoolSize:{}, maxPoolSize:{}, keepAliveTime:{}, timeUnit:{}", threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit);
- corePoolSize = corePoolSize <= 0 ? DEFAULT_CORE_POOL_SIZE : corePoolSize;
- maxPoolSize = maxPoolSize <= 0 ? corePoolSize : maxPoolSize;
- keepAliveTime = keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime;
- timeUnit = timeUnit == null ? TimeUnit.SECONDS : timeUnit;
- ThreadPoolExecutor executor = new PThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue, newThreadFactory(threadPoolKey), handler);
- log.info("after new thread pool, threadPoolKey:{}, corePoolSize:{}, maxPoolSize:{}, keepAliveTime:{}, timeUnit:{}", threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit);
- THREAD_POOL_EXECUTOR.put(threadPoolKey, executor);
- return executor;
- }
- public static ScheduledExecutorService getCommonScheduleExecutorsPool() {
- ScheduledThreadPoolExecutor executorService = (ScheduledThreadPoolExecutor) THREAD_POOL_EXECUTOR.get(COMMON_SCHEDULE_EXECUTOR_POOL_KEY);
- if (executorService == null) {
- executorService = new ScheduledThreadPoolExecutor(2, newThreadFactory(COMMON_SCHEDULE_EXECUTOR_POOL_KEY));
- THREAD_POOL_EXECUTOR.put(COMMON_SCHEDULE_EXECUTOR_POOL_KEY, executorService);
- }
- return executorService;
- }
- public static ThreadFactory newThreadFactory(String threadPrefix) {
- ThreadFactory threadFactory = new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setName(threadPrefix);
- thread.setDaemon(true);
- log.info("new thread:{}", threadPrefix);
- return thread;
- }
- };
- return threadFactory;
- }
- /**
- * 拒绝策略
- * 直接给主线程自己执行
- */
- static class CallerRunsPolicy extends ThreadPoolExecutor.CallerRunsPolicy {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- String threadKey = "";
- if (e instanceof PThreadPoolExecutor) {
- threadKey = ((PThreadPoolExecutor) e).getThreadPoolKey();
- }
- if (r instanceof Thread) {
- log.warn("CallRuns theadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
- } else {
- log.warn("CallRuns theadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
- }
- super.rejectedExecution(r, e);
- }
- }
- /**
- * 拒绝策略 -- activateMQ的做法
- * 重新尝试加入队列,等待超时,影响线程执行效率
- */
- static class DiscardSynchronousQueueWaitPolicy implements RejectedExecutionHandler {
- private long discardContinueWait = 1;
- public DiscardSynchronousQueueWaitPolicy(long discardContinueWait) {
- if (discardContinueWait <= 0) {
- discardContinueWait = 1;
- }
- this.discardContinueWait = discardContinueWait;
- }
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- String threadKey = "";
- if (executor instanceof PThreadPoolExecutor) {
- threadKey = ((PThreadPoolExecutor) executor).getThreadPoolKey();
- }
- if (r instanceof Thread) {
- log.warn("Discard threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
- } else {
- log.warn("Discard threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
- }
- if (!executor.isShutdown()) {
- try {
- executor.getQueue().poll(discardContinueWait, TimeUnit.MICROSECONDS);
- } catch (InterruptedException e) {
- log.error("rejectedExecution", e);
- }
- executor.execute(r);
- }
- }
- }
- /**
- * 拒绝策略 -- netty的做法
- * 创建新的线程,直接执行,直到系统创建不了新的线程为止
- */
- static class NewThreadToRun implements RejectedExecutionHandler {
- public NewThreadToRun() {
- super();
- }
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- String threadKey = "";
- if (executor instanceof PThreadPoolExecutor) {
- threadKey = ((PThreadPoolExecutor) executor).getThreadPoolKey();
- }
- if (r instanceof Thread) {
- log.warn("NewThreadToRun threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
- } else {
- log.warn("NewThreadToRun threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
- }
- try {
- final Thread thread = new Thread(r, "new thread: " + threadKey);
- thread.start();
- } catch (Exception e) {
- throw new RejectedExecutionException("Failed to start new thread. threadKey:" + threadKey, e);
- }
- }
- }
- /**
- * 线程池,添加添加一些日志打印
- */
- static class PThreadPoolExecutor extends ThreadPoolExecutor {
- private String threadPoolKey;
- public PThreadPoolExecutor(String threadPoolKey, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
- this.threadPoolKey = threadPoolKey;
- }
- @Override
- public void execute(Runnable command) {
- try {
- super.execute(command);
- } catch (Exception e) {
- log.error("execute error.", e);
- }
- log.debug("execute runnable, hashCode:{}, threadPoolKey:{}, poolSize:{}, largestPoolSize:{}, activeCount:{}, taskCount:{}, completedTaskCount:{}, queueSize:{}",
- command.hashCode(), threadPoolKey, this.getPoolSize(), this.getLargestPoolSize(), this.getActiveCount(), this.getTaskCount(), this.getCompletedTaskCount(), this.getQueue().size());
- }
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- if (t != null) {
- log.error("execute Runnable error, hashCode:{}", r.hashCode(), t);
- }
- super.afterExecute(r, t);
- }
- public String getThreadPoolKey() {
- return threadPoolKey;
- }
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |