论坛
潜水/灌水快乐,沉淀知识,认识更多同行。
ToB圈子
加入IT圈,遇到更多同好之人。
朋友圈
看朋友圈动态,了解ToB世界。
ToB门户
了解全球最新的ToB事件
博客
Blog
排行榜
Ranklist
文库
业界最专业的IT文库,上传资料也可以赚钱
下载
分享
Share
导读
Guide
相册
Album
记录
Doing
搜索
本版
文章
帖子
ToB圈子
用户
免费入驻
产品入驻
解决方案入驻
公司入驻
案例入驻
登录
·
注册
只需一步,快速开始
账号登录
立即注册
找回密码
用户名
Email
自动登录
找回密码
密码
登录
立即注册
首页
找靠谱产品
找解决方案
找靠谱公司
找案例
找对的人
专家智库
悬赏任务
圈子
SAAS
ToB企服应用市场:ToB评测及商务社交产业平台
»
论坛
›
大数据
›
数据仓库与分析
›
【Flink】Flink Checkpoint 流程剖析
【Flink】Flink Checkpoint 流程剖析
王海鱼
金牌会员
|
2025-1-5 03:29:32
|
显示全部楼层
|
阅读模式
楼主
主题
898
|
帖子
898
|
积分
2694
Flink Checkpoint 流程剖析
Checkpint 流程概括
使命运行后 JobMaster 定时执行 Checkpoint,JobMaster 会通过调用 CheckpointCoordinator 对作业举行 Checkpoint。
CheckpointCoordinator 开始举行 Checkpoint,它首先会先创建 PendingCheckpoint,然后开始给 Checkpoint 计时,再关闭网关开始触发 OperatorCoordinator 的 Checkpoint。
假如是 SourceOperatorCoordinator,则这时会调用 Source 的 getSplitSerializer,获取分片序列化器,然后将 SplitAssignmentTracker 中使命运行时分配的分片序列化创建 Snapshot,再将 Snapshot 放入 PendingCheckpoint 中。
OperatorCoordinator 状态触发完后,开始触发 MasterHooks 状态快照,MasterTriggerRestoreHook 由 UDFStreamOperator 内部的实现 WithMasterCheckpointHook 接口的 Function 创建,用于在 Master 触发 Checkpoint 时,Function 需要举行的操纵。
MasterHooks 调用完后,CheckpointCoordinator 将给子使命 TaskManager 发送哀求,通知它们开始 Checkpoint。
TaskExecutor 获取相应的使命 Task,Task 调用 StreamTask 开始举行 Checkpoint,StreamTask 调用 Mailbox 执行 Checkpoint 事件,Mailbox 执行 Checkpoint 事件时, Source 将不会从数据源读取数据。
Checkpoint 事件开始执行,假如 Checkpoint 需要强制对齐,那么需要异步创建 Channel 和结果分区的数据快照, 随后在执行流传 Barrier 前,SubtaskCheckpointCoordinatorImpl 会调用 OperatorChain 让 Operator 举行 Barrier 前的准备操纵,然后开始往下游流传 Barrier。
SubtaskCheckpointCoordinatorImpl 创建 CheckpointBarrier 并将 CheckpointBarrier 发送给 RecordWriterOutput 将 Barrier 传输给下游使命,然后注册 Barrier 对齐超时计时器。
Barrier 流传完后,假如之前创建了 Channel 状态快照 ,那么还需要异步完成 Channel Output 的数据快照。
末了 SubtaskCheckpointCoordinatorImpl 开始对当前子使命的全部算子举行 Checkpoint,这时会举行算子创建快照时的操纵,算子状态是存储在 OperatorStateBackend 和 KeyedStateBackend 中的, SubtaskCheckpointCoordinator 将会创建 OperatorStateBackend 和 KeyedStateBackend 的状态快照。
下游使命这时是正常处置处罚上游发送过来的数据的,但是上游正在举行 Checkpoint,数据也是被发送过来的 CheckpointBarrier 分割开了,处置处罚到背面会接收到上游的 CheckpointBarrier,也就表示着当前 Checkpoint 上游快照数据已经处置处罚完,下游也开始举行 Checkpoint 了,下游举行 Checkpoint 的过程也是和上面的一样,继续调用 SubtaskCheckpointCoordinatorImpl 开始举行 Checkpoint。
总的来说,Checkpoint 将创建 Coordinator 状态、托管键值状态、托管算子状态、未处置处罚的键值状态、未处置处罚的算子状态、输入通道状态和结果分区状态的快照。
Checkpoint 触发流程剖析 (Flink 1.20)
使命启动后 JobManager 开始定期对使命执行 Checkpoint
Task 使命恢复
Task#restoreAndInvoke
…
更新使命状态为 RUNNING 状态,TaskExecutor 通知 JobMaster 使命状态更新
TaskManagerActions#updateTaskExecutionState
TaskExecutor.TaskManagerActionsImpl#updateTaskExecutionState
JobMasterGateway#updateTaskExecutionState
…
JobMaster 调用 SchedulerBase、DefaultExecutionGraph 更新使命状态,定期触发 Checkpoint
JobMaster#updateTaskExecutionState
SchedulerBase#updateTaskExecutionState
DefaultExecutionGraph#updateState
DefaultExecutionGraph#updateStateInternal
[CheckpointCoordinator 开始定期执行 Checkpoint](#JobManager 利用 CheckpointCoordinator 触发 Checkpoint)
CheckpointCoordinator#startCheckpointScheduler
JobManager 利用 CheckpointCoordinator 触发 Checkpoint
JobMaster 触发 Checkpoint
JobMaster#triggerCheckpoint
调理器触发 Checkpoint
SchedulerNG#triggerCheckpoint
从 ExecutionGraph 中获取 CheckpointCoordinator,创建 CheckpointTriggerRequest,并利用 CheckpointCoordinator 通过 CheckpointRequestDecider 决定需要处置处罚的 Checkpoint 哀求触发 Checkpoint
CheckpointCoordinator#triggerCheckpoint
CheckpointRequestDecider#chooseRequestToExecute
CheckpointCoordinator#startTriggeringCheckpoint
CheckpointCoordinator 初始化 Checkpoint 所需要的信息
触发和通知全部 OperatorCoordinator 开始 Checkpoint
OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
触发 MasterHooks 状态快照
CheckpointCoordinator#snapshotMasterState
MasterTriggerRestoreHook#triggerCheckpoint
CheckpointCoordinator 通知子使命开始 Checkpoint
CheckpointCoordinator 初始化 Checkpoint 所需要的信息
CheckpointCoordinator 初始化 Checkpoint 所需要的信息
计算 Checkpoint 执行操持
CheckpointPlanCalculator#calculateCheckpointPlan
校验全部使命是否已经初始化
假如有使命已经完成,那么创建全部使命完成后计算查抄点的操持
DefaultCheckpointPlanCalculator#calculateAfterTasksFinished
假如没有使命完成,那么创建当全部使命都在运行时计算查抄点的操持,该操持为全部使命都将标记为需要触发 Checkpoint,并将全部使命标记为需要期待和提交
DefaultCheckpointPlanCalculator#calculateWithAllTasksRunning
校验全部使命是否都在运行中
Checkpoint 计数加一
创建待处置处罚的的 Checkpoint
CheckpointCoordinator#createPendingCheckpoint
追溯待处置处罚的 Checkpoint 状态
CheckpointCoordinator#trackPendingCheckpointStats
创建一个新的挂起查抄点跟踪器
CheckpointStatsTracker#reportPendingCheckpoint
陈诉单个子使命的统计信息
CheckpointCoordinator#reportFinishedTasks
创建待处置处罚的的 Checkpoint(PendingCheckpoint)
开始 Checkpoint 计时,时间超时则取消 Checkpoint
返回待处置处罚的的 Checkpoint
初始化 Checkpoint 地址
CheckpointCoordinator#initializeCheckpointLocation
假如该 Checkpoint 类型为 Savepoint,则初始化 Savepoint 地址
CheckpointStorageCoordinatorView#initializeLocationForSavepoint
否则,先初始化 Checkpoint Base 地址,再开始初始化地址
CheckpointStorageCoordinatorView#initializeBaseLocationsForCheckpoint
CheckpointStorageCoordinatorView#initializeLocationForCheckpoint
返回 Checkpoint 地址
触发全部 OperatorCoordinator Checkpoint
触发和通知全部 OperatorCoordinator 开始 Checkpoint
OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
触发 OperatorCoordinator Checkpoint
OperatorCoordinatorCheckpoints#triggerAllCoordinatorCheckpoints
关闭网关,获取并期待全部事件完成
OperatorCoordinatorHolder#closeGateways
IncompleteFuturesTracker#getCurrentIncompleteAndReset
网关标记当前的 Checkpoint
OperatorCoordinator 触发 Checkpoint
OperatorCoordinator#checkpointCoordinator
根据 Coordinator 的 Checkpoint 后的状态创建并返回 CoordinatorSnapshot
通知全部 CheckpointCoordinator Checkpoint 结果
OperatorCoordinatorCheckpoints#acknowledgeAllCoordinators
触发 MastersHooks 状态快照
触发 MasterHooks 状态快照,MasterTriggerRestoreHook 由 UDFOperator 内部的实现 WithMasterCheckpointHook 接口的 UDF 创建,表示在 Master 触发 Checkpoint 时,UDF 可以做什么。
CheckpointCoordinator#snapshotMasterState
MasterTriggerRestoreHook#triggerCheckpoint
CheckpointCoordinator 通知子使命开始 Checkpoint
CheckpointCoordinator 给子使命发送 Checkpoint 哀求
CheckpointCoordinator#triggerCheckpointRequest
发送使命 Checkpoint 哀求
CheckpointCoordinator#triggerTasks
向全部的 Exeuction 对应的 Taskmanager 网关发送 Checkpoint 哀求
,子使命接收到哀求后会开始触发 Checkpoint
Execution#triggerCheckpointHelper
TaskManagerGateway#triggerCheckpoint
使命 Checkpoint 哀求发送完后取消定时器
子使命开始触发 Checkpoint
TaskManager 触发指定子使命的 Checkpoint
TaskExecutor#triggerCheckpoint
Task#triggerCheckpointBarrier
创建 Checkpoint 元数据 CheckpointMetaData
算子 Mailbox 异步执行 Checkpoint,因为 Checkpoint 在 MailboxProcessor 执行,所以这时将不会有数据传入
CheckpointableTask#triggerCheckpointAsync
StreamTask#triggerCheckpointAsync
假如 InputGateway 分区数据未处置处罚完成,则触发未完成的数据通道 Checkpoint
StreamTask#triggerUnfinishedChannelsCheckpoint
这这情况是思量已完成使命的 Checkpoint ,假如非 Source 使命成为新的主使命,则可能会通过 RPC 触发查抄点。在这种情况下,他们将通知该查抄点的 CheckpointBarrierHandle。
创建一个 CheckpointBarrier,并通知全部未完成的 Channel 处置处罚该 Barrier,并尝试触发 Checkpoint
CheckpointBarrierHandler#processBarrier
假如 InputGateway 分区数据已经处置处罚完成,则直接开始触发 Checkpoint
StreamTask#triggerCheckpointAsyncInMailbox
初始化输入端 Channel 状态
SubtaskCheckpointCoordinator#initInputsCheckpoint
SubtaskCheckpointCoordinatorImpl 开始执行 Checkpoint
StreamTask#performCheckpoint
SubtaskCheckpointCoordinator#checkpointState
MailBoxProcessor 异步执行 Checkpoint 事件
算子调用 SubtaskCheckpointCoordinator 执行 Checkpoint
SubtaskCheckpointCoordinatorImpl#checkpointState
假如当前 Checkpint 被终止了,那么向下游发送 CancelCheckpointMarker事件,以防下游背压,并结束当前 Checkpoint。
假如 Checkpoint 之前没有对齐过,并且 Checkpoint 配置的对齐类型是强制对齐,那么首先将当前 Checkpoint 类型设置为不再需要对齐了,然后初始化输入端的状态,可见初始化输入端状态
CheckpointOptions#withUnalignedSupported
SubtaskCheckpointCoordinatorImpl#initInputsCheckpoint
准备 Checkpoint,算子执行 Snapshot 和 发送 Barrier 前的操纵
OperatorChain#prepareSnapshotPreBarrier
创建 CheckpointBarrier,并往下游发送 CheckpointBarrier 事件,开始 Barrier 对齐操纵
OperatorChain#broadcastEvent
注册对齐计时器以在超时时对齐未对齐的 barrier
SubtaskCheckpointCoordinator#registerAlignmentTimer
假如前面举行了 Channel Checkpoint,那么在这里完成状态通道 Writer 快照
ChannelStateWriter#finishOutput
SubtaskCheckpointCoordinator 同步获取算子的全部的状态快照
SubtaskCheckpointCoordinator#takeSnapshotSync
假如 Checkpoint 是可超时和可不对齐的,则从 ChannelStateWriter 中获取通道状态写结果(ChannelStateWriteResult)
剖析 Checkpoint 存储地址
SubtaskCheckpointCoordinatorImpl.CachingCheckpointStorageWorkerView#resolveCheckpointStorageLocation
触发 OpeartorChain 状态快照
OperatorChain#snapshotState
假如是 RegularOperatorChain,则获取全部算子,并触发全部算子的状态快照
RegularOperatorChain#buildOperatorSnapshotFutures
**构建 StreamOpeartor 算子状态快照 Future **
StreamOperator#snapshotState
假如算子是主算子或者是尾算子,那么将通道和结果分区的状态快照结果 Future 设置到AsyncCheckpointRunnable 中
假如是FinishedOperatorChain,则只将通道和结果分区的状态快照结果 Future 设置到 OperatorSnapshotFutures 中
向 CheckpointCoordinator 发送已接收 Checkpoint 事件
OperatorChain#sendAcknowledgeCheckpointEvent
清算 Checkpoint 缓存
SubtaskCheckpointCoordinatorImpl.CachingCheckpointStorageWorkerView#clearCacheFor
设置 Checkpoint 持续时间的指标
CheckpointMetricsBuilder#setSyncDurationMillis
假如获取 SnapShot 乐成,则异步完成 Checkpoint
SubtaskCheckpointCoordinator#finishAndReportAsync
创建并异步执行 AsyncCheckpointRunnable
AsyncCheckpointRunnable#start
开始状态快照,并期待全部 SnapshotFuture 完成
AsyncCheckpointRunnable#finalizedFinishedSnapshots
AsyncCheckpointRunnable#finalizeNonFinishedSnapshots
计算 Channel 和分区对齐时的状态大小,并设置相干指标
否则,清算 SubtaskCheckpointCoordinator
SubtaskCheckpointCoordinator#cleanup
初始化输入端状态
子使命初始化 Checkpoint
SubtaskCheckpointCoordinatorImpl#initInputsCheckpoint
假如 Checkpoint 可不需要对齐
初始化写状态通道
ChannelStateWriter#start
创建CheckpointStartRequest,并将哀求分发到 Writer
ChannelStateWriteRequestDispatcher#dispatch
分发器处置处罚 CheckpointStartRequest
ChannelStateWriteRequestDispatcherImpl#handleCheckpointStartRequest
为该子使命 Writer 注册 ChannelStateWriteResult,用于收集 Checkpoint 过程中传输过来的数据
ChannelStateCheckpointWriter#registerSubtaskResult
准备正在传输中的数据快照,期待输入端的数据到达 Barrier
SubtaskCheckpointCoordinatorImpl#prepareInflightDataSnapshot
准备输入端快照
StreamTask#prepareInputSnapshot
StreamTaskInput#prepareSnapshot
网络输入端准备快照
StreamTaskNetworkInput#prepareSnapshot
获取全部还未处置处罚的 Buffer,并添加到状态写状态通道中
ChannelStateWriter#addInputData
返回全部 Barriers 屏障继承 Future
等全部 Barriers 屏障继承后,完成对给定查抄点 id 的通道状态数据的写入
将 CheckpointInProgressRequest 哀求提交到通道状态写哀求执行器(ChannelStateWriteRequestExecutor)中
通道状态写哀求执行器执行对应哀求
ChannelStateCheckpointWriter#completeInput
完成状态写入,写入的状态存放在 ChannelStateWriteResult 中,内里存放着写入的状态柄 InputChannelStateHandle 和 ResultSubpartitionStateHandle
ChannelStateCheckpointWriter#finishWriteAndResult
假如 Checkpoint 是可超时的,那么除了上面准备输入端快照那一步骤外,其他步骤都需要举行
触发 StreamOperator 状态快照
触发 StreamOperator 的 Checkpoint
RegularOperatorChain#checkpointStreamOperator
StreamOperatorStateHandler 创建快照
StreamOperatorStateHandler#snapshotState
创建算子快照环境和算子快照 Futures
真正的触发算子快照,该步操纵可以通过算子自定义
StreamOperatorStateHandler.CheckpointedStreamOperator#snapshotState
算子和 Keyd 状态后端触发快照
Snapshotable#snapshot
下游算子接收到 CheckpointBarrier 后开始 Checkpoint
下游算子处置处罚上游发送过来的事件
CheckpointedInputGate#handleEvent
假如接收到的事件为 CheckpointBarrier 事件,则开始处置处罚 Barrier,尝试开始 Checkpoint
CheckpointBarrierHandler#processBarrier
CheckpointBarrierHandler 处置处罚 Barrier
CheckpointBarrierHandler 处置处罚 Barrier
CheckpointBarrierHandler#processBarrier
假如该 Barrier Id 大于上一次 PendingCheckpoint 的 Id 并且当前开启的 Channel 只有一个,标记对齐开始和结束,并通知开始 Checkpoint,然后结束该次处置处罚
CheckpointBarrierHandler#markAlignmentStartAndEnd
CheckpointBarrierHandler#notifyCheckpoint
StreamTask#triggerCheckpointOnBarrier
SubtaskCheckpointCoordinator#checkpointState
否则尝试从期待的 Checkpoint 队列中探求该 CheckpointBarrier
假如找到了,则阐明 Barrier 已经对齐,标记已经完成对齐,并开始触发 Checkpoint,可见[MailBoxProcessor 异步执行 Checkpoint 事件](#MailBoxProcessor 异步执行 Checkpoint 事件)
CheckpointBarrierTracker#triggerCheckpointOnAligned
CheckpointBarrierHandler#notifyCheckpoint
StreamTask#triggerCheckpointOnBarrier
SubtaskCheckpointCoordinator#checkpointState
否则将该 Barrier 添加到Checkpoint 队列中,开始对齐
参考:
Flink Stateful Stream Processing:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/concepts/stateful-stream-processing/
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
本帖子中包含更多资源
您需要
登录
才可以下载或查看,没有账号?
立即注册
x
回复
使用道具
举报
0 个回复
正序浏览
返回列表
快速回复
高级模式
B
Color
Image
Link
Quote
Code
Smilies
您需要登录后才可以回帖
登录
or
立即注册
本版积分规则
发表回复
回帖并转播
发新帖
回复
王海鱼
金牌会员
这个人很懒什么都没写!
楼主热帖
在Winform开发中,使用Async-Awati异步 ...
Vue 和 Django 前后端分离实践 (注册 ...
如何在 Vue 3 中使用<script lang=“t ...
一条SQL的执行原理
C#依赖注入(直白明了)讲解 一看就会系 ...
go中 for循环的坑
教务管理系统——数据库课程设计mysql+ ...
2023大数据面试总结
银河麒麟V10安装达梦数据库DM8 ...
clickhouse的稀疏索引
标签云
存储
挺好的
服务器
快速回复
返回顶部
返回列表