Flink 作业提交流程

打印 上一主题 下一主题

主题 671|帖子 671|积分 2013

大家好,我是小寒~
今天给大家带来一篇 flink 作业提交相关的文章。
我们都知道,在开发完一个 flink 应用程序后,打包成 jar 包,然后通过 FLink CLI 或者 Web UI 提交作业到 FLink 集群。其实,Flink 的 jar 文件并不是 FLink 集群的可执行文件,需要经过转换之后提交给集群。其转换过程分为两个大的步骤。

  • 在 FLink Client 中通过反射启动 Jar 中的 main 函数,生成 Flink StreamGraph、JobGraph,将 JobGraph 提交给 Flink 集群。
  • FLink 集群收到 JobGraph 之后,将 JobGraph 翻译成 ExecutionGraph,然后开始调度执行,启动成功之后开始消费数据。
总的来说,对用户API的调用,可以转换为 StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行拓扑(Task DAG)
提交流程

FLink 作业在开发完毕之后,需要提交到 FLink 集群执行。ClientFrontend 是入口,触发用户开发的 Flink 应用 jar 文件中的 main 方法,然后交给 PipelineExecutor#execue 方法,最终会触发一个具体的 PipelineExecutor 执行,如下图所示。

作业执行可以选择 Session 和 Per-Job 模式两种集群。

  • Session 模式的集群,一个集群中运行多个作业。
  • Per-Job 模式的集群,一个集群中只运行一个作业,作业执行完毕则集群销毁。

流水线执行器 PipelineExecutor

流水线执行器在 FLink 中叫作 PipelineExecutor,是 FLink Client 生成 JobGraph 之后,将作业提交给集群的重要环节。
集群有 Session 和 Per-Job 两种模式。在这两种模式下,集群的启动时机、提交作业的方式不同,所以在生产环境中有两种 PipelineExecutor。Session 模式对应于 AbstractSessionClusterExecutor,Per-Job 模式对应于 AbstractJobClusterExecutor。

  • Session 模式
该模式下,作业共享集群资源,作业通过 Http 协议进行提交。
在 Flink 1.10 版本中提供了三种会话模式:Yarn 会话模式、K8s 会话模式、Standalone。Standalone 模式比较特别,Flink 安装在物理机上,不能像在资源集群上一样,可以随时启动一个新集群,所有的作业共享 Standalone 集群。
在 Session 模式下, Yarn 作业提交使用 yarn-session.sh 脚本, K8s 作业提交使用 kubernetes-session.sh 脚本。两者的具体实现不同 ,但逻辑是类似的 ,在启动脚本的时候就会检查是否存在已经启动好的 Flink Session 模式集群,如果没有,则启动一个 Flink Session 模式集群,然后在 PipelineExecutor 中,通过 Dispatcher 提供的 Rest 接口提交 JobGraph,Dispatcher 为每一个作业启动一个 JobMaster,然后进入作业执行阶段。

  • Per-Job 模式
该模式下,一个作业一个集群,作业之间相互隔离。
在 FLink 1.10 版本中,只有 Yarn 上实现了 Per-Job 模式。
Per-Job 模式下,因为不需要共享集群,所以在 PipelineExecutor 中执行作业提交的时候,可以创建集群并将 JobGraph 以及所需要的文件等一同交给 Yarn 集群,Yarn 集群在容器中启动 JobManager 进程,进行一系列的初始化动作,初始化完毕之后,从文件系统中获取 JobGraph ,交给 Dispatcher。 之后的执行流程与 Session 模式下的执行流程相同。
yarn session 的提交流程

从总体上来说,在 Yarn 集群上使用 Session 模式提交 Flink 作业的过程分为 3 个阶段。首先在 Yarn 上启动 Flink Session 模式的集群;其次通过 Flink Client 提交作业 ,最后进行作业的调度执行。


  • 启动集群
(1) 使用 yarn-session.sh 提交会话模式的作业
如果提交到已经存在的集群, 则获取 Yarn 集群信息、应用 ID,并准备提交作业。
如果是启动新的 Yarn Session 集群,则进入到步骤 (2)。
(2)Yarn 启动新的 Flink 集群
如果没有集群,则创建一个新的 Session 模式的集群。首先,将应用的配置文件(flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink jar、用户 jar 文件、JobGraph 对象等)上传至分布式存储(如 HDFS)的应用暂存目录。
然后通过 Yarn Client 向 Yarn 提交 Flink 创建集群的申请,Yarn 分配资源,在申请的 Yarn Container 中初始化并启动 FLink JobManager 进程,在 JobManager 进程中运行 YarnSessionClusterEntrypoint 作为集群启动的入口(不同的集群部署模式有不同的 ClusterEntrypoint 的实现),初始化 Dispatcher、ResourceManager。启动相关的 RPC 服务,等待 Client 通过 Rest 接口提交作业。
2、作业提交
Yarn 集群准备好后,开始作业提交。
(1)Flink Client 通过 Rest 向 Dispatcher 提交 JobGraph。
(2)Dispatcher 是 Rest  接口,不负责实际的调度、执行方面的工作,当收到 JobGraph 后,为作业创建一个 JobMaster,将工作交给 JobMaster(负责作业调度、管理作业和 Task 的生命周期 ),构建 ExecutionGraph(Job Graph的并行化版本)

  • 作业调度执行
(1)JobMaster 向 YarnResourceManager 申请资源,开始调度 ExecutionGraph 的执行;初次提交作业,集群尚没有 TaskManager,此时资源不足,开始申请资源。
(2)YarnResourceManager 收到 JobMaster 的资源请求,如果当前有空闲的 Slot,则将 Slot 分配给 JobMaster,否则 YarnResourceManager 将向 Yarn Master(Yarn 集群的 ResourceManager) 请求创建 TaskManager。
(3)YarnResourceManager 将资源请求加入等待请求队列,并通过心跳向 YARN RM 申请新的 Container 资源来启动 TaskManager 进程;Yarn 分配新的 Container 给 TaskManager。
(4)YarnResourceManager 从 HDFS 加载 Jar 文件等所需的相关资源,在容器中启动 TaskManager。
(5)TaskManager 启动之后,向 YarnResourceManager 进行注册,并把自己的 Slot 资源情况汇报给 YarnResourceManager 。
(6)YarnResourceManager 从等待队列中取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给了哪个 JobMaster。
(7)TaskManager 向 JobMaster 提供 Slot,JobMaster 调度 Task 到 TaskManager 的此 Slot 上执行。
至此,作业进入执行阶段。
Yarn Per-Job 提交流程

Yarn Per-Job 模式提交作业与 Yarn-Session 模式提交作业基本类似。Per-Job 模式下,JobGraph 和集群资源请求一起提交给 Yarn。


  • 启动集群
    (1)使用 flink run -m yarn-cluster 提交 Per-Job 模式的作业。
    (2)Yarn 启动 Flink 集群。该模式下 Flink 集群的启动入口是 YarnJobClusterEntryPoint,其它与 Yarn-Session 模式启动类似。
  • 作业提交
    该步骤与 Session 模式下的不同之处在于,Client 并不会通过 Rest 向 Dispacher 提交 JobGraph,由 Dispacher 从本地文件系统获取 JobGraph,其后的步骤与 Session 模式一样。
  • 作业调度执行
    与 Yarn-Session 模式下一致。
流处理的转换过程

StreamGraph

使用 DataStream API 开发的应用程序,首先被转换为 Transformation,然后被映射为 StreamGraph。
我们以熟知的 WordCount 程序为例,它的 StreamGraph 如下图所示。

从图中我们可以看到,StreamGraph 是由 StreamNode 和 StreamEdge 构成。

  • StreamNode
    StreamNode 是 StreamGraph 中的节点,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示为一个算子;从逻辑上来说,StreamNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StreamNode 可以有多个输入,也可以有多个输出。
    实体的 StreamNode 会最终变为物理的算子。虚拟的 StreamNode 会附着在 StreamEdge 上。
  • StreamEdge
    StreamEdge 是 StreamGraph 中的边, 用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边。 StreamEdge 中包含了盘路输出、分区器、字段筛选输出等的信息。
作业图

JobGraph 可以由流计算的 StreamGraph 转换而来。
流计算中,在 StreamGraph 的基础上进行了一些优化,如通过 OperatorChain 机制将算子合并起来,在执行时,调度在同一个 Task 线程上,避免数据的跨线程、跨网络的传递。

从 JobGraph 的图里可以看到,数据从上一个算子流到下一个算子的过程中,上游作为生产者提供了中间数据集(IntermediateDateSet),而下游作为消费者需要 JobEdge。JobEdge 是一个通信管道,连接了上游生产的中间数据集和 JobVertex 节点。
JobGraph 的核心对象是 JobVertex、JobEdge 和 IntermediateDateSet。

  • JobVertex
    经过算子融合优化后符合条件的多个 StreamNode 可能会融合在一起生成一个 JobVertex,即一个 JobVertex 包含一个或多个算子,JobVertex 的输入是 JobEdge,输出是 IntermediateDateSet。
  • JobEdge
    JobEdge 是 JobGraph 中连接 IntermediateDateSet 和 JobVertex 的边,表示 JobGraph 中的一个数据流转通道,其上游数据源是 IntermediateDateSet,下游消费者是 JobVertex ,即数据通过 JobEdge 由 IntermediateDateSet 传递给目标 JobVertex 。
    JobEdge 中的数据分发模式会直接影响执行时 Task 之间的数据连接关系,是点对点连接还是全连接。
  • IntermediateDateSet
    中间数据集 IntermediateDataSet 是一种逻辑结构,用来表示 JobVertex 的输出,即该 JobVertex 中包含的算子会产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决定了在执行时刻数据交换的模式。
    IntermediateDataSet 的个数与该 JobVertex 对应的 StreamNode 的出边数量相同,可以是一个或者多个。
执行图

ExecutionGraph 是调度 Flink 作业执行的核心数据结构,包含了作业中所有并行执行的 Task 的信息、Task 之间的关联关系、数据流转关系等。
StreamGraph、JobGraph 在 Flink 客户端中生成,然后提交给 Flink 集群。JobGraph 到 ExecutionGraph 的转换在 JobMaster 中完成。在转化过程中,有如下重要变化。

  • 加入了并行度的概念,成为真正可调度的图结构。
  • 生成了与 JobVertex 对应的 ExecutionJobVertex 和 ExecutionVertex,与IntermediateDataSet 对应的 IntermediateResult 和 IntermediateResultPartition 等。
生成的图如下图所示。

ExecutionGraph 的核心对象有 ExecutionJobVertex 、ExecutionVertex、IntermediateResult 、IntermediateResultPartition、ExecutionEdge 和 Execution。

  • ExecutionJobVertex
    该对象和 JobGraph 中的 JobVertex 一一对应。该对象还包含一组 ExecutionVertex,数量与该 JobVertex 中所包含的 StreamNode 的并行度一致,假设 StreamNode 的并行度为3,那么 ExecutionJobVertex 也会包含 3个 ExecutionVertex。
  • ExecutionVertex
    ExecutionJobVertex 中会对作业进行并行化处理,构造可以并行执行的实例,每一个并行执行的实例就是 ExecutionVertex。
    构造 ExecutionVertex 的同时,也会构建 ExecutionVertex 的输出 IntermediateResult。
  • IntermediateResult
    IntermediateResult 又叫中间结果集,该对象是个逻辑概念,表示 ExecutionJobVertex 的输出,和 JobVertex 中的 IntermediateDataSet 一一对应,同样,一个ExecutionJobVertex 可以有多个中间结果,取决于当前 JobVertex 有几个出边(JobEdge)
    一个中间结果包含多个中间结果分区 IntermediateResultPartition,其个数等于该 JobVertex 的并发度,或者叫作算子的并行度。
  • IntermediateResultPartition
    IntermediateResultPartition 又叫作中间结果分区,表示一个 ExecutionVertex 的输出结果,与 ExecutionEdge 相关联。
  • ExecutionEdge
    表示 ExecutionVertex  的输入,连接到上游产生的 IntermediateResultPartition 。
  • Execution
    ExecutionVertex 相当于每个 Task 的模板,在真正执行的时候,会将 ExecutionVertex 中的信息包装为一个 Execution,执行一个 ExecutionVertex 的一次尝试。JobManager 和 TaskManager 之间关于 Task 的部署和 Task 的执行状态的更新都是通过 ExecutionAttemptID 来标识实例的。在发生故障或者数据需要重算的情况下,ExecutionVertex 可能会有多个ExecutionAttemptID 。一个 Execution 通过 ExecutionAttemptID  来唯一标识。
总结

Flink 作业执行前需要提交 Flink 集群, Flink 集群可以与不同的资源框架(Yarn、K8s、Mesos 等)进行集成,可以按照不同的模式(Session 模式和 Per-Job模式)运行,所以在 Flink 作业提交过程中,可能在资源框架上启动Flink集群。Flink 就绪之后,进入作业提交阶段,在Flink客户端进行StreamGraph、JobGraph的转换,提交 JobGraph 到 Flink 集群,然后 Flink 集群负责将 JobGraph 转换为 ExecutionGraph,之后进入调度执行阶段。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

愛在花開的季節

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表