Skip to content

Commit

Permalink
[FLINK-35780][state] Support state migration between disabling and en…
Browse files Browse the repository at this point in the history
…abling state ttl in RocksDBKeyedStateBackend
  • Loading branch information
xiangyuf committed Nov 5, 2024
1 parent 1566db8 commit d35824f
Show file tree
Hide file tree
Showing 20 changed files with 397 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
import org.apache.flink.runtime.state.ttl.TtlAwareSerializerSnapshotWrapper;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -57,11 +59,12 @@ public RegisteredKeyValueStateBackendMetaInfo(
this(
stateType,
name,
StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer),
namespaceSerializer,
stateSerializer,
StateSnapshotTransformFactory.noTransform());
}

@SuppressWarnings("unchecked")
public RegisteredKeyValueStateBackendMetaInfo(
@Nonnull StateDescriptor.Type stateType,
@Nonnull String name,
Expand All @@ -73,7 +76,9 @@ public RegisteredKeyValueStateBackendMetaInfo(
stateType,
name,
StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer),
StateSerializerProvider.fromNewRegisteredSerializer(
(TypeSerializer<S>)
TtlAwareSerializer.wrapTtlAwareSerializer(stateSerializer)),
stateSnapshotTransformFactory);
}

Expand All @@ -93,9 +98,12 @@ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot sna
StateSerializerProvider.fromPreviousSerializerSnapshot(
(TypeSerializerSnapshot<S>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(
StateMetaInfoSnapshot.CommonSerializerKeys
.VALUE_SERIALIZER))),
new TtlAwareSerializerSnapshotWrapper<>(
snapshot.getTypeSerializerSnapshot(
StateMetaInfoSnapshot
.CommonSerializerKeys
.VALUE_SERIALIZER))
.getTtlAwareSerializerSnapshot())),
StateSnapshotTransformFactory.noTransform());

Preconditions.checkState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.flink.runtime.state.StateSnapshotTransformers;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StateMigrationException;
Expand Down Expand Up @@ -199,18 +200,20 @@ KeyGroupedInternalPriorityQueue<T> create(
stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates);
}

@SuppressWarnings("unchecked")
private <N, V> StateTable<K, N, V> tryRegisterStateTable(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<?, V> stateDesc,
@Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory,
boolean allowFutureMetadataUpdates)
throws StateMigrationException {

@SuppressWarnings("unchecked")
StateTable<K, N, V> stateTable =
(StateTable<K, N, V>) registeredKVStates.get(stateDesc.getName());

TypeSerializer<V> newStateSerializer = stateDesc.getSerializer();
TypeSerializer<V> newStateSerializer =
(TypeSerializer<V>)
TtlAwareSerializer.wrapTtlAwareSerializer(stateDesc.getSerializer());

if (stateTable != null) {
RegisteredKeyValueStateBackendMetaInfo<N, V> restoredKvMetaInfo =
Expand Down Expand Up @@ -253,6 +256,16 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+ ").");
}

if (TtlAwareSerializer.needTtlStateMigration(
previousStateSerializer, newStateSerializer)) {
throw new StateMigrationException(
"For heap backends, the new state serializer ("
+ newStateSerializer
+ ") must not need ttl state migration with the old state serializer ("
+ previousStateSerializer
+ ").");
}

restoredKvMetaInfo =
allowFutureMetadataUpdates
? restoredKvMetaInfo.withSerializerUpgradesAllowed()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -37,13 +38,15 @@ abstract class AbstractBatchExecutionKeyState<K, N, V> implements InternalKvStat
private N currentNamespace;
private V currentNamespaceValue;

@SuppressWarnings("unchecked")
protected AbstractBatchExecutionKeyState(
V defaultValue,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<V> stateTypeSerializer) {
this.defaultValue = defaultValue;
this.stateTypeSerializer = stateTypeSerializer;
this.stateTypeSerializer =
(TypeSerializer<V>) TtlAwareSerializer.wrapTtlAwareSerializer(stateTypeSerializer);
this.keySerializer = keySerializer;
this.namespaceSerializer = namespaceSerializer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ void testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
}

private void testKeyedValueStateUpgrade(
protected void testKeyedValueStateUpgrade(
ValueStateDescriptor<TestType> initialAccessDescriptor,
ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore)
throws Exception {
Expand Down Expand Up @@ -271,7 +271,7 @@ void testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
}

private void testKeyedListStateUpgrade(
protected void testKeyedListStateUpgrade(
ListStateDescriptor<TestType> initialAccessDescriptor,
ListStateDescriptor<TestType> newAccessDescriptorAfterRestore)
throws Exception {
Expand Down Expand Up @@ -431,7 +431,7 @@ private Iterator<Map.Entry<Integer, TestType>> sortedIterator(
return set.iterator();
}

private void testKeyedMapStateUpgrade(
protected void testKeyedMapStateUpgrade(
MapStateDescriptor<Integer, TestType> initialAccessDescriptor,
MapStateDescriptor<Integer, TestType> newAccessDescriptorAfterRestore)
throws Exception {
Expand Down Expand Up @@ -1203,7 +1203,7 @@ void testStateMigrationAfterChangingTTL() throws Exception {
}

@TestTemplate
void testStateMigrationAfterChangingTTLFromEnablingToDisabling() {
protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception {
final String stateName = "test-ttl";

ValueStateDescriptor<TestType> initialAccessDescriptor =
Expand All @@ -1219,17 +1219,16 @@ void testStateMigrationAfterChangingTTLFromEnablingToDisabling() {
testKeyedValueStateUpgrade(
initialAccessDescriptor, newAccessDescriptorAfterRestore))
.satisfiesAnyOf(
e -> assertThat(e).isInstanceOf(IllegalStateException.class),
e -> assertThat(e).hasCauseInstanceOf(IllegalStateException.class));
e -> assertThat(e).isInstanceOf(StateMigrationException.class),
e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
}

@TestTemplate
void testStateMigrationAfterChangingTTLFromDisablingToEnabling() {
protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception {
final String stateName = "test-ttl";

ValueStateDescriptor<TestType> initialAccessDescriptor =
new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer());

ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore =
new ValueStateDescriptor<>(stateName, new TestType.V2TestTypeSerializer());
newAccessDescriptorAfterRestore.enableTimeToLive(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
Expand Down Expand Up @@ -1160,7 +1161,9 @@ void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception
// have identical mappings
InternalKvState internalKvState = (InternalKvState) state;
KryoSerializer<TestPojo> kryoSerializer =
(KryoSerializer<TestPojo>) internalKvState.getValueSerializer();
(KryoSerializer<TestPojo>)
((TtlAwareSerializer<TestPojo>) internalKvState.getValueSerializer())
.getOriginalTypeSerializer();
int mainPojoClassRegistrationId =
kryoSerializer.getKryo().getRegistration(TestPojo.class).getId();
int nestedPojoClassARegistrationId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private CheckpointStreamFactory createCheckpointStreamFactory() {
}
}

void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) {
public void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) {
createAndRestoreKeyedStateBackend(NUMBER_OF_KEY_GROUPS, snapshot);
}

Expand Down Expand Up @@ -144,7 +144,7 @@ private void disposeKeyedStateBackend() {
}
}

KeyedStateHandle takeSnapshot() throws Exception {
public KeyedStateHandle takeSnapshot() throws Exception {
SnapshotResult<KeyedStateHandle> snapshotResult = triggerSnapshot().get();
KeyedStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
if (jobManagerOwnedSnapshot != null) {
Expand All @@ -171,7 +171,7 @@ public void setCurrentKey(String key) {
}

@SuppressWarnings("unchecked")
<N, S extends State, V> S createState(
public <N, S extends State, V> S createState(
StateDescriptor<S, V> stateDescriptor,
@SuppressWarnings("SameParameterValue") N defaultNamespace)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public boolean isSavepoint() {
return (TtlMergingStateTestContext<?, UV, ?>) ctx;
}

private void initTest() throws Exception {
protected void initTest() throws Exception {
initTest(
StateTtlConfig.UpdateType.OnCreateAndWrite,
StateTtlConfig.StateVisibility.NeverReturnExpired);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,18 @@ protected boolean supportsKeySerializerCheck() {
// TODO support checking key serializer
return false;
}

@Override
protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception {
if (!(this.delegatedStateBackendSupplier.get() instanceof EmbeddedRocksDBStateBackend)) {
super.testStateMigrationAfterChangingTTLFromDisablingToEnabling();
}
}

@Override
protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception {
if (!(this.delegatedStateBackendSupplier.get() instanceof EmbeddedRocksDBStateBackend)) {
super.testStateMigrationAfterChangingTTLFromEnablingToDisabling();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
Expand All @@ -36,6 +38,8 @@

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Base class for {@link State} implementations that store state in a RocksDB database.
*
Expand Down Expand Up @@ -184,12 +188,20 @@ public void migrateSerializedValue(
DataInputDeserializer serializedOldValueInput,
DataOutputSerializer serializedMigratedValueOutput,
TypeSerializer<V> priorSerializer,
TypeSerializer<V> newSerializer)
TypeSerializer<V> newSerializer,
TtlTimeProvider ttlTimeProvider)
throws StateMigrationException {
checkArgument(priorSerializer instanceof TtlAwareSerializer);
checkArgument(newSerializer instanceof TtlAwareSerializer);
TtlAwareSerializer<V> ttlAwarePriorSerializer = (TtlAwareSerializer<V>) priorSerializer;
TtlAwareSerializer<V> ttlAwareNewSerializer = (TtlAwareSerializer<V>) newSerializer;

try {
V value = priorSerializer.deserialize(serializedOldValueInput);
newSerializer.serialize(value, serializedMigratedValueOutput);
ttlAwareNewSerializer.migrateValueFromPriorSerializer(
ttlAwarePriorSerializer,
() -> ttlAwarePriorSerializer.deserialize(serializedOldValueInput),
serializedMigratedValueOutput,
ttlTimeProvider);
} catch (Exception e) {
throw new StateMigrationException("Error while trying to migrate RocksDB state.", e);
}
Expand Down Expand Up @@ -233,6 +245,11 @@ protected AbstractRocksDBState<K, N, V> setDefaultValue(V defaultValue) {
return this;
}

protected AbstractRocksDBState<K, N, V> setColumnFamily(ColumnFamilyHandle columnFamily) {
this.columnFamily = columnFamily;
return this;
}

@Override
public StateIncrementalVisitor<K, N, V> getStateIncrementalVisitor(
int recommendedMaxNumberOfReturnedRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ static <K, N, SV, S extends State, IS extends S> IS update(
((AggregatingStateDescriptor) stateDesc).getAggregateFunction())
.setNamespaceSerializer(registerResult.f1.getNamespaceSerializer())
.setValueSerializer(registerResult.f1.getStateSerializer())
.setDefaultValue(stateDesc.getDefaultValue());
.setDefaultValue(stateDesc.getDefaultValue())
.setColumnFamily(registerResult.f0);
}
}
Loading

0 comments on commit d35824f

Please sign in to comment.