detach mode :使用-d or --detached参数设置。在这种模式下,当实行yarn-session.sh文件在Yarn上启动Flink集群后,客户端会直接返回。要制止 Flink 群集,需要再次调用客户端或 YARN 工具。
三种提交模式的对比:
由bin/flink.sh脚本可知,客户端提交过程同一由org.apache.flink.client.cli.CliFronted入口类触发。Per-Job模式和Session模式下Flink应用main方法都会在客户端实行。客户端解析天生JobGraph后会将依赖项和JobGraph序列化后的二进制数据一起发往集群上。当客户端机器上有大量作业提交时,需要大量的网络带宽下载依赖项并将二进制文件发送到集群,会造成客户端消耗大量的资源。尤其在大量用户共享客户端时,问题更加突出。为办理该问题,社区提出了Application模式将Flink应用main方法触发过程后置到了JobManager天生过程中,以此将带宽压力分散到集群各个节点上。
鉴于Application部署模式的优势,本文会以Application部署模式的源码来进行解析,探究Flink以Application模式提交使命到Yarn集群中所颠末的大抵流程,为我们理解Flink On Yarn的部署有一个更深入和清晰的认识。
二、Flink Application部署模式源码解析
(一)CliFronted入口类
本节以Application部署模式为例,介绍Flink On Yarn的客户端提交源码流程。正如上文说的,由bin/flink.sh脚本可知,客户端提交过程同一由org.apache.flink.client.cli.CliFronted入口类触发,为此,我们起首进入到该方法的源码中,来观察下该入口类的实现逻辑:
/** Submits the job based on the arguments. */
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
// 用来获取配置目录,该目录通常包含Flink的配置文件,如flink-conf.yaml。
final String configurationDirectory = getConfigurationDirectoryFromEnv();
如果JobManager大小 > YARN 集群能够提供的单个容器的最大资源,则抛出异常:The cluster does not have the requested resources for the JobManager available!
如果TaskManager大小 > YARN 集群能够提供的单个容器的最大资源,则抛出异常:The cluster does not have the requested resources for the TaskManagers available!
如果TaskManager大小 > 当前YARN集群剩余资源单个使命容器分配的最大资源容量,则会打印告警日志:The requested amount of memory for the TaskManagers is more than the largest possible YARN container: freeClusterResources.containerLimit
如果JobManager大小 > 当前YARN集群剩余资源单个使命容器分配的最大资源容量,则会打印告警日志:The requested amount of memory for the JobManager is more than the largest possible YARN container: freeClusterResources.containerLimit
设置ApplicationMaster的环境变量,诸如_FLINK_CLASSPATH、_FLINK_DIST_JAR(Flink jar resource location (in HDFS))、KRB5_PATH、_YARN_SITE_XML_PATH等环境变量。末了调用amContainer.setEnvironment(appMasterEnv);方法进行设置。