diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index bd91e8e1fbb4d..3867ceed0bcfd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.InputChannelStateHandle; @@ -47,12 +48,18 @@ import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChange; import org.apache.flink.runtime.state.changelog.inmemory.InMemoryChangelogStateHandle; +import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; +import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle; +import org.apache.flink.runtime.state.filemerging.EmptySegmentFileStateHandle; +import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle; +import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.BiConsumerWithException; import org.apache.flink.util.function.BiFunctionWithException; @@ -65,6 +72,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -124,6 +132,15 @@ public abstract class MetadataV2V3SerializerBase { // CHANGELOG_HANDLE_V2 is introduced to add new field of checkpointId. private static final byte CHANGELOG_HANDLE_V2 = 14; + // SEGMENT_FILE_HANDLE is introduced to support file merging. + private static final byte SEGMENT_FILE_HANDLE = 15; + + // EMPTY_SEGMENT_FILE_HANDLE is introduced as a placeholder for file merging. + private static final byte EMPTY_SEGMENT_FILE_HANDLE = 16; + + // SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE is introduced for file merging of operator state. + private static final byte SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE = 17; + // ------------------------------------------------------------------------ // (De)serialization entry points // ------------------------------------------------------------------------ @@ -582,7 +599,10 @@ private static IncrementalRemoteKeyedStateHandle deserializeIncrementalStateHand void serializeOperatorStateHandle(OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle != null) { - dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE); + dos.writeByte( + stateHandle instanceof FileMergingOperatorStreamStateHandle + ? SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE + : PARTITIONABLE_OPERATOR_STATE_HANDLE); Map partitionOffsetsMap = stateHandle.getStateNameToPartitionOffsets(); dos.writeInt(partitionOffsetsMap.size()); @@ -601,6 +621,19 @@ void serializeOperatorStateHandle(OperatorStateHandle stateHandle, DataOutputStr dos.writeLong(offset); } } + if (stateHandle instanceof FileMergingOperatorStreamStateHandle) { + dos.writeUTF( + ((FileMergingOperatorStreamStateHandle) stateHandle) + .getTaskOwnedDirHandle() + .getDirectory() + .toString()); + dos.writeUTF( + ((FileMergingOperatorStreamStateHandle) stateHandle) + .getSharedDirHandle() + .getDirectory() + .toString()); + dos.writeBoolean(stateHandle instanceof EmptyFileMergingOperatorStreamStateHandle); + } serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos); } else { dos.writeByte(NULL_HANDLE); @@ -613,7 +646,8 @@ OperatorStateHandle deserializeOperatorStateHandle( final int type = dis.readByte(); if (NULL_HANDLE == type) { return null; - } else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) { + } else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type + || SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE == type) { int mapSize = dis.readInt(); Map offsetsMap = CollectionUtil.newHashMapWithExpectedSize(mapSize); @@ -632,8 +666,31 @@ OperatorStateHandle deserializeOperatorStateHandle( new OperatorStateHandle.StateMetaInfo(offsets, mode); offsetsMap.put(key, metaInfo); } - StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context); - return new OperatorStreamStateHandle(offsetsMap, stateHandle); + if (SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE == type) { + String taskOwnedDirPathStr = dis.readUTF(); + String sharedDirPathStr = dis.readUTF(); + boolean isEmpty = dis.readBoolean(); + StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context); + Preconditions.checkArgument(stateHandle instanceof SegmentFileStateHandle); + return isEmpty + ? new EmptyFileMergingOperatorStreamStateHandle( + DirectoryStreamStateHandle.forPathWithZeroSize( + new File(taskOwnedDirPathStr).toPath()), + DirectoryStreamStateHandle.forPathWithZeroSize( + new File(sharedDirPathStr).toPath()), + offsetsMap, + stateHandle) + : new FileMergingOperatorStreamStateHandle( + DirectoryStreamStateHandle.forPathWithZeroSize( + new File(taskOwnedDirPathStr).toPath()), + DirectoryStreamStateHandle.forPathWithZeroSize( + new File(sharedDirPathStr).toPath()), + offsetsMap, + stateHandle); + } else { + StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context); + return new OperatorStreamStateHandle(offsetsMap, stateHandle); + } } else { throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type); } @@ -677,6 +734,18 @@ static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutput RelativeFileStateHandle relativeFileStateHandle = (RelativeFileStateHandle) stateHandle; dos.writeUTF(relativeFileStateHandle.getRelativePath()); dos.writeLong(relativeFileStateHandle.getStateSize()); + } else if (stateHandle instanceof SegmentFileStateHandle) { + if (stateHandle instanceof EmptySegmentFileStateHandle) { + dos.writeByte(EMPTY_SEGMENT_FILE_HANDLE); + } else { + dos.writeByte(SEGMENT_FILE_HANDLE); + SegmentFileStateHandle segmentFileStateHandle = + (SegmentFileStateHandle) stateHandle; + dos.writeLong(segmentFileStateHandle.getStartPos()); + dos.writeLong(segmentFileStateHandle.getStateSize()); + dos.writeInt(segmentFileStateHandle.getScope().ordinal()); + dos.writeUTF(segmentFileStateHandle.getFilePath().toString()); + } } else if (stateHandle instanceof FileStateHandle) { dos.writeByte(FILE_STREAM_STATE_HANDLE); FileStateHandle fileStateHandle = (FileStateHandle) stateHandle; @@ -745,6 +814,14 @@ static StreamStateHandle deserializeStreamStateHandle( new KeyGroupRangeOffsets(keyGroupRange, offsets); StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context); return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); + } else if (SEGMENT_FILE_HANDLE == type) { + long startPos = dis.readLong(); + long stateSize = dis.readLong(); + CheckpointedStateScope scope = CheckpointedStateScope.values()[dis.readInt()]; + Path physicalFilePath = new Path(dis.readUTF()); + return new SegmentFileStateHandle(physicalFilePath, startPos, stateSize, scope); + } else if (EMPTY_SEGMENT_FILE_HANDLE == type) { + return EmptySegmentFileStateHandle.INSTANCE; } else { throw new IOException("Unknown implementation of StreamStateHandle, code: " + type); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java index ada378f2d0346..11cbefb664d52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java @@ -116,6 +116,14 @@ public long getCheckpointedSize() { return getDelegateStateHandle().getStateSize(); } + public DirectoryStreamStateHandle getSharedDirHandle() { + return sharedDirHandle; + } + + public DirectoryStreamStateHandle getTaskOwnedDirHandle() { + return taskOwnedDirHandle; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java index 2a3aa2f49fdd0..9c5d5a47caf15 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java @@ -35,8 +35,6 @@ * {@link FileStateHandle} for state that was written to a file segment. A {@link * SegmentFileStateHandle} represents a {@link LogicalFile}, which has already been written to a * segment in a physical file. - * - *

TODO (FLINK-32079): serialization and deserialization of {@link SegmentFileStateHandle}. */ public class SegmentFileStateHandle implements StreamStateHandle { @@ -133,7 +131,7 @@ public boolean equals(Object o) { return true; } - if (o == null || getClass() != o.getClass()) { + if (!(o instanceof SegmentFileStateHandle)) { return false; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java index 8cb160310fe59..079fd16c2d727 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.DiscardRecordedStateObject; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; @@ -38,11 +40,16 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TestingRelativeFileStateHandle; import org.apache.flink.runtime.state.TestingStreamStateHandle; +import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; +import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle; +import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle; +import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.util.StringUtils; import javax.annotation.Nullable; +import java.io.File; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; @@ -140,36 +147,13 @@ private static void randomlySetSubtaskState( boolean isIncremental = random.nextInt(3) == 0; for (int subtaskIdx : subtasksToSet) { - StreamStateHandle operatorStateBackend = - new ByteStreamStateHandle( - "b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET)); - StreamStateHandle operatorStateStream = - new ByteStreamStateHandle( - "b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET)); - - Map offsetsMap = new HashMap<>(); - offsetsMap.put( - "A", - new OperatorStateHandle.StateMetaInfo( - new long[] {0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); - offsetsMap.put( - "B", - new OperatorStateHandle.StateMetaInfo( - new long[] {30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); - offsetsMap.put( - "C", - new OperatorStateHandle.StateMetaInfo( - new long[] {60, 70, 80}, OperatorStateHandle.Mode.UNION)); - final OperatorSubtaskState.Builder state = OperatorSubtaskState.builder(); if (hasOperatorStateBackend) { - state.setManagedOperatorState( - new OperatorStreamStateHandle(offsetsMap, operatorStateBackend)); + state.setManagedOperatorState(createDummyOperatorStreamStateHandle(random)); } if (hasOperatorStateStream) { - state.setRawOperatorState( - new OperatorStreamStateHandle(offsetsMap, operatorStateStream)); + state.setRawOperatorState(createDummyOperatorStreamStateHandle(random)); } if (hasKeyedBackend) { @@ -209,6 +193,45 @@ private static void randomlySetSubtaskState( } } + private static OperatorStreamStateHandle createDummyOperatorStreamStateHandle(Random rnd) { + Map offsetsMap = new HashMap<>(); + offsetsMap.put( + "A", + new OperatorStateHandle.StateMetaInfo( + new long[] {0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); + offsetsMap.put( + "B", + new OperatorStateHandle.StateMetaInfo( + new long[] {30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); + offsetsMap.put( + "C", + new OperatorStateHandle.StateMetaInfo( + new long[] {60, 70, 80}, OperatorStateHandle.Mode.UNION)); + + boolean enableFileMerging = rnd.nextBoolean(); + if (enableFileMerging) { + DirectoryStreamStateHandle taskOwnedDirHandle = + DirectoryStreamStateHandle.forPathWithZeroSize( + new File(String.valueOf(createRandomUUID(rnd))).toPath()); + DirectoryStreamStateHandle sharedDirHandle = + DirectoryStreamStateHandle.forPathWithZeroSize( + new File(String.valueOf(createRandomUUID(rnd))).toPath()); + return rnd.nextBoolean() + ? new FileMergingOperatorStreamStateHandle( + taskOwnedDirHandle, + sharedDirHandle, + offsetsMap, + createDummySegmentFileStateHandle(rnd, false)) + : EmptyFileMergingOperatorStreamStateHandle.create( + taskOwnedDirHandle, sharedDirHandle); + } else { + StreamStateHandle operatorStateStream = + new ByteStreamStateHandle( + "b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET)); + return new OperatorStreamStateHandle(offsetsMap, operatorStateStream); + } + } + private static boolean isSavepoint(String basePath) { return basePath != null; } @@ -263,11 +286,15 @@ public static IncrementalRemoteKeyedStateHandle createDummyIncrementalKeyedState } public static List createRandomHandleAndLocalPathList(Random rnd) { + boolean enableFileMerging = rnd.nextBoolean(); final int size = rnd.nextInt(4); List result = new ArrayList<>(size); for (int i = 0; i < size; ++i) { String localPath = createRandomUUID(rnd).toString(); - StreamStateHandle stateHandle = createDummyStreamStateHandle(rnd, null); + StreamStateHandle stateHandle = + enableFileMerging + ? createDummySegmentFileStateHandle(rnd) + : createDummyStreamStateHandle(rnd, null); result.add(HandleAndLocalPath.of(stateHandle, localPath)); } @@ -308,6 +335,55 @@ public static StreamStateHandle createDummyStreamStateHandle( } } + private static StreamStateHandle createDummySegmentFileStateHandle(Random rnd) { + return createDummySegmentFileStateHandle(rnd, rnd.nextBoolean()); + } + + private static StreamStateHandle createDummySegmentFileStateHandle( + Random rnd, boolean isEmpty) { + return isEmpty + ? TestingSegmentFileStateHandle.EMPTY_INSTANCE + : new TestingSegmentFileStateHandle( + new Path(String.valueOf(createRandomUUID(rnd))), + 0, + 1, + CheckpointedStateScope.SHARED); + } + + private static class TestingSegmentFileStateHandle extends SegmentFileStateHandle + implements DiscardRecordedStateObject { + + private static final long serialVersionUID = 1L; + + private static final TestingSegmentFileStateHandle EMPTY_INSTANCE = + new TestingSegmentFileStateHandle( + new Path("empty"), 0, 0, CheckpointedStateScope.EXCLUSIVE); + + private boolean disposed; + + public TestingSegmentFileStateHandle( + Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) { + super(filePath, startPos, stateSize, scope); + } + + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + // Collect to LOCAL_MEMORY for test + collector.add(StateObjectLocation.LOCAL_MEMORY, getStateSize()); + } + + @Override + public void discardState() { + super.discardState(); + disposed = true; + } + + @Override + public boolean isDisposed() { + return disposed; + } + } + private static UUID createRandomUUID(Random rnd) { return new UUID(rnd.nextLong(), rnd.nextLong()); }