Flink源码剖析之:如何根据JobGraph生成ExecutionGraph

打印 上一主题 下一主题

主题 844|帖子 844|积分 2532

Flink源码剖析之:如何根据JobGraph生成ExecutionGraph

在上一篇Flink源码剖析中,我们介绍了Flink如何根据StreamGraph生成JobGraph的流程,并着重分析了其算子链的归并过程和JobGraph的构造流程。
对于StreamGraph和JobGraph的生成来说,其都是在客户端生成的,本文将会报告JobGraph到ExecutionGraph的生成过程,而这一过程会在Flink JobManager的服务端来完成。当JobGraph从客户端提交到JobManager后,JobManager会根据JobGraph生成对应的ExecutionGraph,而ExecutionGraph就是Flink作业调度时使用的焦点数据结构。 本篇将会详细介绍JobGraph转换为ExecutionGraph的流程。
主体流程梳理

Flink在将JobGraph转换成ExecutionGraph后,便可以开始实行真正的任务。这一转换流程主要在Flink源码中的DefaultExecutionGraphBuilder类中的buildGraph方法中实现的。在转换过程中,涉及到了一些新的根本概念,先来简朴介绍一下这些概念,对于明白ExecutionGraph有较大的资助:


  • ExecutionJobVertex: 在ExecutionGraph中表示实行顶点,与JobGraph中的JobVertex逐一对应。实际上,每个ExecutionJobVertex也是依赖JobVertex来创建的。
  • ExecutionVertex: 在ExecutionJobVertex类中创建,每个并发度都对应了一个ExecutionVertex对象,每个ExecutionVertex都代表JobVertex在某个特定并行子任务中的实行。在实际实行时,每个ExecutionVertex实际上就是一个Task,是ExecutionJobVertex并行实行的一个子任务。
  • Execution: Execution表示ExecutionVertex的一次实行。由于ExecutionVertex可以被实行多次(用于恢复、重新计算、重新分配),这个类用于跟踪该ExecutionVertex的单个实行状态和资源。
  • IntermediateResult: 在JobGraph中用IntermediateDataSet表示上游JobVertex的输出数据流,而在ExecutionGraph中,则用IntermediateResult来表示ExecutionJobVertex的输出数据流。
  • IntermediateResultPartition:这是IntermediateResult的一部分或一个分片。由于有多个并行任务(ExecutionVertex)实行雷同的操作,每个任务都会产生一部分IntermediateResult。这些结果在物理存储和计算过程中,大概会被进一步划分成多个分区,每个分区对应一个 IntermediateResultPartition对象。
从上面的根本概念也可以看出,在ExecutionGraph中:


  • 相比StreamGraph和JobGraph,ExecutionGraph是实际根据任务并行度来生成拓扑结构的,在ExecutionGraph中,每个并行子任务都对应一个ExecutionVertex顶点和IntermediateResultPartition输出数据流分区。
  • 在ExecutionGraph中,上下游节点之间的连接是通过ExecutionVertex -> IntermediateResultPartition -> ExecutionVertex 对象来完成的。
整体的实行流程图如下所示:

入口方法:DefaultExecutionGraphBuilder.buildGraph

ExecutionGraph的生成是在DefaultExecutionGraphBuilder类的buildGraph方法中实现的:
  1. public class DefaultExecutionGraphBuilder {
  2.     public static DefaultExecutionGraph buildGraph(
  3.             JobGraph jobGraph,
  4.             Configuration jobManagerConfig,
  5.             ScheduledExecutorService futureExecutor,
  6.             Executor ioExecutor,
  7.             ClassLoader classLoader,
  8.             CompletedCheckpointStore completedCheckpointStore,
  9.             CheckpointsCleaner checkpointsCleaner,
  10.             CheckpointIDCounter checkpointIdCounter,
  11.             Time rpcTimeout,
  12.             MetricGroup metrics,
  13.             BlobWriter blobWriter,
  14.             Logger log,
  15.             ShuffleMaster<?> shuffleMaster,
  16.             JobMasterPartitionTracker partitionTracker,
  17.             TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
  18.             ExecutionDeploymentListener executionDeploymentListener,
  19.             ExecutionStateUpdateListener executionStateUpdateListener,
  20.             long initializationTimestamp,
  21.             VertexAttemptNumberStore vertexAttemptNumberStore,
  22.             VertexParallelismStore vertexParallelismStore)
  23.             throws JobExecutionException, JobException {
  24.         checkNotNull(jobGraph, "job graph cannot be null");
  25.         final String jobName = jobGraph.getName();
  26.         final JobID jobId = jobGraph.getJobID();
  27.                 // 创建JobInformation
  28.         final JobInformation jobInformation =
  29.                 new JobInformation(
  30.                         jobId,
  31.                         jobName,
  32.                         jobGraph.getSerializedExecutionConfig(),
  33.                         jobGraph.getJobConfiguration(),
  34.                         jobGraph.getUserJarBlobKeys(),
  35.                         jobGraph.getClasspaths());
  36.                
  37.                 // Execution 保留的最大历史数
  38.         final int maxPriorAttemptsHistoryLength =
  39.                 jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
  40.                 // IntermediateResultPartitions的释放策略
  41.         final PartitionGroupReleaseStrategy.Factory partitionGroupReleaseStrategyFactory =
  42.                 PartitionGroupReleaseStrategyFactoryLoader.loadPartitionGroupReleaseStrategyFactory(
  43.                         jobManagerConfig);
  44.         // create a new execution graph, if none exists so far
  45.         final DefaultExecutionGraph executionGraph;
  46.         try {
  47.                         // 创建默认的ExecutionGraph执行图对象,最后会返回该创建好的执行图对象
  48.             executionGraph =
  49.                     new DefaultExecutionGraph(
  50.                             jobInformation,
  51.                             futureExecutor,
  52.                             ioExecutor,
  53.                             rpcTimeout,
  54.                             maxPriorAttemptsHistoryLength,
  55.                             classLoader,
  56.                             blobWriter,
  57.                             partitionGroupReleaseStrategyFactory,
  58.                             shuffleMaster,
  59.                             partitionTracker,
  60.                             partitionLocationConstraint,
  61.                             executionDeploymentListener,
  62.                             executionStateUpdateListener,
  63.                             initializationTimestamp,
  64.                             vertexAttemptNumberStore,
  65.                             vertexParallelismStore);
  66.         } catch (IOException e) {
  67.             throw new JobException("Could not create the ExecutionGraph.", e);
  68.         }
  69.         // set the basic properties
  70.         try {
  71.             executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
  72.         } catch (Throwable t) {
  73.             log.warn("Cannot create JSON plan for job", t);
  74.             // give the graph an empty plan
  75.             executionGraph.setJsonPlan("{}");
  76.         }
  77.         // initialize the vertices that have a master initialization hook
  78.         // file output formats create directories here, input formats create splits
  79.         final long initMasterStart = System.nanoTime();
  80.         log.info("Running initialization on master for job {} ({}).", jobName, jobId);
  81.         for (JobVertex vertex : jobGraph.getVertices()) {
  82.             String executableClass = vertex.getInvokableClassName();
  83.             if (executableClass == null || executableClass.isEmpty()) {
  84.                 throw new JobSubmissionException(
  85.                         jobId,
  86.                         "The vertex "
  87.                                 + vertex.getID()
  88.                                 + " ("
  89.                                 + vertex.getName()
  90.                                 + ") has no invokable class.");
  91.             }
  92.             try {
  93.                 vertex.initializeOnMaster(classLoader);
  94.             } catch (Throwable t) {
  95.                 throw new JobExecutionException(
  96.                         jobId,
  97.                         "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),
  98.                         t);
  99.             }
  100.         }
  101.         log.info(
  102.                 "Successfully ran initialization on master in {} ms.",
  103.                 (System.nanoTime() - initMasterStart) / 1_000_000);
  104.         // topologically sort the job vertices and attach the graph to the existing one
  105.                 // 这里会先做一个排序,source源节点会放在最前面,接着开始遍历
  106.                 // 必须保证当前添加到集合的节点的前置节点都已经添加进去了
  107.         List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
  108.         if (log.isDebugEnabled()) {
  109.             log.debug(
  110.                     "Adding {} vertices from job graph {} ({}).",
  111.                     sortedTopology.size(),
  112.                     jobName,
  113.                     jobId);
  114.         }
  115.                 // 构建执行图的重点方法。生成具体的ExecutionGraph
  116.         executionGraph.attachJobGraph(sortedTopology);
  117.         if (log.isDebugEnabled()) {
  118.             log.debug(
  119.                     "Successfully created execution graph from job graph {} ({}).", jobName, jobId);
  120.         }
  121.         // configure the state checkpointing
  122.                 // checkpoint的相关配置
  123.         if (isCheckpointingEnabled(jobGraph)) {
  124.             JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
  125.             // Maximum number of remembered checkpoints
  126.             int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
  127.             CheckpointStatsTracker checkpointStatsTracker =
  128.                     new CheckpointStatsTracker(
  129.                             historySize,
  130.                             snapshotSettings.getCheckpointCoordinatorConfiguration(),
  131.                             metrics);
  132.             // load the state backend from the application settings
  133.             final StateBackend applicationConfiguredBackend;
  134.             final SerializedValue<StateBackend> serializedAppConfigured =
  135.                     snapshotSettings.getDefaultStateBackend();
  136.             if (serializedAppConfigured == null) {
  137.                 applicationConfiguredBackend = null;
  138.             } else {
  139.                 try {
  140.                     applicationConfiguredBackend =
  141.                             serializedAppConfigured.deserializeValue(classLoader);
  142.                 } catch (IOException | ClassNotFoundException e) {
  143.                     throw new JobExecutionException(
  144.                             jobId, "Could not deserialize application-defined state backend.", e);
  145.                 }
  146.             }
  147.                         // StateBackend配置
  148.             final StateBackend rootBackend;
  149.             try {
  150.                 rootBackend =
  151.                         StateBackendLoader.fromApplicationOrConfigOrDefault(
  152.                                 applicationConfiguredBackend,
  153.                                 snapshotSettings.isChangelogStateBackendEnabled(),
  154.                                 jobManagerConfig,
  155.                                 classLoader,
  156.                                 log);
  157.             } catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
  158.                 throw new JobExecutionException(
  159.                         jobId, "Could not instantiate configured state backend", e);
  160.             }
  161.             // load the checkpoint storage from the application settings
  162.             final CheckpointStorage applicationConfiguredStorage;
  163.             final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
  164.                     snapshotSettings.getDefaultCheckpointStorage();
  165.             if (serializedAppConfiguredStorage == null) {
  166.                 applicationConfiguredStorage = null;
  167.             } else {
  168.                 try {
  169.                     applicationConfiguredStorage =
  170.                             serializedAppConfiguredStorage.deserializeValue(classLoader);
  171.                 } catch (IOException | ClassNotFoundException e) {
  172.                     throw new JobExecutionException(
  173.                             jobId,
  174.                             "Could not deserialize application-defined checkpoint storage.",
  175.                             e);
  176.                 }
  177.             }
  178.             final CheckpointStorage rootStorage;
  179.             try {
  180.                 rootStorage =
  181.                         CheckpointStorageLoader.load(
  182.                                 applicationConfiguredStorage,
  183.                                 null,
  184.                                 rootBackend,
  185.                                 jobManagerConfig,
  186.                                 classLoader,
  187.                                 log);
  188.             } catch (IllegalConfigurationException | DynamicCodeLoadingException e) {
  189.                 throw new JobExecutionException(
  190.                         jobId, "Could not instantiate configured checkpoint storage", e);
  191.             }
  192.             // instantiate the user-defined checkpoint hooks
  193.                         // 示例化用户自定义的cp hook
  194.             final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
  195.                     snapshotSettings.getMasterHooks();
  196.             final List<MasterTriggerRestoreHook<?>> hooks;
  197.             if (serializedHooks == null) {
  198.                 hooks = Collections.emptyList();
  199.             } else {
  200.                 final MasterTriggerRestoreHook.Factory[] hookFactories;
  201.                 try {
  202.                     hookFactories = serializedHooks.deserializeValue(classLoader);
  203.                 } catch (IOException | ClassNotFoundException e) {
  204.                     throw new JobExecutionException(
  205.                             jobId, "Could not instantiate user-defined checkpoint hooks", e);
  206.                 }
  207.                 final Thread thread = Thread.currentThread();
  208.                 final ClassLoader originalClassLoader = thread.getContextClassLoader();
  209.                 thread.setContextClassLoader(classLoader);
  210.                 try {
  211.                     hooks = new ArrayList<>(hookFactories.length);
  212.                     for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
  213.                         hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
  214.                     }
  215.                 } finally {
  216.                     thread.setContextClassLoader(originalClassLoader);
  217.                 }
  218.             }
  219.             final CheckpointCoordinatorConfiguration chkConfig =
  220.                     snapshotSettings.getCheckpointCoordinatorConfiguration();
  221.                         // 创建CheckpointCoordinator对象
  222.             executionGraph.enableCheckpointing(
  223.                     chkConfig,
  224.                     hooks,
  225.                     checkpointIdCounter,
  226.                     completedCheckpointStore,
  227.                     rootBackend,
  228.                     rootStorage,
  229.                     checkpointStatsTracker,
  230.                     checkpointsCleaner);
  231.         }
  232.         // create all the metrics for the Execution Graph
  233.                 // 添加metrics指标
  234.         metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
  235.         metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
  236.         metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
  237.         return executionGraph;
  238.     }
复制代码
在这个方法里,会先创建一个 ExecutionGraph 对象,然后对 JobGraph 中的 JobVertex 列表做一下排序(先把有 source 节点的 JobVertex 放在最前面,然后开始遍历,只有当前 JobVertex 的前置节点都已经添加到集合后才华把当前 JobVertex 节点添加到集合中),最后通过过 attachJobGraph() 方法生成详细的ExecutionGraph。
在上面的代码中,最需要焦点关注的方法是:executionGraph.attachJobGraph(sortedTopology);。该方法是创建ExecutionGraph的焦点方法,包括了创建上面我们说的各种ExecutionGraph中涉及的对象,以及连接它们来形成ExecutionGraph拓扑结构。
接下来我们进入该方法来一探究竟。
生成ExecutionGraph:attachJobGraph

先来看下attachJobGraph方法的实现:
  1. public void attachJobGraph(List<JobVertex> topologicallySorted) throws JobException {
  2.     assertRunningInJobMasterMainThread();
  3.     LOG.debug(
  4.             "Attaching {} topologically sorted vertices to existing job graph with {} "
  5.                     + "vertices and {} intermediate results.",
  6.             topologicallySorted.size(),
  7.             tasks.size(),
  8.             intermediateResults.size());
  9.     final long createTimestamp = System.currentTimeMillis();
  10.         // 遍历排序好的拓扑JobVertex
  11.     for (JobVertex jobVertex : topologicallySorted) {
  12.         if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
  13.             this.isStoppable = false;
  14.         }
  15.                 // 获取节点并行度信息
  16.         VertexParallelismInformation parallelismInfo =
  17.                 parallelismStore.getParallelismInfo(jobVertex.getID());
  18.         // create the execution job vertex and attach it to the graph
  19.                 // 创建ExecutionJobVertex
  20.         ExecutionJobVertex ejv =
  21.                 new ExecutionJobVertex(
  22.                         this,
  23.                         jobVertex,
  24.                         maxPriorAttemptsHistoryLength,
  25.                         rpcTimeout,
  26.                         createTimestamp,
  27.                         parallelismInfo,
  28.                         initialAttemptCounts.getAttemptCounts(jobVertex.getID()));
  29.                 // 重要方法!!!
  30.                 // 构建ExecutionGraph,连接上下游节点
  31.         ejv.connectToPredecessors(this.intermediateResults);
  32.         ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
  33.         if (previousTask != null) {
  34.             throw new JobException(
  35.                     String.format(
  36.                             "Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
  37.                             jobVertex.getID(), ejv, previousTask));
  38.         }
  39.                
  40.                 // 遍历ExecutionJobVertex的输出IntermediateResult
  41.         for (IntermediateResult res : ejv.getProducedDataSets()) {
  42.             IntermediateResult previousDataSet =
  43.                     this.intermediateResults.putIfAbsent(res.getId(), res);
  44.             if (previousDataSet != null) {
  45.                 throw new JobException(
  46.                         String.format(
  47.                                 "Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
  48.                                 res.getId(), res, previousDataSet));
  49.             }
  50.         }
  51.         this.verticesInCreationOrder.add(ejv);
  52.         this.numVerticesTotal += ejv.getParallelism();
  53.     }
  54.    
  55.         //将所有的执行顶点和结果分区注册到分布式资源管理系统中,以便能够进行分布式调度。
  56. registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);
  57.     // the topology assigning should happen before notifying new vertices to failoverStrategy
  58.         // 转换执行拓扑
  59.     executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);
  60.     partitionGroupReleaseStrategy =
  61.             
  62. // 创建部分组释放策略的方法,依赖于当前的调度的拓扑结构,这决定了当何时释放特定的中间数据结果所需的策略。
  63. partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
  64. }
复制代码
在上面attchGraph方法中,起首遍历输入的排序后的JobVertex列表,对每一个JobVertex:


  • 判断是否停止: 对于单个 JobVertex,如果它是一个输入顶点且不可停止,则整个 Job 不可停止。这在流处理任务中是常见的,一些输入数据源大概无法停止(如Kafka)。
  • 获取并行信息并创建实行的顶点: 根据JobVertex的ID,从parallelismStore中获取并行信息。利用这些信息创建ExecutionJobVertex实例,它代表运行在特定TaskManager上的taskId,可以是待调度、运行或已完成的。
  • 判断新添加的顶点是否已经存在: 如果试图添加一个已经存在的顶点,这意味着存在程序错误,因为每个JobVertex应当有唯一的ID。这将抛出异常。
  • 判断数据集是否已经存在: 同样。如果试图添加一个已经存在的IntermediateResult,这将抛出异常。
  • 添加实行顶点到创建顺序列表和增加总的顶点数量: 记载创建顶点的顺序能够确保在实行时能够按照精确的依赖关系举行。并同时更新总的顶点数量。
遍历完成后, 注册实行顶点和结果分区,将所有的实行顶点和结果分区注册到分布式资源管理体系中,以便能够举行分布式调度。
利用DefaultExecutionTopology工具类将ExecutionGraph转换为SchedulingTopology,如许便于任务调度器举行处理。
最后,调用partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology())根据当前的调度的拓扑结构来创建组开释策略,这决定了当何时开释特定的中心数据结果所需的策略。
上面流程中,最需要关注的方法就是new ExecutionJobVertex和ejv.connectToPredecessors(this.intermediateResults);
接下来,我们分别对其举行探究。
创建 ExecutionJobVertex 对象

进入到该方法的源码中:
  1. @VisibleForTesting
  2. public ExecutionJobVertex(
  3.         InternalExecutionGraphAccessor graph,
  4.         JobVertex jobVertex,
  5.         int maxPriorAttemptsHistoryLength,
  6.         Time timeout,
  7.         long createTimestamp,
  8.         VertexParallelismInformation parallelismInfo,
  9.         SubtaskAttemptNumberStore initialAttemptCounts)
  10.         throws JobException {
  11.     if (graph == null || jobVertex == null) {
  12.         throw new NullPointerException();
  13.     }
  14.     this.graph = graph;
  15.     this.jobVertex = jobVertex;
  16.     this.parallelismInfo = parallelismInfo;
  17.     // verify that our parallelism is not higher than the maximum parallelism
  18.     if (this.parallelismInfo.getParallelism() > this.parallelismInfo.getMaxParallelism()) {
  19.         throw new JobException(
  20.                 String.format(
  21.                         "Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",
  22.                         jobVertex.getName(),
  23.                         this.parallelismInfo.getParallelism(),
  24.                         this.parallelismInfo.getMaxParallelism()));
  25.     }
  26.     this.resourceProfile =
  27.             ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);
  28.     this.taskVertices = new ExecutionVertex[this.parallelismInfo.getParallelism()];
  29.     this.inputs = new ArrayList<>(jobVertex.getInputs().size());
  30.     // take the sharing group
  31.     this.slotSharingGroup = checkNotNull(jobVertex.getSlotSharingGroup());
  32.     this.coLocationGroup = jobVertex.getCoLocationGroup();
  33.     // create the intermediate results
  34.     this.producedDataSets =
  35.             new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
  36.     for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
  37.         final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
  38.         this.producedDataSets[i] =
  39.                 new IntermediateResult(
  40.                         result.getId(),
  41.                         this,
  42.                         this.parallelismInfo.getParallelism(),
  43.                         result.getResultType());
  44.     }
  45.     // create all task vertices
  46.     for (int i = 0; i < this.parallelismInfo.getParallelism(); i++) {
  47.         ExecutionVertex vertex =
  48.                 new ExecutionVertex(
  49.                         this,
  50.                         i,
  51.                         producedDataSets,
  52.                         timeout,
  53.                         createTimestamp,
  54.                         maxPriorAttemptsHistoryLength,
  55.                         initialAttemptCounts.getAttemptCount(i));
  56.         this.taskVertices[i] = vertex;
  57.     }
  58.     // sanity check for the double referencing between intermediate result partitions and
  59.     // execution vertices
  60.     for (IntermediateResult ir : this.producedDataSets) {
  61.         if (ir.getNumberOfAssignedPartitions() != this.parallelismInfo.getParallelism()) {
  62.             throw new RuntimeException(
  63.                     "The intermediate result's partitions were not correctly assigned.");
  64.         }
  65.     }
  66.     final List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders =
  67.             getJobVertex().getOperatorCoordinators();
  68.     if (coordinatorProviders.isEmpty()) {
  69.         this.operatorCoordinators = Collections.emptyList();
  70.     } else {
  71.         final ArrayList<OperatorCoordinatorHolder> coordinators =
  72.                 new ArrayList<>(coordinatorProviders.size());
  73.         try {
  74.             for (final SerializedValue<OperatorCoordinator.Provider> provider :
  75.                     coordinatorProviders) {
  76.                 coordinators.add(
  77.                         OperatorCoordinatorHolder.create(
  78.                                 provider, this, graph.getUserClassLoader()));
  79.             }
  80.         } catch (Exception | LinkageError e) {
  81.             IOUtils.closeAllQuietly(coordinators);
  82.             throw new JobException(
  83.                     "Cannot instantiate the coordinator for operator " + getName(), e);
  84.         }
  85.         this.operatorCoordinators = Collections.unmodifiableList(coordinators);
  86.     }
  87.     // set up the input splits, if the vertex has any
  88.     try {
  89.         @SuppressWarnings("unchecked")
  90.         InputSplitSource<InputSplit> splitSource =
  91.                 (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
  92.         if (splitSource != null) {
  93.             Thread currentThread = Thread.currentThread();
  94.             ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();
  95.             currentThread.setContextClassLoader(graph.getUserClassLoader());
  96.             try {
  97.                 inputSplits =
  98.                         splitSource.createInputSplits(this.parallelismInfo.getParallelism());
  99.                 if (inputSplits != null) {
  100.                     splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
  101.                 }
  102.             } finally {
  103.                 currentThread.setContextClassLoader(oldContextClassLoader);
  104.             }
  105.         } else {
  106.             inputSplits = null;
  107.         }
  108.     } catch (Throwable t) {
  109.         throw new JobException(
  110.                 "Creating the input splits caused an error: " + t.getMessage(), t);
  111.     }
  112. }
复制代码
在上面这段代码中,主要实现了ExecutionVertex的创建和IntermediateResult对象的创建:


  • 遍历当前JobVertex的输出IntermediateDataSet列表,并根据IntermediateDataSet来创建相应的IntermediateResult对象。每个IntermediateDataSet都会对应一个IntermediateResult。
  • 根据当前JobVertex的并发度,来创建雷同数量的ExecutionVertex对象,每个ExecutionVertex对象代表一个并行计算任务,在实际实行时就是一个Task任务。
创建ExecutionVertex对象

进一步地,我们观察创建ExecutionVertex对象的实现逻辑如下所示:
  1. public ExecutionVertex(
  2.         ExecutionJobVertex jobVertex,
  3.         int subTaskIndex,
  4.         IntermediateResult[] producedDataSets,
  5.         Time timeout,
  6.         long createTimestamp,
  7.         int maxPriorExecutionHistoryLength,
  8.         int initialAttemptCount) {
  9.     this.jobVertex = jobVertex;
  10.     this.subTaskIndex = subTaskIndex;
  11.     this.executionVertexId = new ExecutionVertexID(jobVertex.getJobVertexId(), subTaskIndex);
  12.     this.taskNameWithSubtask =
  13.             String.format(
  14.                     "%s (%d/%d)",
  15.                     jobVertex.getJobVertex().getName(),
  16.                     subTaskIndex + 1,
  17.                     jobVertex.getParallelism());
  18.     this.resultPartitions = new LinkedHashMap<>(producedDataSets.length, 1);
  19.         // 根据IntermediateResult创建当前subTaskIndex分区下的IntermediateResultPartiton
  20.     for (IntermediateResult result : producedDataSets) {
  21.         IntermediateResultPartition irp =
  22.                 new IntermediateResultPartition(
  23.                         result,
  24.                         this,
  25.                         subTaskIndex,
  26.                         getExecutionGraphAccessor().getEdgeManager());
  27.                 // 记录当前分区的irp到ir中
  28.         result.setPartition(subTaskIndex, irp);
  29.                
  30.                 // 记录分区ip与irp的对应关系
  31.         resultPartitions.put(irp.getPartitionId(), irp);
  32.     }
  33.     this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);
  34.         // 创建对应的Execution对象,初始化时initialAttempCount为0,如果后面重新调度这个task,它会自增加1
  35.     this.currentExecution =
  36.             new Execution(
  37.                     getExecutionGraphAccessor().getFutureExecutor(),
  38.                     this,
  39.                     initialAttemptCount,
  40.                     createTimestamp,
  41.                     timeout);
  42.     getExecutionGraphAccessor().registerExecution(currentExecution);
  43.     this.timeout = timeout;
  44.     this.inputSplits = new ArrayList<>();
  45. }
复制代码
上述创建ExecutionVertex的过程主要实现了以下步骤:

  • 生成中心结果分区IntermediateResultPartition
    中心结果分区代表一个并行任务产生的输出,同一并行任务大概会有多个输出(对应多个后续任务),也就是多个中心结果分区。


  • 基于 result,在相应的索引 subTaskIndex 上创建一个 IntermediateResultPartition 并给它赋值。IntermediateResultPartition 提供了并行任务的输出数据,对应于某个特定实行顶点 ExecutionVertex 的并行子任务。
  • 在创建过程中,需要使用 getExecutionGraphAccessor().getEdgeManager() 获取边管理器,边管理器是用于维护这个分区与其它 ExecutionVertex 之间的连接关系。
  • 记载这个 IntermediateResultPartition 到 result 中的相应索引位置,并在 resultPartitions 映射表中保存 IntermediateResultPartition。

  • 创建实行(Execution)对象:
    这一过程是基于 Execution 的构造函数引发的。它用于代表该 ExecutionVertex 在某一特定点时间的一次尝试实行。创建 Execution 实例后,会将其注册到实行图(ExecutionGraph)中,以便于后续调度和实行任务。
通过以上流程,生成了中心结果分区,映射了每一个分区和其对应的任务关系,并且创建了 Execution 对象用于管理并跟踪任务的实行状态。
在创建好ExecutionVertex和IntermediateResultPartition后,根据上面的流程图,就是考虑如何将它们举行连接生成ExecutionGraph了。
这部分的实现逻辑就在attachJobGraph方法的ejv.connectToPredecessors(this.intermediateResults);方法中实现的。
生成ExecutionGraph

同样地,我们进入源码来深入观察一下实现逻辑:
  1. public void connectToPredecessors(
  2.         Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets)
  3.         throws JobException {
  4.     List<JobEdge> inputs = jobVertex.getInputs();
  5.     if (LOG.isDebugEnabled()) {
  6.         LOG.debug(
  7.                 String.format(
  8.                         "Connecting ExecutionJobVertex %s (%s) to %d predecessors.",
  9.                         jobVertex.getID(), jobVertex.getName(), inputs.size()));
  10.     }
  11.     for (int num = 0; num < inputs.size(); num++) {
  12.         JobEdge edge = inputs.get(num);
  13.         if (LOG.isDebugEnabled()) {
  14.             if (edge.getSource() == null) {
  15.                 LOG.debug(
  16.                         String.format(
  17.                                 "Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
  18.                                 num,
  19.                                 jobVertex.getID(),
  20.                                 jobVertex.getName(),
  21.                                 edge.getSourceId()));
  22.             } else {
  23.                 LOG.debug(
  24.                         String.format(
  25.                                 "Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",
  26.                                 num,
  27.                                 jobVertex.getID(),
  28.                                 jobVertex.getName(),
  29.                                 edge.getSource().getProducer().getID(),
  30.                                 edge.getSource().getProducer().getName()));
  31.             }
  32.         }
  33.         // fetch the intermediate result via ID. if it does not exist, then it either has not
  34.         // been created, or the order
  35.         // in which this method is called for the job vertices is not a topological order
  36.         IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
  37.         if (ires == null) {
  38.             throw new JobException(
  39.                     "Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
  40.                             + edge.getSourceId());
  41.         }
  42.         this.inputs.add(ires);
  43.         EdgeManagerBuildUtil.connectVertexToResult(this, ires, edge.getDistributionPattern());
  44.     }
  45. }
复制代码
这段代码主要完成了将当前的ExecutionJobVertex与其前置任务(predecessors)连接的流程。传入的参数intermediateDatasets包罗了JobGraph中所有的中心计算结果,这些结果是由上游前置任务产生的。
需要注意的是,该过程要求连接操作的实行顺序应遵循任务的拓扑顺序。Flink的计算任务通常由多个阶段构成,每个阶段的输出是下一个阶段的输入,每个阶段(JobVertex)都处理一种类型的计算,例如map或reduce。
流程大致如下:


  • 获取输入: 起首获取jobVertex的输入,输入是JobEdge列表,每一条JobEdge都代表一个上游产生的中心数据集和连接上下游的方式(例如HASH, BROADCAST)。
  • 循环处理每个输入: 然后遍历这些inputs,对于每一条JobEdge:

    • 基于edge.getSourceId()从intermediateDatasets获取IntermediateResult,这是一个中心计算结果。
    • 查抄该中心结果是否存在,如果不存在,则表示这不是一个拓扑排序,因为预期的情况是当你尝试访问一个中心结果时,它应该已经被创建了。如果找不到,抛出一个异常。
    • 如果存在(没有异常被抛出),将找到的IntermediateResult添加到ExecutionJobVertex的inputs(List类型)中,如许当前任务就知道它的输入来自哪些中心结果。
    • 调用EdgeManagerBuildUtil.connectVertexToResult方法来建立当前ExecutionJobVertex与找到的IntermediateResult之间的连接。 EdgeManager是Flink中负责管理输入输出边的组件,它表现地记载了发送端的分区和吸收端的分区对应关系。

这个流程紧张的是建立了Job中每个任务的实行依赖关系,并明确了数据传输的方式,让任务在实行时清楚本身的输入来自哪里,当任务实行完成后,它产生的输出会通过何种方式被发送到哪些任务。
详细的连接方式,我们需要继承进入到EdgeManagerBuildUtil.connectVertexToResult方法中。其源码如下所示:
  1. /**
  2. * Calculate the connections between {@link ExecutionJobVertex} and {@link IntermediateResult} *
  3. * based on the {@link DistributionPattern}.
  4. *
  5. * @param vertex the downstream consumer {@link ExecutionJobVertex}
  6. * @param intermediateResult the upstream consumed {@link IntermediateResult}
  7. * @param distributionPattern the {@link DistributionPattern} of the edge that connects the
  8. *     upstream {@link IntermediateResult} and the downstream {@link IntermediateResult}
  9. */
  10. static void connectVertexToResult(
  11.         ExecutionJobVertex vertex,
  12.         IntermediateResult intermediateResult,
  13.         DistributionPattern distributionPattern) {
  14.     switch (distributionPattern) {
  15.                 // 点对点的连接方式
  16.         case POINTWISE:
  17.             connectPointwise(vertex.getTaskVertices(), intermediateResult);
  18.             break;
  19.                 // 全连接的方式
  20.         case ALL_TO_ALL:
  21.             connectAllToAll(vertex.getTaskVertices(), intermediateResult);
  22.             break;
  23.         default:
  24.             throw new IllegalArgumentException("Unrecognized distribution pattern.");
  25.     }
  26. }
复制代码
会根据DistributionPattern选择不同的连接方式,这里主要分两种情况,DistributionPattern是跟Partitioner的配置有关。
这里以POINTWISE的连接方式来举例,看一下其是如安在构造ExecutionGraph时连接上下游节点的。
  1. private static void connectPointwise(
  2.         ExecutionVertex[] taskVertices, IntermediateResult intermediateResult) {
  3.     final int sourceCount = intermediateResult.getPartitions().length;
  4.     final int targetCount = taskVertices.length;
  5.     if (sourceCount == targetCount) {
  6.         for (int i = 0; i < sourceCount; i++) {
  7.             ExecutionVertex executionVertex = taskVertices[i];
  8.             IntermediateResultPartition partition = intermediateResult.getPartitions()[i];
  9.             ConsumerVertexGroup consumerVertexGroup =
  10.                     ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());
  11.             partition.addConsumers(consumerVertexGroup);
  12.             ConsumedPartitionGroup consumedPartitionGroup =
  13.                     createAndRegisterConsumedPartitionGroupToEdgeManager(
  14.                             partition.getPartitionId(), intermediateResult);
  15.             executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);
  16.         }
  17.     } else if (sourceCount > targetCount) {
  18.         for (int index = 0; index < targetCount; index++) {
  19.             ExecutionVertex executionVertex = taskVertices[index];
  20.             ConsumerVertexGroup consumerVertexGroup =
  21.                     ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());
  22.             int start = index * sourceCount / targetCount;
  23.             int end = (index + 1) * sourceCount / targetCount;
  24.             List<IntermediateResultPartitionID> consumedPartitions =
  25.                     new ArrayList<>(end - start);
  26.             for (int i = start; i < end; i++) {
  27.                 IntermediateResultPartition partition = intermediateResult.getPartitions()[i];
  28.                 partition.addConsumers(consumerVertexGroup);
  29.                 consumedPartitions.add(partition.getPartitionId());
  30.             }
  31.             ConsumedPartitionGroup consumedPartitionGroup =
  32.                     createAndRegisterConsumedPartitionGroupToEdgeManager(
  33.                             consumedPartitions, intermediateResult);
  34.             executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);
  35.         }
  36.     } else {
  37.         for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {
  38.             IntermediateResultPartition partition =
  39.                     intermediateResult.getPartitions()[partitionNum];
  40.             ConsumedPartitionGroup consumedPartitionGroup =
  41.                     createAndRegisterConsumedPartitionGroupToEdgeManager(
  42.                             partition.getPartitionId(), intermediateResult);
  43.             int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;
  44.             int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;
  45.             List<ExecutionVertexID> consumers = new ArrayList<>(end - start);
  46.             for (int i = start; i < end; i++) {
  47.                 ExecutionVertex executionVertex = taskVertices[i];
  48.                 executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);
  49.                 consumers.add(executionVertex.getID());
  50.             }
  51.             ConsumerVertexGroup consumerVertexGroup =
  52.                     ConsumerVertexGroup.fromMultipleVertices(consumers);
  53.             partition.addConsumers(consumerVertexGroup);
  54.         }
  55.     }
  56. }
复制代码
上面这段代码的目标是通过“点对点”的方式(即每个生产者产生的数据只被一个消费者消费)建立任务节点(ExecutionVertex)与中心结果集(IntermediateResultPartition)之间的连接关系。
这个方法的逻辑主要是根据上游任务产生的IntermediateResultPartition的数量(源)和下游ExecutionVertex节点数量(目标)的比例关系,做不同的操作:


  • 源和目标数量相等:方法会将每个源中心结果分区与对应的下游ExecutionVertex节点连接。这种情况下,每个任务都完全独立,只会消费一个特定的上游中心结果分区
  • 源数量大于目标数量:源中心结果分区会被尽大概平均地分配给下游ExecutionVertex节点,即每个ExecutionVertex大概会消费多个源中心结果分区数据。
  • 源数量小于目标数量:每个源中心结果分区大概会被分配给多个下游ExecutionVertex节点消费,即多个ExecutionVertex节点大概消费同一个源中心结果分区数据。
⠀在实行连接的过程中,会创建ConsumerVertexGroup和ConsumedPartitionGroup:


  • ConsumerVertexGroup包罗一组吸收同一个中心结果分区(IntermediateResultPartition)的顶点集合。
  • ConsumedPartitionGroup包罗顶点要消费的一组中心结果分区。
⠀注意,当源数量小于目标数量时,会有多个任务消费同一个源数据,以是需要使用ConsumerVertexGroup.fromMultipleVertices(consumers)来创建ConsumerVertexGroup。
几种连接情况的示例图如下所示:

到这里,这个作业的 ExecutionGraph 就创建完成了,有了 ExecutionGraph,JobManager 才华对这个作业做相应的调度。
总结

本文详细介绍了JobGraph生成ExecutionGraph的流程,介绍了ExecutionJobVertex、ExecutionVertex、IntermediateResult、IntermediateResultPartition相干概念的原理和生成过程。最后我们介绍了Flink在生成ExecutionGraph时是如何实现IntermediateResultPartition和ExecutionVertex的连接的。
到这里,StreamGraph、JobGraph和Execution的生成过程,在近来的三篇文章中都已经详细解说完成了,当然除了我们介绍的内容外,另有更多的实现细节没有介绍,有兴趣的读者可以参考文本来阅读源码,以此加深本身的明白和对更多实现细节的挖掘。
最后,再对StreamGraph、JobGraph和ExecutionGraph做一个总结:


  • StreamGraph. StreamGraph 是表示 Flink 流计算的图模子,它是用户界说的计算逻辑的内部表示形式,是最原始的用户逻辑,一个没有做任何优化的DataFlow;
  • JobGraph. JobGraph 由一个或多个 JobVertex 对象和它们之间的 JobEdge 对象构成,包罗并行任务的信息。在JobGraph中对StreamGraph举行了优化,将能够归并在同个算子链中的操作符举行归并,以此减少任务实行时的上下文切换,提任务实行性能。
  • ExecutionGraph. ExecutionGraph是 JobGraph 的并发实行版本,由 ExecutionVertex 和 IntermediateResultPartition 构成。每个 JobVertex 会被转换为一个或多个 ExecutionVertex,ExecutorGraph 包罗了每个任务的全部实例,包罗它们的状态、位置、输入输出结果。ExecutionGraph 是 Flink 中最焦点的部分,它用于任务的调度、失败恢复等。
参考:
https://matt33.com/2019/12/20/flink-execution-graph-4/

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

卖不甜枣

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表