Flink源码解析之四ExecutionGraph生成

介绍JobGraph被提交之后,JobManager如何接收到该请求,以及如何生成ExecutionGraph

JobManager的启动

JobSubmitHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
此处理程序可用于将作业提交到Flink集群
处理请求,调用submitJob,启动JobManagerRunner
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
// 从request中获取上传的文件
final Collection<File> uploadedFiles = request.getUploadedFiles();
// 获取<名称,文件路径>
final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
File::getName,
Path::fromLocalFile
));

// 如果上传文件数量与名称对应数量不匹配报错
if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
nameToFile.size(),
uploadedFiles.size()),
HttpResponseStatus.BAD_REQUEST
);
}

// 获取请求体
final JobSubmitRequestBody requestBody = request.getRequestBody();

if (requestBody.jobGraphFileName == null) {
throw new RestHandlerException(
String.format("The %s field must not be omitted or be null.",
JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
HttpResponseStatus.BAD_REQUEST);
}

// 加载JobGraph
CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);

// 获取Jar
Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);

// 获取artifacts
Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);

// 向BLOBServer上传信息
CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);

// Ack通信,向服务端提交任务
CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));

return jobSubmissionFuture.thenCombine(jobGraphFuture,
(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
}

// 接下来依旧是Dispatcher中submitJob函数调用逻辑
// 详情可参考上一篇

JobManagerRunnerImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
了解一下JobManagerRunnerImpl内做了什么
public JobManagerRunnerImpl(
final JobGraph jobGraph,
final JobMasterServiceFactory jobMasterFactory,
final HighAvailabilityServices haServices,
final LibraryCacheManager libraryCacheManager,
final Executor executor,
final FatalErrorHandler fatalErrorHandler) throws Exception {

this.resultFuture = new CompletableFuture<>();
this.terminationFuture = new CompletableFuture<>();
this.leadershipOperation = CompletableFuture.completedFuture(null);

// make sure we cleanly shut down out JobManager services if initialization fails
try {
this.jobGraph = checkNotNull(jobGraph);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.executor = checkNotNull(executor);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);

checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");

// libraries and class loader first
try {
// 向BlobLibraryCacheManager注册Job
libraryCacheManager.registerJob(
jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
} catch (IOException e) {
throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
}

// 获取类加载器
final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());
if (userCodeLoader == null) {
throw new Exception("The user code class loader could not be initialized.");
}

// high availability services next
this.runningJobsRegistry = haServices.getRunningJobsRegistry();
// 为Job获取Leader选举服务
this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());

this.leaderGatewayFuture = new CompletableFuture<>();

// now start the JobManager
// 启动JobMaster
this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
}
catch (Throwable t) {
terminationFuture.completeExceptionally(t);
resultFuture.completeExceptionally(t);

throw new JobExecutionException(jobGraph.getJobID(), "Could not set up JobManager", t);
}
}

ExecutionGroup的生成

JobMaster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 入口DefaultJobMasterServiceFactory.createJobMasterService
@Override
public JobMaster createJobMasterService(
JobGraph jobGraph,
OnCompletionActions jobCompletionActions,
ClassLoader userCodeClassloader) throws Exception {

return new JobMaster(
rpcService,
jobMasterConfiguration,
ResourceID.generate(),
jobGraph,
haServices,
slotPoolFactory,
schedulerFactory,
jobManagerSharedServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
jobCompletionActions,
fatalErrorHandler,
userCodeClassloader,
schedulerNGFactory,
shuffleMaster,
lookup -> new JobMasterPartitionTrackerImpl(
jobGraph.getJobID(),
shuffleMaster,
lookup
));
}

// JobMaster.createScheduler
private SchedulerNG createScheduler(final JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
// 创建SchedulerNG
return schedulerNGFactory.createInstance(
log,
jobGraph,
backPressureStatsTracker,
scheduledExecutorService,
jobMasterConfiguration.getConfiguration(),
scheduler,
scheduledExecutorService,
userCodeLoader,
highAvailabilityServices.getCheckpointRecoveryFactory(),
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
jobMasterConfiguration.getSlotRequestTimeout(),
shuffleMaster,
partitionTracker);
}

SchedulerNGFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
SchedulerNGFactory
DefaultSchedulerFactory
LegacySchedulerFactory

// 分析DefaultSchedulerFactory
@Override
public SchedulerNG createInstance(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
final SlotProvider slotProvider,
final ScheduledExecutorService futureExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final Time slotRequestTimeout,
final ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker) throws Exception {

// 根据JobGraph不同的调度模式获取SchedulingStrategyFactory
// LAZY_FROM_SOURCES批处理
// EAGER流处理
final SchedulingStrategyFactory schedulingStrategyFactory = createSchedulingStrategyFactory(jobGraph.getScheduleMode());
// 重启策略
final RestartBackoffTimeStrategy restartBackoffTimeStrategy = RestartBackoffTimeStrategyFactoryLoader
.createRestartBackoffTimeStrategyFactory(
jobGraph
.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy(),
jobMasterConfiguration,
jobGraph.isCheckpointingEnabled())
.create();
log.info("Using restart back off time strategy {} for {} ({}).", restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID());

// Slot分配策略
final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
jobGraph.getScheduleMode(),
slotProvider,
slotRequestTimeout);

// 返回DefaultScheduler
return new DefaultScheduler(
log,
jobGraph,
backPressureStatsTracker,
ioExecutor,
jobMasterConfiguration,
slotProvider,
futureExecutor,
new ScheduledExecutorServiceAdapter(futureExecutor),
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
partitionTracker,
schedulingStrategyFactory,
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
restartBackoffTimeStrategy,
new DefaultExecutionVertexOperations(),
new ExecutionVertexVersioner(),
new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
}

DefaultScheduler->SchedulerBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
这里主要是分析其父类SchedulerBase

this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));

// 创建,恢复ExecutionGraph
private ExecutionGraph createAndRestoreExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker) throws Exception {
// 创建ExecutionGraph
ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);

final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();

if (checkpointCoordinator != null) {
// check whether we find a valid checkpoint
if (!checkpointCoordinator.restoreLatestCheckpointedState(
new HashSet<>(newExecutionGraph.getAllVertices().values()),
false,
false)) {

// check whether we can restore from a savepoint
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
}
}

return newExecutionGraph;
}

private ExecutionGraph createExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker) throws JobExecutionException, JobException {

// 失败策略
final FailoverStrategy.Factory failoverStrategy = legacyScheduling ?
FailoverStrategyLoader.loadFailoverStrategy(jobMasterConfiguration, log) :
new NoOpFailoverStrategy.Factory();

// buildGraph初始化ExecutionGraphBuilder
return ExecutionGraphBuilder.buildGraph(
null,
jobGraph,
jobMasterConfiguration,
futureExecutor,
ioExecutor,
slotProvider,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
restartStrategy,
currentJobManagerJobMetricGroup,
blobWriter,
slotRequestTimeout,
log,
shuffleMaster,
partitionTracker,
failoverStrategy);
}

ExecutionGraphBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
// 将JobGraph构建成ExecutionGraph
public static ExecutionGraph buildGraph(
@Nullable ExecutionGraph prior,
JobGraph jobGraph,
Configuration jobManagerConfig,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
SlotProvider slotProvider,
ClassLoader classLoader,
CheckpointRecoveryFactory recoveryFactory,
Time rpcTimeout,
RestartStrategy restartStrategy,
MetricGroup metrics,
BlobWriter blobWriter,
Time allocationTimeout,
Logger log,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
FailoverStrategy.Factory failoverStrategyFactory) throws JobExecutionException, JobException {

checkNotNull(jobGraph, "job graph cannot be null");

final String jobName = jobGraph.getName();
final JobID jobId = jobGraph.getJobID();

// Job信息
final JobInformation jobInformation = new JobInformation(
jobId,
jobName,
jobGraph.getSerializedExecutionConfig(),
jobGraph.getJobConfiguration(),
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());

// 历史记录中保留的先前执行尝试的最大次数。
final int maxPriorAttemptsHistoryLength =
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);

// 中间结果分区
final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);

// create a new execution graph, if none exists so far
final ExecutionGraph executionGraph;
try {
// 创建新的ExecutionGraph
executionGraph = (prior != null) ? prior :
new ExecutionGraph(
jobInformation,
futureExecutor,
ioExecutor,
rpcTimeout,
restartStrategy,
maxPriorAttemptsHistoryLength,
failoverStrategyFactory,
slotProvider,
classLoader,
blobWriter,
allocationTimeout,
partitionReleaseStrategyFactory,
shuffleMaster,
partitionTracker,
jobGraph.getScheduleMode());
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}

// set the basic properties

try {
// 解析JobGraph生成Json形式的执行计划
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
}
catch (Throwable t) {
log.warn("Cannot create JSON plan for job", t);
// give the graph an empty plan
executionGraph.setJsonPlan("{}");
}

// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits

final long initMasterStart = System.nanoTime();
log.info("Running initialization on master for job {} ({}).", jobName, jobId);

// 获取JobVertex,并在Master初始化
for (JobVertex vertex : jobGraph.getVertices()) {
String executableClass = vertex.getInvokableClassName();
if (executableClass == null || executableClass.isEmpty()) {
throw new JobSubmissionException(jobId,
"The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
}

try {
vertex.initializeOnMaster(classLoader);
}
catch (Throwable t) {
throw new JobExecutionException(jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
}
}

log.info("Successfully ran initialization on master in {} ms.",
(System.nanoTime() - initMasterStart) / 1_000_000);

// topologically sort the job vertices and attach the graph to the existing one
// 获取从Source开始排序完成的DAG拓扑
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
}
// 将JobGraph转化为ExecutionGraph操作
executionGraph.attachJobGraph(sortedTopology);

if (log.isDebugEnabled()) {
log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);
}

// configure the state checkpointing
// 配置CK
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
if (snapshotSettings != null) {
List<ExecutionJobVertex> triggerVertices =
idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);

List<ExecutionJobVertex> ackVertices =
idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);

List<ExecutionJobVertex> confirmVertices =
idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);

CompletedCheckpointStore completedCheckpoints;
CheckpointIDCounter checkpointIdCounter;
try {
int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);

if (maxNumberOfCheckpointsToRetain <= 0) {
// warning and use 1 as the default value if the setting in
// state.checkpoints.max-retained-checkpoints is not greater than 0.
log.warn("The setting for '{} : {}' is invalid. Using default value of {}",
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
maxNumberOfCheckpointsToRetain,
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());

maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
}

completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
}
catch (Exception e) {
throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);
}

// Maximum number of remembered checkpoints
int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);

CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
historySize,
ackVertices,
snapshotSettings.getCheckpointCoordinatorConfiguration(),
metrics);

// load the state backend from the application settings
final StateBackend applicationConfiguredBackend;
final SerializedValue<StateBackend> serializedAppConfigured = snapshotSettings.getDefaultStateBackend();

if (serializedAppConfigured == null) {
applicationConfiguredBackend = null;
}
else {
try {
applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(jobId,
"Could not deserialize application-defined state backend.", e);
}
}

final StateBackend rootBackend;
try {
rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(
applicationConfiguredBackend, jobManagerConfig, classLoader, log);
}
catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
}

// instantiate the user-defined checkpoint hooks

final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = snapshotSettings.getMasterHooks();
final List<MasterTriggerRestoreHook<?>> hooks;

if (serializedHooks == null) {
hooks = Collections.emptyList();
}
else {
final MasterTriggerRestoreHook.Factory[] hookFactories;
try {
hookFactories = serializedHooks.deserializeValue(classLoader);
}
catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
}

final Thread thread = Thread.currentThread();
final ClassLoader originalClassLoader = thread.getContextClassLoader();
thread.setContextClassLoader(classLoader);

try {
hooks = new ArrayList<>(hookFactories.length);
for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
}
}
finally {
thread.setContextClassLoader(originalClassLoader);
}
}

final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();

executionGraph.enableCheckpointing(
chkConfig,
triggerVertices,
ackVertices,
confirmVertices,
hooks,
checkpointIdCounter,
completedCheckpoints,
rootBackend,
checkpointStatsTracker);
}

// create all the metrics for the Execution Graph
// 创建监控指标
metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
// 注册监控
executionGraph.getFailoverStrategy().registerMetrics(metrics);

return executionGraph;
}

public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {

assertRunningInJobMasterMainThread();

LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
"vertices and {} intermediate results.",
topologiallySorted.size(),
tasks.size(),
intermediateResults.size());

final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
final long createTimestamp = System.currentTimeMillis();

// 遍历JobVertex
for (JobVertex jobVertex : topologiallySorted) {

if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}

// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv = new ExecutionJobVertex(
this,
jobVertex, // JobVertex
1, // 并行度
maxPriorAttemptsHistoryLength, // 历史记录中保留的先前执行尝试的最大次数
rpcTimeout, // RPC通信的超时时间
globalModVersion, // 全局恢复版本,每次全局恢复时都会递增
createTimestamp); // 创建时间

// 处理JobEdge,对每个JobEdge,获取对应的intermediateResults,并记录到本节点的输入上
// 最后把每个ExecutorVertex和对应的intermediateResults关联起来
ejv.connectToPredecessors(this.intermediateResults);

ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if (previousTask != null) {
throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
jobVertex.getID(), ejv, previousTask));
}

for (IntermediateResult res : ejv.getProducedDataSets()) {
IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
if (previousDataSet != null) {
throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
res.getId(), res, previousDataSet));
}
}

this.verticesInCreationOrder.add(ejv);
this.numVerticesTotal += ejv.getParallelism();
newExecJobVertices.add(ejv);
}

// the topology assigning should happen before notifying new vertices to failoverStrategy
executionTopology = new DefaultExecutionTopology(this);

failoverStrategy.notifyNewVertices(newExecJobVertices);

partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(getSchedulingTopology());
}