写过一篇 发表于 2024-8-5 19:36:46

Flink Job 执行流程

Flink On Yarn 模式

https://i-blog.csdnimg.cn/blog_migrate/d4ef9f3ce4b89e5a9769b2f15dfe754a.png

基于Yarn层面的架构雷同 Spark on Yarn模式,都是由Client提交App到RM上面去运行,然后 RM分配第一个container去运行AM,然后由AM去负责资源的监督和管理。必要分析的是,Flink的Yarn模式更加雷同Spark on Yarn的cluster模式,在cluster模式中,dirver将作为AM中的一个线程去运行。Flink on Yarn模式也是会将JobManager启动在container里面,去做个driver雷同的使命调度和分配,Yarn AM与Flink JobManager在同一个Container中,如许AM可以知道Flink JobManager的地点,从而AM可以申请Container去启动Flink TaskManager。待Flink乐成运行在Yarn集群上,Flink Yarn Client就可以提交Flink Job到Flink JobManager,并进行后续的映射、调度和计算处置惩罚。
Fink on Yarn 的缺陷

【1】资源分配是静态的,一个作业必要在启动时获取所需的资源而且在它的生命周期里一直持有这些资源。这导致了作业不能随负载变化而动态调解,在负载下降时无法归还空闲的资源,在负载上升时也无法动态扩展。
【2】On-Yarn模式下,所有的container都是固定大小的,导致无法根据作业需求来调解container的布局。譬如CPU密集的作业或必要更多的核,但不必要太多内存,固定布局的container会导致内存被浪费。
【3】与容器管理基础设施的交互比较笨拙,必要两个步骤来启动Flink作业:1.启动Flink守护历程;2.提交作业。如果作业被容器化而且将作业部署作为容器部署的一部分,那么将不再必要步骤2。
【4】On-Yarn模式下,作业管理页面会在作业完成后消失不可访问。
【5】Flink保举 per job clusters 的部署方式,但是又支持可以在一个集群上运行多个作业的session模式,令人迷惑。
在Flink版本1.5中引入了Dispatcher,Dispatcher是在新筹划里引入的一个新概念。Dispatcher会从Client端担当作业提交哀求并代表它在集群管理器上启动作业。引入Dispatcher的原因重要有两点:
【1】一些集群管理器必要一个中心化的作业生成和监控实例;
【2】能够实现Standalone模式下JobManager的角色,且等候作业提交。在一些案例中,Dispatcher是可选的Yarn或者不兼容的kubernetes。
资源调度模子重构下的 Flink On Yarn 模式

https://i-blog.csdnimg.cn/blog_migrate/e4310a12449222ce9386200250a8f57e.png
客户端提交JobGraph以及依赖jar包到YarnResourceManager,接着Yarn ResourceManager分配第一个container以此来启动AppMaster,Application Master中会启动一个FlinkResourceManager以及JobManager,JobManager会根据JobGraph生成的ExecutionGraph以及物理执行筹划向FlinkResourceManager申请slot,FlinkResoourceManager会管理这些slot以及哀求,如果没有可用slot就向Yarn的ResourceManager申请container,container启动以后会注册到FlinkResourceManager,末了JobManager会将subTask deploy到对应container的 slot中去。
https://i-blog.csdnimg.cn/blog_migrate/629ecd28d169d738cfa4ea5af17e3ba2.png
在有Dispatcher的模式下:会增长一个过程,就是Client会直接通过HTTP Server的方式,然后用Dispatcher将这个使命提交到Yarn ResourceManager中。
新框架具有四大优势,详情如下:
【1】client直接在Yarn上启动作业,而不必要先启动一个集群然后再提交作业到集群。因此client再提交作业后可以马上返回。
【2】所有的用户依赖库和配置文件都被直接放在应用的classpath,而不是用动态的用户代码classloader去加载。
【3】container在必要时才哀求,不再使用时会被开释。
【4】“必要时申请”的container分配方式允许差别算子使用差别profile (CPU和内存布局)的container。
新的资源调度框架下 single cluster job on Yarn 流程先容

https://i-blog.csdnimg.cn/blog_migrate/6152ebdc82a23f90a0fe70ede968fabf.png
single cluster job on Yarn模式涉及三个实例对象:
【1】clifrontend: Invoke App code;生成StreamGraph,然后转化为JobGraph;
【2】YarnJobClusterEntrypoint(Master): 依次启动YarnResourceManager、MinDispatcher、JobManagerRunner三者都服从分布式协同同等的策略;JobManagerRunner将JobGraph转化为ExecutionGraph,然后转化为物理执行使命Execution,然后进行deploy,deploy过程会向 YarnResourceManager哀求slot,如果有直接deploy到对应的YarnTaskExecutiontor的slot里面,没有则向Yarn的ResourceManager申请,带container启动以后deploy。
【3】YarnTaskExecutorRunner (slave): 负责吸收subTask,并运行。
整个使命运行代码调用流程如下图
https://i-blog.csdnimg.cn/blog_migrate/c652ddec5ddb977e7116788ed3d4147f.png
subTask在执行时是怎么运行的?
调用StreamTask的invoke方法,执行步骤如下:
【1】initializeState()即operator的initializeState();
【2】openAllOperators()即operator的open()方法;
【3】末了调用run方法来进行真正的使命处置惩罚;
我们来看下flatMap对应的OneInputStreamTask的run方法具体是怎么处置惩罚的。
@Override
protected void run() throws Exception {
    // 在堆栈上缓存处理器引用,使代码更易于JIT
    final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

    while (running && inputProcessor.processInput()) {
      // 所有的工作都发生在“processInput”方法中
    }
}
最终是调用StreamInputProcessor的processInput()做数据的处置惩罚,这里面包含用户的处置惩罚逻辑。
public boolean processInput() throws Exception {
    if (isFinished) {
      return false;
    }
    if (numRecordsIn == null) {
      try {
            numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
      } catch (Exception e) {
            LOG.warn("An exception occurred during the metrics setup.", e);
         numRecordsIn = new SimpleCounter();
       }
   }
   while (true) {
       if (currentRecordDeserializer != null) {
         DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
         if (result.isBufferConsumed()) {
               currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
               currentRecordDeserializer = null;
         }
         if (result.isFullRecord()) {
               StreamElement recordOrMark = deserializationDelegate.getInstance();
               //处理watermark
               if (recordOrMark.isWatermark()) {
                   // handle watermark
                   //watermark处理逻辑,这里可能引起timer的trigger
                   statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                   continue;
               } else if (recordOrMark.isStreamStatus()) {
                   // handle stream status
                   statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                   continue;
                   //处理latency watermark
               } else if (recordOrMark.isLatencyMarker()) {
                   // handle latency marker
                   synchronized (lock) {
                     streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                   }
                   continue;
               } else {
                   //用户的真正的代码逻辑
                   // now we can do the actual processing
                   StreamRecord<IN> record = recordOrMark.asRecord();
                   synchronized (lock) {
                     numRecordsIn.inc();
                     streamOperator.setKeyContextElement1(record);
                     //处理数据
                     streamOperator.processElement(record);
                   }
                   return true;
               }
         }
       }
            
       //这里会进行checkpoint barrier的判断和对齐,以及不同partition 里面checkpoint barrier不一致时候的,数据buffer,
       final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
       if (bufferOrEvent != null) {
         if (bufferOrEvent.isBuffer()) {
               currentChannel = bufferOrEvent.getChannelIndex();
               currentRecordDeserializer = recordDeserializers;
               currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
         }
         else {
               // Event received
               final AbstractEvent event = bufferOrEvent.getEvent();
               if (event.getClass() != EndOfPartitionEvent.class) {
                   throw new IOException("Unexpected event: " + event);
               }
         }
       }
       else {
         isFinished = true;
         if (!barrierHandler.isEmpty()) {
               throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
         }
         return false;
       }
   }
}
streamOperator.processElement(record)最终会调用用户的代码处置惩罚逻辑,假如operator是StreamFlatMap的话。
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
    collector.setTimestamp(element);
    userFunction.flatMap(element.getValue(), collector);//用户代码
}

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Flink Job 执行流程