Skip to content

Commit

Permalink
[FLINK-32075][FLIP-306][Checkpoint] Delete merged files on checkpoint…
Browse files Browse the repository at this point in the history
… abort or subsumption (apache#24181)
  • Loading branch information
Zakelly authored and masteryhx committed Mar 4, 2024
1 parent 46cbf22 commit cd9a9f7
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.TaskStateManager;
Expand All @@ -34,8 +35,6 @@
* checkpoint files with merging checkpoint files enabled. It manages the files for ONE single task
* in TM, including all subtasks of this single task that is running in this TM. There is one
* FileMergingSnapshotManager for each job per task manager.
*
* <p>TODO (FLINK-32075): leverage checkpoint notification to delete logical files.
*/
public interface FileMergingSnapshotManager extends Closeable {

Expand Down Expand Up @@ -118,6 +117,34 @@ FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream(
*/
Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope);

/**
* Notifies the manager that the checkpoint with the given {@code checkpointId} completed and
* was committed.
*
* @param subtaskKey the subtask key identifying the subtask.
* @param checkpointId The ID of the checkpoint that has been completed.
* @throws Exception thrown if anything goes wrong with the listener.
*/
void notifyCheckpointComplete(SubtaskKey subtaskKey, long checkpointId) throws Exception;

/**
* This method is called as a notification once a distributed checkpoint has been aborted.
*
* @param subtaskKey the subtask key identifying the subtask.
* @param checkpointId The ID of the checkpoint that has been completed.
* @throws Exception thrown if anything goes wrong with the listener.
*/
void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) throws Exception;

/**
* This method is called as a notification once a distributed checkpoint has been subsumed.
*
* @param subtaskKey the subtask key identifying the subtask.
* @param checkpointId The ID of the checkpoint that has been completed.
* @throws Exception thrown if anything goes wrong with the listener.
*/
void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) throws Exception;

/**
* A key identifies a subtask. A subtask can be identified by the operator id, subtask index and
* the parallelism. Note that this key should be consistent across job attempts.
Expand Down Expand Up @@ -151,6 +178,12 @@ public SubtaskKey(String operatorIDString, int subtaskIndex, int parallelism) {
this.hashCode = hash;
}

public static SubtaskKey of(Environment environment) {
return new SubtaskKey(
OperatorID.fromJobVertexID(environment.getJobVertexId()),
environment.getTaskInfo());
}

/**
* Generate an unique managed directory name for one subtask.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataOutputStream;
Expand All @@ -34,10 +35,15 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand All @@ -55,6 +61,12 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps
/** The executor for I/O operations in this manager. */
protected final Executor ioExecutor;

/** Guard for uploadedStates. */
protected final Object lock = new Object();

@GuardedBy("lock")
protected TreeMap<Long, Set<LogicalFile>> uploadedStates = new TreeMap<>();

/** The {@link FileSystem} that this manager works on. */
protected FileSystem fs;

Expand Down Expand Up @@ -245,6 +257,13 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle(
physicalFile, startPos, stateSize, subtaskKey);
logicalFile.advanceLastCheckpointId(checkpointId);

// track the logical file
synchronized (lock) {
uploadedStates
.computeIfAbsent(checkpointId, key -> new HashSet<>())
.add(logicalFile);
}

// deal with physicalFile file
physicalFile.incSize(stateSize);
returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile);
Expand Down Expand Up @@ -289,6 +308,13 @@ protected Path generatePhysicalFilePath(Path dirPath) {
return new Path(dirPath, fileName);
}

@VisibleForTesting
boolean isResponsibleForFile(Path filePath) {
Path parent = filePath.getParent();
return parent.equals(managedExclusiveStateDir)
|| managedSharedStateDir.containsValue(parent);
}

/**
* Delete a physical file by given file path. Use the io executor to do the deletion.
*
Expand Down Expand Up @@ -345,6 +371,72 @@ protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(
protected abstract void returnPhysicalFileForNextReuse(
SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile) throws IOException;

/**
* The callback which will be triggered when all subtasks discarded (aborted or subsumed).
*
* @param checkpointId the discarded checkpoint id.
* @throws IOException if anything goes wrong with file system.
*/
protected abstract void discardCheckpoint(long checkpointId) throws IOException;

// ------------------------------------------------------------------------
// Checkpoint Listener
// ------------------------------------------------------------------------

@Override
public void notifyCheckpointComplete(SubtaskKey subtaskKey, long checkpointId)
throws Exception {
// does nothing
}

@Override
public void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) throws Exception {
synchronized (lock) {
Set<LogicalFile> logicalFilesForCurrentCp = uploadedStates.get(checkpointId);
if (logicalFilesForCurrentCp == null) {
return;
}
if (discardLogicalFiles(subtaskKey, checkpointId, logicalFilesForCurrentCp)) {
uploadedStates.remove(checkpointId);
}
}
}

@Override
public void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId)
throws Exception {
synchronized (lock) {
Iterator<Map.Entry<Long, Set<LogicalFile>>> uploadedStatesIterator =
uploadedStates.headMap(checkpointId, true).entrySet().iterator();
while (uploadedStatesIterator.hasNext()) {
Map.Entry<Long, Set<LogicalFile>> entry = uploadedStatesIterator.next();
if (discardLogicalFiles(subtaskKey, entry.getKey(), entry.getValue())) {
uploadedStatesIterator.remove();
}
}
}
}

private boolean discardLogicalFiles(
SubtaskKey subtaskKey, long checkpointId, Set<LogicalFile> logicalFiles)
throws Exception {
Iterator<LogicalFile> logicalFileIterator = logicalFiles.iterator();
while (logicalFileIterator.hasNext()) {
LogicalFile logicalFile = logicalFileIterator.next();
if (logicalFile.getSubtaskKey().equals(subtaskKey)
&& logicalFile.getLastUsedCheckpointID() <= checkpointId) {
logicalFile.discardWithCheckpointId(checkpointId);
logicalFileIterator.remove();
}
}

if (logicalFiles.isEmpty()) {
discardCheckpoint(checkpointId);
return true;
}
return false;
}

// ------------------------------------------------------------------------
// file system
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,47 @@ public WithinCheckpointFileMergingSnapshotManager(String id, Executor ioExecutor
writablePhysicalFilePool = new HashMap<>();
}

// ------------------------------------------------------------------------
// CheckpointListener
// ------------------------------------------------------------------------

@Override
public void notifyCheckpointComplete(SubtaskKey subtaskKey, long checkpointId)
throws Exception {
super.notifyCheckpointComplete(subtaskKey, checkpointId);
removeAndCloseFiles(subtaskKey, checkpointId);
}

@Override
public void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) throws Exception {
super.notifyCheckpointAborted(subtaskKey, checkpointId);
removeAndCloseFiles(subtaskKey, checkpointId);
}

/**
* Remove files that belongs to specific subtask and checkpoint from the reuse pool. And close
* these files. TODO: Refactor this in FLINK-32076.
*/
private void removeAndCloseFiles(SubtaskKey subtaskKey, long checkpointId) throws Exception {
Tuple3<Long, SubtaskKey, CheckpointedStateScope> fileKey =
Tuple3.of(checkpointId, subtaskKey, CheckpointedStateScope.SHARED);
PhysicalFile file;
synchronized (writablePhysicalFilePool) {
file = writablePhysicalFilePool.remove(fileKey);
}
if (file != null) {
file.close();
}

fileKey = Tuple3.of(checkpointId, DUMMY_SUBTASK_KEY, CheckpointedStateScope.EXCLUSIVE);
synchronized (writablePhysicalFilePool) {
file = writablePhysicalFilePool.remove(fileKey);
}
if (file != null) {
file.close();
}
}

@Override
@Nonnull
protected PhysicalFile getOrCreatePhysicalFileForCheckpoint(
Expand Down Expand Up @@ -98,4 +139,9 @@ protected void returnPhysicalFileForNextReuse(
physicalFile.close();
}
}

@Override
protected void discardCheckpoint(long checkpointId) throws IOException {
// TODO: Discard the whole file pool for checkpoint id (When there is one after FLINK-32076)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,6 @@ void reportIncompleteTaskStateSnapshots(
StateChangelogStorageView<?> getStateChangelogStorageView(
Configuration configuration, ChangelogStateHandle changelogStateHandle);

@Nullable
FileMergingSnapshotManager getFileMergingSnapshotManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;

Expand Down Expand Up @@ -60,10 +59,7 @@ public FsMergingCheckpointStorageAccess(
fileSizeThreshold,
writeBufferSize);
this.fileMergingSnapshotManager = fileMergingSnapshotManager;
this.subtaskKey =
new SubtaskKey(
OperatorID.fromJobVertexID(environment.getJobVertexId()),
environment.getTaskInfo());
this.subtaskKey = SubtaskKey.of(environment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,38 @@ public void testConcurrentWriting() throws Exception {
}
}

@Test
public void testCheckpointNotification() throws Exception {
try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir);
CloseableRegistry closeableRegistry = new CloseableRegistry()) {
FileMergingCheckpointStateOutputStream cp1Stream =
writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle();
fmsm.notifyCheckpointComplete(subtaskKey1, 1);
assertFileInManagedDir(fmsm, cp1StateHandle);

// complete checkpoint-2
FileMergingCheckpointStateOutputStream cp2Stream =
writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle();
fmsm.notifyCheckpointComplete(subtaskKey1, 2);
assertFileInManagedDir(fmsm, cp2StateHandle);

// subsume checkpoint-1
assertThat(fileExists(cp1StateHandle)).isTrue();
fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
assertThat(fileExists(cp1StateHandle)).isFalse();

// abort checkpoint-3
FileMergingCheckpointStateOutputStream cp3Stream =
writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle();
assertFileInManagedDir(fmsm, cp3StateHandle);
fmsm.notifyCheckpointAborted(subtaskKey1, 3);
assertThat(fileExists(cp3StateHandle)).isFalse();
}
}

private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir)
throws IOException {
FileSystem fs = LocalFileSystem.getSharedInstance();
Expand All @@ -384,4 +416,42 @@ private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpo
assertThat(fmsm).isNotNull();
return fmsm;
}

private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry)
throws IOException {
return writeCheckpointAndGetStream(checkpointId, fmsm, closeableRegistry, 32);
}

private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
long checkpointId,
FileMergingSnapshotManager fmsm,
CloseableRegistry closeableRegistry,
int numBytes)
throws IOException {
FileMergingCheckpointStateOutputStream stream =
fmsm.createCheckpointStateOutputStream(
subtaskKey1, checkpointId, CheckpointedStateScope.EXCLUSIVE);
closeableRegistry.registerCloseable(stream);
for (int i = 0; i < numBytes; i++) {
stream.write(i);
}
return stream;
}

private void assertFileInManagedDir(
FileMergingSnapshotManager fmsm, SegmentFileStateHandle stateHandle) {
assertThat(fmsm instanceof FileMergingSnapshotManagerBase).isTrue();
assertThat(stateHandle).isNotNull();
Path filePath = stateHandle.getFilePath();
assertThat(filePath).isNotNull();
assertThat(((FileMergingSnapshotManagerBase) fmsm).isResponsibleForFile(filePath)).isTrue();
}

private boolean fileExists(SegmentFileStateHandle stateHandle) throws IOException {
assertThat(stateHandle).isNotNull();
Path filePath = stateHandle.getFilePath();
assertThat(filePath).isNotNull();
return filePath.getFileSystem().exists(filePath);
}
}
Loading

0 comments on commit cd9a9f7

Please sign in to comment.