diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 2713c925d70ea..d74f106276740 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -2039,8 +2039,13 @@ public void startCheckpointScheduler() { isPeriodicCheckpointingConfigured(), "Can not start checkpoint scheduler, if no periodic checkpointing is configured"); - // make sure all prior timers are cancelled - stopCheckpointScheduler(); + if (isPeriodicCheckpointingStarted()) { + // cancel previously scheduled checkpoints and spare savepoints. + // TODO: Introduce a more general solution to the race condition + // between different checkpoint scheduling triggers. + // https://issues.apache.org/jira/browse/FLINK-34519 + stopCheckpointScheduler(); + } periodicScheduling = true; scheduleTriggerWithDelay(clock.relativeTimeMillis(), getRandomInitDelay()); @@ -2133,14 +2138,15 @@ private void restoreStateToCoordinators( // job status listener that schedules / cancels periodic checkpoints // ------------------------------------------------------------------------ - public JobStatusListener createActivatorDeactivator() { + public JobStatusListener createActivatorDeactivator(boolean allTasksOutputNonBlocking) { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } if (jobStatusListener == null) { - jobStatusListener = new CheckpointCoordinatorDeActivator(this); + jobStatusListener = + new CheckpointCoordinatorDeActivator(this, allTasksOutputNonBlocking); } return jobStatusListener; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java index b4f52dc69c3a6..830ba4cf58e56 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java @@ -31,15 +31,18 @@ public class CheckpointCoordinatorDeActivator implements JobStatusListener { private final CheckpointCoordinator coordinator; + private final boolean allTasksOutputNonBlocking; - public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) { + public CheckpointCoordinatorDeActivator( + CheckpointCoordinator coordinator, boolean allTasksOutputNonBlocking) { this.coordinator = checkNotNull(coordinator); + this.allTasksOutputNonBlocking = allTasksOutputNonBlocking; } @Override public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) { - if (newJobStatus == JobStatus.RUNNING) { - // start the checkpoint scheduler + if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking) { + // start the checkpoint scheduler if there is no blocking edge coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index 339a8e92a4b41..3ef13334ffdea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -515,8 +515,13 @@ public void failJobDueToTaskFailure( if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { // the periodic checkpoint scheduler is activated and deactivated as a result of - // job status changes (running -> on, all other states -> off) - registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); + // job status and topology changes (running & all edges non-blocking -> on, all + // other states -> off) + boolean allTasksOutputNonBlocking = + tasks.values().stream() + .noneMatch(vertex -> vertex.getJobVertex().isAnyOutputBlocking()); + registerJobStatusListener( + checkpointCoordinator.createActivatorDeactivator(allTasksOutputNonBlocking)); } this.stateBackendName = checkpointStateBackend.getName(); @@ -1376,6 +1381,13 @@ private boolean updateStateInternal( return attempt.switchToInitializing(); case RUNNING: + if (!isAnyOutputBlocking() + && checkpointCoordinator != null + && checkpointCoordinator.isPeriodicCheckpointingConfigured() + && !checkpointCoordinator.isPeriodicCheckpointingStarted()) { + checkpointCoordinator.startCheckpointScheduler(); + } + return attempt.switchToRunning(); case FINISHED: @@ -1413,6 +1425,11 @@ private boolean updateStateInternal( } } + private boolean isAnyOutputBlocking() { + return currentExecutions.values().stream() + .anyMatch(x -> x.getVertex().getJobVertex().getJobVertex().isAnyOutputBlocking()); + } + private void maybeReleasePartitionGroupsFor(final Execution attempt) { final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index cb86923dd30cd..728f85d11ae64 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -155,6 +155,8 @@ public class JobVertex implements java.io.Serializable { */ private boolean supportsConcurrentExecutionAttempts = true; + private boolean anyOutputBlocking = false; + private boolean parallelismConfigured = false; // -------------------------------------------------------------------------------------------- @@ -496,6 +498,7 @@ public void updateCoLocationGroup(CoLocationGroupImpl group) { // -------------------------------------------------------------------------------------------- public IntermediateDataSet getOrCreateResultDataSet( IntermediateDataSetID id, ResultPartitionType partitionType) { + anyOutputBlocking |= partitionType.isBlockingOrBlockingPersistentResultPartition(); return this.results.computeIfAbsent( id, key -> new IntermediateDataSet(id, partitionType, this)); } @@ -557,6 +560,10 @@ public boolean isSupportsConcurrentExecutionAttempts() { return supportsConcurrentExecutionAttempts; } + public boolean isAnyOutputBlocking() { + return anyOutputBlocking; + } + // -------------------------------------------------------------------------------------------- /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index d109bb635d75b..5cd572bea6de6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -1801,6 +1801,53 @@ void testJobStatusHookWithJobFinished() throws Exception { commonJobStatusHookTest(ExecutionState.FINISHED, JobStatus.FINISHED); } + @Test + void testStartCheckpointOnlyAfterVertexWithBlockingEdgeFinished() { + final JobVertex source = new JobVertex("source"); + source.setInvokableClass(NoOpInvokable.class); + final JobVertex map = new JobVertex("map"); + map.setInvokableClass(NoOpInvokable.class); + final JobVertex sink = new JobVertex("sink"); + sink.setInvokableClass(NoOpInvokable.class); + + sink.connectNewDataSetAsInput( + map, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + map.connectNewDataSetAsInput( + source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(source, map, sink); + enableCheckpointing(jobGraph, null, null, Long.MAX_VALUE - 1, true); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + final CheckpointCoordinator checkpointCoordinator = scheduler.getCheckpointCoordinator(); + assertThat(checkpointCoordinator.isPeriodicCheckpointingStarted()).isFalse(); + + final Iterator iterator = + scheduler + .requestJob() + .getArchivedExecutionGraph() + .getAllExecutionVertices() + .iterator(); + final ExecutionAttemptID sourceAttemptId = + iterator.next().getCurrentExecutionAttempt().getAttemptId(); + final ExecutionAttemptID mapAttemptId = + iterator.next().getCurrentExecutionAttempt().getAttemptId(); + final ExecutionAttemptID sinkAttemptId = + iterator.next().getCurrentExecutionAttempt().getAttemptId(); + assertThat(iterator).isExhausted(); + + transitionToRunning(scheduler, sourceAttemptId); + transitionToRunning(scheduler, mapAttemptId); + assertThat(checkpointCoordinator.isPeriodicCheckpointingStarted()).isFalse(); + + scheduler.updateTaskExecutionState( + new TaskExecutionState(sourceAttemptId, ExecutionState.FINISHED)); + scheduler.updateTaskExecutionState( + new TaskExecutionState(mapAttemptId, ExecutionState.FINISHED)); + transitionToRunning(scheduler, sinkAttemptId); + assertThat(checkpointCoordinator.isPeriodicCheckpointingStarted()).isTrue(); + } + private void commonJobStatusHookTest( ExecutionState expectedExecutionState, JobStatus expectedJobStatus) throws Exception { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index ac937deb674d8..55766d2fdaae9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -2346,6 +2346,10 @@ void testOutputOnlyAfterEndOfStream() { assertHasOutputPartitionType( vertexMap.get("transform -> Map"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("transform -> Map").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("sink: Writer").isAnyOutputBlocking()).isFalse(); + env.disableOperatorChaining(); jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph(false)); vertexMap = new HashMap<>(); @@ -2357,6 +2361,11 @@ void testOutputOnlyAfterEndOfStream() { vertexMap.get("Source: source"), ResultPartitionType.PIPELINED_BOUNDED); assertHasOutputPartitionType(vertexMap.get("transform"), ResultPartitionType.BLOCKING); assertHasOutputPartitionType(vertexMap.get("Map"), ResultPartitionType.PIPELINED_BOUNDED); + + assertThat(vertexMap.get("Source: source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("transform").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Map").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("sink: Writer").isAnyOutputBlocking()).isFalse(); } private static void testWhetherOutputFormatSupportsConcurrentExecutionAttempts(