论坛
潜水/灌水快乐,沉淀知识,认识更多同行。
ToB圈子
加入IT圈,遇到更多同好之人。
朋友圈
看朋友圈动态,了解ToB世界。
ToB门户
了解全球最新的ToB事件
博客
Blog
排行榜
Ranklist
文库
业界最专业的IT文库,上传资料也可以赚钱
下载
分享
Share
导读
Guide
相册
Album
记录
Doing
搜索
本版
文章
帖子
ToB圈子
用户
免费入驻
产品入驻
解决方案入驻
公司入驻
案例入驻
登录
·
注册
只需一步,快速开始
账号登录
立即注册
找回密码
用户名
Email
自动登录
找回密码
密码
登录
立即注册
首页
找靠谱产品
找解决方案
找靠谱公司
找案例
找对的人
专家智库
悬赏任务
圈子
SAAS
ToB企服应用市场:ToB评测及商务社交产业平台
»
论坛
›
软件与程序人生
›
后端开发
›
Java
›
ForkJoinPool在生产环境中使用碰到的一个问题 ...
ForkJoinPool在生产环境中使用碰到的一个问题
兜兜零元
金牌会员
|
2024-5-15 05:41:30
|
显示全部楼层
|
阅读模式
楼主
主题
877
|
帖子
877
|
积分
2631
1、配景
在我们的项目中有这么一个场景,需要消费kafka中的消息,并生成对应的工单数据。早些时候程序运行的好好的,但是有一天,我们升级了容器的配置,结果导致部分消息无法消费。而消费者的代码是使用CompletableFuture.runAsync(() -> {while (true){ ..... }}) 来实现的。
即:
需要消费Kafka topic的个数: 7个,每个线程消费一个topic
消费方式:使用线程池异步消费
消费池:默认的 ForkJoin 线程池???,并且没有做任何配置
是否会释放线程池中的核心线程: 不会释放
没出问题时容器配置: 2核4G
出问题时容器配置:4核8G,影响的结果:只有3个topic的数据可以消费。
2、容器2核4G可以正常消费
即:此时程序会启动7个线程来进行消费。
3、容器4核8G只有部分可以消费
即:此时程序会启动3个线程来进行消费。
4、问题原因分析
1、通过上面的配景我们可以知道,是因为升级了容器的配置,才导致我们消费kafka中的消息失败了。
2、针对kafka中的每个topic,我们都会使用一个单独的线程来消费,并且不会释放这个线程。
3、而线程的启动方式是通过CompletableFuture.runAsync()方法来启动的,那么通过这种方式启动的线程,是每个任务一个启动一个线程,还是只启动固定的线程呢?.
通过以上分析,那么问题肯定是出现在线程池身上,那么我们默认使用的是什么线程池呢?查看CompletableFuture.runAsync()的源码可知,有一定的几率是ForkJoinPool。那么我们一起看下源码。
5、源码分析
1、确认使用什么线程池
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
复制代码
通过上述源码可知,我们可能使用的ForkJoin线程池,也可能使用的是ThreadPerTaskExecutor线程池。
ThreadPerTaskExecutor 这个是每个任务,一个线程。
ForkJoinPool 那么就需要确定启动了多少个线程。
2、确认是否使用 ForkJoin 线程池
需要确定 useCommonPool 字段是如何赋值的。
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
复制代码
通过上面代码可知,是否使用ForkJoin线程池,是由 ForkJoinPool.getCommonPoolParallelism()的值确定的。(即并行度是否大于1,大于则使用ForkJoin线程池)
public static int getCommonPoolParallelism() {
return commonParallelism;
}
复制代码
3、commonParallelism 的赋值
1、从上图中可知parallelism的设置有2种方式
<ul>通过Jvm的启动参数java.util.concurrent.ForkJoinPool.common.parallelism进行设置,且这个值最大为 MAX_CAP即32727。
若没有通过Jvm的参数配置,则有2种情况,若cpu的核数 0 ? par : 1;[/code]SMASK 的值是 65535。
common.config 的值就是 (parallelism & SMASK) | 0的值,即最大为65535,若parallelism的值为0,则返回0。
int par = common.config & SMASK ,即最大为 65535
commonParallelism = par > 0 ? par : 1 的值就为 parallelism的值或1
6、结论
结论:
由上面的知识点,我们可以得出,当我们的容器是2核4G时,程序选择的线程池是ThreadPerTaskExecutor,当我们的容器是4核8G时,程序选择的线程池是ForkJoinPool。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
本帖子中包含更多资源
您需要
登录
才可以下载或查看,没有账号?
立即注册
x
回复
使用道具
举报
0 个回复
正序浏览
返回列表
快速回复
高级模式
B
Color
Image
Link
Quote
Code
Smilies
您需要登录后才可以回帖
登录
or
立即注册
本版积分规则
发表回复
回帖并转播
发新帖
回复
兜兜零元
金牌会员
这个人很懒什么都没写!
楼主热帖
Oracle调度器Scheduler
数据库的建立、增、删、改、查 ...
clang-format的使用
深入解析kubernetes中的选举机制 ...
【黄啊码】MySQL入门—4、掌握这些数据 ...
MySQL安装配置
【黄啊码】MySQL入门—5、数据库小技巧 ...
V Rising 服务器搭建
2万多条健康网站文章大全ACCESS\EXCEL ...
为什么用Redis做排行榜?
标签云
存储
服务器
快速回复
返回顶部
返回列表