flink JobGraph解析

打印 上一主题 下一主题

主题 1010|帖子 1010|积分 3030

JobGraph组成


JobGraph主要是StreamGraph经过优化后天生的,主要优化的就是对符合条件节点举行chain,这样可以淘汰数据流动的序列化和传输。
JobGraph主要由三部分组成。


  • JobVertex:图的顶点。输入是一个JobEdge,输出是IntermediateDataSet。它可以对应多个StreamNode,将多个operator归并到一起。
  • IntermediateDataSet:中间效果集。是JobVertex处置惩罚后天生的效果集,为了方便卑鄙复用,producer 是 JobVertex ,consumer 是 JobEdge。
  • JobEdge:边。JobGraph的传输管道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex 。
JobVertex



  • operatorIDs:该 job 节点包罗的全部 operator ids,以深度优先方式存储 ids
  • results:job 节点盘算出的中间效果
  • inputs:输入数据的边列表

IntermediateDataSet



  • producer:生产者,JobVertex
  • consumers:消费边,可以对应多个,但是必须具有相同的分区器和并行性
  • resultType:运行时使用的分区类型

    • BLOCKING 阻塞,批处置惩罚模式
    • PIPELINED 管道非阻塞,流处置惩罚模式


JobEdge



  • target:edge的输出,JobVertex
  • source:edge的源,IntermediateDataSet
  • distributionPattern:决定了在上游节点(生产者)的子任务和卑鄙节点(消费者)之间的连接模式

    • ALL_TO_ALL:每个生产子任务都连接到消费任务的每个子任务
    • POINTWISE:每个生产子任务都连接到使用任务的一个或多个子任务


JobGraph天生

入口是在StreamingJobGraphGenerator的createJobGraph方法

createJobGraph过程比较多,重点是三步:

  • 为各个StreamNode天生hash值,这样在故障规复的时候可以识别
  • 天生JobVertex
  • 天生JobEdge、IntermediateDataSet

天生JobVertex(setChaining

从 Source StreamNode 实例开始设置 task chain,它将会递归地创建全部的 JobVertex 实例。
buildChainedInputsAndGetHeadInputs会得到chain的出发点集合,然后遍历举行createChain

buildChainedInputsAndGetHeadInputs
  1. private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(
  2.         final Map<Integer, byte[]> hashes, final List<Map<Integer, byte[]>> legacyHashes) {
  3.     // 可以chain的source,单独处理这种节点
  4.     final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();
  5.     // chain的起点(不能chain的souce节点、可以chain的souce节点的下一个节点)
  6.     final Map<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<>();
  7.     // 遍历streamGraph的所有source node
  8.     for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
  9.         final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId);
  10.         if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory
  11.                 && sourceNode.getOutEdges().size() == 1) {
  12.             // 要求source node的outEdge只有一个。有多个出边的source不能chain
  13.             // as long as only NAry ops support this chaining, we need to skip the other parts
  14.             final StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);
  15.             final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());
  16.             final ChainingStrategy targetChainingStrategy =
  17.                     target.getOperatorFactory().getChainingStrategy();
  18.             if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES
  19.                     && isChainableInput(sourceOutEdge, streamGraph)) {
  20.                 final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));
  21.                 final StreamConfig.SourceInputConfig inputConfig =
  22.                         new StreamConfig.SourceInputConfig(sourceOutEdge);
  23.                 final StreamConfig operatorConfig = new StreamConfig(new Configuration());
  24.                 setOperatorConfig(sourceNodeId, operatorConfig, Collections.emptyMap());
  25.                 setOperatorChainedOutputsConfig(operatorConfig, Collections.emptyList());
  26.                 // we cache the non-chainable outputs here, and set the non-chained config later
  27.                 opNonChainableOutputsCache.put(sourceNodeId, Collections.emptyList());
  28.                 // sources的index都是0
  29.                 operatorConfig.setChainIndex(0); // sources are always first
  30.                 operatorConfig.setOperatorID(opId);
  31.                 operatorConfig.setOperatorName(sourceNode.getOperatorName());
  32.                 chainedSources.put(
  33.                         sourceNodeId, new ChainedSourceInfo(operatorConfig, inputConfig));
  34.                 final SourceOperatorFactory<?> sourceOpFact =
  35.                         (SourceOperatorFactory<?>) sourceNode.getOperatorFactory();
  36.                 final OperatorCoordinator.Provider coord =
  37.                         sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);
  38.                 // chainEntryPoints中添加(targetNodeId, chainInfo)
  39.                 final OperatorChainInfo chainInfo =
  40.                         chainEntryPoints.computeIfAbsent(
  41.                                 sourceOutEdge.getTargetId(),
  42.                                 (k) ->
  43.                                         new OperatorChainInfo(
  44.                                                 sourceOutEdge.getTargetId(),
  45.                                                 hashes,
  46.                                                 legacyHashes,
  47.                                                 chainedSources,
  48.                                                 streamGraph));
  49.                 chainInfo.addCoordinatorProvider(coord);
  50.                 chainInfo.recordChainedNode(sourceNodeId);
  51.                 continue;
  52.             }
  53.         }
  54.         chainEntryPoints.put(
  55.                 sourceNodeId,
  56.                 new OperatorChainInfo(
  57.                         sourceNodeId, hashes, legacyHashes, chainedSources, streamGraph));
  58.     }
  59.     return chainEntryPoints;
  60. }
复制代码

createChain
在创建chain的过程中,一个chain完成后,在头结点创建一个JobVertex。
  1. private List<StreamEdge> createChain(
  2.         final Integer currentNodeId,
  3.         final int chainIndex,
  4.         final OperatorChainInfo chainInfo,
  5.         final Map<Integer, OperatorChainInfo> chainEntryPoints) {
  6.     Integer startNodeId = chainInfo.getStartNodeId();
  7.     if (!builtVertices.contains(startNodeId)) {
  8.         // transitiveOutEdges 过渡的出边集合,就是两个StreamNode不能再进行chain的那条边,用于生成JobEdge
  9.         List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
  10.         // chainableOutputs 两个StreamNode可以进行chain的出边集合
  11.         // nonChainableOutputs 两个StreamNode不能进行chain的出边
  12.         List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
  13.         List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
  14.         StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
  15.         for (StreamEdge outEdge : currentNode.getOutEdges()) {
  16.             if (isChainable(outEdge, streamGraph)) {
  17.                 chainableOutputs.add(outEdge);
  18.             } else {
  19.                 nonChainableOutputs.add(outEdge);
  20.             }
  21.         }
  22.         for (StreamEdge chainable : chainableOutputs) {
  23.             // 如果存在可以chain的边,那么就继续往这条边的target operator进行chain。
  24.             // transitiveOutEdges最终返回给首次调用栈的是不能再继续chain的那条边
  25.             transitiveOutEdges.addAll(
  26.                     createChain(
  27.                             chainable.getTargetId(),
  28.                             chainIndex + 1,
  29.                             chainInfo,
  30.                             chainEntryPoints));
  31.         }
  32.         for (StreamEdge nonChainable : nonChainableOutputs) {
  33.             //如果存在了不可chain的边,说明该边就是StreamNode chain之间的过渡边,添加到transitiveOutEdges中,
  34.             //继续对该边的target StreamNode进行新的createChain操作,意味着一个新的chain
  35.             transitiveOutEdges.add(nonChainable);
  36.             createChain(
  37.                     nonChainable.getTargetId(),
  38.                     1, // operators start at position 1 because 0 is for chained source inputs
  39.                     chainEntryPoints.computeIfAbsent(
  40.                             nonChainable.getTargetId(),
  41.                             (k) -> chainInfo.newChain(nonChainable.getTargetId())),
  42.                     chainEntryPoints);
  43.         }
  44.         chainedNames.put(
  45.                 currentNodeId,
  46.                 createChainedName(
  47.                         currentNodeId,
  48.                         chainableOutputs,
  49.                         Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
  50.         chainedMinResources.put(
  51.                 currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
  52.         chainedPreferredResources.put(
  53.                 currentNodeId,
  54.                 createChainedPreferredResources(currentNodeId, chainableOutputs));
  55.         // 添加当前的StreamNode到chain中
  56.         OperatorID currentOperatorId =
  57.                 chainInfo.addNodeToChain(
  58.                         currentNodeId,
  59.                         streamGraph.getStreamNode(currentNodeId).getOperatorName());
  60.         if (currentNode.getInputFormat() != null) {
  61.             getOrCreateFormatContainer(startNodeId)
  62.                     .addInputFormat(currentOperatorId, currentNode.getInputFormat());
  63.         }
  64.         if (currentNode.getOutputFormat() != null) {
  65.             getOrCreateFormatContainer(startNodeId)
  66.                     .addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
  67.         }
  68.         // chain的头结点创建JobVertex
  69.         StreamConfig config =
  70.                 currentNodeId.equals(startNodeId)
  71.                         ? createJobVertex(startNodeId, chainInfo)
  72.                         : new StreamConfig(new Configuration());
  73.         tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs);
  74.         setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());
  75.         setOperatorChainedOutputsConfig(config, chainableOutputs);
  76.         // we cache the non-chainable outputs here, and set the non-chained config later
  77.         // 缓存不能chain的出边集合
  78.         opNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);
  79.         if (currentNodeId.equals(startNodeId)) {
  80.             // 头结点
  81.             chainInfo.setTransitiveOutEdges(transitiveOutEdges);
  82.             chainInfos.put(startNodeId, chainInfo);
  83.             config.setChainStart();
  84.             config.setChainIndex(chainIndex);
  85.             config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
  86.             config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
  87.         } else {
  88.             chainedConfigs.computeIfAbsent(
  89.                     startNodeId, k -> new HashMap<Integer, StreamConfig>());
  90.             config.setChainIndex(chainIndex);
  91.             StreamNode node = streamGraph.getStreamNode(currentNodeId);
  92.             config.setOperatorName(node.getOperatorName());
  93.             chainedConfigs.get(startNodeId).put(currentNodeId, config);
  94.         }
  95.         config.setOperatorID(currentOperatorId);
  96.         if (chainableOutputs.isEmpty()) {
  97.             // chain尾节点
  98.             config.setChainEnd();
  99.         }
  100.         return transitiveOutEdges;
  101.     } else {
  102.         return new ArrayList<>();
  103.     }
  104. }
复制代码

判定是否chainable


  • 公用一个slotGroup
  • 上卑鄙operator可以chain
  • partitioner和exchangeMode可以chain(forward)
  • 并行度一样
  • 允许chain
  • 不能是联合操作

createJobVertex

  • 创建对应的operator集合
  • 创建JobVertex(InputOutputFormatVertex是一种特殊的 JobVertex,它用于处置惩罚输入输出格式相干的任务,比方读取和写入文件、数据库等)
  • 添加对应的上游数据集
  • 缓存JobVertex相干信息


天生JobEdge、IntermediateDataSet(setAllVertexNonChainedOutputsConfigs)

遍历jobVertices,调用connect连接起来。


connect
将两个JobVertex(headVertex、downStreamVertex)连接起来。关键方法是downStreamVertex.connectNewDataSetAsInput

connectNewDataSetAsInput
创建IntermediateDataSet和JobEdge,形成JobVertex->IntermediateDataSet->JobEdge->JobVertex


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

水军大提督

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表