目前在家待着,想着就看看flink源码吧,熟悉一下,学习一下输出一些博客。此文章是on yarn application模式代码提交过程。研究源码过程发现自己很多不敷之处,这篇文章也是履历了好多天才看了个大概,如果有问题希望大佬们指出,还请多多包涵!
一、Flink on yarn application模式终端命令提交
./bin/flink run-application -t yarn-application -Dparallelism.default=3 -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name=name -Dtaskmanager.numberOfTaskSlots=1 -c className xxx.jar
我们一般在客户端服务器上,直接通过flink的命令来提交如上的脚本。提交上去以后,代码是怎么走的?今天我们去看一下过程。
我们找到flink脚本,打开以后我们找到最后一条命令。通过flink的安装目录我们探求到bin目录下的flink或者我们直接打开源码在 flink-dist 模块可以找到 flink脚本。关键我们是要找到执行了哪个class,这个想必大家都知道就是 org.apache.flink.client.cli.CliFrontend 。
二、 App模式提交过程剖析
2.1 CliFrontend::main() -> runApplication(String[] args)
2.2 ApplicationClusterDeployer::run( final Configuration configuration, final ApplicationConfiguration applicationConfiguration) throws Exception { checkNotNull(configuration); checkNotNull(applicationConfiguration);
- public <ClusterID> void run(
- final Configuration configuration,
- final ApplicationConfiguration applicationConfiguration)
- throws Exception {
- checkNotNull(configuration);
- checkNotNull(applicationConfiguration);
- LOG.info("Submitting application in 'Application Mode'.");
- final ClusterClientFactory<ClusterID> clientFactory =
- clientServiceLoader.getClusterClientFactory(configuration);
- try (
- //这一步进行了yarn的客户端初始化 yar信息初始化以及客户端启动,后期启动jm rpc要用到
- final ClusterDescriptor<ClusterID> clusterDescriptor =
- clientFactory.createClusterDescriptor(configuration) ) {
- final ClusterSpecification clusterSpecification =
- clientFactory.getClusterSpecification(configuration);
- //2、yarn直接部署作业 进去
- clusterDescriptor.deployApplicationCluster(
- clusterSpecification, applicationConfiguration);
- }
- }
复制代码 2.3 YarnClusterDescriptor :: deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) ->
deployInternal( ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached)
startAppMaster( Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification)
在 startAppMaster 代码运行这里就算是把作业提交交给yarn处理了,yarnClient.submitApplication(appContext); 从这里代码跟进去我们发现源代码的路径已经不是flink的项目路径了。
三、提交rm完毕
以上就完成了提交到rm容器中了,但是flink集群还没有启动,也就是JobManager进程还没有启动,很多细节没有说到,只是看了关键的代码方法,也就是大体的一个轮廓。这样提交完了就可以运行了吗?我们的代码怎么办?其实在yarn初始化话,返回YarnClusterDescriptor 类的过程中,有很多初始化的信息,一个关键的部分就是提供运行我们flnk集群入口的地方。这里的 yarnClusterEntrypoint 类名就是我们的入口 org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint 留着我们下一篇文章再分析。
- ContainerLaunchContext setupApplicationMasterContainer(
- String yarnClusterEntrypoint, boolean hasKrb5, JobManagerProcessSpec processSpec) {
- // ------------------ Prepare Application Master Container ------------------------------
- // respect custom JVM options in the YAML file
- String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
- if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
- javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
- }
- // krb5.conf file will be available as local resource in JM/TM container
- if (hasKrb5) {
- javaOpts += " -Djava.security.krb5.conf=krb5.conf";
- }
- // Set up the container launch context for the application master
- ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
- final Map<String, String> startCommandValues = new HashMap<>();
- startCommandValues.put("java", "$JAVA_HOME/bin/java");
- String jvmHeapMem =
- JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);
- startCommandValues.put("jvmmem", jvmHeapMem);
- startCommandValues.put("jvmopts", javaOpts);
- startCommandValues.put(
- "logging", YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));
- startCommandValues.put("class", yarnClusterEntrypoint);
- startCommandValues.put(
- "redirects",
- "1> "
- + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + "/jobmanager.out "
- + "2> "
- + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + "/jobmanager.err");
- String dynamicParameterListStr =
- JobManagerProcessUtils.generateDynamicConfigsStr(processSpec);
- startCommandValues.put("args", dynamicParameterListStr);
- final String commandTemplate =
- flinkConfiguration.getString(
- ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
- ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
- final String amCommand =
- BootstrapTools.getStartCommand(commandTemplate, startCommandValues);
- amContainer.setCommands(Collections.singletonList(amCommand));
- LOG.debug("Application Master start command: " + amCommand);
- return amContainer;
- }
复制代码
四、JobManager启动过程
从官网上我们看到了jm的由三个紧张部分组成,我们从官网截图如下:
YarnApplicationClusterEntryPoint 就是我们的flink集群入口类,我们跟进 main 方法
ClusterEntrypoint:: runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) ->
ClusterEntrypoint:: startCluster() ->
ClusterEntrypoint:: runCluster(Configuration configuration, PluginManager pluginManager)
-> initializeServices(configuration, pluginManager);
初始化服务,jm的rpc服务,jmx服务,io线程,选举服务,blob服务共享jar使用,心跳服务,指标注册服务 看这里的方法从前不知道akka在哪儿使用,这下就明白了。
-> dispatcherResourceManagerComponentFactory.create( configuration, ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, executionGraphInfoStore, new RpcMetricQueryServiceRetriever( metricRegistry.getMetricQueryServiceRpcService()), this);
从这里代码不绝跟下去直到
- SessionDispatcherLeaderProcess::onStart()
复制代码
接着跟进createDispatcher方法中的create方法
到这里dispatcher启动告一段,dispatcher现实内里onstart方法还有更具体的代码,感爱好的可以跟进去看具体的详情。rm,webMonitor服务与此类似。
五、flink业务代码怎样被调用
上一节我们说到在创建 createDispatcher 中用到了 ApplicationDispatcherBootstrap 就是程序启动项。dispatcherGatewayServiceFactory.create(...)方法中新建ApplicationDispatcherBootstrap实例时构造函数,通过以下方法调用链
ApplicationDispatcherBootstrap::fixJobIdAndRunApplicationAsync(...)
-> runApplicationAsync(...)
-> runApplicationEntryPoint(...)
-> runApplicationEntryPoint(...)
->
ClientUtils.executeProgram(...)
->
->PackagedProgram::invokeInteractiveModeForExecution()
-> callMainMethod(mainClass, args)
->
Method::invoke(null, (Object) args)
颠末这一些步骤,最后通过反射调用我们自己写的flink业务程序的main方法从而开始执行代码。
总结
很多细节自己还是没有把握,以是后期还是会把细节的东西研究清楚一下,比方说:选举过程细节。后期也会针对选举这一块整理一篇文章出来。。
研究良好的源代码也是发现自己不敷的地方,很多时候写着写着发现了自己那边不懂,就去查资料研究,一层一层的,感觉太多东西需要学习了。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |