Skip to content

Commit

Permalink
[FLINK-32088][checkpoint] Space control in file merging
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed May 28, 2024
1 parent 6c41771 commit c233780
Show file tree
Hide file tree
Showing 17 changed files with 381 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,11 @@
<td>Boolean</td>
<td>Whether to use Blocking or Non-Blocking pool for merging physical files. A Non-Blocking pool will always provide usable physical file without blocking. It may create many physical files if poll file frequently. When poll a small file from a Blocking pool, it may be blocked until the file is returned.</td>
</tr>
<tr>
<td><h5>state.checkpoints.file-merging.max-space-amplification</h5></td>
<td style="word-wrap: break-word;">2.0</td>
<td>Float</td>
<td>Space amplification stands for the magnification of the occupied space compared to the amount of valid data. The more space amplification is, the more waste of space will be. This configs a space amplification above which a re-uploading for physical files will be triggered to reclaim space. Any value below 1f means disabling the space control.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
<td>MemorySize</td>
<td>Max size of a physical file for merged checkpoints.</td>
</tr>
<tr>
<td><h5>state.checkpoints.file-merging.max-space-amplification</h5></td>
<td style="word-wrap: break-word;">2.0</td>
<td>Float</td>
<td>Space amplification stands for the magnification of the occupied space compared to the amount of valid data. The more space amplification is, the more waste of space will be. This configs a space amplification above which a re-uploading for physical files will be triggered to reclaim space. Any value below 1f means disabling the space control.</td>
</tr>
<tr>
<td><h5>state.checkpoints.file-merging.pool-blocking</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,16 +428,16 @@ public class CheckpointingOptions {
* of valid data. The more space amplification is, the more waste of space will be. This configs
* a space amplification above which a re-uploading for physical files will be triggered to
* reclaim space.
*
* <p>TODO: remove '@Documentation.ExcludeFromDocumentation' after the feature is implemented.
*/
@Experimental @Documentation.ExcludeFromDocumentation
@Experimental
@Documentation.Section(value = Documentation.Sections.CHECKPOINT_FILE_MERGING, position = 6)
public static final ConfigOption<Float> FILE_MERGING_MAX_SPACE_AMPLIFICATION =
ConfigOptions.key("state.checkpoints.file-merging.max-space-amplification")
.floatType()
.defaultValue(2f)
.withDescription(
"Space amplification stands for the magnification of the occupied space compared to the amount of valid data. "
+ "The more space amplification is, the more waste of space will be. This configs a space amplification "
+ "above which a re-uploading for physical files will be triggered to reclaim space.");
+ "above which a re-uploading for physical files will be triggered to reclaim space. Any value below 1f "
+ "means disabling the space control.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ public class AcrossCheckpointFileMergingSnapshotManager extends FileMergingSnaps
private final PhysicalFilePool filePool;

public AcrossCheckpointFileMergingSnapshotManager(
String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) {
super(id, maxFileSize, filePoolType, ioExecutor);
String id,
long maxFileSize,
PhysicalFilePool.Type filePoolType,
float maxSpaceAmplification,
Executor ioExecutor) {
super(id, maxFileSize, filePoolType, maxSpaceAmplification, ioExecutor);
filePool = createPhysicalPool();
}

Expand All @@ -44,9 +48,6 @@ protected PhysicalFile getOrCreatePhysicalFileForCheckpoint(
return filePool.pollFile(subtaskKey, scope);
}

@Override
protected void discardCheckpoint(long checkpointId) {}

@Override
protected void returnPhysicalFileForNextReuse(
SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ DirectoryStreamStateHandle getManagedDirStateHandle(
*/
void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) throws Exception;

/**
* Check whether previous state handles could further be reused considering the space
* amplification.
*
* @param stateHandle the handle to be reused.
*/
boolean couldReusePreviousStateHandle(StreamStateHandle stateHandle);

/**
* A callback method which is called when previous state handles are reused by following
* checkpoint(s).
Expand Down Expand Up @@ -302,7 +310,6 @@ public SpaceStat(
}

public void onLogicalFileCreate(long size) {
physicalFileSize.addAndGet(size);
logicalFileSize.addAndGet(size);
logicalFileCount.incrementAndGet();
}
Expand All @@ -312,6 +319,10 @@ public void onLogicalFileDelete(long size) {
logicalFileCount.decrementAndGet();
}

public void onPhysicalFileUpdate(long size) {
physicalFileSize.addAndGet(size);
}

public void onPhysicalFileCreate() {
physicalFileCount.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import static org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile.PhysicalFileDeleter;
Expand Down Expand Up @@ -112,6 +114,8 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps
/** Type of physical file pool. */
protected PhysicalFilePool.Type filePoolType;

protected final float maxSpaceAmplification;

protected PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile;

private final Object notifyLock = new Object();
Expand Down Expand Up @@ -150,10 +154,15 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps
protected SpaceStat spaceStat;

public FileMergingSnapshotManagerBase(
String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) {
String id,
long maxFileSize,
PhysicalFilePool.Type filePoolType,
float maxSpaceAmplification,
Executor ioExecutor) {
this.id = id;
this.maxPhysicalFileSize = maxFileSize;
this.filePoolType = filePoolType;
this.maxSpaceAmplification = maxSpaceAmplification;
this.ioExecutor = ioExecutor;
this.spaceStat = new SpaceStat();
}
Expand Down Expand Up @@ -245,6 +254,7 @@ protected LogicalFile createLogicalFile(
knownLogicalFiles.put(fileID, file);
if (physicalFile.isOwned()) {
spaceStat.onLogicalFileCreate(length);
spaceStat.onPhysicalFileUpdate(length);
}
return file;
}
Expand Down Expand Up @@ -471,7 +481,9 @@ protected abstract void returnPhysicalFileForNextReuse(
* @param checkpointId the discarded checkpoint id.
* @throws IOException if anything goes wrong with file system.
*/
protected abstract void discardCheckpoint(long checkpointId) throws IOException;
protected void discardCheckpoint(long checkpointId) throws IOException {
controlSpace();
}

// ------------------------------------------------------------------------
// Checkpoint Listener
Expand Down Expand Up @@ -572,6 +584,67 @@ public void reusePreviousStateHandle(
}
}

// ------------------------------------------------------------------------
// Space Control
// ------------------------------------------------------------------------

/**
* The core method that control space if needed. This method will compare the desired space
* amplification with current one, and if it exceeds the configured amplification, this method
* will mark minimal set of {@link PhysicalFile}s not to be reused anymore.
*/
private void controlSpace() {
if (spaceStat.logicalFileSize.get() * maxSpaceAmplification
< spaceStat.physicalFileSize.get()) {
// may need control space
long goalPhysicalSize =
Math.round(spaceStat.logicalFileSize.get() * maxSpaceAmplification);
final AtomicLong aliveSize = new AtomicLong(0L);
// retrieve all the physical files and calculate current alive size
Set<PhysicalFile> knownPhysicalFiles = new HashSet<>();
knownLogicalFiles.values().stream()
.map(LogicalFile::getPhysicalFile)
.forEach(
file -> {
if (file.isCouldReuse()) {
if (knownPhysicalFiles.add(file)) {
aliveSize.addAndGet(file.getSize());
}
}
});
// the alive size still greater than the goal
if (aliveSize.get() > goalPhysicalSize) {
// sort in DESC order on wasted size
SortedSet<PhysicalFile> sortedPhysicalFile =
new TreeSet<>((a, b) -> Long.compare(b.wastedSize(), a.wastedSize()));
knownPhysicalFiles.stream()
.filter(PhysicalFile::closed)
.forEach(sortedPhysicalFile::add);
// mark the physical file un-alive, until it reaches our goal.
for (PhysicalFile file : sortedPhysicalFile) {
if (!file.checkReuseOnSpaceAmplification(maxSpaceAmplification)) {
if (aliveSize.addAndGet(-file.wastedSize()) <= goalPhysicalSize) {
break;
}
}
}
}
}
}

@Override
public boolean couldReusePreviousStateHandle(StreamStateHandle stateHandle) {
if (stateHandle instanceof SegmentFileStateHandle) {
LogicalFile file =
knownLogicalFiles.get(
((SegmentFileStateHandle) stateHandle).getLogicalFileId());
if (file != null) {
return file.getPhysicalFile().isCouldReuse();
}
}
return true;
}

public void discardSingleLogicalFile(LogicalFile logicalFile, long checkpointId)
throws IOException {
logicalFile.discardWithCheckpointId(checkpointId);
Expand Down Expand Up @@ -692,15 +765,23 @@ && isManagedByFileMergingManager(
path,
subtaskKey,
fileHandle.getScope());
PhysicalFile file =
new PhysicalFile(
null,
path,
physicalFileDeleter,
fileHandle.getScope(),
managedByFileMergingManager);
try {
file.updateSize(getFileSize(file));
} catch (IOException e) {
throw new RuntimeException(e);
}
if (managedByFileMergingManager) {
spaceStat.onPhysicalFileCreate();
spaceStat.onPhysicalFileUpdate(file.getSize());
}
return new PhysicalFile(
null,
path,
physicalFileDeleter,
fileHandle.getScope(),
managedByFileMergingManager);
return file;
});

LogicalFileId logicalFileId = fileHandle.getLogicalFileId();
Expand All @@ -722,6 +803,16 @@ && isManagedByFileMergingManager(
}
}

private long getFileSize(PhysicalFile file) throws IOException {
FileStatus fileStatus =
file.getFilePath().getFileSystem().getFileStatus(file.getFilePath());
if (fileStatus == null || fileStatus.isDir()) {
throw new FileNotFoundException("File " + file.getFilePath() + " does not exist.");
} else {
return fileStatus.getLen();
}
}

/**
* Distinguish whether the given filePath is managed by the FileMergingSnapshotManager. If the
* filePath is located under managedDir (managedSharedStateDir or managedExclusiveStateDir) as a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class FileMergingSnapshotManagerBuilder {
/** Type of physical file pool. */
private PhysicalFilePool.Type filePoolType = PhysicalFilePool.Type.NON_BLOCKING;

/** The max space amplification that the manager should control. */
private float maxSpaceAmplification = Float.MAX_VALUE;

@Nullable private Executor ioExecutor = null;

/**
Expand All @@ -63,6 +66,17 @@ public FileMergingSnapshotManagerBuilder setFilePoolType(PhysicalFilePool.Type f
return this;
}

public FileMergingSnapshotManagerBuilder setMaxSpaceAmplification(float amplification) {
if (amplification < 1) {
// only valid number counts. If not valid, disable space control by setting this to
// Float.MAX_VALUE.
this.maxSpaceAmplification = Float.MAX_VALUE;
} else {
this.maxSpaceAmplification = amplification;
}
return this;
}

/**
* Set the executor for io operation in manager. If null(default), all io operation will be
* executed synchronously.
Expand All @@ -84,12 +98,14 @@ public FileMergingSnapshotManager build() {
id,
maxFileSize,
filePoolType,
maxSpaceAmplification,
ioExecutor == null ? Runnable::run : ioExecutor);
case MERGE_ACROSS_CHECKPOINT:
return new AcrossCheckpointFileMergingSnapshotManager(
id,
maxFileSize,
filePoolType,
maxSpaceAmplification,
ioExecutor == null ? Runnable::run : ioExecutor);
default:
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void advanceLastCheckpointId(long checkpointId) {
public void discardWithCheckpointId(long checkpointId) throws IOException {
if (!discarded && checkpointId >= lastUsedCheckpointID) {
physicalFile.decRefCount();
physicalFile.decSize(length);
discarded = true;
}
}
Expand Down
Loading

0 comments on commit c233780

Please sign in to comment.