Flink 反压
反压Flink反压是一个在实时盘算应用中常见的问题,特殊是在流式盘算场景中。以下是对Flink反压的具体解释:
一、反压释义
反压(backpressure)意味着数据管道中某个节点成为瓶颈,其处理速率跟不上上游发送数据的速率,从而须要对上游进行限速。在Flink等实时盘算框架中,反压通常是从某个节点传导至数据源,并低沉数据源(如Kafka consumer)的摄入速率。
二、反压原因
[*]数据倾斜:数据分布不均,导致个别task处理数据过多。
[*]算子性能问题:大概某个节点逻辑很复杂,如sink节点很慢或lookup join热查询慢等。
[*]流量陡增:如大促时流量激增,或者使用了数据炸开的函数。
三、反压影响
[*]使命处理性能出现瓶颈:比方,在消费Kafka时,大概会出现消费Kafka Lag。
[*]Checkpoint时间长或失败:由于某些反压会导致barrier须要花很长时间才能对齐,从而影响使命的稳固性。
[*]State状态变大:由于数据处理速率不匹配,大概导致系统内部状态堆积。
[*]Kafka数据积存:当Flink作业无法实时处理Kafka中的数据时,会导致数据在Kafka中积存。
[*]OOM(内存溢出):严重的反压大概导致系统资源耗尽,进而引发内存溢出等问题。
四、反压机制实现方式
在Flink中,反压机制可以通过以下两种方式实现:
[*]阻塞式反压:当下游使命无法实时处理上游使命生成的数据时,上游使命会被阻塞,直到下游使命处理完毕。这种方式可以保证数据不丢失,但会造成延迟增长。
[*]异步非阻塞式反压:此方式的具体实现和细节大概因Flink版本和配置而异,但通常旨在通过异步处理和非阻塞操作来减轻反压的影响。
五、解决思路
[*]优化数据分布:通过调解数据分区计谋或重新设计数据模子来减少数据倾斜。
[*]提拔算子性能:针对性能瓶颈的算子进行优化,如简化逻辑、增长资源等。
[*]限流与缓冲:在数据源端或关键节点前设置限流计谋和缓冲区,以平滑处理流量陡增的情况。
[*]监控与告警:建立完善的监控和告警机制,实时发现并处理反压问题。
综上所述,Flink反压是一个须要关注的问题,它大概影响到实时盘算应用的性能和稳固性。通过理解反压的原理和影响,并采取相应的解决步伐,可以有效地提拔Flink作业的处理本领和稳固性。
监控
Flink Web
Flink Web 界面提供了一个选项卡来监控正在运行 jobs 的反压举动。
Task 性能指标
task(SubTask)的每个并行实例都可以用三个一组的指标评价:
[*]backPressuredTimeMsPerSecond,subtask 被反压的时间
[*]dleTimeMsPerSecond,subtask 等待某类处理的时间
[*]busyTimeMsPerSecond,subtask 现实工作时间 在任何时间点,这三个指标相加都约即是1000ms。
这些指标每两秒更新一次,上报的值表现 subtask 在最近两秒被反压(或闲或忙)的平均时长。 当工作负荷是变化的时须要尤其引起注意。如,一个以恒定50%负载工作的 subtask 和另一个每秒钟在满负载和闲置切换的 subtask 的busyTimeMsPerSecond值相同,都是500ms。
在内部,反压根据输出 buffers 的可用性来进行判定的。 如果一个 task 没有可用的输出 buffers,那么这个 task 就被认定是在被反压。 相反,如果有可用的输入,则可认定为闲置,
WebUI
WebUI 集合了所有 subTasks 的反压和繁忙指标的最大值,并在 JobGraph 中将集合的值进行显示。除了显示原始的数值,tasks 也用颜色进行了标记,使检查更加容易。
https://i-blog.csdnimg.cn/blog_migrate/7b2f0b0ab355f8ad1e5628b305b94eb7.png
闲置的 tasks 为蓝色,完全被反压的 tasks 为玄色,完全繁忙的 tasks 被标记为红色。 中间的所有值都表现为这三种颜色之间的过渡色。
反压状态
在 Job Overview 旁的 Back Pressure 选项卡中,可以找到更多细节指标。
https://i-blog.csdnimg.cn/blog_migrate/997656433001a57208d8b86235bcf0da.png
如果看到 subtasks 的状态为 OK 表现没有反压。HIGH 表现这个 subtask 被反压。状态用如下定义:
[*]OK: 0% <= 反压比例 <= 10%
[*]LOW: 10% < 反压比例 <= 50%
[*]HIGH: 50% < 反压比例 <= 100%
Prometheus监控
在Flink中使用Prometheus进行反压监测通常涉及配置Flink的metrics系统以及Prometheus的配置。以下是配置Flink以使用Prometheus进行反压的基本步骤:
配置
[*]在Flink配置文件中启用Prometheus metrics(通常是flink-conf.yaml):
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: <prometheus-pushgateway-host>
metrics.reporter.promgateway.port: <prometheus-pushgateway-port>
metrics.reporter.promgateway.jobName: <job-name>
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
[*]确保Prometheus配置了PushGateway,而且Prometheus可以或许从Flink TaskManagers推送指标。
[*]下载并解压 Prometheus Pushgateway:
wget https://github.com/prometheus/pushgateway/releases/download/v1.4.1/pushgateway-1.4.1.linux-amd64.tar.gz
tar xvzf pushgateway-1.4.1.linux-amd64.tar.gz
cd pushgateway-1.4.1.linux-amd64
[*]创建一个系统服务文件 /etc/systemd/system/pushgateway.service:
Description=Pushgateway
After=network.target
User=nobody
Group=nobody
Type=simple
ExecStart=/path/to/pushgateway
WantedBy=multi-user.target
[*]启动并使 Pushgateway 服务随系统启动:
sudo systemctl daemon-reload
sudo systemctl start pushgateway
sudo systemctl enable pushgateway
[*]配置 Prometheus 来从 Pushgateway 拉取数据。在 Prometheus 配置文件 (prometheus.yml) 中添加以下内容:
scrape_configs:
- job_name: 'pushgateway'
static_configs:
- targets: ['localhost:9091']
[*]配置Prometheus PushGateway,通常在Prometheus配置文件中(prometheus.yml):
scrape_configs:
- job_name: 'flink-metrics'
honor_labels: true
static_configs:
- targets: ['<pushgateway-host>:<pushgateway-port>']
[*]重启Flink集群和Prometheus以应用配置更改。
配置告警规则,推送alertmanager进行告警关照推送
通过Grafana显示Flink运行状态
方案
增长资源
[*]增长CPU资源: 调解TaskManager的CPU配置和并行化使命
[*]增长内存资源:调解TaskManager的内存配置和优化数据结构
[*]增长其他资源:比方:使用SSD或RDMA等网络加速设备、增长GPU资源
注意:
[*]制止过度资源分配:虽然增长资源可以缓解反压问题,但过度分配资源大概导致资源浪费和资本增长。因此,在增长资源之前,须要仔细评估使命的现实需求和资源使用情况。
[*]结合其他优化步伐:除了增长资源外,还可以结合其他优化步伐来进一步缓解反压问题。比方,优化处理逻辑、减少盘算复杂度、使用更高效的数据结构等。
数据倾斜
[*]数据倾斜定义
数据倾斜是指数据的分布严重不均,造成一部门数据很多,一部门数据很少的局面。在Flink中,这通常表现为部门节点处理的数据量宏大于其他节点。
[*]数据倾斜的原因
[*]业务原因:如订单数据中某些都会的订单量宏大于其他都会。
[*]技术原因:大量使用KeyBy、GroupBy等操作,错误地使用了分组Key,人为产生数据热门。
[*]解决方案
[*]业务层面:
[*]只管制止热门key的设计,比方将热门都会分成差别的区域,并进行单独处理。
[*]在数据预处理阶段对数据进行均衡处理,如使用随机前缀打散key。
[*]技术层面:
[*]调解方案打散原来的key,制止直接聚合。
[*]使用Flink提供的二次聚合等计谋,先对打散后的数据进行聚合,再还原为真正的key进行二次聚合。
[*]优化join操作,将条目少的表/子查询放在Join的左边,减少内存溢出的几率。
[*]使用MapJoin处理小表关联大表的情况,制止数据倾斜。
[*]配置层面:
[*]设置公道的mapreduce的task数,能有效提拔性能。
[*]在数据量较大的情况下,慎用count(distinct)等操作。
[*]对小文件进行合并,减少文件数据源带来的倾斜问题。## 算子性能
算子性能
原因:
[*]下游算子性能差:下游算子sub-task的处理性能低下,无法实时消费上游算子产生的数据。
[*]外部接口访问:算子须要频仍访问外部接口,如数据库或API,这些操作耗时长,导致数据处理速率下降。
[*]代码问题:用户代码执行服从低下,比方存在频仍的阻塞操作或性能瓶颈。
判定:
[*]通过Flink Web UI的BackPressure模块,观察算子的颜色和数值来判定是否出现反压。红色- 表现当前算子繁忙,有反压;绿色表现当前算子不繁忙,没有反压。
[*]通过对比差别SubTask处理的数据量,判定是否存在数据倾斜导致的个别SubTask性能下降。
解决
[*]限制数据源消费速率:在数据源处设置限流步伐,确保数据匀速消费,制止速率不均导致的反压。
[*]关闭Checkpoint:在数据回溯期间关闭Checkpoint,以减少barrier对齐对性能的影响。完成数据回溯后再重新开启Checkpoint。
[*]优化代码:检查并优化用户代码,减少阻塞操作和性能瓶颈,提高算子处理服从。
[*]增长盘算资源:根据现实须要增长盘算资源,如增长盘算节点、CPU和内存等,提高系统的团体处理本领。
[*]动态调解并行度:根据系统负载情况动态调解使命的并行度,将使命分配到更多的盘算节点上,以提高处理本领。
[*]重分区:通过重分区将数据均匀地分布到差别的分区中,减少数据倾斜并提高并行度。
[*]使用缓冲区:设置缓冲区来暂存数据,制止在下游算子处理速率不敷时导致数据丢失或延迟增长。
调大并行度
**并行度:**并行度(Parallelism)是指Flink使命中每个算子的并行实例数。增长并行度意味着更多的使命实例将同时处理数据,从而提高了系统的团体处理本领。
调大并行度
[*]分析原因:
[*]在调大并行度之前,起首须要分析反压的具体原因。常见原因包括资源不敷、数据倾斜、算子性能问题等。
[*]使用Flink的监控工具(如Web UI、Metrics等)来观察使命的资源使用情况和性能瓶颈。
[*]确定公道的并行度:
[*]可以通过压测来确定公道的并行度。比方,先获取高峰期的QPS(每秒处理的数据量),然后测试差别并行度下系统的处理本领,找到可以或许处理该QPS而不发生反压的并行度。
[*]也可以思量使用经验法则,如根据数据源(如Kafka)的分区数来设置并行度。
[*]设置并行度:
[*]在Flink程序中,可以通过多种方式来设置并行度。
[*]在代码中:通过StreamExecutionEnvironment的setParallelism()方法来设置全局并行度。
[*]在算子层次:对于单个算子,可以调用其setParallelism()方法来设置该算子的并行度。
[*]在配置文件或提交使命时:通过配置文件或提交使命时的参数来设置并行度。
[*]须要注意的是,当使用savepoints时,应该思量设置最大并行度。这可以确保在从savepoint规复使命时,可以或许改变特定算子或整个程序的并行度,而不会超过设定的上限。
[*]监控与调解:
[*]在调解并行度后,须要持续监控使命的运行情况,观察是否解决了反压问题,以及是否出现了新的问题(如资源利用率不敷、资源浪费等)。
[*]根据监控效果,可以进一步调解并行度或其他相关配置,以达到最佳的性能和稳固性。
注意:
[*]制止过度并行化:虽然增长并行度可以提高系统的处理本领,但过度并行化大概导致资源利用率下降、管理复杂性增长等问题。因此,在调解并行度时须要衡量利弊。
[*]思量数据倾斜:数据倾斜大概导致部门节点处理的数据量宏大于其他节点,从而引发反压。在调解并行度时,须要思量数据倾斜的情况,并采取相应的步伐来均衡数据分布。
[*]优化其他配置:除了调解并行度外,还可以思量优化其他相关配置,如内存大小、缓存计谋等,以进一步提拔系统的性能和稳固性。
限流与缓冲
限流机制
Flink通过水位线(Watermark)机制来实现限流。水位线是一个时间戳,表现当前处理的数据已经到达的位置。通过控制水位线的流传速率,Flink可以限制数据的流量,制止数据的堆积和延迟。当下游节点处理速率较慢时,水位线的流传速率会相应减慢,从而限制上游节点的生产速率。
缓冲机制
Flink在网络传输和TaskManager内部都使用了缓冲机制来处理反压。
[*]网络传输缓冲:在网络传输过程中,Flink使用NetworkBufferPool来管理内存块(MemorySegment)。每个Task都有一个输入区域(InputGate)和输出区域(ResultPartition),它们使用Buffer来存储和传输数据。当数据从上游节点传输到下游节点时,起首会存储在Buffer中,等待下游节点消费。如果下游节点消费速率较慢,Buffer中的数据会逐渐累积,形成反压。此时,Flink会根据Buffer的使用情况来限制上游节点的生产速率。
[*]TaskManager内部缓冲:在TaskManager内部,Flink为每个Task创建了输入和输出的LocalBufferPool。这些缓冲池用于存储和传输数据。当Task的消费速率跟不上生产速率时,LocalBufferPool中的数据会逐渐累积,形成反压。Flink会根据LocalBufferPool的使用情况来限制Task的生产速率。
关闭Checkpoint
关闭 Checkpoint。关闭 Checkpoint 可以将 barrier 对齐这一步省略掉,促使使命可以或许快速回溯数据。然后等数据回溯完成之后,再将 Checkpoint 打开
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]