1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
| 本地微型集群 根据JobGraph的信息,提取jar,生成JobFile 并通过BlobClient上传到Cluster
@Override public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) { return miniCluster.submitJob(jobGraph).thenApply(JobSubmissionResult::getJobID); }
MiniCluster public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) { final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture(); final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture); final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph); final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture .thenCombine( dispatcherGatewayFuture, (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout)) .thenCompose(Function.identity()); return acknowledgeCompletableFuture.thenApply( (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID())); }
private CompletableFuture<Void> uploadAndSetJobFiles(final CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph job) { return blobServerAddressFuture.thenAccept(blobServerAddress -> { try { ClientUtils.extractAndUploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration())); } catch (FlinkException e) { throw new CompletionException(e); } }); }
Dispatcher @Override public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) { log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
try { if (isDuplicateJob(jobGraph.getJobID())) { return FutureUtils.completedExceptionally( new DuplicateJobSubmissionException(jobGraph.getJobID())); } else if (isPartialResourceConfigured(jobGraph)) { return FutureUtils.completedExceptionally( new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " + "resources configured. The limitation will be removed in future versions.")); } else { return internalSubmitJob(jobGraph); } } catch (FlinkException e) { return FutureUtils.completedExceptionally(e); } }
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) { log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob) .thenApply(ignored -> Acknowledge.get());
return persistAndRunFuture.handleAsync((acknowledge, throwable) -> { if (throwable != null) { cleanUpJobData(jobGraph.getJobID(), true);
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable); throw new CompletionException( new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable)); } else { return acknowledge; } }, getRpcService().getExecutor()); }
private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception { jobGraphWriter.putJobGraph(jobGraph);
final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> { if (throwable != null) { jobGraphWriter.removeJobGraph(jobGraph.getJobID()); } })); }
private CompletableFuture<Void> runJob(JobGraph jobGraph) { Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
return jobManagerRunnerFuture .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner)) .thenApply(FunctionUtils.nullFn()) .whenCompleteAsync( (ignored, throwable) -> { if (throwable != null) { jobManagerRunnerFutures.remove(jobGraph.getJobID()); } }, getMainThreadExecutor()); }
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) { final RpcService rpcService = getRpcService();
return CompletableFuture.supplyAsync( CheckedSupplier.unchecked(() -> jobManagerRunnerFactory.createJobManagerRunner( jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler)), rpcService.getExecutor()); }
private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception { final JobID jobId = jobManagerRunner.getJobID();
FutureUtils.assertNoException( jobManagerRunner.getResultFuture().handleAsync( (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { final JobManagerRunner currentJobManagerRunner = Optional.ofNullable(jobManagerRunnerFutures.get(jobId)) .map(future -> future.getNow(null)) .orElse(null); if (jobManagerRunner == currentJobManagerRunner) { if (archivedExecutionGraph != null) { jobReachedGloballyTerminalState(archivedExecutionGraph); } else { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
if (strippedThrowable instanceof JobNotFinishedException) { jobNotFinished(jobId); } else { jobMasterFailed(jobId, strippedThrowable); } } } else { log.debug("There is a newer JobManagerRunner for the job {}.", jobId); }
return null; }, getMainThreadExecutor()));
jobManagerRunner.start();
return jobManagerRunner; }
|