From 82905cce4de1842557a285290a025ee2fba73584 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Fri, 17 May 2024 16:53:24 +0800 Subject: [PATCH] [FLINK-32088][checkpoint] Space control in file merging This fixes #24807 --- .../checkpoint_file_merging_section.html | 6 + .../checkpointing_configuration.html | 6 + .../configuration/CheckpointingOptions.java | 8 +- ...sCheckpointFileMergingSnapshotManager.java | 11 +- .../FileMergingSnapshotManager.java | 13 ++- .../FileMergingSnapshotManagerBase.java | 109 ++++++++++++++++-- .../FileMergingSnapshotManagerBuilder.java | 16 +++ .../checkpoint/filemerging/LogicalFile.java | 1 + .../checkpoint/filemerging/PhysicalFile.java | 51 +++++++- ...nCheckpointFileMergingSnapshotManager.java | 9 +- .../state/CheckpointStreamFactory.java | 12 ++ .../state/TaskExecutorFileMergingManager.java | 9 ++ .../FsMergingCheckpointStorageLocation.java | 5 + ...ckpointFileMergingSnapshotManagerTest.java | 69 +++++++++++ .../FileMergingSnapshotManagerTestBase.java | 13 ++- ...ckpointFileMergingSnapshotManagerTest.java | 67 +++++++++++ .../RocksIncrementalSnapshotStrategy.java | 3 +- 17 files changed, 383 insertions(+), 25 deletions(-) diff --git a/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html b/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html index 50ee139e0ee9c2..f67c75f1744b15 100644 --- a/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html +++ b/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html @@ -32,5 +32,11 @@ Boolean Whether to use Blocking or Non-Blocking pool for merging physical files. A Non-Blocking pool will always provide usable physical file without blocking. It may create many physical files if poll file frequently. When poll a small file from a Blocking pool, it may be blocked until the file is returned. + +
state.checkpoints.file-merging.max-space-amplification
+ 2.0 + Float + Space amplification stands for the magnification of the occupied space compared to the amount of valid data. The more space amplification is, the more waste of space will be. This configs a space amplification above which a re-uploading for physical files will be triggered to reclaim space. Any value below 1f means disabling the space control. + diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html b/docs/layouts/shortcodes/generated/checkpointing_configuration.html index 20017193174b16..9ac83a194322b2 100644 --- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html @@ -140,6 +140,12 @@ MemorySize Max size of a physical file for merged checkpoints. + +
state.checkpoints.file-merging.max-space-amplification
+ 2.0 + Float + Space amplification stands for the magnification of the occupied space compared to the amount of valid data. The more space amplification is, the more waste of space will be. This configs a space amplification above which a re-uploading for physical files will be triggered to reclaim space. Any value below 1f means disabling the space control. +
state.checkpoints.file-merging.pool-blocking
false diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java index 568811b465a968..10344a3d30c125 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java @@ -434,10 +434,9 @@ public class CheckpointingOptions { * of valid data. The more space amplification is, the more waste of space will be. This configs * a space amplification above which a re-uploading for physical files will be triggered to * reclaim space. - * - *

TODO: remove '@Documentation.ExcludeFromDocumentation' after the feature is implemented. */ - @Experimental @Documentation.ExcludeFromDocumentation + @Experimental + @Documentation.Section(value = Documentation.Sections.CHECKPOINT_FILE_MERGING, position = 6) public static final ConfigOption FILE_MERGING_MAX_SPACE_AMPLIFICATION = ConfigOptions.key("state.checkpoints.file-merging.max-space-amplification") .floatType() @@ -445,7 +444,8 @@ public class CheckpointingOptions { .withDescription( "Space amplification stands for the magnification of the occupied space compared to the amount of valid data. " + "The more space amplification is, the more waste of space will be. This configs a space amplification " - + "above which a re-uploading for physical files will be triggered to reclaim space."); + + "above which a re-uploading for physical files will be triggered to reclaim space. Any value below 1f " + + "means disabling the space control."); public static final ConfigOption CHECKPOINTING_CONSISTENCY_MODE = ConfigOptions.key("execution.checkpointing.mode") diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java index 941cb98a1c5b58..be0aeed9a582aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java @@ -31,8 +31,12 @@ public class AcrossCheckpointFileMergingSnapshotManager extends FileMergingSnaps private final PhysicalFilePool filePool; public AcrossCheckpointFileMergingSnapshotManager( - String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) { - super(id, maxFileSize, filePoolType, ioExecutor); + String id, + long maxFileSize, + PhysicalFilePool.Type filePoolType, + float maxSpaceAmplification, + Executor ioExecutor) { + super(id, maxFileSize, filePoolType, maxSpaceAmplification, ioExecutor); filePool = createPhysicalPool(); } @@ -44,9 +48,6 @@ protected PhysicalFile getOrCreatePhysicalFileForCheckpoint( return filePool.pollFile(subtaskKey, scope); } - @Override - protected void discardCheckpoint(long checkpointId) {} - @Override protected void returnPhysicalFileForNextReuse( SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index f5f88c050805b5..a4e066ad9c6d48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -170,6 +170,14 @@ DirectoryStreamStateHandle getManagedDirStateHandle( */ void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) throws Exception; + /** + * Check whether previous state handles could further be reused considering the space + * amplification. + * + * @param stateHandle the handle to be reused. + */ + boolean couldReusePreviousStateHandle(StreamStateHandle stateHandle); + /** * A callback method which is called when previous state handles are reused by following * checkpoint(s). @@ -302,7 +310,6 @@ public SpaceStat( } public void onLogicalFileCreate(long size) { - physicalFileSize.addAndGet(size); logicalFileSize.addAndGet(size); logicalFileCount.incrementAndGet(); } @@ -312,6 +319,10 @@ public void onLogicalFileDelete(long size) { logicalFileCount.decrementAndGet(); } + public void onPhysicalFileUpdate(long size) { + physicalFileSize.addAndGet(size); + } + public void onPhysicalFileCreate() { physicalFileCount.incrementAndGet(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java index d6319ffab60589..30e9af996738af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java @@ -50,11 +50,13 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import static org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile.PhysicalFileDeleter; @@ -112,6 +114,8 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps /** Type of physical file pool. */ protected PhysicalFilePool.Type filePoolType; + protected final float maxSpaceAmplification; + protected PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile; private final Object notifyLock = new Object(); @@ -150,10 +154,16 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps protected SpaceStat spaceStat; public FileMergingSnapshotManagerBase( - String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) { + String id, + long maxFileSize, + PhysicalFilePool.Type filePoolType, + float maxSpaceAmplification, + Executor ioExecutor) { this.id = id; this.maxPhysicalFileSize = maxFileSize; this.filePoolType = filePoolType; + this.maxSpaceAmplification = + maxSpaceAmplification < 1f ? Float.MAX_VALUE : maxSpaceAmplification; this.ioExecutor = ioExecutor; this.spaceStat = new SpaceStat(); } @@ -245,6 +255,7 @@ protected LogicalFile createLogicalFile( knownLogicalFiles.put(fileID, file); if (physicalFile.isOwned()) { spaceStat.onLogicalFileCreate(length); + spaceStat.onPhysicalFileUpdate(length); } return file; } @@ -471,7 +482,9 @@ protected abstract void returnPhysicalFileForNextReuse( * @param checkpointId the discarded checkpoint id. * @throws IOException if anything goes wrong with file system. */ - protected abstract void discardCheckpoint(long checkpointId) throws IOException; + protected void discardCheckpoint(long checkpointId) throws IOException { + controlSpace(); + } // ------------------------------------------------------------------------ // Checkpoint Listener @@ -572,6 +585,68 @@ public void reusePreviousStateHandle( } } + // ------------------------------------------------------------------------ + // Space Control + // ------------------------------------------------------------------------ + + /** + * The core method that control space if needed. This method will compare the desired space + * amplification with current one, and if it exceeds the configured amplification, this method + * will mark minimal set of {@link PhysicalFile}s not to be reused anymore. + */ + private void controlSpace() { + if (maxSpaceAmplification != Float.MAX_VALUE + && spaceStat.logicalFileSize.get() * maxSpaceAmplification + < spaceStat.physicalFileSize.get()) { + // may need control space + long goalPhysicalSize = + Math.round(spaceStat.logicalFileSize.get() * maxSpaceAmplification); + final AtomicLong aliveSize = new AtomicLong(0L); + // retrieve all the physical files and calculate current alive size + Set knownPhysicalFiles = new HashSet<>(); + knownLogicalFiles.values().stream() + .map(LogicalFile::getPhysicalFile) + .forEach( + file -> { + if (file.isCouldReuse()) { + if (knownPhysicalFiles.add(file)) { + aliveSize.addAndGet(file.getSize()); + } + } + }); + // the alive size still greater than the goal + if (aliveSize.get() > goalPhysicalSize) { + // sort in DESC order on wasted size + SortedSet sortedPhysicalFile = + new TreeSet<>((a, b) -> Long.compare(b.wastedSize(), a.wastedSize())); + knownPhysicalFiles.stream() + .filter(PhysicalFile::closed) + .forEach(sortedPhysicalFile::add); + // mark the physical file un-alive, until it reaches our goal. + for (PhysicalFile file : sortedPhysicalFile) { + if (!file.checkReuseOnSpaceAmplification(maxSpaceAmplification)) { + if (aliveSize.addAndGet(-file.wastedSize()) <= goalPhysicalSize) { + break; + } + } + } + } + } + } + + @Override + public boolean couldReusePreviousStateHandle(StreamStateHandle stateHandle) { + if (stateHandle instanceof SegmentFileStateHandle) { + LogicalFile file = + knownLogicalFiles.get( + ((SegmentFileStateHandle) stateHandle).getLogicalFileId()); + if (file != null) { + return file.getPhysicalFile().isCouldReuse(); + } + } + return true; + } + public void discardSingleLogicalFile(LogicalFile logicalFile, long checkpointId) throws IOException { logicalFile.discardWithCheckpointId(checkpointId); @@ -692,15 +767,23 @@ && isManagedByFileMergingManager( path, subtaskKey, fileHandle.getScope()); + PhysicalFile file = + new PhysicalFile( + null, + path, + physicalFileDeleter, + fileHandle.getScope(), + managedByFileMergingManager); + try { + file.updateSize(getFileSize(file)); + } catch (IOException e) { + throw new RuntimeException(e); + } if (managedByFileMergingManager) { spaceStat.onPhysicalFileCreate(); + spaceStat.onPhysicalFileUpdate(file.getSize()); } - return new PhysicalFile( - null, - path, - physicalFileDeleter, - fileHandle.getScope(), - managedByFileMergingManager); + return file; }); LogicalFileId logicalFileId = fileHandle.getLogicalFileId(); @@ -722,6 +805,16 @@ && isManagedByFileMergingManager( } } + private long getFileSize(PhysicalFile file) throws IOException { + FileStatus fileStatus = + file.getFilePath().getFileSystem().getFileStatus(file.getFilePath()); + if (fileStatus == null || fileStatus.isDir()) { + throw new FileNotFoundException("File " + file.getFilePath() + " does not exist."); + } else { + return fileStatus.getLen(); + } + } + /** * Distinguish whether the given filePath is managed by the FileMergingSnapshotManager. If the * filePath is located under managedDir (managedSharedStateDir or managedExclusiveStateDir) as a diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java index 0004a5a9ec029f..320ca1a908fe44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java @@ -38,6 +38,9 @@ public class FileMergingSnapshotManagerBuilder { /** Type of physical file pool. */ private PhysicalFilePool.Type filePoolType = PhysicalFilePool.Type.NON_BLOCKING; + /** The max space amplification that the manager should control. */ + private float maxSpaceAmplification = Float.MAX_VALUE; + @Nullable private Executor ioExecutor = null; /** @@ -63,6 +66,17 @@ public FileMergingSnapshotManagerBuilder setFilePoolType(PhysicalFilePool.Type f return this; } + public FileMergingSnapshotManagerBuilder setMaxSpaceAmplification(float amplification) { + if (amplification < 1) { + // only valid number counts. If not valid, disable space control by setting this to + // Float.MAX_VALUE. + this.maxSpaceAmplification = Float.MAX_VALUE; + } else { + this.maxSpaceAmplification = amplification; + } + return this; + } + /** * Set the executor for io operation in manager. If null(default), all io operation will be * executed synchronously. @@ -84,12 +98,14 @@ public FileMergingSnapshotManager build() { id, maxFileSize, filePoolType, + maxSpaceAmplification, ioExecutor == null ? Runnable::run : ioExecutor); case MERGE_ACROSS_CHECKPOINT: return new AcrossCheckpointFileMergingSnapshotManager( id, maxFileSize, filePoolType, + maxSpaceAmplification, ioExecutor == null ? Runnable::run : ioExecutor); default: throw new UnsupportedOperationException( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java index fa8fc8cc80a67a..4ee3f5595dd738 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java @@ -116,6 +116,7 @@ public void advanceLastCheckpointId(long checkpointId) { public void discardWithCheckpointId(long checkpointId) throws IOException { if (!discarded && checkpointId >= lastUsedCheckpointID) { physicalFile.decRefCount(); + physicalFile.decSize(length); discarded = true; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java index b271ec48e9cd77..6f3f1b55b7516f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java @@ -66,6 +66,9 @@ PhysicalFile perform( /** The size of this physical file. */ private final AtomicLong size; + /** The valid data size in this physical file. */ + private final AtomicLong dataSize; + /** * Deleter that will be called when delete this physical file. If null, do not delete this * physical file. @@ -94,6 +97,9 @@ PhysicalFile perform( */ private boolean isOwned; + /** If this physical file could be further reused, considering the space amplification. */ + private boolean couldReuse; + public PhysicalFile( @Nullable FSDataOutputStream outputStream, Path filePath, @@ -114,6 +120,8 @@ public PhysicalFile( this.deleter = deleter; this.scope = scope; this.size = new AtomicLong(0); + this.dataSize = new AtomicLong(0); + this.couldReuse = true; this.logicalFileRefCount = new AtomicInteger(0); this.isOwned = owned; } @@ -170,18 +178,59 @@ public void deleteIfNecessary() throws IOException { } void incSize(long delta) { - this.size.addAndGet(delta); + dataSize.addAndGet(delta); + if (!closed) { + size.addAndGet(delta); + } + } + + void decSize(long delta) { + dataSize.addAndGet(-delta); } long getSize() { return size.get(); } + long wastedSize() { + return size.get() - dataSize.get(); + } + + void updateSize(long updated) { + size.set(updated); + } + + boolean isCouldReuse() { + return !closed || couldReuse; + } + + /** + * Check whether this physical file can be reused. + * + * @param maxAmp the max space amplification. + * @return true if it can be further reused. + */ + boolean checkReuseOnSpaceAmplification(float maxAmp) { + if (!closed) { + return true; + } + if (couldReuse) { + if (dataSize.get() == 0L || dataSize.get() * maxAmp < size.get()) { + couldReuse = false; + } + } + return couldReuse; + } + @VisibleForTesting int getRefCount() { return logicalFileRefCount.get(); } + public boolean closed() { + return closed; + } + public void close() throws IOException { innerClose(); deleteIfNecessary(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java index 5b79c172b4cfdc..e61343201ac111 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java @@ -39,9 +39,13 @@ public class WithinCheckpointFileMergingSnapshotManager extends FileMergingSnaps private final Map writablePhysicalFilePool; public WithinCheckpointFileMergingSnapshotManager( - String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) { + String id, + long maxFileSize, + PhysicalFilePool.Type filePoolType, + float maxSpaceAmplification, + Executor ioExecutor) { // currently there is no file size limit For WITHIN_BOUNDARY mode - super(id, maxFileSize, filePoolType, ioExecutor); + super(id, maxFileSize, filePoolType, maxSpaceAmplification, ioExecutor); writablePhysicalFilePool = new HashMap<>(); } @@ -116,6 +120,7 @@ protected void discardCheckpoint(long checkpointId) throws IOException { if (filePool != null) { filePool.close(); } + super.discardCheckpoint(checkpointId); } private PhysicalFilePool getOrCreateFilePool(long checkpointId) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java index f08db8481f2fd0..8fa1adb2eeb95e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java @@ -79,4 +79,16 @@ List duplicate( default void reusePreviousStateHandle(Collection previousHandle) { // Does nothing for normal stream factory } + + /** + * A pre-check hook before the checkpoint writer want to reuse a state handle, if this returns + * false, it is not recommended for the writer to rewrite the state file considering the space + * amplification. + * + * @param stateHandle the handle to be reused. + * @return true if it can be reused. + */ + default boolean couldReuseStateHandle(StreamStateHandle stateHandle) { + return true; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java index 28484eca944e2c..f609044a2e41ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java @@ -42,6 +42,7 @@ import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY; import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED; import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_MAX_FILE_SIZE; +import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_MAX_SPACE_AMPLIFICATION; import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_POOL_BLOCKING; /** @@ -118,6 +119,13 @@ public TaskExecutorFileMergingManager() { .getOptional(FILE_MERGING_POOL_BLOCKING) .orElse(clusterConfiguration.get(FILE_MERGING_POOL_BLOCKING)); + Float spaceAmplification = + jobConfiguration + .getOptional(FILE_MERGING_MAX_SPACE_AMPLIFICATION) + .orElse( + clusterConfiguration.get( + FILE_MERGING_MAX_SPACE_AMPLIFICATION)); + fileMergingSnapshotManagerAndRetainedExecutions = Tuple2.of( new FileMergingSnapshotManagerBuilder( @@ -127,6 +135,7 @@ public TaskExecutorFileMergingManager() { usingBlockingPool ? PhysicalFilePool.Type.BLOCKING : PhysicalFilePool.Type.NON_BLOCKING) + .setMaxSpaceAmplification(spaceAmplification) .build(), new HashSet<>()); fileMergingSnapshotManagerByJobId.put( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java index 32455f1057652b..d0ae8d86458552 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java @@ -119,4 +119,9 @@ public FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream( public void reusePreviousStateHandle(Collection previousHandle) { fileMergingSnapshotManager.reusePreviousStateHandle(checkpointId, previousHandle); } + + @Override + public boolean couldReuseStateHandle(StreamStateHandle stateHandle) { + return fileMergingSnapshotManager.couldReusePreviousStateHandle(stateHandle); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java index 2f040ab2513f25..c439e17e7de839 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; @@ -231,4 +232,72 @@ public void testCheckpointNotification() throws Exception { assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); } } + + @Test + public void testSpaceControl() throws Exception { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir); + CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + fmsm.registerSubtaskForSharedStates(subtaskKey1); + + BiFunctionWithException writer = + ((checkpointId, size) -> { + return writeCheckpointAndGetStream( + subtaskKey1, + checkpointId, + CheckpointedStateScope.SHARED, + fmsm, + closeableRegistry, + size) + .closeAndGetHandle(); + }); + Integer eighthOfFile = 4 * 1024 * 1024; + + // Doing checkpoint-1 with 6 files + SegmentFileStateHandle stateHandle1 = writer.apply(1L, eighthOfFile); + SegmentFileStateHandle stateHandle2 = writer.apply(1L, eighthOfFile); + SegmentFileStateHandle stateHandle3 = writer.apply(1L, eighthOfFile); + SegmentFileStateHandle stateHandle4 = writer.apply(1L, eighthOfFile); + SegmentFileStateHandle stateHandle5 = writer.apply(1L, eighthOfFile); + SegmentFileStateHandle stateHandle6 = writer.apply(1L, eighthOfFile); + + fmsm.notifyCheckpointComplete(subtaskKey1, 1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(6); + + // complete checkpoint-2 with 3 files written and 1 file reused from checkpoint 1 + assertThat(fmsm.couldReusePreviousStateHandle(stateHandle1)).isTrue(); + SegmentFileStateHandle stateHandle7 = writer.apply(2L, eighthOfFile); + SegmentFileStateHandle stateHandle8 = writer.apply(2L, eighthOfFile); + SegmentFileStateHandle stateHandle9 = writer.apply(2L, eighthOfFile); + fmsm.reusePreviousStateHandle(2, Collections.singletonList(stateHandle1)); + fmsm.notifyCheckpointComplete(subtaskKey1, 2); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(9); + + // subsume checkpoint-1 + fmsm.notifyCheckpointSubsumed(subtaskKey1, 1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4); + + // complete checkpoint-3 with 1 files reuse from checkpoint 1 and 2. + assertThat(fmsm.couldReusePreviousStateHandle(stateHandle1)).isFalse(); + assertThat(fmsm.couldReusePreviousStateHandle(stateHandle7)).isFalse(); + assertThat(fmsm.couldReusePreviousStateHandle(stateHandle9)).isTrue(); + SegmentFileStateHandle stateHandle10 = writer.apply(3L, eighthOfFile); + SegmentFileStateHandle stateHandle11 = writer.apply(3L, eighthOfFile); + SegmentFileStateHandle stateHandle12 = writer.apply(3L, eighthOfFile); + + fmsm.notifyCheckpointComplete(subtaskKey1, 3); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(7); + + // subsume checkpoint-2 + fmsm.notifyCheckpointSubsumed(subtaskKey1, 2); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java index 94c32482d0f554..7ef79148b2cd06 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java @@ -344,7 +344,10 @@ public void testConcurrentFileReusingWithBlockingPool() throws Exception { try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase) createFileMergingSnapshotManager( - checkpointBaseDir, 32, PhysicalFilePool.Type.BLOCKING)) { + checkpointBaseDir, + 32, + PhysicalFilePool.Type.BLOCKING, + Float.MAX_VALUE)) { fmsm.registerSubtaskForSharedStates(subtaskKey1); // test reusing a physical file @@ -563,11 +566,14 @@ private SegmentFileStateHandle buildOneSegmentFileHandle( FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir) throws IOException { return createFileMergingSnapshotManager( - checkpointBaseDir, 32 * 1024 * 1024, PhysicalFilePool.Type.NON_BLOCKING); + checkpointBaseDir, 32 * 1024 * 1024, PhysicalFilePool.Type.NON_BLOCKING, 2f); } FileMergingSnapshotManager createFileMergingSnapshotManager( - Path checkpointBaseDir, long maxFileSize, PhysicalFilePool.Type filePoolType) + Path checkpointBaseDir, + long maxFileSize, + PhysicalFilePool.Type filePoolType, + float spaceAmplification) throws IOException { FileSystem fs = LocalFileSystem.getSharedInstance(); Path sharedStateDir = @@ -587,6 +593,7 @@ FileMergingSnapshotManager createFileMergingSnapshotManager( new FileMergingSnapshotManagerBuilder(tmId, getFileMergingType()) .setMaxFileSize(maxFileSize) .setFilePoolType(filePoolType) + .setMaxSpaceAmplification(spaceAmplification) .build(); fmsm.initFileSystem( LocalFileSystem.getSharedInstance(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java index ea6df6fe9da6e4..6a15abf0fe7dd7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java @@ -238,4 +238,71 @@ public void testCheckpointNotification() throws Exception { assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3); } } + + @Test + public void testSpaceControl() throws Exception { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir); + CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + fmsm.registerSubtaskForSharedStates(subtaskKey1); + + BiFunctionWithException writer = + ((checkpointId, size) -> { + return writeCheckpointAndGetStream( + subtaskKey1, + checkpointId, + CheckpointedStateScope.SHARED, + fmsm, + closeableRegistry, + size) + .closeAndGetHandle(); + }); + Integer eighthOfFile = 4 * 1024 * 1024; + + // Doing checkpoint-1 with 6 files + SegmentFileStateHandle stateHandle1 = writer.apply(1L, eighthOfFile); + SegmentFileStateHandle stateHandle2 = writer.apply(1L, eighthOfFile); + SegmentFileStateHandle stateHandle3 = writer.apply(1L, eighthOfFile); + SegmentFileStateHandle stateHandle4 = writer.apply(1L, eighthOfFile); + SegmentFileStateHandle stateHandle5 = writer.apply(1L, eighthOfFile); + SegmentFileStateHandle stateHandle6 = writer.apply(1L, eighthOfFile); + + fmsm.notifyCheckpointComplete(subtaskKey1, 1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(6); + + // complete checkpoint-2 with 3 files written and 1 file reused from checkpoint 1 + assertThat(fmsm.couldReusePreviousStateHandle(stateHandle1)).isTrue(); + SegmentFileStateHandle stateHandle7 = writer.apply(2L, eighthOfFile); + SegmentFileStateHandle stateHandle8 = writer.apply(2L, eighthOfFile); + SegmentFileStateHandle stateHandle9 = writer.apply(2L, eighthOfFile); + fmsm.reusePreviousStateHandle(2, Collections.singletonList(stateHandle1)); + fmsm.notifyCheckpointComplete(subtaskKey1, 2); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(9); + + // subsume checkpoint-1 + fmsm.notifyCheckpointSubsumed(subtaskKey1, 1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4); + + // complete checkpoint-3 with 1 files reuse from checkpoint 1 and 2. + assertThat(fmsm.couldReusePreviousStateHandle(stateHandle1)).isFalse(); + assertThat(fmsm.couldReusePreviousStateHandle(stateHandle7)).isTrue(); + SegmentFileStateHandle stateHandle10 = writer.apply(3L, eighthOfFile); + SegmentFileStateHandle stateHandle11 = writer.apply(3L, eighthOfFile); + fmsm.reusePreviousStateHandle(3, Collections.singletonList(stateHandle7)); + + fmsm.notifyCheckpointComplete(subtaskKey1, 3); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(6); + + // subsume checkpoint-2 + fmsm.notifyCheckpointSubsumed(subtaskKey1, 2); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3); + } + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java index 968882a6be89e2..48d8bb35aa2c13 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java @@ -416,7 +416,8 @@ private void createUploadFilePaths( if (fileName.endsWith(SST_FILE_SUFFIX)) { Optional uploaded = previousSnapshot.getUploaded(fileName); - if (uploaded.isPresent()) { + if (uploaded.isPresent() + && checkpointStreamFactory.couldReuseStateHandle(uploaded.get())) { sstFiles.add(HandleAndLocalPath.of(uploaded.get(), fileName)); } else { sstFilePaths.add(filePath); // re-upload