diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java new file mode 100644 index 00000000000000..8fef72e28ac3ad --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.runtime.state.CheckpointedStateScope; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.concurrent.Executor; + +/** A {@link FileMergingSnapshotManager} that merging files across checkpoints. */ +public class AcrossCheckpointFileMergingSnapshotManager extends FileMergingSnapshotManagerBase { + + private final PhysicalFilePool filePool; + + public AcrossCheckpointFileMergingSnapshotManager( + String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) { + super(id, maxFileSize, filePoolType, ioExecutor); + filePool = createPhysicalPool(); + } + + @Override + @Nonnull + protected PhysicalFile getOrCreatePhysicalFileForCheckpoint( + SubtaskKey subtaskKey, long checkpointID, CheckpointedStateScope scope) + throws IOException { + PhysicalFile result = filePool.pollFile(subtaskKey, scope); + + // a new file could be put into the file pool after closeAndGetHandle() + return result == null ? createPhysicalFile(subtaskKey, scope) : result; + } + + @Override + protected void discardCheckpoint(long checkpointId) {} + + @Override + protected void returnPhysicalFileForNextReuse( + SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile) + throws IOException { + + if (shouldSyncAfterClosingLogicalFile) { + FSDataOutputStream os = physicalFile.getOutputStream(); + if (os != null) { + os.sync(); + } + } + + if (!filePool.tryPutFile(subtaskKey, physicalFile)) { + physicalFile.close(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java index ca1d5b675018a9..1be9b0d2f4fcf7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java @@ -29,6 +29,8 @@ public class FileMergingSnapshotManagerBuilder { /** The id for identifying a {@link FileMergingSnapshotManager}. */ private final String id; + private FileMergingType fileMergingType = FileMergingType.NO_MERGE; + /** Max size for a file. TODO: Make it configurable. */ private long maxFileSize = 32 * 1024 * 1024; @@ -46,6 +48,12 @@ public FileMergingSnapshotManagerBuilder(String id) { this.id = id; } + /** Set the type. */ + public FileMergingSnapshotManagerBuilder setType(FileMergingType fileMergingType) { + this.fileMergingType = fileMergingType; + return this; + } + /** Set the max file size. */ public FileMergingSnapshotManagerBuilder setMaxFileSize(long maxFileSize) { Preconditions.checkArgument(maxFileSize > 0); @@ -71,13 +79,27 @@ public FileMergingSnapshotManagerBuilder setIOExecutor(@Nullable Executor ioExec /** * Create file-merging snapshot manager based on configuration. * - *

TODO (FLINK-32074): Support another type of FileMergingSnapshotManager that merges files - * across different checkpoints. - * * @return the created manager. */ public FileMergingSnapshotManager build() { - return new WithinCheckpointFileMergingSnapshotManager( - id, maxFileSize, filePoolType, ioExecutor == null ? Runnable::run : ioExecutor); + switch (fileMergingType) { + case MERGE_WITHIN_CHECKPOINT: + return new WithinCheckpointFileMergingSnapshotManager( + id, + maxFileSize, + filePoolType, + ioExecutor == null ? Runnable::run : ioExecutor); + case MERGE_ACROSS_CHECKPOINT: + return new AcrossCheckpointFileMergingSnapshotManager( + id, + maxFileSize, + filePoolType, + ioExecutor == null ? Runnable::run : ioExecutor); + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported type %s when creating file merging manager", + fileMergingType)); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingType.java new file mode 100644 index 00000000000000..e2955fd4571262 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +/** How the checkpoint files can be segmented. */ +public enum FileMergingType { + // do not write logical checkpoint files into file segments + // and thus do not merge checkpoint files + NO_MERGE, + // merge checkpoint files within checkpoint boundaries + MERGE_WITHIN_CHECKPOINT, + // merge checkpoint files across checkpoint boundaries + MERGE_ACROSS_CHECKPOINT +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java index 42d8a4a5469f60..a916f25be72c37 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType; import org.apache.flink.util.ShutdownHookUtil; import org.slf4j.Logger; @@ -78,7 +79,9 @@ public TaskExecutorFileMergingManager() { if (fileMergingSnapshotManager == null) { // TODO FLINK-32440: choose different FileMergingSnapshotManager by configuration fileMergingSnapshotManager = - new FileMergingSnapshotManagerBuilder(jobId.toString()).build(); + new FileMergingSnapshotManagerBuilder(jobId.toString()) + .setType(FileMergingType.MERGE_WITHIN_CHECKPOINT) + .build(); fileMergingSnapshotManagerByJobId.put(jobId, fileMergingSnapshotManager); LOG.info("Registered new file merging snapshot manager for job {}.", jobId); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java new file mode 100644 index 00000000000000..475c13b5445a5e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link AcrossCheckpointFileMergingSnapshotManager}. */ +public class AcrossCheckpointFileMergingSnapshotManagerTest + extends FileMergingSnapshotManagerTestBase { + @Override + FileMergingType getFileMergingType() { + return FileMergingType.MERGE_ACROSS_CHECKPOINT; + } + + @Test + void testCreateAndReuseFiles() throws IOException { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + fmsm.registerSubtaskForSharedStates(subtaskKey2); + // firstly, we try shared state. + PhysicalFile file1 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file1.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + // allocate another + PhysicalFile file2 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file2.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file2).isNotEqualTo(file1); + + // return for reuse + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1); + + // allocate for another subtask + PhysicalFile file3 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey2, 0, CheckpointedStateScope.SHARED); + assertThat(file3.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.SHARED)); + assertThat(file3).isNotEqualTo(file1); + + // allocate for another checkpoint + PhysicalFile file4 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 1, CheckpointedStateScope.SHARED); + assertThat(file4.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file4).isEqualTo(file1); + + // a physical file whose size is bigger than maxPhysicalFileSize cannot be reused + file4.incSize(fmsm.maxPhysicalFileSize); + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 1, file4); + PhysicalFile file5 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 1, CheckpointedStateScope.SHARED); + assertThat(file5.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file5).isNotEqualTo(file4); + + // Secondly, we try private state + PhysicalFile file6 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE); + assertThat(file6.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + + // allocate another + PhysicalFile file7 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE); + assertThat(file7.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + assertThat(file7).isNotEqualTo(file5); + + // return for reuse + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file6); + + // allocate for another checkpoint + PhysicalFile file8 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 2, CheckpointedStateScope.EXCLUSIVE); + assertThat(file8.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + assertThat(file8).isEqualTo(file6); + + // return for reuse + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file8); + + // allocate for this checkpoint but another subtask + PhysicalFile file9 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey2, 2, CheckpointedStateScope.EXCLUSIVE); + assertThat(file9.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)); + assertThat(file9).isEqualTo(file6); + + // a physical file whose size is bigger than maxPhysicalFileSize cannot be reused + file9.incSize(fmsm.maxPhysicalFileSize); + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 2, file9); + PhysicalFile file10 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 2, CheckpointedStateScope.SHARED); + assertThat(file10.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file10).isNotEqualTo(file9); + + assertThat(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + } + } + + @Test + public void testCheckpointNotification() throws Exception { + try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir); + CloseableRegistry closeableRegistry = new CloseableRegistry()) { + FileMergingCheckpointStateOutputStream cp1Stream = + writeCheckpointAndGetStream(1, fmsm, closeableRegistry); + SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle(); + fmsm.notifyCheckpointComplete(subtaskKey1, 1); + assertFileInManagedDir(fmsm, cp1StateHandle); + + // complete checkpoint-2 + FileMergingCheckpointStateOutputStream cp2Stream = + writeCheckpointAndGetStream(2, fmsm, closeableRegistry); + SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle(); + fmsm.notifyCheckpointComplete(subtaskKey1, 2); + assertFileInManagedDir(fmsm, cp2StateHandle); + + // subsume checkpoint-1 + assertThat(fileExists(cp1StateHandle)).isTrue(); + fmsm.notifyCheckpointSubsumed(subtaskKey1, 1); + assertThat(fileExists(cp1StateHandle)).isTrue(); + + // abort checkpoint-3 + FileMergingCheckpointStateOutputStream cp3Stream = + writeCheckpointAndGetStream(3, fmsm, closeableRegistry); + SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle(); + assertFileInManagedDir(fmsm, cp3StateHandle); + fmsm.notifyCheckpointAborted(subtaskKey1, 3); + assertThat(fileExists(cp3StateHandle)).isTrue(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java similarity index 68% rename from flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java index be47d27290009f..262dbd36eee5c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java @@ -43,18 +43,20 @@ import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link FileMergingSnapshotManager}. */ -public class FileMergingSnapshotManagerTest { +public abstract class FileMergingSnapshotManagerTestBase { - private final String tmId = "Testing"; + final String tmId = "Testing"; - private final OperatorID operatorID = new OperatorID(289347923L, 75893479L); + final OperatorID operatorID = new OperatorID(289347923L, 75893479L); - private SubtaskKey subtaskKey1; - private SubtaskKey subtaskKey2; + SubtaskKey subtaskKey1; + SubtaskKey subtaskKey2; - private Path checkpointBaseDir; + Path checkpointBaseDir; - private int writeBufferSize; + int writeBufferSize; + + abstract FileMergingType getFileMergingType(); @BeforeEach public void setup(@TempDir java.nio.file.Path tempFolder) { @@ -90,113 +92,6 @@ void testCreateFileMergingSnapshotManager() throws IOException { } } - @Test - void testCreateAndReuseFiles() throws IOException { - try (FileMergingSnapshotManagerBase fmsm = - (FileMergingSnapshotManagerBase) - createFileMergingSnapshotManager(checkpointBaseDir)) { - fmsm.registerSubtaskForSharedStates(subtaskKey1); - fmsm.registerSubtaskForSharedStates(subtaskKey2); - // firstly, we try shared state. - PhysicalFile file1 = - fmsm.getOrCreatePhysicalFileForCheckpoint( - subtaskKey1, 0, CheckpointedStateScope.SHARED); - assertThat(file1.getFilePath().getParent()) - .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); - // allocate another - PhysicalFile file2 = - fmsm.getOrCreatePhysicalFileForCheckpoint( - subtaskKey1, 0, CheckpointedStateScope.SHARED); - assertThat(file2.getFilePath().getParent()) - .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); - assertThat(file2).isNotEqualTo(file1); - - // return for reuse - fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1); - - // allocate for another subtask - PhysicalFile file3 = - fmsm.getOrCreatePhysicalFileForCheckpoint( - subtaskKey2, 0, CheckpointedStateScope.SHARED); - assertThat(file3.getFilePath().getParent()) - .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.SHARED)); - assertThat(file3).isNotEqualTo(file1); - - // allocate for another checkpoint - PhysicalFile file4 = - fmsm.getOrCreatePhysicalFileForCheckpoint( - subtaskKey1, 1, CheckpointedStateScope.SHARED); - assertThat(file4.getFilePath().getParent()) - .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); - assertThat(file4).isNotEqualTo(file1); - - // allocate for this checkpoint - PhysicalFile file5 = - fmsm.getOrCreatePhysicalFileForCheckpoint( - subtaskKey1, 0, CheckpointedStateScope.SHARED); - assertThat(file5.getFilePath().getParent()) - .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); - assertThat(file5).isEqualTo(file1); - - // a physical file whose size is bigger than maxPhysicalFileSize cannot be reused - file5.incSize(fmsm.maxPhysicalFileSize); - fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file5); - PhysicalFile file6 = - fmsm.getOrCreatePhysicalFileForCheckpoint( - subtaskKey1, 0, CheckpointedStateScope.SHARED); - assertThat(file6.getFilePath().getParent()) - .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); - assertThat(file6).isNotEqualTo(file5); - - // Secondly, we try private state - PhysicalFile file7 = - fmsm.getOrCreatePhysicalFileForCheckpoint( - subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE); - assertThat(file7.getFilePath().getParent()) - .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); - - // allocate another - PhysicalFile file8 = - fmsm.getOrCreatePhysicalFileForCheckpoint( - subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE); - assertThat(file8.getFilePath().getParent()) - .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); - assertThat(file8).isNotEqualTo(file6); - - // return for reuse - fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file7); - - // allocate for another checkpoint - PhysicalFile file9 = - fmsm.getOrCreatePhysicalFileForCheckpoint( - subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE); - assertThat(file9.getFilePath().getParent()) - .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); - assertThat(file9).isNotEqualTo(file7); - - // allocate for this checkpoint but another subtask - PhysicalFile file10 = - fmsm.getOrCreatePhysicalFileForCheckpoint( - subtaskKey2, 0, CheckpointedStateScope.EXCLUSIVE); - assertThat(file10.getFilePath().getParent()) - .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)); - assertThat(file10).isEqualTo(file7); - - // a physical file whose size is bigger than maxPhysicalFileSize cannot be reused - file10.incSize(fmsm.maxPhysicalFileSize); - fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file10); - PhysicalFile file11 = - fmsm.getOrCreatePhysicalFileForCheckpoint( - subtaskKey1, 0, CheckpointedStateScope.SHARED); - assertThat(file11.getFilePath().getParent()) - .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); - assertThat(file11).isNotEqualTo(file10); - - assertThat(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)) - .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); - } - } - @Test void testRefCountBetweenLogicalAndPhysicalFiles() throws IOException { try (FileMergingSnapshotManagerBase fmsm = @@ -378,38 +273,6 @@ public void testConcurrentWriting() throws Exception { } } - @Test - public void testCheckpointNotification() throws Exception { - try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir); - CloseableRegistry closeableRegistry = new CloseableRegistry()) { - FileMergingCheckpointStateOutputStream cp1Stream = - writeCheckpointAndGetStream(1, fmsm, closeableRegistry); - SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle(); - fmsm.notifyCheckpointComplete(subtaskKey1, 1); - assertFileInManagedDir(fmsm, cp1StateHandle); - - // complete checkpoint-2 - FileMergingCheckpointStateOutputStream cp2Stream = - writeCheckpointAndGetStream(2, fmsm, closeableRegistry); - SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle(); - fmsm.notifyCheckpointComplete(subtaskKey1, 2); - assertFileInManagedDir(fmsm, cp2StateHandle); - - // subsume checkpoint-1 - assertThat(fileExists(cp1StateHandle)).isTrue(); - fmsm.notifyCheckpointSubsumed(subtaskKey1, 1); - assertThat(fileExists(cp1StateHandle)).isFalse(); - - // abort checkpoint-3 - FileMergingCheckpointStateOutputStream cp3Stream = - writeCheckpointAndGetStream(3, fmsm, closeableRegistry); - SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle(); - assertFileInManagedDir(fmsm, cp3StateHandle); - fmsm.notifyCheckpointAborted(subtaskKey1, 3); - assertThat(fileExists(cp3StateHandle)).isFalse(); - } - } - @Test public void testConcurrentFileReusingWithBlockingPool() throws Exception { try (FileMergingSnapshotManagerBase fmsm = @@ -455,13 +318,13 @@ public void testConcurrentFileReusingWithBlockingPool() throws Exception { } } - private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir) + FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir) throws IOException { return createFileMergingSnapshotManager( checkpointBaseDir, 32 * 1024 * 1024, PhysicalFilePool.Type.NON_BLOCKING); } - private FileMergingSnapshotManager createFileMergingSnapshotManager( + FileMergingSnapshotManager createFileMergingSnapshotManager( Path checkpointBaseDir, long maxFileSize, PhysicalFilePool.Type filePoolType) throws IOException { FileSystem fs = LocalFileSystem.getSharedInstance(); @@ -480,6 +343,7 @@ private FileMergingSnapshotManager createFileMergingSnapshotManager( } FileMergingSnapshotManager fmsm = new FileMergingSnapshotManagerBuilder(tmId) + .setType(getFileMergingType()) .setMaxFileSize(maxFileSize) .setFilePoolType(filePoolType) .build(); @@ -493,13 +357,13 @@ private FileMergingSnapshotManager createFileMergingSnapshotManager( return fmsm; } - private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream( + FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream( long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry) throws IOException { return writeCheckpointAndGetStream(checkpointId, fmsm, closeableRegistry, 32); } - private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream( + FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream( long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry, @@ -515,7 +379,7 @@ private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream( return stream; } - private void assertFileInManagedDir( + void assertFileInManagedDir( FileMergingSnapshotManager fmsm, SegmentFileStateHandle stateHandle) { assertThat(fmsm instanceof FileMergingSnapshotManagerBase).isTrue(); assertThat(stateHandle).isNotNull(); @@ -524,7 +388,7 @@ private void assertFileInManagedDir( assertThat(((FileMergingSnapshotManagerBase) fmsm).isResponsibleForFile(filePath)).isTrue(); } - private boolean fileExists(SegmentFileStateHandle stateHandle) throws IOException { + boolean fileExists(SegmentFileStateHandle stateHandle) throws IOException { assertThat(stateHandle).isNotNull(); Path filePath = stateHandle.getFilePath(); assertThat(filePath).isNotNull(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java new file mode 100644 index 00000000000000..4128e5e7cd3af3 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link WithinCheckpointFileMergingSnapshotManager}. */ +public class WithinCheckpointFileMergingSnapshotManagerTest + extends FileMergingSnapshotManagerTestBase { + @Override + FileMergingType getFileMergingType() { + return FileMergingType.MERGE_WITHIN_CHECKPOINT; + } + + @Test + void testCreateAndReuseFiles() throws IOException { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + fmsm.registerSubtaskForSharedStates(subtaskKey2); + // firstly, we try shared state. + PhysicalFile file1 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file1.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + // allocate another + PhysicalFile file2 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file2.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file2).isNotEqualTo(file1); + + // return for reuse + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1); + + // allocate for another subtask + PhysicalFile file3 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey2, 0, CheckpointedStateScope.SHARED); + assertThat(file3.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.SHARED)); + assertThat(file3).isNotEqualTo(file1); + + // allocate for another checkpoint + PhysicalFile file4 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 1, CheckpointedStateScope.SHARED); + assertThat(file4.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file4).isNotEqualTo(file1); + + // allocate for this checkpoint + PhysicalFile file5 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file5.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file5).isEqualTo(file1); + + // a physical file whose size is bigger than maxPhysicalFileSize cannot be reused + file5.incSize(fmsm.maxPhysicalFileSize); + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file5); + PhysicalFile file6 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file6.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file6).isNotEqualTo(file5); + + // Secondly, we try private state + PhysicalFile file7 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE); + assertThat(file7.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + + // allocate another + PhysicalFile file8 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE); + assertThat(file8.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + assertThat(file8).isNotEqualTo(file6); + + // return for reuse + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file7); + + // allocate for another checkpoint + PhysicalFile file9 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE); + assertThat(file9.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + assertThat(file9).isNotEqualTo(file7); + + // allocate for this checkpoint but another subtask + PhysicalFile file10 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey2, 0, CheckpointedStateScope.EXCLUSIVE); + assertThat(file10.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)); + assertThat(file10).isEqualTo(file7); + + // a physical file whose size is bigger than maxPhysicalFileSize cannot be reused + file10.incSize(fmsm.maxPhysicalFileSize); + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file10); + PhysicalFile file11 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file11.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file11).isNotEqualTo(file10); + + assertThat(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + } + } + + @Test + public void testCheckpointNotification() throws Exception { + try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir); + CloseableRegistry closeableRegistry = new CloseableRegistry()) { + FileMergingCheckpointStateOutputStream cp1Stream = + writeCheckpointAndGetStream(1, fmsm, closeableRegistry); + SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle(); + fmsm.notifyCheckpointComplete(subtaskKey1, 1); + assertFileInManagedDir(fmsm, cp1StateHandle); + + // complete checkpoint-2 + FileMergingCheckpointStateOutputStream cp2Stream = + writeCheckpointAndGetStream(2, fmsm, closeableRegistry); + SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle(); + fmsm.notifyCheckpointComplete(subtaskKey1, 2); + assertFileInManagedDir(fmsm, cp2StateHandle); + + // subsume checkpoint-1 + assertThat(fileExists(cp1StateHandle)).isTrue(); + fmsm.notifyCheckpointSubsumed(subtaskKey1, 1); + assertThat(fileExists(cp1StateHandle)).isFalse(); + + // abort checkpoint-3 + FileMergingCheckpointStateOutputStream cp3Stream = + writeCheckpointAndGetStream(3, fmsm, closeableRegistry); + SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle(); + assertFileInManagedDir(fmsm, cp3StateHandle); + fmsm.notifyCheckpointAborted(subtaskKey1, 3); + assertThat(fileExists(cp3StateHandle)).isFalse(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java index 37fbaa98f0960b..0d9329b070a29a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java @@ -24,6 +24,7 @@ import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType; import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointedStateScope; @@ -220,7 +221,9 @@ private FileMergingSnapshotManager createFileMergingSnapshotManager() { private FileMergingSnapshotManager createFileMergingSnapshotManager(long maxFileSize) { FileMergingSnapshotManager mgr = - new FileMergingSnapshotManagerBuilder(SNAPSHOT_MGR_ID).build(); + new FileMergingSnapshotManagerBuilder(SNAPSHOT_MGR_ID) + .setType(FileMergingType.MERGE_WITHIN_CHECKPOINT) + .build(); mgr.initFileSystem( getSharedInstance(),