ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink 作业提交流程 [打印本页]

作者: 愛在花開的季節    时间: 2022-8-25 19:19
标题: Flink 作业提交流程
大家好,我是小寒~
今天给大家带来一篇 flink 作业提交相关的文章。
我们都知道,在开发完一个 flink 应用程序后,打包成 jar 包,然后通过 FLink CLI 或者 Web UI 提交作业到 FLink 集群。其实,Flink 的 jar 文件并不是 FLink 集群的可执行文件,需要经过转换之后提交给集群。其转换过程分为两个大的步骤。
总的来说,对用户API的调用,可以转换为 StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行拓扑(Task DAG)
提交流程

FLink 作业在开发完毕之后,需要提交到 FLink 集群执行。ClientFrontend 是入口,触发用户开发的 Flink 应用 jar 文件中的 main 方法,然后交给 PipelineExecutor#execue 方法,最终会触发一个具体的 PipelineExecutor 执行,如下图所示。

作业执行可以选择 Session 和 Per-Job 模式两种集群。

流水线执行器 PipelineExecutor

流水线执行器在 FLink 中叫作 PipelineExecutor,是 FLink Client 生成 JobGraph 之后,将作业提交给集群的重要环节。
集群有 Session 和 Per-Job 两种模式。在这两种模式下,集群的启动时机、提交作业的方式不同,所以在生产环境中有两种 PipelineExecutor。Session 模式对应于 AbstractSessionClusterExecutor,Per-Job 模式对应于 AbstractJobClusterExecutor。
该模式下,作业共享集群资源,作业通过 Http 协议进行提交。
在 Flink 1.10 版本中提供了三种会话模式:Yarn 会话模式、K8s 会话模式、Standalone。Standalone 模式比较特别,Flink 安装在物理机上,不能像在资源集群上一样,可以随时启动一个新集群,所有的作业共享 Standalone 集群。
在 Session 模式下, Yarn 作业提交使用 yarn-session.sh 脚本, K8s 作业提交使用 kubernetes-session.sh 脚本。两者的具体实现不同 ,但逻辑是类似的 ,在启动脚本的时候就会检查是否存在已经启动好的 Flink Session 模式集群,如果没有,则启动一个 Flink Session 模式集群,然后在 PipelineExecutor 中,通过 Dispatcher 提供的 Rest 接口提交 JobGraph,Dispatcher 为每一个作业启动一个 JobMaster,然后进入作业执行阶段。
该模式下,一个作业一个集群,作业之间相互隔离。
在 FLink 1.10 版本中,只有 Yarn 上实现了 Per-Job 模式。
Per-Job 模式下,因为不需要共享集群,所以在 PipelineExecutor 中执行作业提交的时候,可以创建集群并将 JobGraph 以及所需要的文件等一同交给 Yarn 集群,Yarn 集群在容器中启动 JobManager 进程,进行一系列的初始化动作,初始化完毕之后,从文件系统中获取 JobGraph ,交给 Dispatcher。 之后的执行流程与 Session 模式下的执行流程相同。
yarn session 的提交流程

从总体上来说,在 Yarn 集群上使用 Session 模式提交 Flink 作业的过程分为 3 个阶段。首先在 Yarn 上启动 Flink Session 模式的集群;其次通过 Flink Client 提交作业 ,最后进行作业的调度执行。

(1) 使用 yarn-session.sh 提交会话模式的作业
如果提交到已经存在的集群, 则获取 Yarn 集群信息、应用 ID,并准备提交作业。
如果是启动新的 Yarn Session 集群,则进入到步骤 (2)。
(2)Yarn 启动新的 Flink 集群
如果没有集群,则创建一个新的 Session 模式的集群。首先,将应用的配置文件(flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink jar、用户 jar 文件、JobGraph 对象等)上传至分布式存储(如 HDFS)的应用暂存目录。
然后通过 Yarn Client 向 Yarn 提交 Flink 创建集群的申请,Yarn 分配资源,在申请的 Yarn Container 中初始化并启动 FLink JobManager 进程,在 JobManager 进程中运行 YarnSessionClusterEntrypoint 作为集群启动的入口(不同的集群部署模式有不同的 ClusterEntrypoint 的实现),初始化 Dispatcher、ResourceManager。启动相关的 RPC 服务,等待 Client 通过 Rest 接口提交作业。
2、作业提交
Yarn 集群准备好后,开始作业提交。
(1)Flink Client 通过 Rest 向 Dispatcher 提交 JobGraph。
(2)Dispatcher 是 Rest  接口,不负责实际的调度、执行方面的工作,当收到 JobGraph 后,为作业创建一个 JobMaster,将工作交给 JobMaster(负责作业调度、管理作业和 Task 的生命周期 ),构建 ExecutionGraph(Job Graph的并行化版本)
(1)JobMaster 向 YarnResourceManager 申请资源,开始调度 ExecutionGraph 的执行;初次提交作业,集群尚没有 TaskManager,此时资源不足,开始申请资源。
(2)YarnResourceManager 收到 JobMaster 的资源请求,如果当前有空闲的 Slot,则将 Slot 分配给 JobMaster,否则 YarnResourceManager 将向 Yarn Master(Yarn 集群的 ResourceManager) 请求创建 TaskManager。
(3)YarnResourceManager 将资源请求加入等待请求队列,并通过心跳向 YARN RM 申请新的 Container 资源来启动 TaskManager 进程;Yarn 分配新的 Container 给 TaskManager。
(4)YarnResourceManager 从 HDFS 加载 Jar 文件等所需的相关资源,在容器中启动 TaskManager。
(5)TaskManager 启动之后,向 YarnResourceManager 进行注册,并把自己的 Slot 资源情况汇报给 YarnResourceManager 。
(6)YarnResourceManager 从等待队列中取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给了哪个 JobMaster。
(7)TaskManager 向 JobMaster 提供 Slot,JobMaster 调度 Task 到 TaskManager 的此 Slot 上执行。
至此,作业进入执行阶段。
Yarn Per-Job 提交流程

Yarn Per-Job 模式提交作业与 Yarn-Session 模式提交作业基本类似。Per-Job 模式下,JobGraph 和集群资源请求一起提交给 Yarn。

流处理的转换过程

StreamGraph

使用 DataStream API 开发的应用程序,首先被转换为 Transformation,然后被映射为 StreamGraph。
我们以熟知的 WordCount 程序为例,它的 StreamGraph 如下图所示。

从图中我们可以看到,StreamGraph 是由 StreamNode 和 StreamEdge 构成。
作业图

JobGraph 可以由流计算的 StreamGraph 转换而来。
流计算中,在 StreamGraph 的基础上进行了一些优化,如通过 OperatorChain 机制将算子合并起来,在执行时,调度在同一个 Task 线程上,避免数据的跨线程、跨网络的传递。

从 JobGraph 的图里可以看到,数据从上一个算子流到下一个算子的过程中,上游作为生产者提供了中间数据集(IntermediateDateSet),而下游作为消费者需要 JobEdge。JobEdge 是一个通信管道,连接了上游生产的中间数据集和 JobVertex 节点。
JobGraph 的核心对象是 JobVertex、JobEdge 和 IntermediateDateSet。
执行图

ExecutionGraph 是调度 Flink 作业执行的核心数据结构,包含了作业中所有并行执行的 Task 的信息、Task 之间的关联关系、数据流转关系等。
StreamGraph、JobGraph 在 Flink 客户端中生成,然后提交给 Flink 集群。JobGraph 到 ExecutionGraph 的转换在 JobMaster 中完成。在转化过程中,有如下重要变化。
生成的图如下图所示。

ExecutionGraph 的核心对象有 ExecutionJobVertex 、ExecutionVertex、IntermediateResult 、IntermediateResultPartition、ExecutionEdge 和 Execution。
总结

Flink 作业执行前需要提交 Flink 集群, Flink 集群可以与不同的资源框架(Yarn、K8s、Mesos 等)进行集成,可以按照不同的模式(Session 模式和 Per-Job模式)运行,所以在 Flink 作业提交过程中,可能在资源框架上启动Flink集群。Flink 就绪之后,进入作业提交阶段,在Flink客户端进行StreamGraph、JobGraph的转换,提交 JobGraph 到 Flink 集群,然后 Flink 集群负责将 JobGraph 转换为 ExecutionGraph,之后进入调度执行阶段。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4