本地环境提交flink on yarn作业

打印 上一主题 下一主题

主题 860|帖子 860|积分 2580

本地环境提交flink on yarn作业

在使用云厂商提供的flink job管理平台时,通过界面操作提交flink任务到yarn上非常方便,那么开发调试时能否在本地环境直接提交flink任务到yarn呢?
开源的flink管理平台 streampark 有提交flink on yarn作业的代码实现,可以参照 streampark 里对应模块的代码实现本地环境下的flink on yarn作业的提交。
其中 streampark-flink-client-core 作为提交flink job的焦点模块,这里我们只关心flink on yarn作业的提交。
flink job提交流程分析:

1、flink启动脚本

flink 1.14.4版本为例,flink安装目录的bin/目录下的 flink脚本 有详细的任务提交步调,其中最后一行为:
  1. # Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
  2. exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
复制代码
可知,flink任务仍以java命令方式提交,程序入口为:org.apache.flink.client.cli.CliFrontend
2、启动程序main方法

根据参数提交作业,CliFrontend#main方法执行流程如下:
  1. // 1. find the configuration directory - 获取配置文件目录
  2. final String configurationDirectory = getConfigurationDirectoryFromEnv();
  3. // 2. load the global configuration - 加载配置参数
  4. final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
  5. // 3. load the custom command lines - 加载命令行参数
  6. final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(configuration, configurationDirectory);
  7. // 4. 创建CliFrontend的对象并调用CliFrontend#parseAndRun方法
  8. final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
  9. SecurityUtils.install(new SecurityConfiguration(cli.configuration));
  10. retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
复制代码
3、CliFrontend#parseAndRun方法

CliFrontend#parseAndRun方法主要代码如下:
  1. // check for action
  2. ...
  3. // get action
  4. String action = args[0];
  5. // remove action from parameters
  6. final String[] params = Arrays.copyOfRange(args, 1, args.length);
  7. try {
  8.     // do action
  9.     switch (action) {
  10.         case ACTION_RUN: // String ACTION_RUN = "run";
  11.             run(params);
  12.             return 0;
  13.         case ACTION_RUN_APPLICATION: // String ACTION_RUN_APPLICATION = "run-application";
  14.             runApplication(params);
  15.             return 0;
  16.         case ACTION_LIST: // String ACTION_INFO = "list";
  17.             list(params);
  18.             return 0;
  19.         case ACTION_INFO: // String ACTION_LIST = "info";
  20.             info(params);
  21.             return 0;
  22.         ... 省略后续步骤
复制代码
可以看到CliFrontend#parseAndRun方法通过获取命令行的第一个参数,匹配并运行指定方法。
假设我们以run-application命令启动程序,则调用CliFrontend#runApplication方法,进入该方法。
4、CliFrontend#runApplication方法

直接展示关键代码,CliFrontend#runApplication方法通过传入clusterClientServiceLoader参数来创建一个ApplicationDeployer对象,然后调用该对象的ApplicationDeployer#run方法,执行完成结束调用。
  1. final ApplicationDeployer deployer = new ApplicationClusterDeployer(clusterClientServiceLoader);
  2. ...
  3. deployer.run(effectiveConfiguration, applicationConfiguration);
复制代码
那么application命令提交任务到yarn的详细实现就在这里了,点击run方法并进入详细实现方法ApplicationClusterDeployer#run。
  1. public <ClusterID> void run(
  2.     final Configuration configuration,
  3.     final ApplicationConfiguration applicationConfiguration)
  4.     throws Exception {
  5.     checkNotNull(configuration);
  6.     checkNotNull(applicationConfiguration);
  7.     LOG.info("Submitting application in 'Application Mode'.");
  8.    
  9.     final ClusterClientFactory<ClusterID> clientFactory =
  10.         clientServiceLoader.getClusterClientFactory(configuration);
  11.     try (final ClusterDescriptor<ClusterID> clusterDescriptor =
  12.          clientFactory.createClusterDescriptor(configuration)) {
  13.         final ClusterSpecification clusterSpecification =
  14.             clientFactory.getClusterSpecification(configuration);
  15.         // 提交flink on yarn任务
  16.         clusterDescriptor.deployApplicationCluster(
  17.             clusterSpecification, applicationConfiguration);
  18.     }
  19. }
复制代码
代码实现流程:

  • 通过clientServiceLoader生成ClusterClientFactory客户端工厂;
  • 通过ClusterClientFactory对象创建 ClusterDescriptor 集群描述符 及 ClusterSpecification 集群规格对象;
  • ClusterDescriptor#deployApplicationCluster为提交任务方法,跳转到详细实现方法YarnClusterDescriptor#deployApplicationCluster;
  • 进入后我们看到deployApplicationCluster方法及下面的deployJobCluster方法,二者都调用了YarnClusterDescriptor#deployInternal方法,以完成flink on yran任务提交;
  • 通过参数描述也可以看出deployApplicationCluster对应的是application提交模式,deployJobCluster对应的是per-job提交模式;

**总结:**通过对run方法的梳理,可以确定step2是我们提交任务所需要创建的对象,YarnClusterDescriptor#deployInternal方法是实现提交需要调用方法;
5、YarnClusterDescriptor#deployInternal方法

进入该方法,前面是一些参数校检及认证操作,然后通过 yarnClient 创建一个YarnClientApplication(这里的yarnClient在哪里生成?先不管,背面再看),背面进入startAppMaster方法,传入flinkConfiguration、yarnClient、yarnApplication等参数,这里应该会进行yarn任务的提交。
  1. // Create application via yarnClient
  2. final YarnClientApplication yarnApplication = yarnClient.createApplication();
  3. ...
  4.    
  5. ApplicationReport report =
  6.                 startAppMaster(
  7.                         flinkConfiguration,
  8.                         applicationName,
  9.                         yarnClusterEntrypoint,
  10.                         jobGraph,
  11.                         yarnClient,
  12.                         yarnApplication,
  13.                         validClusterSpecification);
复制代码
进入YarnClusterDescriptor#startAppMaster方法,方法实现较长,这里含糊搜索’submit’关键词,定位到 yarnClient.submitApplication 方法执行任务的提交,就此flink on yarn任务正式开始提交到集群中。
  1. LOG.info("Submitting application master " + appId);
  2. yarnClient.submitApplication(appContext);
复制代码
6、yarnClient对象在哪里生成?

梳理提交流程后,已知入口程序CliFrontend#main方法会加载flink的FLINK_CONF_DIR设置文件目录并装载设置参数,而我在FLINK_CONF_DIR目录的设置文件中并没有找到对应的yarn参数设置,那么flink如何和yarn建立起联系呢?
仍需要从ApplicationClusterDeployer#run方法中创建的对象入手:


  • clientServiceLoader是由DefaultClusterClientServiceLoader生成,使用SPI机制动态加载其ClusterClientFactory客户端工厂;
  • clientFactory.createClusterDescriptor(configuration) 生成ClusterDescriptor集群描述符对象,调用的方法为 YarnClusterClientFactory#createClusterDescriptor,方法代码如下:
    1. public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
    2.     checkNotNull(configuration);
    3.     // 读取flink configuration的CONF_DIR获取配置文件目录
    4.     final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
    5.     // 设置日志相关参数,如未设置日志参数,默认配置文件目录下的"log4j.properties"或"logback.xml"文件路径为该参数值
    6.     YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
    7.     // 获取YarnClient
    8.     return getClusterDescriptor(configuration);
    9. }
    10. private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
    11.     // 创建YarnClient对象
    12.     final YarnClient yarnClient = YarnClient.createYarnClient();
    13.     // 获取yarn集群配置
    14.     final YarnConfiguration yarnConfiguration = Utils.getYarnAndHadoopConfiguration(configuration);
    15.     // 根据yarnConfiguration配置,初始化yarn客户端并启动建立连接
    16.     yarnClient.init(yarnConfiguration);
    17.     yarnClient.start();
    18.     // 创建并返回ClusterDescriptor集群描述符对象
    19.     return new YarnClusterDescriptor(
    20.         configuration,
    21.         yarnConfiguration,
    22.         yarnClient,
    23.         YarnClientYarnClusterInformationRetriever.create(yarnClient),
    24.         false);
    25. }
    复制代码
    点击进入Utils#getYarnAndHadoopConfiguration方法获取yarn集群设置,代码逻辑如下:
    1. public static YarnConfiguration getYarnAndHadoopConfiguration(
    2.     org.apache.flink.configuration.Configuration flinkConfig) {
    3.     // 从flink configuration配置中获取对应的yarn配置参数(如未设置相关yarn参数,则不添加)
    4.     final YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
    5.     // 获取系统环境变量"HADOOP_HOME"、"HADOOP_CONF_DIR",读取环境变量下的配置文件目录并添加到yarnConfig中
    6.     yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));
    7.     return yarnConfig;
    8. }
    复制代码
    HadoopUtils#getHadoopConfiguration方法部分截图:

由此确定flink与yarn集群建立毗连,需要在提交flink任务的体系上设置hadoop环境变量,这与安装flink时需要设置环境变量完成与YARN集群的对接操作描述一致。
java实现flink on yarn作业的提交

实现思路:

由上分析可知,提交flink job需要flink设置文件、hadoop环境变量,在本地环境下需要在项目中添加 flink-conf.yaml 设置文件,没有设置hadoop环境变量的话,可以自行添加 core-site.xml、hdfs-site.xml、yarn-site.xml 设置文件到项目指定路径中并创建YarnClient对象,或手动设置参数创建YarnClient对象。
剩下的就是将 streampark 的streampark-flink-client-core模块下的flink on yarn提交任务代码提取出来,通过阅读代码发现提交flink任务还需要flink-dist_*.jar文件,这是flink任务提交到yarn的条件条件之一。
实现流程:

创建自定义的任务提交客户端,通过HdfsUtils将任务jar包及依赖lib/路径上传至指定hdfs文件目录中,调用提交客户端的 doSubmit方法 提交任务到yarn集群,doCancel方法 取消正在运行的flink任务。
Windows体系提交任务失败的问题办理

由于windows体系和linux体系下是差别的路径分隔符,导致windows下的本地环境提交flink on yarn作业失败(windows下找不到主类:YarnJobClusterEntrypoint)。
需在项目下创建 org.apache.flink.yarn 包路径,复制并修改 Utils 和 YarnClusterDescriptor 类文件,在启动时覆盖源码类加载执行,可以办理windows下提交任务失败的问题。

————————————————
[FLINK-17858] Yarn mode, windows and linux environment should be interlinked - ASF JIRA
   项目仓库地址:Cyanty/flink-job-publish-demo

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

金歌

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表