IT评测·应用市场-qidao123.com

标题: flink sql 优化 [打印本页]

作者: 乌市泽哥    时间: 2024-8-4 16:49
标题: flink sql 优化

提示:及时flink sql 参考很多网上方法与自己实践方法汇总(版本:flink1.13+)
一、参数方面


  1. //关闭详细算子链(默认为true),true后job性能会略微有提升。false则可以展示更详细的DAG图方便地位性能结点   ###有用的参数
  2. pipeline.operator-chaining: 'true'
  3. //指定时区  ###实用的参数
  4. table.local-time-zone: Asia/Shanghai
  5. //对flink sql是否要敏感大小(建议false,不区分大小写。默认为true)
  6. table.identifier-case-sensitive: 'false'
  7. //开启 miniBatch
  8. table.exec.mini-batch.enabled: 'true'
  9. //批量输出的间隔时间
  10. table.exec.mini-batch.allow-latency: 5s
  11. //防止 OOM 设置每个批次最多缓存数据的条数
  12. table.exec.mini-batch.size: '500'
  13. //提交批次数据大小
  14. batchSize: '127108864'
  15. //刷数据间隔
  16. flushIntervalMs: '60000'
  17. //几个flush线程
  18. numFlushThreads: '5'
  19. // 写odps时压缩 :https://help.aliyun.com/zh/flink/developer-reference/maxcompute-connector
  20. compressAlgorithm: snappy
  21. //开启异步状态后端
  22. state.backend.async: 'true'
  23. //状态后端开启增量(默认就是true 增量)
  24. state.backend.incremental: 'true'
  25. //作业链与处理槽共享组(默认为false),开启后在针对某个操作算子增加并行度和cu等资源时,不与其他槽位共享资源,单独增加额外资源  ###有用的参数
  26. table.exec.split-slot-sharing-group-per-vertex: 'true'
  27. //Checkpoint间隔时间,单位为毫秒 默认180秒 ###如果作业量大,可以适当调大间隔时间。性能方便略有提升
  28. execution.checkpointing.interval: 180s
  29. //State数据的生命周期,单位为毫秒。默认36小时
  30. table.exec.state.ttl: 129600000
  31. //Checkpoint生成超时时间(默认值10分钟),当Checkpoint生成时间超过10分钟,flink会把创建生成的Checkpoint杀掉,重新再创建生成Checkpoint。如果观察自己的job生成时间过长减少被杀死Checkpoint可以调大下面时间   ###有用的参数
  32. execution.checkpointing.timeout        :10min
复制代码

  1. // 初始化 table environment
  2. TableEnvironment tEnv = ...
  3. // 获取 tableEnv 的配置对象
  4. Configuration configuration = tEnv.getConfig().getConfiguration();
  5. // 设置参数:
  6. // 开启 miniBatch
  7. configuration.setString("table.exec.mini-batch.enabled", "true");
  8. // 批量输出的间隔时间
  9. configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
  10. // 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
  11. configuration.setString("table.exec.mini-batch.size", "20000");
  12. // 开启 LocalGlobal(job有聚合函数使用)
  13. configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
  14. // 开启 Split Distinct (job有聚合函数使用)
  15. configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
  16. // 第一层打散的 bucket 数目 (job有聚合函数使用)
  17. configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
  18. // TopN 的缓存条数 (job有分组top使用)
  19. configuration.setString("table.exec.topn.cache-size", "200000");
  20. // 指定时区
  21. configuration.setString("table.local-time-zone", "Asia/Shanghai");
复制代码

5).全量Checkpoint与增量Checkpoint的大小一致,是否正常?
假如您在使用Flink的环境下,观察到全量Checkpoint与增量Checkpoint的大小一致:

增量Checkpoint通常是从第二个Checkpoint开始表现出来的,在数据稳定输入且没有大规模的状态变动时,后续的增量Checkpoint应该表现出大小上的差异,表明系统正常地只对状态的增量部门举行快照。假如仍然一致,则必要进一步审查系统状态和举动,确认是否存在题目。
二、资源方面

当上面添加配置性能还是不行是,可以增加资源。

*上面只是建议给taskmanager 1cup,4Gb内存起,原因现在很多平台大多是云虚拟资源,如许分配性能较好,同时也是养成精良习惯。
三、总结

   不是所有job资源越堆越多好。有时作业的复杂或数据的特殊环境(外部系统性能除外,比方写数据库),增加资源只会让job性能越来越差或报错(亲身经历job性能差,特殊痛苦,一直加资源性能还是差或运行报错)。必要不断找根源题目,多使用不同方法测试才能找到恰当job的处理性能。
  



*上面案例头脑:
A.镌汰CP生成时间。flink才能快速处理数据(提交完已处理的偏移量数据,快速举行下一轮的新数据)。
B.在有回撤流,必要状态(自己观察在一个并发时CP较大几百兆,一般join环境出现的比力多)将多个并发只管放到一个slot,镌汰数据传输和交换(一个槽位共享状态)。其他简单的job没有或很少回撤流的环境下可以只建一个槽位。
C.增加并行度会导致CP增大。原因之前一个线程一个CP,现在是多个线程有自己的状态(大概会有重复数据状态),多个状态合在一起CP就大了。

参考:文档1

补充说明:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4