1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
| public CompletableFuture<Acknowledge> triggerCheckpoint( ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) { log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
final CheckpointType checkpointType = checkpointOptions.getCheckpointType(); if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) { throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX."); }
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) { task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);
return CompletableFuture.completedFuture(Acknowledge.get()); } else { final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
log.debug(message); return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE)); } }
public void triggerCheckpointBarrier( final long checkpointID, final long checkpointTimestamp, final CheckpointOptions checkpointOptions, final boolean advanceToEndOfEventTime) {
final AbstractInvokable invokable = this.invokable; final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
if (executionState == ExecutionState.RUNNING && invokable != null) { try { invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); } ... } ... }
@Override public Future<Boolean> triggerCheckpointAsync( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) { return mailboxProcessor.getMainMailboxExecutor().submit( () -> triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime), "checkpoint %s with %s", checkpointMetaData, checkpointOptions); } private boolean triggerCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception { try { CheckpointMetrics checkpointMetrics = new CheckpointMetrics() .setBytesBufferedInAlignment(0L) .setAlignmentDurationNanos(0L);
boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime); if (!success) { declineCheckpoint(checkpointMetaData.getCheckpointId()); } return success; } catch (Exception e) { ... } } private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics, boolean advanceToEndOfTime) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
final long checkpointId = checkpointMetaData.getCheckpointId();
if (isRunning) { actionExecutor.runThrowing(() -> {
if (checkpointOptions.getCheckpointType().isSynchronous()) { setSynchronousSavepointId(checkpointId);
if (advanceToEndOfTime) { advanceToEndOfEventTime(); } }
operatorChain.prepareSnapshotPreBarrier(checkpointId);
operatorChain.broadcastCheckpointBarrier( checkpointId, checkpointMetaData.getTimestamp(), checkpointOptions);
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
});
return true; } else { actionExecutor.runThrowing(() -> {
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()); recordWriter.broadcastEvent(message); });
return false; } } private void checkpointState( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation( checkpointMetaData.getCheckpointId(), checkpointOptions.getTargetLocation()); CheckpointingOperation checkpointingOperation = new CheckpointingOperation( this, checkpointMetaData, checkpointOptions, storage, checkpointMetrics); checkpointingOperation.executeCheckpointing(); }
|