JobGraph组成
JobGraph主要是StreamGraph经过优化后天生的,主要优化的就是对符合条件节点举行chain,这样可以淘汰数据流动的序列化和传输。
JobGraph主要由三部分组成。
- JobVertex:图的顶点。输入是一个JobEdge,输出是IntermediateDataSet。它可以对应多个StreamNode,将多个operator归并到一起。
- IntermediateDataSet:中间效果集。是JobVertex处置惩罚后天生的效果集,为了方便卑鄙复用,producer 是 JobVertex ,consumer 是 JobEdge。
- JobEdge:边。JobGraph的传输管道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex 。
JobVertex
- operatorIDs:该 job 节点包罗的全部 operator ids,以深度优先方式存储 ids
- results:job 节点盘算出的中间效果
- inputs:输入数据的边列表
IntermediateDataSet
- producer:生产者,JobVertex
- consumers:消费边,可以对应多个,但是必须具有相同的分区器和并行性
- resultType:运行时使用的分区类型
- BLOCKING 阻塞,批处置惩罚模式
- PIPELINED 管道非阻塞,流处置惩罚模式
JobEdge
- target:edge的输出,JobVertex
- source:edge的源,IntermediateDataSet
- distributionPattern:决定了在上游节点(生产者)的子任务和卑鄙节点(消费者)之间的连接模式
- ALL_TO_ALL:每个生产子任务都连接到消费任务的每个子任务
- POINTWISE:每个生产子任务都连接到使用任务的一个或多个子任务
JobGraph天生
入口是在StreamingJobGraphGenerator的createJobGraph方法
createJobGraph过程比较多,重点是三步:
- 为各个StreamNode天生hash值,这样在故障规复的时候可以识别
- 天生JobVertex
- 天生JobEdge、IntermediateDataSet
天生JobVertex(setChaining)
从 Source StreamNode 实例开始设置 task chain,它将会递归地创建全部的 JobVertex 实例。
buildChainedInputsAndGetHeadInputs会得到chain的出发点集合,然后遍历举行createChain

buildChainedInputsAndGetHeadInputs
- private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(
- final Map<Integer, byte[]> hashes, final List<Map<Integer, byte[]>> legacyHashes) {
- // 可以chain的source,单独处理这种节点
- final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();
- // chain的起点(不能chain的souce节点、可以chain的souce节点的下一个节点)
- final Map<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<>();
- // 遍历streamGraph的所有source node
- for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
- final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId);
- if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory
- && sourceNode.getOutEdges().size() == 1) {
- // 要求source node的outEdge只有一个。有多个出边的source不能chain
- // as long as only NAry ops support this chaining, we need to skip the other parts
- final StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);
- final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());
- final ChainingStrategy targetChainingStrategy =
- target.getOperatorFactory().getChainingStrategy();
- if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES
- && isChainableInput(sourceOutEdge, streamGraph)) {
- final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));
- final StreamConfig.SourceInputConfig inputConfig =
- new StreamConfig.SourceInputConfig(sourceOutEdge);
- final StreamConfig operatorConfig = new StreamConfig(new Configuration());
- setOperatorConfig(sourceNodeId, operatorConfig, Collections.emptyMap());
- setOperatorChainedOutputsConfig(operatorConfig, Collections.emptyList());
- // we cache the non-chainable outputs here, and set the non-chained config later
- opNonChainableOutputsCache.put(sourceNodeId, Collections.emptyList());
- // sources的index都是0
- operatorConfig.setChainIndex(0); // sources are always first
- operatorConfig.setOperatorID(opId);
- operatorConfig.setOperatorName(sourceNode.getOperatorName());
- chainedSources.put(
- sourceNodeId, new ChainedSourceInfo(operatorConfig, inputConfig));
- final SourceOperatorFactory<?> sourceOpFact =
- (SourceOperatorFactory<?>) sourceNode.getOperatorFactory();
- final OperatorCoordinator.Provider coord =
- sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);
- // chainEntryPoints中添加(targetNodeId, chainInfo)
- final OperatorChainInfo chainInfo =
- chainEntryPoints.computeIfAbsent(
- sourceOutEdge.getTargetId(),
- (k) ->
- new OperatorChainInfo(
- sourceOutEdge.getTargetId(),
- hashes,
- legacyHashes,
- chainedSources,
- streamGraph));
- chainInfo.addCoordinatorProvider(coord);
- chainInfo.recordChainedNode(sourceNodeId);
- continue;
- }
- }
- chainEntryPoints.put(
- sourceNodeId,
- new OperatorChainInfo(
- sourceNodeId, hashes, legacyHashes, chainedSources, streamGraph));
- }
- return chainEntryPoints;
- }
复制代码
createChain
在创建chain的过程中,一个chain完成后,在头结点创建一个JobVertex。
- private List<StreamEdge> createChain(
- final Integer currentNodeId,
- final int chainIndex,
- final OperatorChainInfo chainInfo,
- final Map<Integer, OperatorChainInfo> chainEntryPoints) {
- Integer startNodeId = chainInfo.getStartNodeId();
- if (!builtVertices.contains(startNodeId)) {
- // transitiveOutEdges 过渡的出边集合,就是两个StreamNode不能再进行chain的那条边,用于生成JobEdge
- List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
- // chainableOutputs 两个StreamNode可以进行chain的出边集合
- // nonChainableOutputs 两个StreamNode不能进行chain的出边
- List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
- List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
- StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
- for (StreamEdge outEdge : currentNode.getOutEdges()) {
- if (isChainable(outEdge, streamGraph)) {
- chainableOutputs.add(outEdge);
- } else {
- nonChainableOutputs.add(outEdge);
- }
- }
- for (StreamEdge chainable : chainableOutputs) {
- // 如果存在可以chain的边,那么就继续往这条边的target operator进行chain。
- // transitiveOutEdges最终返回给首次调用栈的是不能再继续chain的那条边
- transitiveOutEdges.addAll(
- createChain(
- chainable.getTargetId(),
- chainIndex + 1,
- chainInfo,
- chainEntryPoints));
- }
- for (StreamEdge nonChainable : nonChainableOutputs) {
- //如果存在了不可chain的边,说明该边就是StreamNode chain之间的过渡边,添加到transitiveOutEdges中,
- //继续对该边的target StreamNode进行新的createChain操作,意味着一个新的chain
- transitiveOutEdges.add(nonChainable);
- createChain(
- nonChainable.getTargetId(),
- 1, // operators start at position 1 because 0 is for chained source inputs
- chainEntryPoints.computeIfAbsent(
- nonChainable.getTargetId(),
- (k) -> chainInfo.newChain(nonChainable.getTargetId())),
- chainEntryPoints);
- }
- chainedNames.put(
- currentNodeId,
- createChainedName(
- currentNodeId,
- chainableOutputs,
- Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
- chainedMinResources.put(
- currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
- chainedPreferredResources.put(
- currentNodeId,
- createChainedPreferredResources(currentNodeId, chainableOutputs));
- // 添加当前的StreamNode到chain中
- OperatorID currentOperatorId =
- chainInfo.addNodeToChain(
- currentNodeId,
- streamGraph.getStreamNode(currentNodeId).getOperatorName());
- if (currentNode.getInputFormat() != null) {
- getOrCreateFormatContainer(startNodeId)
- .addInputFormat(currentOperatorId, currentNode.getInputFormat());
- }
- if (currentNode.getOutputFormat() != null) {
- getOrCreateFormatContainer(startNodeId)
- .addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
- }
- // chain的头结点创建JobVertex
- StreamConfig config =
- currentNodeId.equals(startNodeId)
- ? createJobVertex(startNodeId, chainInfo)
- : new StreamConfig(new Configuration());
- tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs);
- setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());
- setOperatorChainedOutputsConfig(config, chainableOutputs);
- // we cache the non-chainable outputs here, and set the non-chained config later
- // 缓存不能chain的出边集合
- opNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);
- if (currentNodeId.equals(startNodeId)) {
- // 头结点
- chainInfo.setTransitiveOutEdges(transitiveOutEdges);
- chainInfos.put(startNodeId, chainInfo);
- config.setChainStart();
- config.setChainIndex(chainIndex);
- config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
- config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
- } else {
- chainedConfigs.computeIfAbsent(
- startNodeId, k -> new HashMap<Integer, StreamConfig>());
- config.setChainIndex(chainIndex);
- StreamNode node = streamGraph.getStreamNode(currentNodeId);
- config.setOperatorName(node.getOperatorName());
- chainedConfigs.get(startNodeId).put(currentNodeId, config);
- }
- config.setOperatorID(currentOperatorId);
- if (chainableOutputs.isEmpty()) {
- // chain尾节点
- config.setChainEnd();
- }
- return transitiveOutEdges;
- } else {
- return new ArrayList<>();
- }
- }
复制代码
判定是否chainable
- 公用一个slotGroup
- 上卑鄙operator可以chain
- partitioner和exchangeMode可以chain(forward)
- 并行度一样
- 允许chain
- 不能是联合操作
createJobVertex
- 创建对应的operator集合
- 创建JobVertex(InputOutputFormatVertex是一种特殊的 JobVertex,它用于处置惩罚输入输出格式相干的任务,比方读取和写入文件、数据库等)
- 添加对应的上游数据集
- 缓存JobVertex相干信息
天生JobEdge、IntermediateDataSet(setAllVertexNonChainedOutputsConfigs)
遍历jobVertices,调用connect连接起来。
connect
将两个JobVertex(headVertex、downStreamVertex)连接起来。关键方法是downStreamVertex.connectNewDataSetAsInput
connectNewDataSetAsInput
创建IntermediateDataSet和JobEdge,形成JobVertex->IntermediateDataSet->JobEdge->JobVertex
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |