| 本地微型集群 根据JobGraph的信息,提取jar,生成JobFile 并通过BlobClient上传到Cluster
@Override public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) { return miniCluster.submitJob(jobGraph).thenApply(JobSubmissionResult::getJobID); }
MiniCluster public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) { final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture(); final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture); final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph); final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture .thenCombine( dispatcherGatewayFuture, (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout)) .thenCompose(Function.identity()); return acknowledgeCompletableFuture.thenApply( (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID())); }
private CompletableFuture<Void> uploadAndSetJobFiles(final CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph job) { return blobServerAddressFuture.thenAccept(blobServerAddress -> { try { ClientUtils.extractAndUploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration())); } catch (FlinkException e) { throw new CompletionException(e); } }); }
Dispatcher @Override public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) { log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
try { if (isDuplicateJob(jobGraph.getJobID())) { return FutureUtils.completedExceptionally( new DuplicateJobSubmissionException(jobGraph.getJobID())); } else if (isPartialResourceConfigured(jobGraph)) { return FutureUtils.completedExceptionally( new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " + "resources configured. The limitation will be removed in future versions.")); } else { return internalSubmitJob(jobGraph); } } catch (FlinkException e) { return FutureUtils.completedExceptionally(e); } }
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) { log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob) .thenApply(ignored -> Acknowledge.get());
return persistAndRunFuture.handleAsync((acknowledge, throwable) -> { if (throwable != null) { cleanUpJobData(jobGraph.getJobID(), true);
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable); throw new CompletionException( new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable)); } else { return acknowledge; } }, getRpcService().getExecutor()); }
private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception { jobGraphWriter.putJobGraph(jobGraph);
final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> { if (throwable != null) { jobGraphWriter.removeJobGraph(jobGraph.getJobID()); } })); }
private CompletableFuture<Void> runJob(JobGraph jobGraph) { Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
return jobManagerRunnerFuture .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner)) .thenApply(FunctionUtils.nullFn()) .whenCompleteAsync( (ignored, throwable) -> { if (throwable != null) { jobManagerRunnerFutures.remove(jobGraph.getJobID()); } }, getMainThreadExecutor()); }
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) { final RpcService rpcService = getRpcService();
return CompletableFuture.supplyAsync( CheckedSupplier.unchecked(() -> jobManagerRunnerFactory.createJobManagerRunner( jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler)), rpcService.getExecutor()); }
private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception { final JobID jobId = jobManagerRunner.getJobID();
FutureUtils.assertNoException( jobManagerRunner.getResultFuture().handleAsync( (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { final JobManagerRunner currentJobManagerRunner = Optional.ofNullable(jobManagerRunnerFutures.get(jobId)) .map(future -> future.getNow(null)) .orElse(null); if (jobManagerRunner == currentJobManagerRunner) { if (archivedExecutionGraph != null) { jobReachedGloballyTerminalState(archivedExecutionGraph); } else { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
if (strippedThrowable instanceof JobNotFinishedException) { jobNotFinished(jobId); } else { jobMasterFailed(jobId, strippedThrowable); } } } else { log.debug("There is a newer JobManagerRunner for the job {}.", jobId); }
return null; }, getMainThreadExecutor()));
return jobManagerRunner; }