Flink源码解析之:Flink On Yarn模式使命提交部署过程解析 ...

打印 上一主题 下一主题

主题 1291|帖子 1291|积分 3873

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Flink源码解析之:Flink On Yarn模式使命提交部署过程解析

一、Flink on Yarn部署模式概述

Apache Hadoop YARN 在许多数据处理框架中都很盛行。 Flink 服务提交给 YARN 的 ResourceManager,后者会在 YARN NodeManagers 管理的机器上天生容器。 Flink 将其 JobManager 和 TaskManager 实例部署到这些容器中。
Flink 可根据在 JobManager 上运行的作业所需的处理插槽数目,动态分配和取消分配 TaskManager 资源。
Flink on Yarn的部署模式包括三种方式,Application Mode、Per-Job Mode、Session Mode。对于天生环境来说,更推荐使用Application Mode或Per-Job Mode,因为这两种模式能够提供更好的应用隔离性。

  • Application Mode
    Application Mode模式将在 YARN 上启动一个 Flink 集群,应用程序 jar 的 main() 方法将在 YARN 的 JobManager 上实行。 应用程序一旦完成,群集就会关闭。 该种方式相比Per-Job模式来说,将应用main()方法的实行,StreamGraph、JobGraph的天生放在了Flink集群侧来实现。
  • Per-Job Mode
    Per-job 模式将在 YARN 上启动一个 Flink 集群,在客户端天生StreamGraph、JobGraph,并上传依赖项。末了将 JobGraph 提交给 YARN 上的 JobManager。 如果通过—detached参数配置了分离模式,则客户端将在提交被接受后立刻制止。
  • Session Mode
    Session部署模式会在YARN上部署一个恒久运行的Flink集群会话,该会话可以接受并实行多个Flink作业。
    Session部署模式包罗两种操纵模式:

    • attach mode(default):实行yarn-session.sh文件在Yarn上启动Flink集群,启动后客户端会一致运行,来追踪/监听集群状态。一旦集群异常,客户端会获取异常信息并展示。如果客户端异常制止了,则会发送signal到Flink集群,此时Flink集群同样也会制止。
    • detach mode :使用-d or --detached参数设置。在这种模式下,当实行yarn-session.sh文件在Yarn上启动Flink集群后,客户端会直接返回。要制止 Flink 群集,需要再次调用客户端或 YARN 工具。

三种提交模式的对比:
由bin/flink.sh脚本可知,客户端提交过程同一由org.apache.flink.client.cli.CliFronted入口类触发。Per-Job模式和Session模式下Flink应用main方法都会在客户端实行。客户端解析天生JobGraph后会将依赖项和JobGraph序列化后的二进制数据一起发往集群上。当客户端机器上有大量作业提交时,需要大量的网络带宽下载依赖项并将二进制文件发送到集群,会造成客户端消耗大量的资源。尤其在大量用户共享客户端时,问题更加突出。为办理该问题,社区提出了Application模式将Flink应用main方法触发过程后置到了JobManager天生过程中,以此将带宽压力分散到集群各个节点上。
鉴于Application部署模式的优势,本文会以Application部署模式的源码来进行解析,探究Flink以Application模式提交使命到Yarn集群中所颠末的大抵流程,为我们理解Flink On Yarn的部署有一个更深入和清晰的认识。
二、Flink Application部署模式源码解析

(一)CliFronted入口类

本节以Application部署模式为例,介绍Flink On Yarn的客户端提交源码流程。正如上文说的,由bin/flink.sh脚本可知,客户端提交过程同一由org.apache.flink.client.cli.CliFronted入口类触发,为此,我们起首进入到该方法的源码中,来观察下该入口类的实现逻辑:
  1. /** Submits the job based on the arguments. */
  2. public static void main(final String[] args) {
  3.     EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
  4.     // 1. find the configuration directory
  5.     // 用来获取配置目录,该目录通常包含Flink的配置文件,如flink-conf.yaml。
  6.     final String configurationDirectory = getConfigurationDirectoryFromEnv();
  7.     // 2. load the global configuration
  8.     // 加载flink的全局配置
  9.     final Configuration configuration =
  10.             GlobalConfiguration.loadConfiguration(configurationDirectory);
  11.     // 3. load the custom command lines
  12.     // 加载自定义的命令行配置
  13.     final List<CustomCommandLine> customCommandLines =
  14.             loadCustomCommandLines(configuration, configurationDirectory);
  15.     int retCode = 31;
  16.     try {
  17.                 // 实例化了CliFronted对象,CliFronted是Flink为CLI客户端提供的API,它提供了一系列的操作,例如作业的提交,取消,以及打印job的状态等。
  18.         final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
  19.         SecurityUtils.install(new SecurityConfiguration(cli.configuration));
  20.         // 启动Flink作业的入口,parseAndRun方法会解析命令行参数,并启动Flink作业。
  21.                 retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
  22.     } catch (Throwable t) {
  23.         final Throwable strippedThrowable =
  24.                 ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
  25.         LOG.error("Fatal error while running command line interface.", strippedThrowable);
  26.         strippedThrowable.printStackTrace();
  27.     } finally {
  28.         System.exit(retCode);
  29.     }
  30. }
复制代码
上述代码是CliFronted的入口main方法,该方法起首根据Flink的配置路径加载全局配置,比如flink-conf.xml配置文件,接着加载自界说命令行配置,并实例化了CliFronted对象。CliFronted是Flink为CLI客户端提供的API,它提供了一系列的操纵,例如作业的提交,取消,以及打印job的状态等。末了,调用cli.parseAndRun(args)方法,该方法会解析命令行参数,并启动Flink作业。
在parseAndRun方法中,会根据传入参数的第一个参数值来决定Flink集群的部署模式:


  • run-application:则会进入到CliFronted类的runApplication方法中,实行Application部署流程。
  • run:则会进入到CliFronted类的run方法中,在客户端实行作业的main方法(利用反射来实行)
这也是为什么我在使用命令行以Application模式部署Flink集群时,命令的开始要用以下形式:
  1. /bin/flink run-application -t yarn-application...
复制代码
(二)runApplication

接下来,我们继续进入到runApplicaiton方法来看看它的实现逻辑:
  1. protected void runApplication(String[] args) throws Exception {
  2.     LOG.info("Running 'run-application' command.");
  3.         // 解析传入的命令行参数
  4.     final Options commandOptions = CliFrontendParser.getRunCommandOptions();
  5.     final CommandLine commandLine = getCommandLine(commandOptions, args, true);
  6.         // 如果命令行参数中包含帮助选项(-h/--help),则调用下述方法打印帮助信息并返回
  7.     if (commandLine.hasOption(HELP_OPTION.getOpt())) {
  8.         CliFrontendParser.printHelpForRunApplication(customCommandLines);
  9.         return;
  10.     }
  11.         // 验证并获取激活的自定义命令行, CustonCommandLine是Flink用来处理不同部署模式的工具(例如Yarn,Standlone等),以便针对不同模式解析对应的特定设置和参数
  12.     final CustomCommandLine activeCommandLine =
  13.             validateAndGetActiveCommandLine(checkNotNull(commandLine));
  14.         // 初始化ApplicationClusterDeployer实例, 这是Flink用来启动Application的工具
  15.     final ApplicationDeployer deployer =
  16.             new ApplicationClusterDeployer(clusterClientServiceLoader);
  17.     final ProgramOptions programOptions;
  18.     final Configuration effectiveConfiguration;
  19.     // No need to set a jarFile path for Pyflink job.
  20.     if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
  21.         programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
  22.         effectiveConfiguration =
  23.                 getEffectiveConfiguration(
  24.                         activeCommandLine,
  25.                         commandLine,
  26.                         programOptions,
  27.                         Collections.emptyList());
  28.     } else {
  29.         programOptions = new ProgramOptions(commandLine);
  30.         programOptions.validate();
  31.         final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
  32.         effectiveConfiguration =
  33.                 getEffectiveConfiguration(
  34.                         activeCommandLine,
  35.                         commandLine,
  36.                         programOptions,
  37.                         Collections.singletonList(uri.toString()));
  38.     }
  39.         // 根据programOptions获取程序参数和入口类名来创建ApplicationConfiguration实例
  40.     final ApplicationConfiguration applicationConfiguration =
  41.             new ApplicationConfiguration(
  42.                     programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
  43.         // 最后调用deployer.run()来运行应用。这一步通常包括联系Flink集群,提交应用程序并安排其在集群中执行。
  44.     deployer.run(effectiveConfiguration, applicationConfiguration);
  45. }
复制代码
上述代码的实现流程与原理如下所示:


  • 解析命令行参数:起首,调用getCommandLine函数解析传入的命令行参数args。
  • 处理资助选项:如果命令行参数中包罗资助选项(-h/–help),则调用CliFrontendParser.printHelpForRunApplication打印资助信息并返回。
  • 获取激活的CustomCommandLine:通过validateAndGetActiveCommandLine函数获取激活的自界说命令行(CustomCommandLine)。CustomCommandLine是Flink用来处理差别部署模式的工具(例如Yarn,Standalone等),以便于针对差别模式解析对应的特定设置和参数。
  • 部署器配置:初始化ApplicationClusterDeployer实例,这是Flink用来启动Application的工具。
  • 提取程序选项和盘算有用配置:区分Python作业和其他作业,天生对应的ProgramOptions并验证其有用性。此外,根据激活的命令行、解析得到的命令行参数和程序选项盘算出有用的配置(effectiveConfiguration)。
  • 构造应用配置:使用从ProgramOptions中获取的程序参数和入口点类名创建ApplicationConfiguration实例。
  • 运行应用:末了,调用deployer.run()来运行应用。这一步通常包括联系Flink集群,提交应用程序并安排其在集群中实行。
ProgramOptions.entryPointClass的成员值是flink命令行 -c 选项指定的Flink应用入口类com.xxx.xxx.FlinkApplicationDemo,后续会以反射的形式触发main()方法的实行。
(三)ClusterDescriptor.deployApplicationCluster

上面代码中deployer.run(...)方法负责加载Yarn Application模式客户端信息等。
起首代码会根据configuration配置信息来获取ClusterClientFactory对象,获取的逻辑过程是根据configuration配置中的execution.target参数来决定的。
当实行命令行bin/flink run时, execution.target参数对应的枚举值可以如下:


  • remote
  • local
  • yarn-per-job
  • yarn-session
  • kubernetes-session
    当实行命令行bin/flink run-application时,execution.target参数对应的枚举值可以如下:
  • yarn-application
  • kubernetes-application
当execution.target参数为yarn-application时,Flink便会天生相应的YarnClusterClientFactory客户端工厂类,然后调用该工厂类的createClusterDescriptor方法,该方法中会新建YarnClient实例,YarnClient实例负责在客户端提交Flink应用程序,并终极天生ClusterDescriptor实例,该实例包罗用于在Yarn上部署Flink集群的部署信息Descriptor。
  1. @Override
  2. public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
  3.     checkNotNull(configuration);
  4.     final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
  5.     YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
  6.     return getClusterDescriptor(configuration);
  7. }
  8. private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
  9.     final YarnClient yarnClient = YarnClient.createYarnClient();
  10.     final YarnConfiguration yarnConfiguration =
  11.             Utils.getYarnAndHadoopConfiguration(configuration);
  12.     yarnClient.init(yarnConfiguration);
  13.     yarnClient.start();
  14.     return new YarnClusterDescriptor(
  15.             configuration,
  16.             yarnConfiguration,
  17.             yarnClient,
  18.             YarnClientYarnClusterInformationRetriever.create(yarnClient),
  19.             false);
  20. }
复制代码
有了该实例后,会调用deployApplicationCluster方法来部署Application模式的Flink集群。集群将在提交应用程序时创建,并在应用程序制止时拆除。此外,应用程序用户代码的{@code main()}将在集群上实行,而不是在客户端上实行。
YarnClusterDescriptor.deployApplicationCluster(…)方法调用过程如下:
(1)、YarnClusterDescriptor.deployApplicationCluster(…);进行一些配置和查抄,并调用deployInternal(…)方法。
(2)、YarnClusterDescriptor.deployInternal(…);
此中,最紧张的方法是deployInternal方法
(四)YarnClusterDescriptor.deployInternal

在该方法中,起首会判断Hadoop集群是否启用了Kerberos安全认证,如果开启了,则Flink会起首确认当前用户是否拥有有用的kerberos凭证。如果无效,则会抛出异常,部署作业失败。
紧接着,进行资源查抄和部署模式判断。
在validateClusterResources方法中,会根据配置的JobManager和TaskManager的资源大小与集群资源进行比对。

  • 如果JobManager的配置内存大小 < Yarn配置的最小调治分配内存(yarn.scheduler.minimum-allocation-mb参数,默认1024MB),则JobManager的内存大小会设置为该配置值。
  • 如果JobManager大小 > YARN 集群能够提供的单个容器的最大资源,则抛出异常:The cluster does not have the requested resources for the JobManager available!
  • 如果TaskManager大小 > YARN 集群能够提供的单个容器的最大资源,则抛出异常:The cluster does not have the requested resources for the TaskManagers available!
  • 如果TaskManager大小 > 当前YARN集群剩余资源单个使命容器分配的最大资源容量,则会打印告警日志:The requested amount of memory for the TaskManagers is more than the largest possible YARN container: freeClusterResources.containerLimit
  • 如果JobManager大小 > 当前YARN集群剩余资源单个使命容器分配的最大资源容量,则会打印告警日志:The requested amount of memory for the JobManager is more than the largest possible YARN container: freeClusterResources.containerLimit
颠末资源查抄后,会将末了确定的JobManager和TaskManager资源保存在ClusterSpecification对象中。
在部署模式决定中,Flink 提供了两种部署模式:Detach模式和Non-Detach模式。如果是 Detach模式,Flink 作业提交到YARN后,客户端可以直接退出,而作业将继续在YARN集群上运行。而在 Non-Detach模式下,客户端将持续等候作业实行完成。
然后就到了一个非常紧张的方法中:startAppMaster。
会根据上面决定的ClusterSpecification资源实例,启动用于管理Flink作业的Application Master。
startAppMaster方法比力长。
这段代码重要是用于启动Flink在YARN集群上的Application Master的过程,代码中包罗了几个重要部分:

  • 起首,核心的首步调是初始化文件系统并获取对应的 FileSystem 实例。代码查抄了文件系统的范例,如果是本地文件系统范例(file://开头),会抛出警告,因为Flink在YARN上运行需要分布式文件系统来存储文件。
  • 然后,获取了用于提交应用程序的 ApplicationSubmissionContext,并将 Flink 应用所需的各种文件如jar包、配置文件等上传到HDFS,并将这些文件的HDFS路径作为本地资源 (LocalResources)添加到ApplicationSubmissionContext里。
  • 在文件上传阶段,包括了一系列复杂的步调,起首是将 flink 配置、job graph、用户 jar、依赖库等上传到HDFS,并将这些文件的路径添加到应用的classpath;其次,如果设置了 security options(例如,Kerberos认证信息),会将相关文件也上传到HDFS;并且,对配置了Kerberos认证的 flink 应用,会从 YARN 获取 HDFS delegation tokens。
  • 在收集完上述一系列依赖文件后,final ContainerLaunchContext amContainer = setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec) 负责设置启动ApplicationMaster的命令操纵。
  • 设置ApplicationMaster的环境变量,诸如_FLINK_CLASSPATH、_FLINK_DIST_JAR(Flink jar resource location (in HDFS))、KRB5_PATH、_YARN_SITE_XML_PATH等环境变量。末了调用amContainer.setEnvironment(appMasterEnv);方法进行设置。
  • 接着,会将上述配置好的amContainer实例放入ApplicationSubmissionContext对象中,以及ApplicationName和所需的资源大小,终极交给交给YarnClient去提交,并随后通过周期性地获取应用状态,来等候应用处于RUNNING或FINISHED状态,完成应用的提交过程。
  • 如果在这一系列操纵中有任何异常或错误发生,会触发失败掩护钩子 DeploymentFailureHook,进行必要的清算工作。
上面这段代码表现了 Flink on YARN 的工作原理,Flink 通过 YARN Client 提交应用,启动 Application Master 来进行资源申请和使命调治,这是典范的 YARN 应用程序模型。各种文件(包括 flink 自己、用户 jar、配置文件等)都被上传到HDFS,然后再从HDFS分发到运行使命的 YARN 容器中,这样做是为了实现文件的分布式共享,并且利用了 YARN 的 LocalResource 机制来进行文件的分发。
对于第四点中的setupApplicationMasterContainer方法,该方法构造了ApplicationMaster的命令行启动命令,如下所示:
  1. ContainerLaunchContext setupApplicationMasterContainer(
  2.         String yarnClusterEntrypoint, boolean hasKrb5, JobManagerProcessSpec processSpec) {
  3.     // ------------------ Prepare Application Master Container  ------------------------------
  4.     // respect custom JVM options in the YAML file
  5.     String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
  6.     if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
  7.         javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
  8.     }
  9.     // krb5.conf file will be available as local resource in JM/TM container
  10.     if (hasKrb5) {
  11.         javaOpts += " -Djava.security.krb5.conf=krb5.conf";
  12.     }
  13.     // Set up the container launch context for the application master
  14.     ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
  15.     final Map<String, String> startCommandValues = new HashMap<>();
  16.     startCommandValues.put("java", "$JAVA_HOME/bin/java");
  17.     String jvmHeapMem =
  18.             JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);
  19.     startCommandValues.put("jvmmem", jvmHeapMem);
  20.     startCommandValues.put("jvmopts", javaOpts);
  21.     startCommandValues.put(
  22.             "logging", YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));
  23.     startCommandValues.put("class", yarnClusterEntrypoint);
  24.     startCommandValues.put(
  25.             "redirects",
  26.             "1> "
  27.                     + ApplicationConstants.LOG_DIR_EXPANSION_VAR
  28.                     + "/jobmanager.out "
  29.                     + "2> "
  30.                     + ApplicationConstants.LOG_DIR_EXPANSION_VAR
  31.                     + "/jobmanager.err");
  32.     String dynamicParameterListStr =
  33.             JobManagerProcessUtils.generateDynamicConfigsStr(processSpec);
  34.     startCommandValues.put("args", dynamicParameterListStr);
  35.     final String commandTemplate =
  36.             flinkConfiguration.getString(
  37.                     ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
  38.                     ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
  39.     final String amCommand =
  40.             BootstrapTools.getStartCommand(commandTemplate, startCommandValues);
  41.     amContainer.setCommands(Collections.singletonList(amCommand));
  42.     LOG.debug("Application Master start command: " + amCommand);
  43.     return amContainer;
  44. }
复制代码
启动命令的参数包括以下部分:


  • “java”:Java二进制文件的路径。一般来说,在YARN容器中,Java的路径会被设置为$JAVA_HOME/bin/java。
  • “jvmmem”:JVM参数,重要是内存参数,比如最大堆内存、最小堆内存等。这些参数会基于Flink配置以及JobManager的内存配置来天生。
  • “jvmopts”:JVM选项。这些选项来自Flink配置文件中设置的JVM选项,以及若存在Kerberos krb5.conf文件,还会添加-Djava.security.krb5.conf=krb5.conf。
  • “logging”:日志配置项,用于配置Flink的日志选项。
  • “class”:启动类,即YARN集群入口点类名(yarnClusterEntrypoint)。
  • “redirects”:输出重定向的参数,将stdout(输出流)和stderr(错误流)重定向到日志文件中。
  • “args”:通报给启动类的参数,重要是JobManager的动态配置参数。
⠀这些参数末了会填入一个启动命令模板(通常为"%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%"),来天生实际启动Flink应用的命令。
启动后的ApplicationMaster,在YARN集群上起着以下的关键作用:


  • 作为应用程序的主控制器,管理和监视应用程序的实行。
  • 负责请求YARN ResourceManager分配所需的资源(例如容器)。
  • 启动和监视使命实行器(TaskExecutor),它们在分配的容器中运行。
  • 与Flink的client(例如命令行界面或Web界面)以及ResourceManager进行交互,提供应用程序的状态和进度信息。
  • 在应用程序出现异常或失败时,它可以选择重新请求资源并重启失败的使命,提供了一定水平的错误规复能力。
⠀因此,Application Master是Flink在YARN上运行的关键组件,它负责管理Flink应用程序的生命周期和资源。
(五)YarnApplicationClusterEntryPoint

在上面的setupApplicationMasterContainer方法中,我们说该方法构建了ApplicationMaster的启动命令。从该命令行中可以看到,命令行的启动入口类为yarnClusterEntrypoint参数,对于Yarn Application部署模式来说,参数对应的入口类即为YarnApplicationClusterEntryPoint。在第四部分的分析中,当通过yarnClient将ApplicationMaster提交到Yarn集群后,便会申请Container来实行ApplicationMaster,实行该入口类。
为此,接下来我们来分析一下,YarnApplicationClusterEntryPoint入口类的实行逻辑。
  1. public static void main(final String[] args) {
  2.     // startup checks and logging
  3.     EnvironmentInformation.logEnvironmentInfo(
  4.             LOG, YarnApplicationClusterEntryPoint.class.getSimpleName(), args);
  5.     SignalHandler.register(LOG);
  6.     JvmShutdownSafeguard.installAsShutdownHook(LOG);
  7.     Map<String, String> env = System.getenv();
  8.        
  9.         // 获取工作路径
  10.     final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
  11.     Preconditions.checkArgument(
  12.             workingDirectory != null,
  13.             "Working directory variable (%s) not set",
  14.             ApplicationConstants.Environment.PWD.key());
  15.     try {
  16.         YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
  17.     } catch (IOException e) {
  18.         LOG.warn("Could not log YARN environment information.", e);
  19.     }
  20.     final Configuration dynamicParameters =
  21.             ClusterEntrypointUtils.parseParametersOrExit(
  22.                     args,
  23.                     new DynamicParametersConfigurationParserFactory(),
  24.                     YarnApplicationClusterEntryPoint.class);
  25.     final Configuration configuration =
  26.             YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);
  27.     PackagedProgram program = null;
  28.     try {
  29.                 // 获取用户应用程序jar,程序参数、入口类名等信息,封装为PackagedProgram实例
  30.         program = getPackagedProgram(configuration);
  31.     } catch (Exception e) {
  32.         LOG.error("Could not create application program.", e);
  33.         System.exit(1);
  34.     }
  35.     try {
  36.         configureExecution(configuration, program);
  37.     } catch (Exception e) {
  38.         LOG.error("Could not apply application configuration.", e);
  39.         System.exit(1);
  40.     }
  41.     YarnApplicationClusterEntryPoint yarnApplicationClusterEntrypoint =
  42.             new YarnApplicationClusterEntryPoint(configuration, program);
  43.     //         执行Application Cluster
  44. ClusterEntrypoint.runClusterEntrypoint(yarnApplicationClusterEntrypoint);
  45. }
复制代码
上面这段代码中,使用getPackagedProgram(configuration)方法获取用户应用程序jar,程序参数、入口类名等信息,封装为PackagedProgram实例,便于后续调用。
末了,调用runClusterEntrypoint方法,启动实行Application Cluster集群。
ClusterEntrypoint.runClusterEntrypoint(...)方法的调用链路如下:


  • ClusterEntrypoint.runClusterEntrypoint(...)
  • ClusterEntrypoint.startCluster(...)
  • ClusterEntrypoint.runCluster(...)
  • DispatcherResourceManagerComponentFactory.create(…)
在DispatcherResourceManagerComponentFactory.create方法中,启动了一系列服务,比如:


  • LeaderRetrievalService
  • WebMonitorEndpoint
  • ResourceManagerService
  • DispatcherRunner
本流程中重要需要关注的服务是DispatcherRunner,该方法中,会调用dispatcherRunnerFactory.createDispatcherRunner来初始化dispatchRunner实例,dispatcherRunner实例负责dispatcher组件的高可用leader推举操纵,同时dispatcher组件负责触发Flink用户应用main(…)方法实行。
在创建DispatchRunner的过程中,包罗高可用Leader推举过程,颠末一系列的方法链调用,会推举出一个Leader DispatchRunner服务来负责后续的处理流程。


  • DispatcherResourceManagerComponentFactory.createDispatcherRunner
  • DefaultDispatcherRunner.create()
  • DispatcherRunnerLeaderElectionLifecycleManager.createFor()
  • DefaultLeaderElectionService.start()
  • LeaderElectionDriverFactory.createLeaderElectionDriver()
  • new ZooKeeperLeaderElectionDriver
  • LeaderLatch.start()
  • LeaderLatch.internalStart()
  • LeaderLatch.reset()
  • LeaderLatch.setLeadership()
  • ZooKeeperLeaderElectionDriver.isLeader()
  • DefaultLeaderElectionService.onGrantLeadership()
  • DefaultDispatcherRunner.grantLeadership()
  • DefaultDispatcherRunner.startNewDispatcherLeaderProcess()
推举为leader的DefaultDispatcherRunner实例候选者在回变更作过程中会不停调用到上面的grantLeadership(…)方法,并在startNewDispatcherLeaderProcess(…)方法中天生dispatcherLeaderProcess,表现一个Ledaer Dispatcher历程来提供服务,并通过newDispatcherLeaderProcess::start方法来启动实行该服务的后续处理流程。Leader候选者回变更作触发过程会另起篇幅详细讲解,此处先这样理解。
在后续的处理流程中,我们需要关注的点是在何时触发用户应用程序的main方法实行,为此,继续深入以下调用链:


  • AbstractDispatcherLeaderProcess.startInternal()
  • SessionDispatcherLeaderProcess.onStart()
  • SessionDispatcherLeaderProcess.createDispatcherIfRunning()
  • SessionDispatcherLeaderProcess.createDispatcher()
  • ApplicationDispatcherGatewayServiceFactory.create()
  • new ApplicationDispatcherBootstrap(...)
上述调用链中,createDispatcher(…)方法会调用dispatcherGatewayServiceFactory.create(…)方法,dispatcherGatewayServiceFactory实际范例是ApplicationDispatcherGatewayServiceFactory。在dispatcherGatewayServiceFactory.create(…)方法中新建ApplicationDispatcherBootstrap实例。
在ApplicationDispatcherBootstrap实例中,继续通过以下方法调用链fixJobIdAndRunApplicationAsync(…) -> runApplicationAsync(…) -> runApplicationEntryPoint(…) -> ClientUtils.executeProgram(…) -> program.invokeInteractiveModeForExecution() -> callMainMethod(mainClass, args) -> mainMethod.invoke(null, (Object) args)触发Flink应用main(…)方法的实行。


  • ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync()
  • ApplicationDispatcherBootstrap.runApplicationAsync()
  • ApplicationDispatcherBootstrap.runApplicationEntryPoint()
  • ClientUtils.executeProgram()
  • PackagedProgram.invokeInteractiveModeForExecution()
  • PackagedProgram.callMainMethod()
  • mainMethod.invoke(null, (Object) args);
终极,在ApplicationDispatcherBootstrap类的实现中,我们找到了用户应用程序的main方法实行入口。
三、回顾与总结

回顾一下上面的整体流程,起首,我们通过ApplicationMaster的启动命令,找到AM组建实行的入口类为YarnApplicationClusterEntryPoint,接着,在启动集群时,我们发现Flink会初始化一些诸如LeaderRetrievalService、WebMonitorEndpoint、ResourceManagerService、DispatcherRunner的服务,这些服务分别发挥差别的用途,与Yarn和Flink集群进行交互。在本次分析过程中,我们偏重探究了DispatcherRunner服务的创建流程。
起首,会实行高可用的推举流程,终极推举出一个Leader DispatcherRunner来实行服务。推举完成后,该Leader DispatchRunner会调用ClientUtils.executeProgram方法,从封装好的PackagedProgram实例中,获取用户应用程序的入口类mainClass以及程序入参,并终极利用反射触发mainClass的main方法的实行,完成用户自界说Flink应用的实行。
以上就是重要的Flink On Yarn客户端作业的提交过程解析。这个提交过程相对来说照旧比力复杂的,包罗着许多部署配置参数,资源以及权限的校验和分配,ApplicationMaster的提交启动,并陪同AM启动后实行的一系列Flink服务初始化,以及我们关心的用户应用程序的调用入口,发现了在Application的部署模式下,用户应用程序的调用是在集群侧,也就是Leader DispatchRunner服务中完成的。
当然,DispatchRunner服务负责的使命远不止于此,上述流程中还有更多的细节等候我们去挖掘和学习,这篇文章可能只是让我们对提交流程有了一个初步的大体认识,对于更多深入的部分,需要我们不断思考不断挖掘,也接待各人交流观点和看法,感谢!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦应逍遥

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表