乌市泽哥 发表于 2024-8-4 16:49:26

flink sql 优化

提示:及时flink sql 参考很多网上方法与自己实践方法汇总(版本:flink1.13+)
一、参数方面



[*]flink sql参数配置
//关闭详细算子链(默认为true),true后job性能会略微有提升。false则可以展示更详细的DAG图方便地位性能结点   ###有用的参数
pipeline.operator-chaining: 'true'
//指定时区###实用的参数
table.local-time-zone: Asia/Shanghai
//对flink sql是否要敏感大小(建议false,不区分大小写。默认为true)
table.identifier-case-sensitive: 'false'
//开启 miniBatch
table.exec.mini-batch.enabled: 'true'
//批量输出的间隔时间
table.exec.mini-batch.allow-latency: 5s
//防止 OOM 设置每个批次最多缓存数据的条数
table.exec.mini-batch.size: '500'
//提交批次数据大小
batchSize: '127108864'
//刷数据间隔
flushIntervalMs: '60000'
//几个flush线程
numFlushThreads: '5'
// 写odps时压缩 :https://help.aliyun.com/zh/flink/developer-reference/maxcompute-connector
compressAlgorithm: snappy
//开启异步状态后端
state.backend.async: 'true'
//状态后端开启增量(默认就是true 增量)
state.backend.incremental: 'true'
//作业链与处理槽共享组(默认为false),开启后在针对某个操作算子增加并行度和cu等资源时,不与其他槽位共享资源,单独增加额外资源###有用的参数
table.exec.split-slot-sharing-group-per-vertex: 'true'
//Checkpoint间隔时间,单位为毫秒 默认180秒 ###如果作业量大,可以适当调大间隔时间。性能方便略有提升
execution.checkpointing.interval: 180s
//State数据的生命周期,单位为毫秒。默认36小时
table.exec.state.ttl: 129600000
//Checkpoint生成超时时间(默认值10分钟),当Checkpoint生成时间超过10分钟,flink会把创建生成的Checkpoint杀掉,重新再创建生成Checkpoint。如果观察自己的job生成时间过长减少被杀死Checkpoint可以调大下面时间   ###有用的参数
execution.checkpointing.timeout        :10min


[*]datastream代码配置
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal(job有聚合函数使用)
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 开启 Split Distinct (job有聚合函数使用)
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目 (job有聚合函数使用)
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// TopN 的缓存条数 (job有分组top使用)
configuration.setString("table.exec.topn.cache-size", "200000");
// 指定时区
configuration.setString("table.local-time-zone", "Asia/Shanghai");


[*]flink sql 简单作业优化实验截图
1).调大checkpoint生成时间
https://img-blog.csdnimg.cn/direct/d6d804d7b84d433db7563eb5dece4502.png
https://img-blog.csdnimg.cn/direct/59304cacd552444090452ca7ddc992ed.png
2).去掉参数:pipeline.operator-chaining: ‘false’
https://img-blog.csdnimg.cn/direct/cfb02ebe88204668851e7eb88c26e9ed.png
https://img-blog.csdnimg.cn/direct/2150017134db4da28fb2949b04b87616.png
3).加攒批参数
https://img-blog.csdnimg.cn/direct/5f9279403bd34070a0a9731626d8c3a3.png
https://img-blog.csdnimg.cn/direct/dcd7e550da6a43999348a3224292d975.png
4).由于full GC导致job性能过差(排查)
https://img-blog.csdnimg.cn/direct/a7b31f0600414409a0439e4d3779ce1a.png
查看gc日志:
https://img-blog.csdnimg.cn/direct/f2d620bd078d4da5a520f69d3eeba6bc.png
https://img-blog.csdnimg.cn/direct/5fd3399e4e7b40299b1d42a92ae3d6d2.png
解决方案:对taskmanager增加内存(jobmanager略,因为它很少会出现频仍full gc)。
5).全量Checkpoint与增量Checkpoint的大小一致,是否正常?
假如您在使用Flink的环境下,观察到全量Checkpoint与增量Checkpoint的大小一致:


[*]检查增量快照是否正常配置并生效。
[*]是否为特定环境。在特定环境下,这种现象是正常的,比方:
a.在数据注入前(18:29之前),作业没有处理任何数据,此时Checkpoint只包罗了初始化的源(Source)状态信息。由于没有其他状态数据,此时的Checkpoint现实上是一个全量Checkpoint。
b.在18:29时注入了100万条数据。假设数据在接下来的Checkpoint隔断时间(3分钟)内被完全处理,并且期间没有其他数据注入,此时发生的第一个增量Checkpoint将会包罗这100万条数据产生的所有状态信息。
在这种环境下,全量Checkpoint和增量Checkpoint的大小一致是符合预期的。因为第一个增量Checkpoint必要包罗全量数据状态,以确保可以大概从该点恢复整个状态,导致它现实上也是一个全量Checkpoint。
增量Checkpoint通常是从第二个Checkpoint开始表现出来的,在数据稳定输入且没有大规模的状态变动时,后续的增量Checkpoint应该表现出大小上的差异,表明系统正常地只对状态的增量部门举行快照。假如仍然一致,则必要进一步审查系统状态和举动,确认是否存在题目。
二、资源方面

当上面添加配置性能还是不行是,可以增加资源。


[*]添加cu
一般对taskmanager添加cup,默认给1的整数倍,比方1,2,3等。jobmanager 根本不咋干活(业务数据处理),不消添加资源,之前给很少cup即可
[*]添加内存
一般默认每个taskmanager给4G内存,后面再对它增加资源。jobmanager不消增加内存
[*]槽位(slot)
每个 TaskManager Slot 数给1个( TaskManager 只能同时实行一个 Subtask),性能比力好(一般简单的job没有大量的回撤流的环境下)。
A.假如开3个并行度,每个taskmanager1个槽位:1个槽位 乘 3个并行度 乘 每个taskmanager分配的资源+job manager资源=job的总使用资源
B.假如开3个并行度,每个taskmanager3个槽位:1个槽位 乘 每个taskmanager分配的资源+job manager资源=job的总使用资源
[*]并行度
在 TaskManager Slot 数给1个环境下(此方案性能比力好),增加并行度可以提升处理性能。但taskmanager资源(内存和cpu)也会成倍增加
*上面只是建议给taskmanager 1cup,4Gb内存起,原因现在很多平台大多是云虚拟资源,如许分配性能较好,同时也是养成精良习惯。
三、总结

   不是所有job资源越堆越多好。有时作业的复杂或数据的特殊环境(外部系统性能除外,比方写数据库),增加资源只会让job性能越来越差或报错(亲身经历job性能差,特殊痛苦,一直加资源性能还是差或运行报错)。必要不断找根源题目,多使用不同方法测试才能找到恰当job的处理性能。


[*]假如优化很多次后job性能还是很差(资源给的很多性能还是不理想)(略增加一些资源)
可以将一个job拆分两个job(将占用比力多的业务数据(50%更好)在新的job单独处理)
[*]性能优化一直无法提升,要么看业务要么看job的性能瓶颈业务(业务牺牲)
[*]要么flink只做业务写表,离线负责处理业务写其他表(时效牺牲)


[*]调优举例(真实案例,折腾了很久):
背景:(flink 双流join) 默认资源配置(taskmanager 1cpu,4Bb内存,1个槽位,1个并行度)
数据有堆积,且越堆越多,写入性能弱(每秒十几条写入),CP(checkpoint有时失败,但很大,生成很慢),业务处理简单,单日数量在1700万条数据。
[*]后面开始对此job加资源,加并行度,加各种优化配置,增加CP生成时间等等。
开启job运行后生成CP一直失败(生成CP更大,之前200多兆,改后生成700-800兆还没有生成,生成变慢,生成时间酿成)
https://img-blog.csdnimg.cn/direct/e6af06e00ad24084bc48971a069c64cb.png
纵然加大CP生产时间和CP校验时间,CP仍旧是失败。
CP一直失败导致处理性能极差(CP在生成时整个job几乎都在克制),如下截图
https://img-blog.csdnimg.cn/direct/fcb9fb71da01416bb8bf1ba5babb7806.png
后面是各种调优尝试都不能,发现题目是flink在双流join时,有大量的回撤流.假如撤回数据较多的话 , 就会造成这个节点的state大 从而导致SinkMaterializer节点压力大(自己结合UI监控图观察得到)。
[*]后面经过很多次调优将并行度改为3,每个 TaskManager Slot 数 给3个,其他不变,性能有提升,CP生成也变快了
https://img-blog.csdnimg.cn/direct/5cd944ecac27499aab33760f935fee8b.jpeg
[*]又做调整将taskmanager 给3cup,内存给15Gb,开5个并行度,每个 TaskManager Slot 数 给5个。
目的:将5个并行度放到一个槽位,资源也没有使用多少。
测试后发现CP比上面3个并发的增量存储要大(意料之中),CP生成特殊快,已将数据堆压十几个小时的数据全部追上。
https://img-blog.csdnimg.cn/direct/2ac30eb63b0447c9a3db551d78e7bf50.png
*上面案例头脑:
A.镌汰CP生成时间。flink才能快速处理数据(提交完已处理的偏移量数据,快速举行下一轮的新数据)。
B.在有回撤流,必要状态(自己观察在一个并发时CP较大几百兆,一般join环境出现的比力多)将多个并发只管放到一个slot,镌汰数据传输和交换(一个槽位共享状态)。其他简单的job没有或很少回撤流的环境下可以只建一个槽位。
C.增加并行度会导致CP增大。原因之前一个线程一个CP,现在是多个线程有自己的状态(大概会有重复数据状态),多个状态合在一起CP就大了。
https://img-blog.csdnimg.cn/direct/c783da59fd314e429d08c36072ebd461.png
参考:文档1
补充说明:


[*]Flink Job并行度与slot槽位和taskmanager关系

[*]案例1:job设置5个并行度,每个TaskManager slot(槽位)数设置5个,则此job只有一个TaskManager进程(5个subtask)
[*]案例2:job设置5个并行度每个TaskManager slot(槽位)数设置2个,则此job会有3个TaskManager 进程(1个taskmanager运行1个subtask,其他两个taskmanager各运行2个subtask)
[*]Flink 提交job时 必要给job manager,taskmanager分配cup,内存资源。TaskManager数量越过必要的资源越多
[*]每个 TaskManager 可以拥有一个或多个 Slot,每个 Slot 可以运行一个 Subtask。
Subtask 在 TaskManager 的 Slot 中运行,实行数据处理的逻辑。
https://i-blog.csdnimg.cn/direct/39cd789f83ac44e0845b1e3922097780.png


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: flink sql 优化