Skip to content

Commit

Permalink
[FLINK-34371][runtime] Fail savepoints until tasks with blocking edge…
Browse files Browse the repository at this point in the history
… finished
  • Loading branch information
yunfengzhou-hub authored and xintongsong committed Feb 28, 2024
1 parent 8d45cd9 commit d4e0084
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ public void checkFailureCounter(CheckpointException exception, long checkpointId
case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE:
// there are some edge cases shouldn't be counted as a failure, e.g. shutdown
case TRIGGER_CHECKPOINT_FAILURE:
case BLOCKING_OUTPUT_EXIST:
// ignore
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public enum CheckpointFailureReason {
IO_EXCEPTION(
true, "An Exception occurred while triggering the checkpoint. IO-problem detected."),

BLOCKING_OUTPUT_EXIST(true, "Blocking output edge exists in running tasks."),

CHECKPOINT_ASYNC_EXCEPTION(false, "Asynchronous task checkpoint failed."),

CHANNEL_STATE_SHARED_STREAM_EXCEPTION(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -1381,7 +1382,7 @@ private boolean updateStateInternal(
return attempt.switchToInitializing();

case RUNNING:
if (!isAnyOutputBlocking()
if (!isAnyOutputBlocking(this)
&& checkpointCoordinator != null
&& checkpointCoordinator.isPeriodicCheckpointingConfigured()
&& !checkpointCoordinator.isPeriodicCheckpointingStarted()) {
Expand Down Expand Up @@ -1425,11 +1426,6 @@ private boolean updateStateInternal(
}
}

private boolean isAnyOutputBlocking() {
return currentExecutions.values().stream()
.anyMatch(x -> x.getVertex().getJobVertex().getJobVertex().isAnyOutputBlocking());
}

private void maybeReleasePartitionGroupsFor(final Execution attempt) {
final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.runtime.executiongraph;

/** Utility methods related to {@link ExecutionGraph}. */
public class ExecutionGraphUtils {
/** @return Whether there is any blocking output edge in the execution graph. */
public static boolean isAnyOutputBlocking(ExecutionGraph graph) {
return graph.getRegisteredExecutions().values().stream()
.anyMatch(x -> x.getVertex().getJobVertex().getJobVertex().isAnyOutputBlocking());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -890,6 +891,14 @@ public CompletableFuture<String> triggerSavepoint(
final SavepointFormatType formatType) {
mainThreadExecutor.assertRunningInMainThread();

if (isAnyOutputBlocking(executionGraph)) {
// TODO: Introduce a more general solution to mark times when
// checkpoints are disabled, as well as the detailed reason.
// https://issues.apache.org/jira/browse/FLINK-34519
return FutureUtils.completedExceptionally(
new CheckpointException(CheckpointFailureReason.BLOCKING_OUTPUT_EXIST));
}

final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();
StopWithSavepointTerminationManager.checkSavepointActionPreconditions(
Expand Down Expand Up @@ -1015,6 +1024,11 @@ public CompletableFuture<String> stopWithSavepoint(
final SavepointFormatType formatType) {
mainThreadExecutor.assertRunningInMainThread();

if (isAnyOutputBlocking(executionGraph)) {
return FutureUtils.completedExceptionally(
new CheckpointException(CheckpointFailureReason.BLOCKING_OUTPUT_EXIST));
}

final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;

/**
* A {@link SchedulerNG} implementation that uses the declarative resource management and
* automatically adapts the parallelism in case not enough resource could be acquired to run at the
Expand Down Expand Up @@ -757,9 +759,15 @@ public CompletableFuture<String> triggerSavepoint(
@Nullable String targetDirectory, boolean cancelJob, SavepointFormatType formatType) {
return state.tryCall(
StateWithExecutionGraph.class,
stateWithExecutionGraph ->
stateWithExecutionGraph.triggerSavepoint(
targetDirectory, cancelJob, formatType),
stateWithExecutionGraph -> {
if (isAnyOutputBlocking(stateWithExecutionGraph.getExecutionGraph())) {
return FutureUtils.<String>completedExceptionally(
new CheckpointException(
CheckpointFailureReason.BLOCKING_OUTPUT_EXIST));
}
return stateWithExecutionGraph.triggerSavepoint(
targetDirectory, cancelJob, formatType);
},
"triggerSavepoint")
.orElse(
FutureUtils.completedExceptionally(
Expand Down Expand Up @@ -847,8 +855,15 @@ public CompletableFuture<String> stopWithSavepoint(
@Nullable String targetDirectory, boolean terminate, SavepointFormatType formatType) {
return state.tryCall(
Executing.class,
executing ->
executing.stopWithSavepoint(targetDirectory, terminate, formatType),
executing -> {
if (isAnyOutputBlocking(executing.getExecutionGraph())) {
return FutureUtils.<String>completedExceptionally(
new CheckpointException(
CheckpointFailureReason.BLOCKING_OUTPUT_EXIST));
}
return executing.stopWithSavepoint(
targetDirectory, terminate, formatType);
},
"stopWithSavepoint")
.orElse(
FutureUtils.completedExceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.execution.JobStatusHook;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.TestingFailureEnricher;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
Expand Down Expand Up @@ -123,6 +126,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -134,6 +138,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.createSlotOffersForResourceRequirements;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
Expand Down Expand Up @@ -1839,6 +1844,14 @@ void testStartCheckpointOnlyAfterVertexWithBlockingEdgeFinished() {
transitionToRunning(scheduler, sourceAttemptId);
transitionToRunning(scheduler, mapAttemptId);
assertThat(checkpointCoordinator.isPeriodicCheckpointingStarted()).isFalse();
assertThatFuture(scheduler.triggerSavepoint("", false, SavepointFormatType.DEFAULT))
.eventuallyFailsWith(ExecutionException.class)
.withCauseInstanceOf(CheckpointException.class)
.withMessageContaining(CheckpointFailureReason.BLOCKING_OUTPUT_EXIST.message());
assertThatFuture(scheduler.stopWithSavepoint("", false, SavepointFormatType.DEFAULT))
.eventuallyFailsWith(ExecutionException.class)
.withCauseInstanceOf(CheckpointException.class)
.withMessageContaining(CheckpointFailureReason.BLOCKING_OUTPUT_EXIST.message());

scheduler.updateTaskExecutionState(
new TaskExecutionState(sourceAttemptId, ExecutionState.FINISHED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.NoRestartBackoffTimeStrategy;
Expand All @@ -61,6 +63,7 @@
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
Expand All @@ -71,6 +74,7 @@
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
Expand Down Expand Up @@ -141,6 +145,7 @@
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexResource;
import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph;
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.createSlotOffersForResourceRequirements;
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
Expand Down Expand Up @@ -1732,6 +1737,56 @@ void testStopWithSavepointFailsInIllegalState() throws Exception {
.withCauseInstanceOf(CheckpointException.class);
}

@Test
void testSavepointFailsWhenBlockingEdgeExists() throws Exception {
JobVertex jobVertex = createNoOpVertex(PARALLELISM);
jobVertex.getOrCreateResultDataSet(
new IntermediateDataSetID(), ResultPartitionType.BLOCKING);

final ExecutionGraph executionGraph =
ExecutionGraphTestUtils.createExecutionGraph(
EXECUTOR_RESOURCE.getExecutor(), jobVertex);

executionGraph
.getAllExecutionVertices()
.forEach(
task ->
setVertexResource(
task,
new TestingLogicalSlotBuilder()
.createTestingLogicalSlot()));
executionGraph.transitionToRunning();
executionGraph
.getAllExecutionVertices()
.forEach(
task ->
task.getCurrentExecutionAttempt()
.transitionState(ExecutionState.RUNNING));

final AdaptiveScheduler scheduler =
new AdaptiveSchedulerBuilder(
streamingJobGraph(jobVertex),
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
.build();

scheduler.goToExecuting(executionGraph, null, null, Collections.emptyList());

assertThatFuture(
scheduler.stopWithSavepoint(
"some directory", false, SavepointFormatType.CANONICAL))
.eventuallyFailsWith(ExecutionException.class)
.withCauseInstanceOf(CheckpointException.class)
.withMessageContaining(CheckpointFailureReason.BLOCKING_OUTPUT_EXIST.message());

assertThatFuture(
scheduler.triggerSavepoint(
"some directory", false, SavepointFormatType.CANONICAL))
.eventuallyFailsWith(ExecutionException.class)
.withCauseInstanceOf(CheckpointException.class)
.withMessageContaining(CheckpointFailureReason.BLOCKING_OUTPUT_EXIST.message());
}

@Test
void testDeliverOperatorEventToCoordinatorFailsInIllegalState() throws Exception {
final AdaptiveScheduler scheduler =
Expand Down

0 comments on commit d4e0084

Please sign in to comment.