【Flink】Flink Checkpoint 流程剖析

打印 上一主题 下一主题

主题 899|帖子 899|积分 2697

Flink Checkpoint 流程剖析


  
Checkpint 流程概括

使命运行后 JobMaster 定时执行 Checkpoint,JobMaster 会通过调用 CheckpointCoordinator 对作业举行 Checkpoint。

CheckpointCoordinator 开始举行 Checkpoint,它首先会先创建 PendingCheckpoint,然后开始给 Checkpoint 计时,再关闭网关开始触发 OperatorCoordinator 的 Checkpoint。
假如是 SourceOperatorCoordinator,则这时会调用 Source 的 getSplitSerializer,获取分片序列化器,然后将 SplitAssignmentTracker 中使命运行时分配的分片序列化创建 Snapshot,再将 Snapshot 放入 PendingCheckpoint 中。
OperatorCoordinator 状态触发完后,开始触发 MasterHooks 状态快照,MasterTriggerRestoreHook 由 UDFStreamOperator 内部的实现 WithMasterCheckpointHook 接口的 Function 创建,用于在 Master 触发 Checkpoint 时,Function 需要举行的操纵。
MasterHooks 调用完后,CheckpointCoordinator 将给子使命 TaskManager 发送哀求,通知它们开始 Checkpoint。
TaskExecutor 获取相应的使命 Task,Task 调用 StreamTask 开始举行 Checkpoint,StreamTask 调用 Mailbox 执行 Checkpoint 事件,Mailbox 执行 Checkpoint 事件时, Source 将不会从数据源读取数据。
Checkpoint 事件开始执行,假如 Checkpoint 需要强制对齐,那么需要异步创建 Channel 和结果分区的数据快照, 随后在执行流传 Barrier 前,SubtaskCheckpointCoordinatorImpl 会调用 OperatorChain 让 Operator 举行 Barrier 前的准备操纵,然后开始往下游流传 Barrier。
SubtaskCheckpointCoordinatorImpl 创建 CheckpointBarrier 并将 CheckpointBarrier 发送给 RecordWriterOutput 将 Barrier 传输给下游使命,然后注册 Barrier 对齐超时计时器。


Barrier 流传完后,假如之前创建了 Channel 状态快照 ,那么还需要异步完成 Channel Output 的数据快照。
末了 SubtaskCheckpointCoordinatorImpl 开始对当前子使命的全部算子举行 Checkpoint,这时会举行算子创建快照时的操纵,算子状态是存储在 OperatorStateBackend 和 KeyedStateBackend 中的, SubtaskCheckpointCoordinator 将会创建 OperatorStateBackend 和 KeyedStateBackend 的状态快照。
下游使命这时是正常处置处罚上游发送过来的数据的,但是上游正在举行 Checkpoint,数据也是被发送过来的 CheckpointBarrier 分割开了,处置处罚到背面会接收到上游的 CheckpointBarrier,也就表示着当前 Checkpoint 上游快照数据已经处置处罚完,下游也开始举行 Checkpoint 了,下游举行 Checkpoint 的过程也是和上面的一样,继续调用 SubtaskCheckpointCoordinatorImpl 开始举行 Checkpoint。
总的来说,Checkpoint 将创建 Coordinator 状态、托管键值状态、托管算子状态、未处置处罚的键值状态、未处置处罚的算子状态、输入通道状态和结果分区状态的快照。


Checkpoint 触发流程剖析 (Flink 1.20)

使命启动后 JobManager 开始定期对使命执行 Checkpoint



  • Task 使命恢复
    Task#restoreAndInvoke


    • 更新使命状态为 RUNNING 状态,TaskExecutor 通知 JobMaster 使命状态更新
      TaskManagerActions#updateTaskExecutionState
      TaskExecutor.TaskManagerActionsImpl#updateTaskExecutionState
      JobMasterGateway#updateTaskExecutionState


  • JobMaster 调用 SchedulerBase、DefaultExecutionGraph 更新使命状态,定期触发 Checkpoint
    JobMaster#updateTaskExecutionState
    SchedulerBase#updateTaskExecutionState
    DefaultExecutionGraph#updateState
    DefaultExecutionGraph#updateStateInternal

    • [CheckpointCoordinator 开始定期执行 Checkpoint](#JobManager 利用 CheckpointCoordinator 触发 Checkpoint)
      CheckpointCoordinator#startCheckpointScheduler

JobManager 利用 CheckpointCoordinator 触发 Checkpoint



  • JobMaster 触发 Checkpoint
    JobMaster#triggerCheckpoint

    • 调理器触发 Checkpoint
      SchedulerNG#triggerCheckpoint

      • 从 ExecutionGraph 中获取 CheckpointCoordinator,创建 CheckpointTriggerRequest,并利用 CheckpointCoordinator 通过 CheckpointRequestDecider 决定需要处置处罚的 Checkpoint 哀求触发 Checkpoint
        CheckpointCoordinator#triggerCheckpoint
        CheckpointRequestDecider#chooseRequestToExecute
        CheckpointCoordinator#startTriggeringCheckpoint

        • CheckpointCoordinator 初始化 Checkpoint 所需要的信息
        • 触发和通知全部 OperatorCoordinator 开始 Checkpoint
          OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
        • 触发 MasterHooks 状态快照
          CheckpointCoordinator#snapshotMasterState
          MasterTriggerRestoreHook#triggerCheckpoint
        • CheckpointCoordinator 通知子使命开始 Checkpoint



CheckpointCoordinator 初始化 Checkpoint 所需要的信息



  • CheckpointCoordinator 初始化 Checkpoint 所需要的信息

    • 计算 Checkpoint 执行操持
    CheckpointPlanCalculator#calculateCheckpointPlan
       

    • 校验全部使命是否已经初始化
    • 假如有使命已经完成,那么创建全部使命完成后计算查抄点的操持
      DefaultCheckpointPlanCalculator#calculateAfterTasksFinished
    • 假如没有使命完成,那么创建当全部使命都在运行时计算查抄点的操持,该操持为全部使命都将标记为需要触发 Checkpoint,并将全部使命标记为需要期待和提交
      DefaultCheckpointPlanCalculator#calculateWithAllTasksRunning
    • 校验全部使命是否都在运行中
    • Checkpoint 计数加一
    • 创建待处置处罚的的 Checkpoint
      CheckpointCoordinator#createPendingCheckpoint

      • 追溯待处置处罚的 Checkpoint 状态
        CheckpointCoordinator#trackPendingCheckpointStats

        • 创建一个新的挂起查抄点跟踪器
          CheckpointStatsTracker#reportPendingCheckpoint
        • 陈诉单个子使命的统计信息
          CheckpointCoordinator#reportFinishedTasks

      • 创建待处置处罚的的 Checkpoint(PendingCheckpoint)
      • 开始 Checkpoint 计时,时间超时则取消 Checkpoint
      • 返回待处置处罚的的 Checkpoint

    • 初始化 Checkpoint 地址
      CheckpointCoordinator#initializeCheckpointLocation

      • 假如该 Checkpoint 类型为 Savepoint,则初始化 Savepoint 地址
        CheckpointStorageCoordinatorView#initializeLocationForSavepoint
      • 否则,先初始化 Checkpoint Base 地址,再开始初始化地址
        CheckpointStorageCoordinatorView#initializeBaseLocationsForCheckpoint
        CheckpointStorageCoordinatorView#initializeLocationForCheckpoint
      • 返回 Checkpoint 地址


触发全部 OperatorCoordinator Checkpoint



  • 触发和通知全部 OperatorCoordinator 开始 Checkpoint
    OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion

    • 触发 OperatorCoordinator Checkpoint
      OperatorCoordinatorCheckpoints#triggerAllCoordinatorCheckpoints

      • 关闭网关,获取并期待全部事件完成
        OperatorCoordinatorHolder#closeGateways
        IncompleteFuturesTracker#getCurrentIncompleteAndReset
      • 网关标记当前的 Checkpoint
      • OperatorCoordinator 触发 Checkpoint
        OperatorCoordinator#checkpointCoordinator
      • 根据 Coordinator 的 Checkpoint 后的状态创建并返回 CoordinatorSnapshot

    • 通知全部 CheckpointCoordinator Checkpoint 结果
      OperatorCoordinatorCheckpoints#acknowledgeAllCoordinators

触发 MastersHooks 状态快照



  • 触发 MasterHooks 状态快照,MasterTriggerRestoreHook 由 UDFOperator 内部的实现 WithMasterCheckpointHook 接口的 UDF 创建,表示在 Master 触发 Checkpoint 时,UDF 可以做什么。
    CheckpointCoordinator#snapshotMasterState
    MasterTriggerRestoreHook#triggerCheckpoint
CheckpointCoordinator 通知子使命开始 Checkpoint



  • CheckpointCoordinator 给子使命发送 Checkpoint 哀求
    CheckpointCoordinator#triggerCheckpointRequest

    • 发送使命 Checkpoint 哀求
      CheckpointCoordinator#triggerTasks

      • 向全部的 Exeuction 对应的 Taskmanager 网关发送 Checkpoint 哀求,子使命接收到哀求后会开始触发 Checkpoint
        Execution#triggerCheckpointHelper
        TaskManagerGateway#triggerCheckpoint

    • 使命 Checkpoint 哀求发送完后取消定时器

子使命开始触发 Checkpoint



  • TaskManager 触发指定子使命的 Checkpoint
    TaskExecutor#triggerCheckpoint
    Task#triggerCheckpointBarrier

    • 创建 Checkpoint 元数据 CheckpointMetaData
    • 算子 Mailbox 异步执行 Checkpoint,因为 Checkpoint 在 MailboxProcessor 执行,所以这时将不会有数据传入
      CheckpointableTask#triggerCheckpointAsync
      StreamTask#triggerCheckpointAsync

      • 假如 InputGateway 分区数据未处置处罚完成,则触发未完成的数据通道 Checkpoint
        StreamTask#triggerUnfinishedChannelsCheckpoint
        这这情况是思量已完成使命的 Checkpoint ,假如非 Source 使命成为新的主使命,则可能会通过 RPC 触发查抄点。在这种情况下,他们将通知该查抄点的 CheckpointBarrierHandle。

        • 创建一个 CheckpointBarrier,并通知全部未完成的 Channel 处置处罚该 Barrier,并尝试触发 Checkpoint
          CheckpointBarrierHandler#processBarrier

      • 假如 InputGateway 分区数据已经处置处罚完成,则直接开始触发 Checkpoint
        StreamTask#triggerCheckpointAsyncInMailbox

        • 初始化输入端 Channel 状态
          SubtaskCheckpointCoordinator#initInputsCheckpoint
        • SubtaskCheckpointCoordinatorImpl 开始执行 Checkpoint
          StreamTask#performCheckpoint
          SubtaskCheckpointCoordinator#checkpointState



MailBoxProcessor 异步执行 Checkpoint 事件



  • 算子调用 SubtaskCheckpointCoordinator 执行 Checkpoint
    SubtaskCheckpointCoordinatorImpl#checkpointState

    • 假如当前 Checkpint 被终止了,那么向下游发送 CancelCheckpointMarker事件,以防下游背压,并结束当前 Checkpoint。
    • 假如 Checkpoint 之前没有对齐过,并且 Checkpoint 配置的对齐类型是强制对齐,那么首先将当前 Checkpoint 类型设置为不再需要对齐了,然后初始化输入端的状态,可见初始化输入端状态
      CheckpointOptions#withUnalignedSupported
      SubtaskCheckpointCoordinatorImpl#initInputsCheckpoint
    • 准备 Checkpoint,算子执行 Snapshot 和 发送 Barrier 前的操纵
      OperatorChain#prepareSnapshotPreBarrier
    • 创建 CheckpointBarrier,并往下游发送 CheckpointBarrier 事件,开始 Barrier 对齐操纵
      OperatorChain#broadcastEvent
    • 注册对齐计时器以在超时时对齐未对齐的 barrier
      SubtaskCheckpointCoordinator#registerAlignmentTimer
    • 假如前面举行了 Channel Checkpoint,那么在这里完成状态通道 Writer 快照
      ChannelStateWriter#finishOutput
    • SubtaskCheckpointCoordinator 同步获取算子的全部的状态快照
      SubtaskCheckpointCoordinator#takeSnapshotSync

      • 假如 Checkpoint 是可超时和可不对齐的,则从 ChannelStateWriter 中获取通道状态写结果(ChannelStateWriteResult)
      • 剖析 Checkpoint 存储地址
        SubtaskCheckpointCoordinatorImpl.CachingCheckpointStorageWorkerView#resolveCheckpointStorageLocation
      • 触发 OpeartorChain 状态快照
        OperatorChain#snapshotState

        • 假如是 RegularOperatorChain,则获取全部算子,并触发全部算子的状态快照
          RegularOperatorChain#buildOperatorSnapshotFutures

          • **构建 StreamOpeartor 算子状态快照 Future **
            StreamOperator#snapshotState
          • 假如算子是主算子或者是尾算子,那么将通道和结果分区的状态快照结果 Future 设置到AsyncCheckpointRunnable 中

        • 假如是FinishedOperatorChain,则只将通道和结果分区的状态快照结果 Future 设置到 OperatorSnapshotFutures 中
        • 向 CheckpointCoordinator 发送已接收 Checkpoint 事件
          OperatorChain#sendAcknowledgeCheckpointEvent

      • 清算 Checkpoint 缓存
        SubtaskCheckpointCoordinatorImpl.CachingCheckpointStorageWorkerView#clearCacheFor
      • 设置 Checkpoint 持续时间的指标
        CheckpointMetricsBuilder#setSyncDurationMillis

    • 假如获取 SnapShot 乐成,则异步完成 Checkpoint
      SubtaskCheckpointCoordinator#finishAndReportAsync

      • 创建并异步执行 AsyncCheckpointRunnable
        AsyncCheckpointRunnable#start

        • 开始状态快照,并期待全部 SnapshotFuture 完成
          AsyncCheckpointRunnable#finalizedFinishedSnapshots
          AsyncCheckpointRunnable#finalizeNonFinishedSnapshots
        • 计算 Channel 和分区对齐时的状态大小,并设置相干指标


    • 否则,清算 SubtaskCheckpointCoordinator
      SubtaskCheckpointCoordinator#cleanup

初始化输入端状态



  • 子使命初始化 Checkpoint
    SubtaskCheckpointCoordinatorImpl#initInputsCheckpoint

    • 假如 Checkpoint 可不需要对齐

      • 初始化写状态通道
        ChannelStateWriter#start

        • 创建CheckpointStartRequest,并将哀求分发到 Writer
          ChannelStateWriteRequestDispatcher#dispatch

          • 分发器处置处罚 CheckpointStartRequest
            ChannelStateWriteRequestDispatcherImpl#handleCheckpointStartRequest

            • 为该子使命 Writer 注册 ChannelStateWriteResult,用于收集 Checkpoint 过程中传输过来的数据
              ChannelStateCheckpointWriter#registerSubtaskResult



      • 准备正在传输中的数据快照,期待输入端的数据到达 Barrier
        SubtaskCheckpointCoordinatorImpl#prepareInflightDataSnapshot

        • 准备输入端快照
          StreamTask#prepareInputSnapshot
          StreamTaskInput#prepareSnapshot

          • 网络输入端准备快照
            StreamTaskNetworkInput#prepareSnapshot

            • 获取全部还未处置处罚的 Buffer,并添加到状态写状态通道中
              ChannelStateWriter#addInputData
            • 返回全部 Barriers 屏障继承 Future


        • 等全部 Barriers 屏障继承后,完成对给定查抄点 id 的通道状态数据的写入

          • 将 CheckpointInProgressRequest 哀求提交到通道状态写哀求执行器(ChannelStateWriteRequestExecutor)中
          • 通道状态写哀求执行器执行对应哀求
            ChannelStateCheckpointWriter#completeInput

            • 完成状态写入,写入的状态存放在 ChannelStateWriteResult 中,内里存放着写入的状态柄 InputChannelStateHandle 和 ResultSubpartitionStateHandle
              ChannelStateCheckpointWriter#finishWriteAndResult




    • 假如 Checkpoint 是可超时的,那么除了上面准备输入端快照那一步骤外,其他步骤都需要举行

触发 StreamOperator 状态快照



  • 触发 StreamOperator 的 Checkpoint
    RegularOperatorChain#checkpointStreamOperator

    • StreamOperatorStateHandler 创建快照
      StreamOperatorStateHandler#snapshotState

      • 创建算子快照环境和算子快照 Futures
      • 真正的触发算子快照,该步操纵可以通过算子自定义
        StreamOperatorStateHandler.CheckpointedStreamOperator#snapshotState
      • 算子和 Keyd 状态后端触发快照
        Snapshotable#snapshot


下游算子接收到 CheckpointBarrier 后开始 Checkpoint



  • 下游算子处置处罚上游发送过来的事件
    CheckpointedInputGate#handleEvent

    • 假如接收到的事件为 CheckpointBarrier 事件,则开始处置处罚 Barrier,尝试开始 Checkpoint
      CheckpointBarrierHandler#processBarrier

CheckpointBarrierHandler 处置处罚 Barrier



  • CheckpointBarrierHandler 处置处罚 Barrier
    CheckpointBarrierHandler#processBarrier

    • 假如该 Barrier Id 大于上一次 PendingCheckpoint 的 Id 并且当前开启的 Channel 只有一个,标记对齐开始和结束,并通知开始 Checkpoint,然后结束该次处置处罚
      CheckpointBarrierHandler#markAlignmentStartAndEnd
      CheckpointBarrierHandler#notifyCheckpoint
      StreamTask#triggerCheckpointOnBarrier
      SubtaskCheckpointCoordinator#checkpointState
    • 否则尝试从期待的 Checkpoint 队列中探求该 CheckpointBarrier

      • 假如找到了,则阐明 Barrier 已经对齐,标记已经完成对齐,并开始触发 Checkpoint,可见[MailBoxProcessor 异步执行 Checkpoint 事件](#MailBoxProcessor 异步执行 Checkpoint 事件)
        CheckpointBarrierTracker#triggerCheckpointOnAligned
        CheckpointBarrierHandler#notifyCheckpoint
        StreamTask#triggerCheckpointOnBarrier
        SubtaskCheckpointCoordinator#checkpointState
      • 否则将该 Barrier 添加到Checkpoint 队列中,开始对齐


参考:
Flink Stateful Stream Processing:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/concepts/stateful-stream-processing/

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王海鱼

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表