1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| 在StreamGraphGenerator的generate方法中,对所有的Transformation进行转换
private <T> Collection<Integer> transformSource(SourceTransformation<T> source) { String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());
streamGraph.addSource(source.getId(), slotSharingGroup, source.getCoLocationGroupKey(), source.getOperatorFactory(), null, source.getOutputType(), "Source: " + source.getName()); if (source.getOperatorFactory() instanceof InputFormatOperatorFactory) { streamGraph.setInputFormat(source.getId(), ((InputFormatOperatorFactory<T>) source.getOperatorFactory()).getInputFormat()); } int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? source.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism(source.getId(), parallelism); streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism()); return Collections.singleton(source.getId()); }
public <IN, OUT> void addSource(Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) { addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName); sources.add(vertexID); }
public <IN, OUT> void addOperator( Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
if (operatorFactory.isStreamSource()) { addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName); } else { addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName); }
TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;
TypeSerializer<OUT> outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;
setSerializers(vertexID, inSerializer, null, outSerializer);
if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) { operatorFactory.setOutputType(outTypeInfo, executionConfig); }
if (operatorFactory.isInputTypeConfigurable()) { operatorFactory.setInputType(inTypeInfo, executionConfig); }
if (LOG.isDebugEnabled()) { LOG.debug("Vertex: {}", vertexID); } }
protected StreamNode addNode(Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, Class<? extends AbstractInvokable> vertexClass, StreamOperatorFactory<?> operatorFactory, String operatorName) { if (streamNodes.containsKey(vertexID)) { throw new RuntimeException("Duplicate vertexID " + vertexID); } StreamNode vertex = new StreamNode( vertexID, slotSharingGroup, coLocationGroup, operatorFactory, operatorName, new ArrayList<OutputSelector<?>>(), vertexClass); streamNodes.put(vertexID, vertex); return vertex; }
|