ToB企服应用市场:ToB评测及商务社交产业平台
标题:
flink源码跟读:on yarn application模式作业提交
[打印本页]
作者:
梦见你的名字
时间:
2024-10-12 14:07
标题:
flink源码跟读:on yarn application模式作业提交
目前在家待着,想着就看看flink源码吧,熟悉一下,学习一下输出一些博客。此文章是on yarn application模式代码提交过程。研究源码过程发现自己很多不敷之处,这篇文章也是履历了好多天才看了个大概,如果有问题希望大佬们指出,还请多多包涵!
一、Flink on yarn application模式终端命令提交
./bin/flink run-application -t yarn-application -Dparallelism.default=3 -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name=name -Dtaskmanager.numberOfTaskSlots=1 -c className xxx.jar
我们一般在客户端服务器上,直接通过flink的命令来提交如上的脚本。提交上去以后,代码是怎么走的?今天我们去看一下过程。
我们找到flink脚本,打开以后我们找到最后一条命令。通过flink的安装目录我们探求到bin目录下的flink或者我们直接打开源码在
flink-dist
模块可以找到 flink脚本。关键我们是要找到执行了哪个class,这个想必大家都知道就是
org.apache.flink.client.cli.CliFrontend
。
二、 App模式提交过程剖析
2.1 CliFrontend::main() -> runApplication(String[] args)
2.2 ApplicationClusterDeployer::run( final Configuration configuration, final ApplicationConfiguration applicationConfiguration) throws Exception { checkNotNull(configuration); checkNotNull(applicationConfiguration);
public <ClusterID> void run(
final Configuration configuration,
final ApplicationConfiguration applicationConfiguration)
throws Exception {
checkNotNull(configuration);
checkNotNull(applicationConfiguration);
LOG.info("Submitting application in 'Application Mode'.");
final ClusterClientFactory<ClusterID> clientFactory =
clientServiceLoader.getClusterClientFactory(configuration);
try (
//这一步进行了yarn的客户端初始化 yar信息初始化以及客户端启动,后期启动jm rpc要用到
final ClusterDescriptor<ClusterID> clusterDescriptor =
clientFactory.createClusterDescriptor(configuration) ) {
final ClusterSpecification clusterSpecification =
clientFactory.getClusterSpecification(configuration);
//2、yarn直接部署作业 进去
clusterDescriptor.deployApplicationCluster(
clusterSpecification, applicationConfiguration);
}
}
复制代码
2.3 YarnClusterDescriptor :: deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) ->
deployInternal( ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached)
startAppMaster( Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification)
在 startAppMaster 代码运行这里就算是把作业提交交给yarn处理了,yarnClient.submitApplication(appContext); 从这里代码跟进去我们发现源代码的路径已经不是flink的项目路径了。
三、提交rm完毕
以上就完成了提交到rm容器中了,但是flink集群还没有启动,也就是JobManager进程还没有启动,很多细节没有说到,只是看了关键的代码方法,也就是大体的一个轮廓。这样提交完了就可以运行了吗?我们的代码怎么办?其实在yarn初始化话,返回YarnClusterDescriptor 类的过程中,有很多初始化的信息,一个关键的部分就是提供运行我们flnk集群入口的地方。这里的 yarnClusterEntrypoint 类名就是我们的入口 org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint 留着我们下一篇文章再分析。
ContainerLaunchContext setupApplicationMasterContainer(
String yarnClusterEntrypoint, boolean hasKrb5, JobManagerProcessSpec processSpec) {
// ------------------ Prepare Application Master Container ------------------------------
// respect custom JVM options in the YAML file
String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
}
// krb5.conf file will be available as local resource in JM/TM container
if (hasKrb5) {
javaOpts += " -Djava.security.krb5.conf=krb5.conf";
}
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
final Map<String, String> startCommandValues = new HashMap<>();
startCommandValues.put("java", "$JAVA_HOME/bin/java");
String jvmHeapMem =
JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);
startCommandValues.put("jvmmem", jvmHeapMem);
startCommandValues.put("jvmopts", javaOpts);
startCommandValues.put(
"logging", YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));
startCommandValues.put("class", yarnClusterEntrypoint);
startCommandValues.put(
"redirects",
"1> "
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/jobmanager.out "
+ "2> "
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/jobmanager.err");
String dynamicParameterListStr =
JobManagerProcessUtils.generateDynamicConfigsStr(processSpec);
startCommandValues.put("args", dynamicParameterListStr);
final String commandTemplate =
flinkConfiguration.getString(
ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
final String amCommand =
BootstrapTools.getStartCommand(commandTemplate, startCommandValues);
amContainer.setCommands(Collections.singletonList(amCommand));
LOG.debug("Application Master start command: " + amCommand);
return amContainer;
}
复制代码
四、JobManager启动过程
从官网上我们看到了jm的由三个紧张部分组成,我们从官网截图如下:
YarnApplicationClusterEntryPoint 就是我们的flink集群入口类,我们跟进 main 方法
ClusterEntrypoint:: runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint)
->
ClusterEntrypoint:: startCluster()
->
ClusterEntrypoint:: runCluster(Configuration configuration, PluginManager pluginManager)
->
initializeServices(configuration, pluginManager);
初始化服务,jm的rpc服务,jmx服务,io线程,选举服务,blob服务共享jar使用,心跳服务,指标注册服务 看这里的方法从前不知道akka在哪儿使用,这下就明白了。
->
dispatcherResourceManagerComponentFactory.create( configuration, ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, executionGraphInfoStore, new RpcMetricQueryServiceRetriever( metricRegistry.getMetricQueryServiceRpcService()), this);
从这里代码不绝跟下去直到
SessionDispatcherLeaderProcess::onStart()
复制代码
接着跟进createDispatcher方法中的create方法
到这里dispatcher启动告一段,dispatcher现实内里onstart方法还有更具体的代码,感爱好的可以跟进去看具体的详情。rm,webMonitor服务与此类似。
五、flink业务代码怎样被调用
上一节我们说到在创建 createDispatcher 中用到了 ApplicationDispatcherBootstrap 就是程序启动项。dispatcherGatewayServiceFactory.create(...)方法中新建ApplicationDispatcherBootstrap实例时构造函数,通过以下方法调用链
ApplicationDispatcherBootstrap::fixJobIdAndRunApplicationAsync(...)
->
runApplicationAsync(...)
->
runApplicationEntryPoint(...)
->
runApplicationEntryPoint(...)
->
ClientUtils.executeProgram(...)
->
->
PackagedProgram::invokeInteractiveModeForExecution()
->
callMainMethod(mainClass, args)
->
Method::invoke(null, (Object) args)
颠末这一些步骤,最后通过反射调用我们自己写的flink业务程序的main方法从而开始执行代码。
总结
很多细节自己还是没有把握,以是后期还是会把细节的东西研究清楚一下,比方说:选举过程细节。后期也会针对选举这一块整理一篇文章出来。。
研究良好的源代码也是发现自己不敷的地方,很多时候写着写着发现了自己那边不懂,就去查资料研究,一层一层的,感觉太多东西需要学习了。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4