论坛
潜水/灌水快乐,沉淀知识,认识更多同行。
ToB圈子
加入IT圈,遇到更多同好之人。
朋友圈
看朋友圈动态,了解ToB世界。
ToB门户
了解全球最新的ToB事件
博客
Blog
排行榜
Ranklist
文库
业界最专业的IT文库,上传资料也可以赚钱
下载
分享
Share
导读
Guide
相册
Album
记录
Doing
搜索
本版
文章
帖子
ToB圈子
用户
免费入驻
产品入驻
解决方案入驻
公司入驻
案例入驻
登录
·
注册
只需一步,快速开始
账号登录
立即注册
找回密码
用户名
Email
自动登录
找回密码
密码
登录
立即注册
首页
找靠谱产品
找解决方案
找靠谱公司
找案例
找对的人
专家智库
悬赏任务
圈子
SAAS
IT评测·应用市场-qidao123.com
»
论坛
›
数据库
›
Oracle
›
ETL的系统核心特征
ETL的系统核心特征
张春
论坛元老
|
2023-2-17 15:25:18
|
显示全部楼层
|
阅读模式
楼主
主题
1033
|
帖子
1033
|
积分
3099
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要
登录
才可以下载或查看,没有账号?
立即注册
x
目录
ETL 系统核心特征
数据重跑及其优化
重跑的场景
重跑的方式
重跑的优化
自动水平扩展
参考链接
ETL 系统核心特征
数据重跑及其优化
重跑的场景
场景导致原因影响kafka consumer poll消息失败1. 网络问题;2. kafka broker 磁盘坏道,拉取消息一直失败或其他 kafka 原因导致一个或多个topic&partition的消息未消费完整硬件故障,机器重启磁盘满、硬件故障等机器宕机、重启、yarn内部机制会重新在另外一个nodeManager节点重新分配宕机节点的mapper task,可能会造成数据重复task killed1. yarn 主动 killed task : ①, mapper 初始化某个逻辑卡住,导致 mappper 超时;如:加载调试设备信息时 mysql 阻塞、加载 lP 库一直Full GC 或者 mapper 获取不到执行资源一直等待等; ② yarn . nodemanager .local-dirs and yarn. nodemanager . log-dirs 配置的磁盘使用率超过 90 % ResourceManager 标记那台 NodeManger 为bad ,把机器上跑的 container 都 kill了,导致 ETL 一个 mapper 被 kill ,然后起一个新的 mapper task ,但是因为 kill mapper task attmept 未正常关闭,导致文件租约 Iease 未安全释放,后面起 Mapper 一直写失败; 2.S手工 killed task ①Hadoop 集群节点负载太高,一些 DataNode 响应慢或者进程值死无响应,导致HDFS 写入一直超时失败等;人工 killed这个 mapper 分配执行的所有 topic partition 消息未消费或者消息不完整(影响多个 topic ) ;mapper 执行慢,数据一直写不进去(创建、读写 block 超时) :当用户调用 application kill 时会经历三个阶段:
kill SIGTERM(-15) pid;
Sleep for 250ms;
3.kill SIGKILL(-9) pid 。
重跑的方式
重跑方式优点缺点1.备份目标目录的数据
2.将 kafka topic &partition 的 offset 重新消费一遍清洗落地;简单(由于每个 et l执行消费的offset 都是自己记录维护的.找到当前小时或者当天对应的最小以及最大offset 重新消费即可)依赖于 kafka 消息过期时间,过期后无法重新消费;
因为大部分时间分区是是按天的.目前重跑一天的数需要 4 +小时.故障后恢复非常慢;记录未消费的offset信息,补跑未消费的offset即可速度快(只处理未消费完整的topic&partitioner offset)1 )依赖于 kafka 消息过期时间,过期后无法重新消费
2 ) MapTask 被 yarn killed 或者人为 killed ,会导致写数据的 Writer 不正常关闭流,从而引发 hdfs 文件租约 Iease 未释放、 block recovery 等问题,导致后面数据 append 失败;
3 ) MapTask attempt 被 yarn killed 后会自动起另外一个 attemPt (目前来.无法避免,这是 yarn 的机制,需要改源码),可能导致数据重复写入:
4 ) killed 或者机器宕机是无法记录到。offset 消息到哪里,只能每段小段时间记录当前 offset .当 MapTask 被 killed . offset 可能不是最新的,导致补跑时重复写了部分数据;1 ) ETL 小时任务,数据落地到临时目录.文件按小时划分.支持覆盖写(保存一段时间) ;
2 )当数据落地成功,检测每个 toPic partition 的消费情况进行处理;
3 )如果 topic partitlon 的消息消费不完整时,告警通知,手工重跑相关 toPic partition 的小时任务;
4 )如果消费完整,将小时数据文件合并到仓库的目标文件(每个 topic 、 partition 单独一个文件) ;
5 )如果合并过程失败.告警通知,手工触发重新将已经落地的小文件合并成一个目标仓库文件;支持小时重跑、覆盖写
恢复速度较快
HDFS block 丢失、可以从小文件新恢复1 )流程比较复杂,多出一个数据合并 Merge 的步骤;
2 )每小时的 ETL 会产生非常多的小文件间定期删除;需要保留一段时如下图所示是第三种重跑方式的整体流程,ETL 是按照小时调度的,首先将数据按小时写到临时目录中,如果消费失败会告警通知并重跑消费当前小时。如果落地成功则合并到仓库目录的目标文件,合并失败同样会告警通知并人工重跑,将小文件合并成目标文件。
graph TB 开始 --> 小时ETL--write -->1{落地临时目录/etl/work/按小时区分}--落地成功-->2{到仓库目录的目标文件每个partition一个文件}--合并成功-->结束 1-->失败消费不完整-->告警通知-->重跑消费当前小时即可-->小时ETL 2-->合并失败-->告警通知-->根据每小时产生的小文件重新合并一个目标文件--合并成功-->结束
重跑的优化
流程按天分区的topic每小时05分调度的ETL正常跑完1.包含之前的数据.如:昨天漏采集的日志.直接在对应日期分区目录下创建一个新的数据文件; 2.今天的日志,直接在今天的日期分区下创建一个新的数据文件; 3.将当前 ETL 产生的小文件直接 append 到对应日期分区以及相同 partition 下的文件;一个小时内执行多次ETL1.因为目前落地的临时文件都是按 partition +执行时间(yyyyMMddHH )生成的,如: all _ 1 _ 2018031511 . avro ; 2.如果一个小时执行多次 ETL ,每次都会消费新的 offset ( etl _ 0ffsets 会存多条记录),但是临时目录的数据文件的名称相同,这时候通过 append 方式追加数据; 3.如果小时重跑时.会找到当前小时内 topic--partjtion 的最小的 beginoffset 、最大的 endoffset 重新从 kafka 消费,这样保证了数据的完整性;小时ETL重跑(前提未进行merge)1 )小时三跑,直接找到对应小时 topic--partition 的最小的 beginoffset 、最大的 endoffset 重新从 kafka 消费: 2 )之前已经创建的临时目录的数据文件会以 create overwrite 形式重新写入数据; 3 )最后在将文件 append 到对应日期分区下相同 partition 的文件;merge失败(小时ETL产生的临时数据文件已经是完整、可靠的)重新根据当天每小时 ETL 产生的临时数据文件合并一个目标仓库文件即可:ETL消费消息不完整1.消费不完整的 toPic 、 partition 信息会记录.告警短信通知: 2 )人工重跑对应 topic--partition 小时 ETL 即可(支持细化到 partition 的小时数据重跑,因为是 overwrite )MapTask attempt killed by yarn1 ) MapTask attempt 被 yarn 的机制 killed 后.另启的 attempt 会 overwrite 之前的文件(排除一个小时执行多次 ETL 的情况.这时候需要人工重跑).比较好的兼容 hadoop 集群的故障;NodeManager carsh by hardware fault由于机器硬件故障导致运行 ETL 的 MapTask 的 NodeManager 直接 crash :1 )数据正在写入临时目录,手工重跑对应 topic 一 partition 小时的 ETL 即可: 2 )数据正在 merge ,手工重新 merge 对应 topic date 的数据;人为killed整个ETL job人工处理未执行的topic :1.数据正在写入临时目录,手工重跑对于topic-partition小时的ETL即可 2.数据正在 merge , 手工重新merge对应 topic--date 的数据:
自动水平扩展
现在离线 Kafka-ETL 是每小时 05 分调度,每次调度的 ETL 都会获取每个 topic&partition 当前最新、最大的 latest offset,同时与上个小时消费的截止 offset 组合成本地要消费的 kafkaEvent。由于每次获取的 latest offset 是不可控的,有些情况下某些 topic&partition 的消息 offset 增长非常快,同时 kafka topic 的 partition 数量来不及调整,导致 ETL 消费处理延迟,影响下游的业务处理流程:
由于扩容、故障等原因需要补采集漏采集的数据或者历史数据,这种情况下 topic&&partition 的消息 offset 增长非常快,仅仅依赖 kafka topic partiton 扩容是不靠谱的,补采集完后面还得删除扩容的 partition;
周末高峰、节假日、6.18、双十一等用户流量高峰期,收集的用户行为数据会比平时翻几倍、几十倍,但是同样遇到来不及扩容 topic partition 个数、扩容后需要缩容的情况;
Kafka ETL 是否能自动水平扩展不强依赖于 kafka topic partition 的个数。如果某个 topic kafkaEvent 需要处理的数据过大,评估在合理时间范围单个 mapper 能消费的最大的条数,再将 kafkaEvent 水平拆分成多个子 kafkaEvent,并分配到各个 mapper 中处理,这样就避免单个 mapper 单次需要处理过大 kafkaEvent 而导致延迟,提高水平扩展能力。拆分的逻辑如下图所示:
graph TB 开始 --> 根据ETLSchema选择SplitStrategy-->IsRemaining("SplitStrategy是否配置") IsRemaining -->配置--> 生成对应的SplitStrategy-->根据SplitStrategy拆分KafkaEvent-->oneChosse("消息是否超过阈值") oneChosse -->no["否"]-->KafkaEvent不拆分-->结束 oneChosse -->yes["是"]-->twoChosse("消息是否超过阈值2倍")-->yes2["是"]-->根据倍数进行合理拆分-->结束 twoChosse-->no2["否"]-->KafkaEvent拆分为两个-->结束 IsRemaining -->未配置-->KafkaEvent不拆分-->结束后续我们将针对以下两点进行自动水平扩展的优化:
如果单个 mapper 处理的总消息数据比较大,将考虑扩容 mapper 个数并生成分片 split 进行负载均衡。
每种格式的消息处理速度不一样,分配时可能出现一些 mapper 负担比较重,将给每个格式配置一定的权重,根据消息条数、权重等结合一起分配 kafkaEvent。
参考链接
https://blog.csdn.net/javastart/article/details/113838240
美图离线ETL实践 - 掘金 (juejin.cn)
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复
使用道具
举报
0 个回复
倒序浏览
返回列表
快速回复
高级模式
B
Color
Image
Link
Quote
Code
Smilies
您需要登录后才可以回帖
登录
or
立即注册
本版积分规则
发表回复
回帖并转播
回帖后跳转到最后一页
发新帖
回复
张春
论坛元老
这个人很懒什么都没写!
楼主热帖
Eclipse连接SQLServer2008
【CSDN官方】开源又好用的国产SPL ...
Redis概述及基本数据结构
聊聊容灾演练-练什么|深度好文 ...
Velero系列文章(四):使用Velero进行 ...
2022十三届蓝桥杯国赛题解
彻底搞懂Docker容器与Kraft模式kafka集 ...
干货|APP自动化Android特殊控件Toast识 ...
4. 事务和锁
linux跟踪技术之ebpf
标签云
AI
运维
CIO
存储
服务器
浏览过的版块
前端开发
虚拟化与私有云
Mysql
移动端开发
分布式数据库
DevOps与敏捷开发
快速回复
返回顶部
返回列表