flink源码跟读:on yarn application模式作业提交

打印 上一主题 下一主题

主题 834|帖子 834|积分 2502

目前在家待着,想着就看看flink源码吧,熟悉一下,学习一下输出一些博客。此文章是on yarn application模式代码提交过程。研究源码过程发现自己很多不敷之处,这篇文章也是履历了好多天才看了个大概,如果有问题希望大佬们指出,还请多多包涵!
一、Flink on yarn application模式终端命令提交

./bin/flink run-application -t yarn-application -Dparallelism.default=3 -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name=name  -Dtaskmanager.numberOfTaskSlots=1  -c className xxx.jar
我们一般在客户端服务器上,直接通过flink的命令来提交如上的脚本。提交上去以后,代码是怎么走的?今天我们去看一下过程。
我们找到flink脚本,打开以后我们找到最后一条命令。通过flink的安装目录我们探求到bin目录下的flink或者我们直接打开源码在 flink-dist 模块可以找到 flink脚本。关键我们是要找到执行了哪个class,这个想必大家都知道就是  org.apache.flink.client.cli.CliFrontend

 二、 App模式提交过程剖析

2.1 CliFrontend::main() ->  runApplication(String[] args) 
2.2 ApplicationClusterDeployer::run( final Configuration configuration, final ApplicationConfiguration applicationConfiguration) throws Exception { checkNotNull(configuration); checkNotNull(applicationConfiguration);
  1.     public <ClusterID> void run(
  2.             final Configuration configuration,
  3.             final ApplicationConfiguration applicationConfiguration)
  4.             throws Exception {
  5.         checkNotNull(configuration);
  6.         checkNotNull(applicationConfiguration);
  7.         LOG.info("Submitting application in 'Application Mode'.");
  8.         final ClusterClientFactory<ClusterID> clientFactory =
  9.                 clientServiceLoader.getClusterClientFactory(configuration);
  10.         try (
  11.                 //这一步进行了yarn的客户端初始化 yar信息初始化以及客户端启动,后期启动jm rpc要用到
  12.                 final ClusterDescriptor<ClusterID> clusterDescriptor =
  13.                 clientFactory.createClusterDescriptor(configuration) ) {
  14.             final ClusterSpecification clusterSpecification =
  15.                     clientFactory.getClusterSpecification(configuration);
  16.             //2、yarn直接部署作业 进去
  17.             clusterDescriptor.deployApplicationCluster(
  18.                     clusterSpecification, applicationConfiguration);
  19.         }
  20.     }
复制代码
2.3  YarnClusterDescriptor :: deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) ->
deployInternal( ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached)
startAppMaster( Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification)
在 startAppMaster 代码运行这里就算是把作业提交交给yarn处理了,yarnClient.submitApplication(appContext); 从这里代码跟进去我们发现源代码的路径已经不是flink的项目路径了。

     
三、提交rm完毕

以上就完成了提交到rm容器中了,但是flink集群还没有启动,也就是JobManager进程还没有启动,很多细节没有说到,只是看了关键的代码方法,也就是大体的一个轮廓。这样提交完了就可以运行了吗?我们的代码怎么办?其实在yarn初始化话,返回YarnClusterDescriptor 类的过程中,有很多初始化的信息,一个关键的部分就是提供运行我们flnk集群入口的地方。这里的  yarnClusterEntrypoint 类名就是我们的入口 org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint  留着我们下一篇文章再分析。
  1. ContainerLaunchContext setupApplicationMasterContainer(
  2.             String yarnClusterEntrypoint, boolean hasKrb5, JobManagerProcessSpec processSpec) {
  3.         // ------------------ Prepare Application Master Container  ------------------------------
  4.         // respect custom JVM options in the YAML file
  5.         String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
  6.         if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
  7.             javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
  8.         }
  9.         // krb5.conf file will be available as local resource in JM/TM container
  10.         if (hasKrb5) {
  11.             javaOpts += " -Djava.security.krb5.conf=krb5.conf";
  12.         }
  13.         // Set up the container launch context for the application master
  14.         ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
  15.         final Map<String, String> startCommandValues = new HashMap<>();
  16.         startCommandValues.put("java", "$JAVA_HOME/bin/java");
  17.         String jvmHeapMem =
  18.                 JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);
  19.         startCommandValues.put("jvmmem", jvmHeapMem);
  20.         startCommandValues.put("jvmopts", javaOpts);
  21.         startCommandValues.put(
  22.                 "logging", YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));
  23.         startCommandValues.put("class", yarnClusterEntrypoint);
  24.         startCommandValues.put(
  25.                 "redirects",
  26.                 "1> "
  27.                         + ApplicationConstants.LOG_DIR_EXPANSION_VAR
  28.                         + "/jobmanager.out "
  29.                         + "2> "
  30.                         + ApplicationConstants.LOG_DIR_EXPANSION_VAR
  31.                         + "/jobmanager.err");
  32.         String dynamicParameterListStr =
  33.                 JobManagerProcessUtils.generateDynamicConfigsStr(processSpec);
  34.         startCommandValues.put("args", dynamicParameterListStr);
  35.         final String commandTemplate =
  36.                 flinkConfiguration.getString(
  37.                         ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
  38.                         ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
  39.         final String amCommand =
  40.                 BootstrapTools.getStartCommand(commandTemplate, startCommandValues);
  41.         amContainer.setCommands(Collections.singletonList(amCommand));
  42.         LOG.debug("Application Master start command: " + amCommand);
  43.         return amContainer;
  44.     }
复制代码

四、JobManager启动过程

从官网上我们看到了jm的由三个紧张部分组成,我们从官网截图如下:

YarnApplicationClusterEntryPoint 就是我们的flink集群入口类,我们跟进 main 方法
ClusterEntrypoint:: runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) ->
ClusterEntrypoint:: startCluster() ->
ClusterEntrypoint:: runCluster(Configuration configuration, PluginManager pluginManager)
-> initializeServices(configuration, pluginManager);
初始化服务,jm的rpc服务,jmx服务,io线程,选举服务,blob服务共享jar使用,心跳服务,指标注册服务 看这里的方法从前不知道akka在哪儿使用,这下就明白了。
-> dispatcherResourceManagerComponentFactory.create( configuration, ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, executionGraphInfoStore, new RpcMetricQueryServiceRetriever( metricRegistry.getMetricQueryServiceRpcService()), this);
从这里代码不绝跟下去直到 
  1. SessionDispatcherLeaderProcess::onStart()
复制代码

接着跟进createDispatcher方法中的create方法

到这里dispatcher启动告一段,dispatcher现实内里onstart方法还有更具体的代码,感爱好的可以跟进去看具体的详情。rm,webMonitor服务与此类似。

五、flink业务代码怎样被调用

上一节我们说到在创建 createDispatcher 中用到了 ApplicationDispatcherBootstrap  就是程序启动项。dispatcherGatewayServiceFactory.create(...)方法中新建ApplicationDispatcherBootstrap实例时构造函数,通过以下方法调用链
ApplicationDispatcherBootstrap::fixJobIdAndRunApplicationAsync(...)
-> runApplicationAsync(...)
-> runApplicationEntryPoint(...)
-> runApplicationEntryPoint(...)
->
ClientUtils.executeProgram(...)
->
->PackagedProgram::invokeInteractiveModeForExecution()
-> callMainMethod(mainClass, args)
->
Method::invoke(null, (Object) args)
颠末这一些步骤,最后通过反射调用我们自己写的flink业务程序的main方法从而开始执行代码。
总结

很多细节自己还是没有把握,以是后期还是会把细节的东西研究清楚一下,比方说:选举过程细节。后期也会针对选举这一块整理一篇文章出来。。
研究良好的源代码也是发现自己不敷的地方,很多时候写着写着发现了自己那边不懂,就去查资料研究,一层一层的,感觉太多东西需要学习了。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦见你的名字

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表