Flink 反压

海哥  金牌会员 | 2024-11-30 03:19:28 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 826|帖子 826|积分 2478

反压

Flink反压是一个在实时盘算应用中常见的问题,特殊是在流式盘算场景中。以下是对Flink反压的具体解释:
一、反压释义

反压(backpressure)意味着数据管道中某个节点成为瓶颈,其处理速率跟不上上游发送数据的速率,从而须要对上游进行限速。在Flink等实时盘算框架中,反压通常是从某个节点传导至数据源,并低沉数据源(如Kafka consumer)的摄入速率。
二、反压原因


  • 数据倾斜:数据分布不均,导致个别task处理数据过多。
  • 算子性能问题:大概某个节点逻辑很复杂,如sink节点很慢或lookup join热查询慢等。
  • 流量陡增:如大促时流量激增,或者使用了数据炸开的函数。
三、反压影响


  • 使命处理性能出现瓶颈:比方,在消费Kafka时,大概会出现消费Kafka Lag。
  • Checkpoint时间长或失败:由于某些反压会导致barrier须要花很长时间才能对齐,从而影响使命的稳固性。
  • State状态变大:由于数据处理速率不匹配,大概导致系统内部状态堆积。
  • Kafka数据积存:当Flink作业无法实时处理Kafka中的数据时,会导致数据在Kafka中积存。
  • OOM(内存溢出):严重的反压大概导致系统资源耗尽,进而引发内存溢出等问题。
四、反压机制实现方式

在Flink中,反压机制可以通过以下两种方式实现:

  • 阻塞式反压:当下游使命无法实时处理上游使命生成的数据时,上游使命会被阻塞,直到下游使命处理完毕。这种方式可以保证数据不丢失,但会造成延迟增长。
  • 异步非阻塞式反压:此方式的具体实现和细节大概因Flink版本和配置而异,但通常旨在通过异步处理和非阻塞操作来减轻反压的影响。
五、解决思路


  • 优化数据分布:通过调解数据分区计谋或重新设计数据模子来减少数据倾斜。
  • 提拔算子性能:针对性能瓶颈的算子进行优化,如简化逻辑、增长资源等。
  • 限流与缓冲:在数据源端或关键节点前设置限流计谋和缓冲区,以平滑处理流量陡增的情况。
  • 监控与告警:建立完善的监控和告警机制,实时发现并处理反压问题。
   综上所述,Flink反压是一个须要关注的问题,它大概影响到实时盘算应用的性能和稳固性。通过理解反压的原理和影响,并采取相应的解决步伐,可以有效地提拔Flink作业的处理本领和稳固性。
  监控

Flink Web

Flink Web 界面提供了一个选项卡来监控正在运行 jobs 的反压举动。
Task 性能指标

task(SubTask)的每个并行实例都可以用三个一组的指标评价:


  • backPressuredTimeMsPerSecond,subtask 被反压的时间
  • dleTimeMsPerSecond,subtask 等待某类处理的时间
  • busyTimeMsPerSecond,subtask 现实工作时间 在任何时间点,这三个指标相加都约即是1000ms。
这些指标每两秒更新一次,上报的值表现 subtask 在最近两秒被反压(或闲或忙)的平均时长。 当工作负荷是变化的时须要尤其引起注意。如,一个以恒定50%负载工作的 subtask 和另一个每秒钟在满负载和闲置切换的 subtask 的busyTimeMsPerSecond值相同,都是500ms。
在内部,反压根据输出 buffers 的可用性来进行判定的。 如果一个 task 没有可用的输出 buffers,那么这个 task 就被认定是在被反压。 相反,如果有可用的输入,则可认定为闲置,
WebUI

WebUI 集合了所有 subTasks 的反压和繁忙指标的最大值,并在 JobGraph 中将集合的值进行显示。除了显示原始的数值,tasks 也用颜色进行了标记,使检查更加容易。

   闲置的 tasks 为蓝色,完全被反压的 tasks 为玄色,完全繁忙的 tasks 被标记为红色。 中间的所有值都表现为这三种颜色之间的过渡色。
  反压状态

在 Job Overview 旁的 Back Pressure 选项卡中,可以找到更多细节指标。

   如果看到 subtasks 的状态为 OK 表现没有反压。HIGH 表现这个 subtask 被反压。状态用如下定义:
  

  • OK: 0% <= 反压比例 <= 10%
  • LOW: 10% < 反压比例 <= 50%
  • HIGH: 50% < 反压比例 <= 100%
  Prometheus监控

在Flink中使用Prometheus进行反压监测通常涉及配置Flink的metrics系统以及Prometheus的配置。以下是配置Flink以使用Prometheus进行反压的基本步骤:
配置


  • 在Flink配置文件中启用Prometheus metrics(通常是flink-conf.yaml):
  1. metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
  2. metrics.reporter.promgateway.host: <prometheus-pushgateway-host>
  3. metrics.reporter.promgateway.port: <prometheus-pushgateway-port>
  4. metrics.reporter.promgateway.jobName: <job-name>
  5. metrics.reporter.promgateway.randomJobNameSuffix: true
  6. metrics.reporter.promgateway.deleteOnShutdown: false
复制代码

  • 确保Prometheus配置了PushGateway,而且Prometheus可以或许从Flink TaskManagers推送指标。


  • 下载并解压 Prometheus Pushgateway:
  1. wget https://github.com/prometheus/pushgateway/releases/download/v1.4.1/pushgateway-1.4.1.linux-amd64.tar.gz
  2. tar xvzf pushgateway-1.4.1.linux-amd64.tar.gz
  3. cd pushgateway-1.4.1.linux-amd64
复制代码


  • 创建一个系统服务文件 /etc/systemd/system/pushgateway.service:
  1. [Unit]
  2. Description=Pushgateway
  3. After=network.target
  4. [Service]
  5. User=nobody
  6. Group=nobody
  7. Type=simple
  8. ExecStart=/path/to/pushgateway
  9. [Install]
  10. WantedBy=multi-user.target
复制代码


  • 启动并使 Pushgateway 服务随系统启动:
  1. sudo systemctl daemon-reload
  2. sudo systemctl start pushgateway
  3. sudo systemctl enable pushgateway
复制代码


  • 配置 Prometheus 来从 Pushgateway 拉取数据。在 Prometheus 配置文件 (prometheus.yml) 中添加以下内容:
  1. scrape_configs:
  2.   - job_name: 'pushgateway'
  3.     static_configs:
  4.       - targets: ['localhost:9091']
复制代码

  • 配置Prometheus PushGateway,通常在Prometheus配置文件中(prometheus.yml):
  1. scrape_configs:
  2.   - job_name: 'flink-metrics'
  3.     honor_labels: true
  4.     static_configs:
  5.       - targets: ['<pushgateway-host>:<pushgateway-port>']
复制代码

  • 重启Flink集群和Prometheus以应用配置更改。
   配置告警规则,推送alertmanager进行告警关照推送
通过Grafana显示Flink运行状态
  方案

增长资源



  • 增长CPU资源: 调解TaskManager的CPU配置和并行化使命
  • 增长内存资源:调解TaskManager的内存配置和优化数据结构
  • 增长其他资源:比方:使用SSD或RDMA等网络加速设备、增长GPU资源
   注意:
  

  • 制止过度资源分配:虽然增长资源可以缓解反压问题,但过度分配资源大概导致资源浪费和资本增长。因此,在增长资源之前,须要仔细评估使命的现实需求和资源使用情况。
  • 结合其他优化步伐:除了增长资源外,还可以结合其他优化步伐来进一步缓解反压问题。比方,优化处理逻辑、减少盘算复杂度、使用更高效的数据结构等。
  数据倾斜


  • 数据倾斜定义
    数据倾斜是指数据的分布严重不均,造成一部门数据很多,一部门数据很少的局面。在Flink中,这通常表现为部门节点处理的数据量宏大于其他节点。
  • 数据倾斜的原因


  • 业务原因:如订单数据中某些都会的订单量宏大于其他都会。
  • 技术原因:大量使用KeyBy、GroupBy等操作,错误地使用了分组Key,人为产生数据热门。

  • 解决方案


  • 业务层面

    • 只管制止热门key的设计,比方将热门都会分成差别的区域,并进行单独处理。
    • 在数据预处理阶段对数据进行均衡处理,如使用随机前缀打散key。

  • 技术层面

    • 调解方案打散原来的key,制止直接聚合。
    • 使用Flink提供的二次聚合等计谋,先对打散后的数据进行聚合,再还原为真正的key进行二次聚合。
    • 优化join操作,将条目少的表/子查询放在Join的左边,减少内存溢出的几率。
    • 使用MapJoin处理小表关联大表的情况,制止数据倾斜。

  • 配置层面

    • 设置公道的mapreduce的task数,能有效提拔性能。
    • 在数据量较大的情况下,慎用count(distinct)等操作。
    • 对小文件进行合并,减少文件数据源带来的倾斜问题。## 算子性能

算子性能

原因:



  • 下游算子性能差:下游算子sub-task的处理性能低下,无法实时消费上游算子产生的数据。
  • 外部接口访问:算子须要频仍访问外部接口,如数据库或API,这些操作耗时长,导致数据处理速率下降。
  • 代码问题:用户代码执行服从低下,比方存在频仍的阻塞操作或性能瓶颈。
判定:



  • 通过Flink Web UI的BackPressure模块,观察算子的颜色和数值来判定是否出现反压。红色- 表现当前算子繁忙,有反压;绿色表现当前算子不繁忙,没有反压。
  • 通过对比差别SubTask处理的数据量,判定是否存在数据倾斜导致的个别SubTask性能下降。
解决



  • 限制数据源消费速率:在数据源处设置限流步伐,确保数据匀速消费,制止速率不均导致的反压。
  • 关闭Checkpoint:在数据回溯期间关闭Checkpoint,以减少barrier对齐对性能的影响。完成数据回溯后再重新开启Checkpoint。
  • 优化代码:检查并优化用户代码,减少阻塞操作和性能瓶颈,提高算子处理服从。
  • 增长盘算资源:根据现实须要增长盘算资源,如增长盘算节点、CPU和内存等,提高系统的团体处理本领。
  • 动态调解并行度:根据系统负载情况动态调解使命的并行度,将使命分配到更多的盘算节点上,以提高处理本领。
  • 重分区:通过重分区将数据均匀地分布到差别的分区中,减少数据倾斜并提高并行度。
  • 使用缓冲区:设置缓冲区来暂存数据,制止在下游算子处理速率不敷时导致数据丢失或延迟增长。
调大并行度

**并行度:**并行度(Parallelism)是指Flink使命中每个算子的并行实例数。增长并行度意味着更多的使命实例将同时处理数据,从而提高了系统的团体处理本领。
调大并行度


  • 分析原因:


  • 在调大并行度之前,起首须要分析反压的具体原因。常见原因包括资源不敷、数据倾斜、算子性能问题等。
  • 使用Flink的监控工具(如Web UI、Metrics等)来观察使命的资源使用情况和性能瓶颈。

  • 确定公道的并行度:


  • 可以通过压测来确定公道的并行度。比方,先获取高峰期的QPS(每秒处理的数据量),然后测试差别并行度下系统的处理本领,找到可以或许处理该QPS而不发生反压的并行度。
  • 也可以思量使用经验法则,如根据数据源(如Kafka)的分区数来设置并行度。

  • 设置并行度:


  • 在Flink程序中,可以通过多种方式来设置并行度。

    • 在代码中:通过StreamExecutionEnvironment的setParallelism()方法来设置全局并行度。
    • 在算子层次:对于单个算子,可以调用其setParallelism()方法来设置该算子的并行度。
    • 在配置文件或提交使命时:通过配置文件或提交使命时的参数来设置并行度。

  • 须要注意的是,当使用savepoints时,应该思量设置最大并行度。这可以确保在从savepoint规复使命时,可以或许改变特定算子或整个程序的并行度,而不会超过设定的上限。

  • 监控与调解:


  • 在调解并行度后,须要持续监控使命的运行情况,观察是否解决了反压问题,以及是否出现了新的问题(如资源利用率不敷、资源浪费等)。
  • 根据监控效果,可以进一步调解并行度或其他相关配置,以达到最佳的性能和稳固性。
   注意:
  

  • 制止过度并行化:虽然增长并行度可以提高系统的处理本领,但过度并行化大概导致资源利用率下降、管理复杂性增长等问题。因此,在调解并行度时须要衡量利弊。
  • 思量数据倾斜:数据倾斜大概导致部门节点处理的数据量宏大于其他节点,从而引发反压。在调解并行度时,须要思量数据倾斜的情况,并采取相应的步伐来均衡数据分布。
  • 优化其他配置:除了调解并行度外,还可以思量优化其他相关配置,如内存大小、缓存计谋等,以进一步提拔系统的性能和稳固性。
  限流与缓冲

限流机制

Flink通过水位线(Watermark)机制来实现限流。水位线是一个时间戳,表现当前处理的数据已经到达的位置。通过控制水位线的流传速率,Flink可以限制数据的流量,制止数据的堆积和延迟。当下游节点处理速率较慢时,水位线的流传速率会相应减慢,从而限制上游节点的生产速率。
缓冲机制

Flink在网络传输和TaskManager内部都使用了缓冲机制来处理反压。


  • 网络传输缓冲:在网络传输过程中,Flink使用NetworkBufferPool来管理内存块(MemorySegment)。每个Task都有一个输入区域(InputGate)和输出区域(ResultPartition),它们使用Buffer来存储和传输数据。当数据从上游节点传输到下游节点时,起首会存储在Buffer中,等待下游节点消费。如果下游节点消费速率较慢,Buffer中的数据会逐渐累积,形成反压。此时,Flink会根据Buffer的使用情况来限制上游节点的生产速率。
  • TaskManager内部缓冲:在TaskManager内部,Flink为每个Task创建了输入和输出的LocalBufferPool。这些缓冲池用于存储和传输数据。当Task的消费速率跟不上生产速率时,LocalBufferPool中的数据会逐渐累积,形成反压。Flink会根据LocalBufferPool的使用情况来限制Task的生产速率。
关闭Checkpoint

关闭 Checkpoint。关闭 Checkpoint 可以将 barrier 对齐这一步省略掉,促使使命可以或许快速回溯数据。然后等数据回溯完成之后,再将 Checkpoint 打开

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

海哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表