| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 
 | private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
 
 for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
 createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
 }
 }
 
 
 
 private List<StreamEdge> createChain(
 Integer startNodeId,
 Integer currentNodeId,
 Map<Integer, byte[]> hashes,
 List<Map<Integer, byte[]>> legacyHashes,
 int chainIndex,
 Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
 
 if (!builtVertices.contains(startNodeId)) {
 
 
 
 List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
 
 List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
 List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
 
 StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
 
 
 for (StreamEdge outEdge : currentNode.getOutEdges()) {
 if (isChainable(outEdge, streamGraph)) {
 chainableOutputs.add(outEdge);
 } else {
 nonChainableOutputs.add(outEdge);
 }
 }
 
 
 for (StreamEdge chainable : chainableOutputs) {
 transitiveOutEdges.addAll(
 createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
 }
 
 for (StreamEdge nonChainable : nonChainableOutputs) {
 transitiveOutEdges.add(nonChainable);
 createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
 }
 
 List<Tuple2<byte[], byte[]>> operatorHashes =
 chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
 
 byte[] primaryHashBytes = hashes.get(currentNodeId);
 OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
 
 for (Map<Integer, byte[]> legacyHash : legacyHashes) {
 operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
 }
 
 
 chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
 chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
 chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
 
 if (currentNode.getInputFormat() != null) {
 getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
 }
 
 if (currentNode.getOutputFormat() != null) {
 getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
 }
 
 
 
 
 StreamConfig config = currentNodeId.equals(startNodeId)
 ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
 : new StreamConfig(new Configuration());
 
 
 
 setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
 
 if (currentNodeId.equals(startNodeId)) {
 
 config.setChainStart();
 config.setChainIndex(0);
 
 config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
 config.setOutEdgesInOrder(transitiveOutEdges);
 config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
 
 
 for (StreamEdge edge : transitiveOutEdges) {
 
 connect(startNodeId, edge);
 }
 
 
 config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
 
 } else {
 
 chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
 
 config.setChainIndex(chainIndex);
 StreamNode node = streamGraph.getStreamNode(currentNodeId);
 config.setOperatorName(node.getOperatorName());
 
 chainedConfigs.get(startNodeId).put(currentNodeId, config);
 }
 
 config.setOperatorID(currentOperatorId);
 
 if (chainableOutputs.isEmpty()) {
 config.setChainEnd();
 }
 return transitiveOutEdges;
 
 } else {
 return new ArrayList<>();
 }
 }
 
 |