Nacos 实现动态化线程池,真香!

打印 上一主题 下一主题

主题 895|帖子 895|积分 2685

在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。
在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到平台侧,运行开发同学根据系统运行情况对核心参数进行动态配置。
本文以Nacos作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。
代码实现

推荐一个开源免费的 Spring Boot 实战项目:
https://github.com/javastacks/spring-boot-best-practice
1.依赖
  1. <dependency>
  2.     <groupId>com.alibaba.cloud</groupId>
  3.     <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  4.     <version>2021.1</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>com.alibaba.cloud</groupId>
  8.     <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
  9.     <version>2021.1</version>
  10. </dependency>
  11. <dependency>
  12.     <groupId>org.springframework.boot</groupId>
  13.     <artifactId>spring-boot-starter-web</artifactId>
  14. </dependency>
  15. <dependency>
  16.     <groupId>org.springframework.boot</groupId>
  17.     <artifactId>spring-boot-starter</artifactId>
  18. </dependency>
复制代码
2.配置yml文件

Spring Boot 基础就不介绍了,推荐看这个实战项目:
https://github.com/javastacks/spring-boot-best-practice
bootstrap.yml:
  1. server:
  2.   port: 8010
  3.   # 应用名称(nacos会将该名称当做服务名称)
  4. spring:
  5.   application:
  6.     name: order-service
  7.   cloud:
  8.     nacos:
  9.       discovery:
  10.         namespace: public
  11.         server-addr: 192.168.174.129:8848
  12.       config:
  13.         server-addr: 192.168.174.129:8848
  14.         file-extension: yml
复制代码
application.yml:
  1. spring:
  2.   profiles:
  3.     active: dev
复制代码
为什么要配置两个yml文件?
springboot中配置文件的加载是存在优先级顺序的,bootstrap优先级高于application。
nacos在项目初始化时,要保证先从配置中心进行配置拉取,拉取配置之后才能保证项目的正常启动。
3.nacos配置

登录到nacos管理页面,新建配置,如下图所示:

注意Data ID的命名格式为,${spring.application.name}-${spring.profile.active}.${spring.cloud.nacos.config.file-extension} ,在本文中,Data ID的名字就是order-service-dev.yml。

这里我们只配置了两个参数,核心线程数量和最大线程数。
4.线程池配置和nacos配置变更监听
  1. @RefreshScope
  2. @Configuration
  3. public class DynamicThreadPool implements InitializingBean {
  4.     @Value("${core.size}")
  5.     private String coreSize;
  6.     @Value("${max.size}")
  7.     private String maxSize;
  8.     private static ThreadPoolExecutor threadPoolExecutor;
  9.     @Autowired
  10.     private NacosConfigManager nacosConfigManager;
  11.     @Autowired
  12.     private NacosConfigProperties nacosConfigProperties;
  13.     @Override
  14.     public void afterPropertiesSet() throws Exception {
  15.         //按照nacos配置初始化线程池
  16.         threadPoolExecutor = new ThreadPoolExecutor(Integer.parseInt(coreSize), Integer.parseInt(maxSize), 10L, TimeUnit.SECONDS,
  17.                 new LinkedBlockingQueue<>(10),
  18.                 new ThreadFactoryBuilder().setNameFormat("c_t_%d").build(),
  19.                 new RejectedExecutionHandler() {
  20.                     @Override
  21.                     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  22.                         System.out.println("rejected!");
  23.                     }
  24.                 });
  25.         //nacos配置变更监听
  26.         nacosConfigManager.getConfigService().addListener("order-service-dev.yml", nacosConfigProperties.getGroup(),
  27.                 new Listener() {
  28.                     @Override
  29.                     public Executor getExecutor() {
  30.                         return null;
  31.                     }
  32.                     @Override
  33.                     public void receiveConfigInfo(String configInfo) {
  34.                         //配置变更,修改线程池配置
  35.                         System.out.println(configInfo);
  36.                         changeThreadPoolConfig(Integer.parseInt(coreSize), Integer.parseInt(maxSize));
  37.                     }
  38.                 });
  39.     }
  40.     /**
  41.      * 打印当前线程池的状态
  42.      */
  43.     public String printThreadPoolStatus() {
  44.         return String.format("core_size:%s,thread_current_size:%s;" +
  45.                         "thread_max_size:%s;queue_current_size:%s,total_task_count:%s", threadPoolExecutor.getCorePoolSize(),
  46.                 threadPoolExecutor.getActiveCount(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getQueue().size(),
  47.                 threadPoolExecutor.getTaskCount());
  48.     }
  49.     /**
  50.      * 给线程池增加任务
  51.      *
  52.      * @param count
  53.      */
  54.     public void dynamicThreadPoolAddTask(int count) {
  55.         for (int i = 0; i < count; i++) {
  56.             int finalI = i;
  57.             threadPoolExecutor.execute(new Runnable() {
  58.                 @Override
  59.                 public void run() {
  60.                     try {
  61.                         System.out.println(finalI);
  62.                         Thread.sleep(10000);
  63.                     } catch (InterruptedException e) {
  64.                         e.printStackTrace();
  65.                     }
  66.                 }
  67.             });
  68.         }
  69.     }
  70.     /**
  71.      * 修改线程池核心参数
  72.      *
  73.      * @param coreSize
  74.      * @param maxSize
  75.      */
  76.     private void changeThreadPoolConfig(int coreSize, int maxSize) {
  77.         threadPoolExecutor.setCorePoolSize(coreSize);
  78.         threadPoolExecutor.setMaximumPoolSize(maxSize);
  79.     }
  80. }
复制代码
这个代码就是实现动态线程池和核心了,需要说明的是:

  • @RefreshScope:这个注解用来支持nacos的动态刷新功能;
  • @Value("${max.size}"),@Value("${core.size}"):这两个注解用来读取我们上一步在nacos配置的具体信息;同时,nacos配置变更时,能够实时读取到变更后的内容
  • nacosConfigManager.getConfigService().addListener:配置监听,nacos配置变更时实时修改线程池的配置。
5.controller

为了观察线程池动态变更的效果,增加Controller类。
  1. @RestController
  2. @RequestMapping("/threadpool")
  3. public class ThreadPoolController {
  4.     @Autowired
  5.     private DynamicThreadPool dynamicThreadPool;
  6.     /**
  7.      * 打印当前线程池的状态
  8.      */
  9.     @GetMapping("/print")
  10.     public String printThreadPoolStatus() {
  11.         return dynamicThreadPool.printThreadPoolStatus();
  12.     }
  13.     /**
  14.      * 给线程池增加任务
  15.      *
  16.      * @param count
  17.      */
  18.     @GetMapping("/add")
  19.     public String dynamicThreadPoolAddTask(int count) {
  20.         dynamicThreadPool.dynamicThreadPoolAddTask(count);
  21.         return String.valueOf(count);
  22.     }
  23. }
复制代码
6.测试

启动项目,访问http://localhost:8010/threadpool/print打印当前线程池的配置。

可以看到,这个就是我们之前在nacos配置的线程数。
访问http://localhost:8010/threadpool/add?count=20增加20个任务,重新打印线程池配置

可以看到已经有线程在排队了。
为了能够看到效果,我们多访问几次/add接口,增加任务数,在控制台出现拒绝信息时调整nacos配置。

此时,执行/add命令时,所有的线程都会提示rejected。
调整nacos配置,将核心线程数调整为50,最大线程数调整为100.

重新多次访问/add接口增加任务,发现没有拒绝信息了。这时,打印具体的线程状态,发现线程池参数修改成功。

总结

这里,只是简单实现了一个可以调整核心线程数和最大线程数的动态线程池。具体的线程池实现原理可以参考美团的这篇文章:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html,结合监控告警等实现一个完善的动态线程池产品。
优秀的轮子还有好多,比如Hippo4J ,使用起来和dynamic-tp差不多。Hippo4J 有无依赖中间件实现动静线程池,也有默认实现Nacos和Apollo的版本,而dynamic-tp 默认实现依赖Nacos或Apollo。
来源:blog.csdn.net/m0_37558251/article/details/126542359
近期热文推荐:
1.1,000+ 道 Java面试题及答案整理(2022最新版)
2.劲爆!Java 协程要来了。。。
3.Spring Boot 2.x 教程,太全了!
4.别再写满屏的爆爆爆炸类了,试试装饰器模式,这才是优雅的方式!!
5.《Java开发手册(嵩山版)》最新发布,速速下载!
觉得不错,别忘了随手点赞+转发哦!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

怀念夏天

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表