Skip to content

Commit

Permalink
[FLINK-35570] Consider PlaceholderStreamStateHandle in checkpoint fil…
Browse files Browse the repository at this point in the history
…e merging
  • Loading branch information
Zakelly committed Jun 12, 2024
1 parent 526f9b0 commit 9f55941
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
Expand Down Expand Up @@ -361,4 +362,10 @@ public String toString() {
+ '}';
}
}

static boolean isFileMergingHandle(StreamStateHandle handle) {
return (handle instanceof SegmentFileStateHandle)
|| (handle instanceof PlaceholderStreamStateHandle
&& ((PlaceholderStreamStateHandle) handle).isFileMerged());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,8 @@ public void reusePreviousStateHandle(
if (file != null) {
file.advanceLastCheckpointId(checkpointId);
}
} else if (stateHandle instanceof PlaceholderStreamStateHandle) {
} else if (stateHandle instanceof PlaceholderStreamStateHandle
&& ((PlaceholderStreamStateHandle) stateHandle).isFileMerged()) {
// Since the rocksdb state backend will leverage the PlaceholderStreamStateHandle,
// the manager should recognize this.
LogicalFile file =
Expand Down Expand Up @@ -643,6 +644,16 @@ public boolean couldReusePreviousStateHandle(StreamStateHandle stateHandle) {
if (file != null) {
return file.getPhysicalFile().isCouldReuse();
}
} else if (stateHandle instanceof PlaceholderStreamStateHandle
&& ((PlaceholderStreamStateHandle) stateHandle).isFileMerged()) {
// Since the rocksdb state backend will leverage the PlaceholderStreamStateHandle,
// the manager should recognize this.
LogicalFile file =
knownLogicalFiles.get(
new LogicalFileId(stateHandle.getStreamStateHandleID().getKeyString()));
if (file != null) {
return file.getPhysicalFile().isCouldReuse();
}
}
// If a stateHandle is not of the type SegmentFileStateHandle or if its corresponding file
// is not recognized by the fileMergingManager, it needs to be re-uploaded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.runtime.state;

import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -91,13 +91,8 @@ default void reusePreviousStateHandle(Collection<? extends StreamStateHandle> pr
* @return true if it can be reused.
*/
default boolean couldReuseStateHandle(StreamStateHandle stateHandle) {

// By default, the CheckpointStreamFactory doesn't support snapshot-file-merging, so the
// SegmentFileStateHandle type of stateHandle can not be reused.
if (stateHandle instanceof SegmentFileStateHandle) {
return false;
}

return true;
return !FileMergingSnapshotManager.isFileMergingHandle(stateHandle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ public class PlaceholderStreamStateHandle implements StreamStateHandle {

private final PhysicalStateHandleID physicalID;
private final long stateSize;
private final boolean fileMerged;

public PlaceholderStreamStateHandle(PhysicalStateHandleID physicalID, long stateSize) {
public PlaceholderStreamStateHandle(
PhysicalStateHandleID physicalID, long stateSize, boolean fileMerged) {
this.physicalID = physicalID;
this.stateSize = stateSize;
this.fileMerged = fileMerged;
}

@Override
Expand Down Expand Up @@ -67,4 +70,8 @@ public void discardState() throws Exception {
public long getStateSize() {
return stateSize;
}

public boolean isFileMerged() {
return fileMerged;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void testSharedStateRegistration() throws Exception {
buildIncrementalHandle(
localPath,
new PlaceholderStreamStateHandle(
handle.getStreamStateHandleID(), handle.getStateSize()),
handle.getStreamStateHandleID(), handle.getStateSize(), false),
backendId);
newHandle.registerSharedStates(sharedStateRegistry, 1L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
Expand Down Expand Up @@ -418,7 +419,9 @@ protected Optional<StreamStateHandle> getUploaded(String filename) {
// (created from a previous checkpoint).
return Optional.of(
new PlaceholderStreamStateHandle(
handle.getStreamStateHandleID(), handle.getStateSize()));
handle.getStreamStateHandleID(),
handle.getStateSize(),
FileMergingSnapshotManager.isFileMergingHandle(handle)));
} else {
// Don't use any uploaded but not confirmed handles because they might be deleted
// (by TM) if the previous checkpoint failed. See FLINK-25395
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,16 +402,18 @@ private static String runJobAndGetExternalizedCheckpoint(
MiniClusterWithClientResource cluster,
RestoreMode restoreMode)
throws Exception {
// complete at least two checkpoints so that the initial checkpoint can be subsumed
return runJobAndGetExternalizedCheckpoint(
backend, externalCheckpoint, cluster, restoreMode, new Configuration());
backend, externalCheckpoint, cluster, restoreMode, new Configuration(), 2);
}

static String runJobAndGetExternalizedCheckpoint(
StateBackend backend,
@Nullable String externalCheckpoint,
MiniClusterWithClientResource cluster,
RestoreMode restoreMode,
Configuration jobConfig)
Configuration jobConfig,
int consecutiveCheckpoints)
throws Exception {
JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint, restoreMode, jobConfig);
NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
Expand All @@ -420,8 +422,8 @@ static String runJobAndGetExternalizedCheckpoint(
// wait until all sources have been started
NotifyingInfiniteTupleSource.countDownLatch.await();

// complete at least two checkpoints so that the initial checkpoint can be subsumed
waitForCheckpoint(initialJobGraph.getJobID(), cluster.getMiniCluster(), 2);
waitForCheckpoint(
initialJobGraph.getJobID(), cluster.getMiniCluster(), consecutiveCheckpoints);
cluster.getClusterClient().cancel(initialJobGraph.getJobID()).get();
waitUntilJobCanceled(initialJobGraph.getJobID(), cluster.getClusterClient());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.TestLogger;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -90,6 +89,9 @@ private void testSwitchingFileMerging(
boolean fileMergingAcrossBoundary)
throws Exception {
final Configuration config = new Configuration();
// Wait for 4 checkpoints each round to subsume the original one and produce the
// PlaceholderStreamStateHandle in the final round
final int consecutiveCheckpoint = 4;
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toUri().toString());
config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
config.set(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY, fileMergingAcrossBoundary);
Expand All @@ -108,7 +110,12 @@ private void testSwitchingFileMerging(
try {
firstCheckpoint =
runJobAndGetExternalizedCheckpoint(
stateBackend1, null, firstCluster, restoreMode, config);
stateBackend1,
null,
firstCluster,
restoreMode,
config,
consecutiveCheckpoint);
assertThat(firstCheckpoint).isNotNull();
verifyStateHandleType(firstCheckpoint, firstFileMergingSwitch);
} finally {
Expand All @@ -130,7 +137,12 @@ private void testSwitchingFileMerging(
try {
secondCheckpoint =
runJobAndGetExternalizedCheckpoint(
stateBackend2, firstCheckpoint, secondCluster, restoreMode, config);
stateBackend2,
firstCheckpoint,
secondCluster,
restoreMode,
config,
consecutiveCheckpoint);
assertThat(secondCheckpoint).isNotNull();
verifyStateHandleType(secondCheckpoint, secondFileMergingSwitch);
} finally {
Expand All @@ -150,7 +162,12 @@ private void testSwitchingFileMerging(
try {
String thirdCheckpoint =
runJobAndGetExternalizedCheckpoint(
stateBackend3, secondCheckpoint, thirdCluster, restoreMode, config);
stateBackend3,
secondCheckpoint,
thirdCluster,
restoreMode,
config,
consecutiveCheckpoint);
assertThat(thirdCheckpoint).isNotNull();
verifyStateHandleType(thirdCheckpoint, secondFileMergingSwitch);
} finally {
Expand All @@ -167,22 +184,25 @@ private void verifyStateHandleType(String checkpointPath, boolean fileMergingEna
// Check keyed state handle
List<KeyedStateHandle> keyedStateHandles =
new ArrayList<>(subtaskState.getManagedKeyedState());
keyedStateHandles.addAll(subtaskState.getRawKeyedState());
for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
Assertions.assertInstanceOf(
IncrementalRemoteKeyedStateHandle.class, keyedStateHandle);
assertThat(keyedStateHandle)
.isInstanceOf(IncrementalRemoteKeyedStateHandle.class);
((IncrementalRemoteKeyedStateHandle) keyedStateHandle)
.streamSubHandles()
.forEach(
handle -> {
Assertions.assertEquals(
fileMergingEnabled,
handle instanceof SegmentFileStateHandle);
if (fileMergingEnabled) {
assertThat(handle)
.isInstanceOf(SegmentFileStateHandle.class);
} else {
assertThat(handle)
.isNotInstanceOf(SegmentFileStateHandle.class);
}
});
hasKeyedState = true;
}
}
}
Assertions.assertTrue(hasKeyedState);
assertThat(hasKeyedState).isTrue();
}
}

0 comments on commit 9f55941

Please sign in to comment.