Skip to content

Commit

Permalink
[FLINK-32076][checkpoint] Introduce blocking file pool to reuse files (
Browse files Browse the repository at this point in the history
  • Loading branch information
masteryhx committed Mar 14, 2024
1 parent 583722e commit 3b9623e
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.runtime.state.CheckpointedStateScope;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A Blocking {@link PhysicalFilePool} which may block when polling physical files. This class try
* best to reuse a physical file until its size > maxFileSize.
*/
public class BlockingPhysicalFilePool extends PhysicalFilePool {

private final AtomicBoolean exclusivePhysicalFilePoolInitialized;

public BlockingPhysicalFilePool(
long maxFileSize, PhysicalFile.PhysicalFileCreator physicalFileCreator) {
super(maxFileSize, physicalFileCreator);
this.exclusivePhysicalFilePoolInitialized = new AtomicBoolean(false);
}

@Override
public boolean tryPutFile(
FileMergingSnapshotManager.SubtaskKey subtaskKey, PhysicalFile physicalFile)
throws IOException {
if (physicalFile.getSize() < maxFileSize) {
return getFileQueue(subtaskKey, physicalFile.getScope()).offer(physicalFile);
} else {
getFileQueue(subtaskKey, physicalFile.getScope())
.offer(physicalFileCreator.perform(subtaskKey, physicalFile.getScope()));
return false;
}
}

@Override
@Nonnull
public PhysicalFile pollFile(
FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope)
throws IOException {
initialize(subtaskKey, scope);
try {
return ((BlockingQueue<PhysicalFile>) getFileQueue(subtaskKey, scope)).take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private void initialize(
FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope)
throws IOException {
if (scope.equals(CheckpointedStateScope.SHARED)) {
AtomicBoolean init = new AtomicBoolean(false);
Queue<PhysicalFile> fileQueue =
sharedPhysicalFilePoolBySubtask.computeIfAbsent(
subtaskKey,
key -> {
init.set(true);
return createFileQueue();
});
if (init.get()) {
fileQueue.offer(physicalFileCreator.perform(subtaskKey, scope));
}
} else if (scope.equals(CheckpointedStateScope.EXCLUSIVE)
&& exclusivePhysicalFilePoolInitialized.compareAndSet(false, true)) {
getFileQueue(subtaskKey, scope).offer(physicalFileCreator.perform(subtaskKey, scope));
}
}

@Override
protected Queue<PhysicalFile> createFileQueue() {
return new LinkedBlockingQueue<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ protected final PhysicalFilePool createPhysicalPool() {
case NON_BLOCKING:
return new NonBlockingPhysicalFilePool(
maxPhysicalFileSize, this::createPhysicalFile);
case BLOCKING:
return new BlockingPhysicalFilePool(maxPhysicalFileSize, this::createPhysicalFile);
default:
throw new UnsupportedOperationException(
"Unsupported type of physical file pool: " + filePoolType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,60 @@ public void testCheckpointNotification() throws Exception {
}
}

@Test
public void testConcurrentFileReusingWithBlockingPool() throws Exception {
try (FileMergingSnapshotManagerBase fmsm =
(FileMergingSnapshotManagerBase)
createFileMergingSnapshotManager(
checkpointBaseDir, 32, PhysicalFilePool.Type.BLOCKING)) {
fmsm.registerSubtaskForSharedStates(subtaskKey1);

// test reusing a physical file
PhysicalFile file1 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.SHARED);
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1);
PhysicalFile file2 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.SHARED);
assertThat(file2).isEqualTo(file1);

// a physical file whose size is bigger than maxPhysicalFileSize cannot be reused
file2.incSize(fmsm.maxPhysicalFileSize);
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file2);
PhysicalFile file3 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.SHARED);
assertThat(file3).isNotEqualTo(file2);

// test for exclusive scope
PhysicalFile file4 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file4);
PhysicalFile file5 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
assertThat(file5).isEqualTo(file4);

file5.incSize(fmsm.maxPhysicalFileSize);
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file5);
PhysicalFile file6 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
assertThat(file6).isNotEqualTo(file5);
}
}

private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir)
throws IOException {
return createFileMergingSnapshotManager(
checkpointBaseDir, 32 * 1024 * 1024, PhysicalFilePool.Type.NON_BLOCKING);
}

private FileMergingSnapshotManager createFileMergingSnapshotManager(
Path checkpointBaseDir, long maxFileSize, PhysicalFilePool.Type filePoolType)
throws IOException {
FileSystem fs = LocalFileSystem.getSharedInstance();
Path sharedStateDir =
new Path(
Expand All @@ -426,7 +478,11 @@ private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpo
fs.mkdirs(sharedStateDir);
fs.mkdirs(taskOwnedStateDir);
}
FileMergingSnapshotManager fmsm = new FileMergingSnapshotManagerBuilder(tmId).build();
FileMergingSnapshotManager fmsm =
new FileMergingSnapshotManagerBuilder(tmId)
.setMaxFileSize(maxFileSize)
.setFilePoolType(filePoolType)
.build();
fmsm.initFileSystem(
LocalFileSystem.getSharedInstance(),
checkpointBaseDir,
Expand Down

0 comments on commit 3b9623e

Please sign in to comment.