From 3b9623e5d2ebc5d65b25884d13c0702e5587fd3c Mon Sep 17 00:00:00 2001 From: Hangxiang Yu Date: Tue, 12 Mar 2024 18:19:25 +0800 Subject: [PATCH] [FLINK-32076][checkpoint] Introduce blocking file pool to reuse files (#24418) --- .../filemerging/BlockingPhysicalFilePool.java | 96 +++++++++++++++++++ .../FileMergingSnapshotManagerBase.java | 2 + .../FileMergingSnapshotManagerTest.java | 58 ++++++++++- 3 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/BlockingPhysicalFilePool.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/BlockingPhysicalFilePool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/BlockingPhysicalFilePool.java new file mode 100644 index 0000000000000..11f7f41404f7b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/BlockingPhysicalFilePool.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.runtime.state.CheckpointedStateScope; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A Blocking {@link PhysicalFilePool} which may block when polling physical files. This class try + * best to reuse a physical file until its size > maxFileSize. + */ +public class BlockingPhysicalFilePool extends PhysicalFilePool { + + private final AtomicBoolean exclusivePhysicalFilePoolInitialized; + + public BlockingPhysicalFilePool( + long maxFileSize, PhysicalFile.PhysicalFileCreator physicalFileCreator) { + super(maxFileSize, physicalFileCreator); + this.exclusivePhysicalFilePoolInitialized = new AtomicBoolean(false); + } + + @Override + public boolean tryPutFile( + FileMergingSnapshotManager.SubtaskKey subtaskKey, PhysicalFile physicalFile) + throws IOException { + if (physicalFile.getSize() < maxFileSize) { + return getFileQueue(subtaskKey, physicalFile.getScope()).offer(physicalFile); + } else { + getFileQueue(subtaskKey, physicalFile.getScope()) + .offer(physicalFileCreator.perform(subtaskKey, physicalFile.getScope())); + return false; + } + } + + @Override + @Nonnull + public PhysicalFile pollFile( + FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope) + throws IOException { + initialize(subtaskKey, scope); + try { + return ((BlockingQueue) getFileQueue(subtaskKey, scope)).take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void initialize( + FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope) + throws IOException { + if (scope.equals(CheckpointedStateScope.SHARED)) { + AtomicBoolean init = new AtomicBoolean(false); + Queue fileQueue = + sharedPhysicalFilePoolBySubtask.computeIfAbsent( + subtaskKey, + key -> { + init.set(true); + return createFileQueue(); + }); + if (init.get()) { + fileQueue.offer(physicalFileCreator.perform(subtaskKey, scope)); + } + } else if (scope.equals(CheckpointedStateScope.EXCLUSIVE) + && exclusivePhysicalFilePoolInitialized.compareAndSet(false, true)) { + getFileQueue(subtaskKey, scope).offer(physicalFileCreator.perform(subtaskKey, scope)); + } + } + + @Override + protected Queue createFileQueue() { + return new LinkedBlockingQueue<>(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java index 532be66f84352..3dc9c78856188 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java @@ -351,6 +351,8 @@ protected final PhysicalFilePool createPhysicalPool() { case NON_BLOCKING: return new NonBlockingPhysicalFilePool( maxPhysicalFileSize, this::createPhysicalFile); + case BLOCKING: + return new BlockingPhysicalFilePool(maxPhysicalFileSize, this::createPhysicalFile); default: throw new UnsupportedOperationException( "Unsupported type of physical file pool: " + filePoolType); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java index 9ac101c7eb0f1..be47d27290009 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java @@ -410,8 +410,60 @@ public void testCheckpointNotification() throws Exception { } } + @Test + public void testConcurrentFileReusingWithBlockingPool() throws Exception { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager( + checkpointBaseDir, 32, PhysicalFilePool.Type.BLOCKING)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + + // test reusing a physical file + PhysicalFile file1 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1); + PhysicalFile file2 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file2).isEqualTo(file1); + + // a physical file whose size is bigger than maxPhysicalFileSize cannot be reused + file2.incSize(fmsm.maxPhysicalFileSize); + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file2); + PhysicalFile file3 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file3).isNotEqualTo(file2); + + // test for exclusive scope + PhysicalFile file4 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE); + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file4); + PhysicalFile file5 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE); + assertThat(file5).isEqualTo(file4); + + file5.incSize(fmsm.maxPhysicalFileSize); + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file5); + PhysicalFile file6 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE); + assertThat(file6).isNotEqualTo(file5); + } + } + private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir) throws IOException { + return createFileMergingSnapshotManager( + checkpointBaseDir, 32 * 1024 * 1024, PhysicalFilePool.Type.NON_BLOCKING); + } + + private FileMergingSnapshotManager createFileMergingSnapshotManager( + Path checkpointBaseDir, long maxFileSize, PhysicalFilePool.Type filePoolType) + throws IOException { FileSystem fs = LocalFileSystem.getSharedInstance(); Path sharedStateDir = new Path( @@ -426,7 +478,11 @@ private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpo fs.mkdirs(sharedStateDir); fs.mkdirs(taskOwnedStateDir); } - FileMergingSnapshotManager fmsm = new FileMergingSnapshotManagerBuilder(tmId).build(); + FileMergingSnapshotManager fmsm = + new FileMergingSnapshotManagerBuilder(tmId) + .setMaxFileSize(maxFileSize) + .setFilePoolType(filePoolType) + .build(); fmsm.initFileSystem( LocalFileSystem.getSharedInstance(), checkpointBaseDir,