diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java index 044b18bfb8e25..f67a43ea678e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java @@ -64,7 +64,6 @@ public RegisteredKeyValueStateBackendMetaInfo( StateSnapshotTransformFactory.noTransform()); } - @SuppressWarnings("unchecked") public RegisteredKeyValueStateBackendMetaInfo( @Nonnull StateDescriptor.Type stateType, @Nonnull String name, @@ -76,9 +75,7 @@ public RegisteredKeyValueStateBackendMetaInfo( stateType, name, StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer), - StateSerializerProvider.fromNewRegisteredSerializer( - (TypeSerializer) - TtlAwareSerializer.wrapTtlAwareSerializer(stateSerializer)), + StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer, true), stateSnapshotTransformFactory); } @@ -98,12 +95,10 @@ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot sna StateSerializerProvider.fromPreviousSerializerSnapshot( (TypeSerializerSnapshot) Preconditions.checkNotNull( - new TtlAwareSerializerSnapshotWrapper<>( - snapshot.getTypeSerializerSnapshot( - StateMetaInfoSnapshot - .CommonSerializerKeys - .VALUE_SERIALIZER)) - .getTtlAwareSerializerSnapshot())), + snapshot.getTypeSerializerSnapshot( + StateMetaInfoSnapshot.CommonSerializerKeys + .VALUE_SERIALIZER)), + true), StateSnapshotTransformFactory.noTransform()); Preconditions.checkState( @@ -151,6 +146,12 @@ public TypeSerializer getStateSerializer() { return stateSerializerProvider.currentSchemaSerializer(); } + @Nonnull + public TypeSerializer getTtlAwareStateSerializer() { + TypeSerializer serializer = stateSerializerProvider.currentSchemaSerializer(); + return (TtlAwareSerializer) TtlAwareSerializer.wrapTtlAwareSerializer(serializer); + } + @Nonnull public TypeSerializerSchemaCompatibility updateStateSerializer( TypeSerializer newStateSerializer) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java index 4ed106e88cc72..b68d746e8c17b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.runtime.state.ttl.TtlAwareSerializer; +import org.apache.flink.runtime.state.ttl.TtlAwareSerializerSnapshotWrapper; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; @@ -96,9 +98,14 @@ public abstract class StateSerializerProvider { * @param the type of the state. * @return a new {@link StateSerializerProvider}. */ + public static StateSerializerProvider fromPreviousSerializerSnapshot( + TypeSerializerSnapshot stateSerializerSnapshot, boolean mayContainsTtl) { + return new LazilyRegisteredStateSerializerProvider<>(stateSerializerSnapshot, mayContainsTtl); + } + public static StateSerializerProvider fromPreviousSerializerSnapshot( TypeSerializerSnapshot stateSerializerSnapshot) { - return new LazilyRegisteredStateSerializerProvider<>(stateSerializerSnapshot); + return fromPreviousSerializerSnapshot(stateSerializerSnapshot, false); } /** @@ -112,9 +119,14 @@ public static StateSerializerProvider fromPreviousSerializerSnapshot( * @param the type of the state. * @return a new {@link StateSerializerProvider}. */ + public static StateSerializerProvider fromNewRegisteredSerializer( + TypeSerializer registeredStateSerializer, boolean mayContainsTtl) { + return new EagerlyRegisteredStateSerializerProvider<>(registeredStateSerializer, mayContainsTtl); + } + public static StateSerializerProvider fromNewRegisteredSerializer( TypeSerializer registeredStateSerializer) { - return new EagerlyRegisteredStateSerializerProvider<>(registeredStateSerializer); + return fromNewRegisteredSerializer(registeredStateSerializer, false); } private StateSerializerProvider(@Nonnull TypeSerializer stateSerializer) { @@ -287,9 +299,12 @@ protected final void invalidateCurrentSchemaSerializerAccess() { private static class LazilyRegisteredStateSerializerProvider extends StateSerializerProvider { + private boolean mayContainsTtl; + LazilyRegisteredStateSerializerProvider( - TypeSerializerSnapshot previousSerializerSnapshot) { + TypeSerializerSnapshot previousSerializerSnapshot, boolean mayContainsTtl) { super(Preconditions.checkNotNull(previousSerializerSnapshot)); + this.mayContainsTtl = mayContainsTtl; } @Nonnull @@ -303,10 +318,21 @@ public TypeSerializerSchemaCompatibility registerNewSerializerForRestoredStat "A serializer has already been registered for the state; re-registration is not allowed."); } - TypeSerializerSchemaCompatibility result = - newSerializer - .snapshotConfiguration() - .resolveSchemaCompatibility(previousSerializerSnapshot); + TypeSerializerSchemaCompatibility result; + if (mayContainsTtl) { + result = + TtlAwareSerializer.wrapTtlAwareSerializer(newSerializer) + .snapshotConfiguration() + .resolveSchemaCompatibility( + new TtlAwareSerializerSnapshotWrapper( + previousSerializerSnapshot) + .getTtlAwareSerializerSnapshot()); + } else { + result = + newSerializer + .snapshotConfiguration() + .resolveSchemaCompatibility(previousSerializerSnapshot); + } if (result.isIncompatible()) { invalidateCurrentSchemaSerializerAccess(); } @@ -335,8 +361,11 @@ public TypeSerializerSchemaCompatibility setPreviousSerializerSnapshotForRest private static class EagerlyRegisteredStateSerializerProvider extends StateSerializerProvider { - EagerlyRegisteredStateSerializerProvider(TypeSerializer registeredStateSerializer) { + boolean mayContainsTtl; + + EagerlyRegisteredStateSerializerProvider(TypeSerializer registeredStateSerializer, boolean mayContainsTtl) { super(Preconditions.checkNotNull(registeredStateSerializer)); + this.mayContainsTtl = mayContainsTtl; } @Nonnull @@ -359,10 +388,21 @@ public TypeSerializerSchemaCompatibility setPreviousSerializerSnapshotForRest this.previousSerializerSnapshot = previousSerializerSnapshot; - TypeSerializerSchemaCompatibility result = - Preconditions.checkNotNull(registeredSerializer) - .snapshotConfiguration() - .resolveSchemaCompatibility(previousSerializerSnapshot); + TypeSerializerSchemaCompatibility result; + if (mayContainsTtl) { + result = + TtlAwareSerializer.wrapTtlAwareSerializer(registeredSerializer) + .snapshotConfiguration() + .resolveSchemaCompatibility( + new TtlAwareSerializerSnapshotWrapper( + previousSerializerSnapshot) + .getTtlAwareSerializerSnapshot()); + } else { + result = + Preconditions.checkNotNull(registeredSerializer) + .snapshotConfiguration() + .resolveSchemaCompatibility(previousSerializerSnapshot); + } if (result.isIncompatible()) { invalidateCurrentSchemaSerializerAccess(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareListSerializer.java index 97aefd9765779..e226fb5cd29bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareListSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareListSerializer.java @@ -27,167 +27,30 @@ import org.apache.flink.util.function.SupplierWithException; import java.io.IOException; +import java.util.List; import java.util.Objects; /** - * This class wraps a {@link TypeSerializer} with ttl awareness. It will return true when the - * wrapped {@link TypeSerializer} is instance of {@link TtlStateFactory.TtlSerializer}. Also, it - * wraps the value migration process between TtlSerializer and non-ttl typeSerializer. + * The list version of TtlAwareSerializer. + * @param */ -public class TtlAwareSerializer> extends TypeSerializer { +public class TtlAwareListSerializer extends TtlAwareSerializer, ListSerializer> { - private final boolean isTtlEnabled; - - private final ORI typeSerializer; - - public TtlAwareSerializer(ORI typeSerializer) { - this.typeSerializer = typeSerializer; - this.isTtlEnabled = typeSerializer instanceof TtlStateFactory.TtlSerializer; - } - - @Override - public boolean isImmutableType() { - return typeSerializer.isImmutableType(); - } - - @Override - public TypeSerializer duplicate() { - return new TtlAwareSerializer<>(typeSerializer.duplicate()); - } - - @Override - public T createInstance() { - return typeSerializer.createInstance(); - } - - @Override - public T copy(T from) { - return typeSerializer.copy(from); - } - - @Override - public T copy(T from, T reuse) { - return typeSerializer.copy(from, reuse); - } - - @Override - public int getLength() { - return typeSerializer.getLength(); - } - - @Override - public void serialize(T record, DataOutputView target) throws IOException { - typeSerializer.serialize(record, target); - } - - @Override - public T deserialize(DataInputView source) throws IOException { - return typeSerializer.deserialize(source); - } - - @Override - public T deserialize(T reuse, DataInputView source) throws IOException { - return typeSerializer.deserialize(reuse, source); + public TtlAwareListSerializer(ListSerializer typeSerializer) { + super(typeSerializer); } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TtlAwareSerializer that = (TtlAwareSerializer) o; - return isTtlEnabled == that.isTtlEnabled - && Objects.equals(typeSerializer, that.typeSerializer); - } - - @Override - public int hashCode() { - return Objects.hash(isTtlEnabled, typeSerializer); - } + // ------------------------------------------------------------------------ + // ListSerializer specific properties + // ------------------------------------------------------------------------ + /** + * Gets the serializer for the elements of the list. + * + * @return The serializer for the elements of the list + */ @SuppressWarnings("unchecked") - public void migrateValueFromPriorSerializer( - TtlAwareSerializer priorTtlAwareSerializer, - SupplierWithException inputSupplier, - DataOutputView target, - TtlTimeProvider ttlTimeProvider) - throws IOException { - T outputRecord; - if (this.isTtlEnabled()) { - outputRecord = - priorTtlAwareSerializer.isTtlEnabled - ? inputSupplier.get() - : (T) - new TtlValue<>( - inputSupplier.get(), - ttlTimeProvider.currentTimestamp()); - } else { - outputRecord = - priorTtlAwareSerializer.isTtlEnabled - ? ((TtlValue) inputSupplier.get()).getUserValue() - : inputSupplier.get(); - } - this.serialize(outputRecord, target); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - typeSerializer.copy(source, target); - } - - public boolean isTtlEnabled() { - return isTtlEnabled; - } - - public TypeSerializer getOriginalTypeSerializer() { - return typeSerializer; - } - - @Override - public TypeSerializerSnapshot snapshotConfiguration() { - return new TtlAwareSerializerSnapshot<>( - typeSerializer.snapshotConfiguration(), isTtlEnabled); - } - - public static boolean isSerializerTtlEnabled(TypeSerializer typeSerializer) { - TypeSerializer wrappedTypeSerializer = wrapTtlAwareSerializer(typeSerializer); - return wrappedTypeSerializer instanceof TtlAwareSerializer - && ((TtlAwareSerializer) wrappedTypeSerializer).isTtlEnabled(); - } - - public static boolean needTtlStateMigration( - TypeSerializer previousSerializer, TypeSerializer newSerializer) { - return TtlAwareSerializer.isSerializerTtlEnabled(previousSerializer) - != TtlAwareSerializer.isSerializerTtlEnabled(newSerializer); - } - - public static TypeSerializer wrapTtlAwareSerializer(TypeSerializer typeSerializer) { - if (typeSerializer instanceof TtlAwareSerializer) { - return typeSerializer; - } - - if (typeSerializer instanceof ListSerializer) { - return ((ListSerializer) typeSerializer).getElementSerializer() - instanceof TtlAwareSerializer - ? typeSerializer - : new ListSerializer<>( - new TtlAwareSerializer<>( - ((ListSerializer) typeSerializer).getElementSerializer())); - } - - if (typeSerializer instanceof MapSerializer) { - return ((MapSerializer) typeSerializer).getValueSerializer() - instanceof TtlAwareSerializer - ? typeSerializer - : new MapSerializer<>( - ((MapSerializer) typeSerializer).getKeySerializer(), - new TtlAwareSerializer<>( - ((MapSerializer) typeSerializer).getValueSerializer())); - } - - return new TtlAwareSerializer<>(typeSerializer); + public TtlAwareSerializer> getElementSerializer() { + return (TtlAwareSerializer>) TtlAwareSerializer.wrapTtlAwareSerializer(getOriginalTypeSerializer().getElementSerializer()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareMapSerializer.java index 4cf6bc7c1d070..27a64311bab96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareMapSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareMapSerializer.java @@ -19,37 +19,34 @@ package org.apache.flink.runtime.state.ttl; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.MapSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.function.SupplierWithException; -import java.io.IOException; import java.util.List; -import java.util.Objects; +import java.util.Map; /** - * The list version of TtlAwareSerializer. - * @param + * The map version of TtlAwareSerializer. */ -public class TtlAwareListSerializer extends TtlAwareSerializer, ListSerializer> { +public class TtlAwareMapSerializer extends TtlAwareSerializer, MapSerializer> { - public TtlAwareListSerializer(ListSerializer typeSerializer) { + + public TtlAwareMapSerializer(MapSerializer typeSerializer) { super(typeSerializer); } // ------------------------------------------------------------------------ - // ListSerializer specific properties + // MapSerializer specific properties // ------------------------------------------------------------------------ - /** - * Gets the serializer for the elements of the list. - * - * @return The serializer for the elements of the list - */ - public TypeSerializer getElementSerializer() { - return getOriginalTypeSerializer().getElementSerializer(); + @SuppressWarnings("unchecked") + public TtlAwareSerializer> getKeySerializer() { + return (TtlAwareSerializer>) TtlAwareSerializer.wrapTtlAwareSerializer(getOriginalTypeSerializer().getKeySerializer()); + } + + @SuppressWarnings("unchecked") + public TtlAwareSerializer> getValueSerializer() { + return (TtlAwareSerializer>) TtlAwareSerializer.wrapTtlAwareSerializer(getOriginalTypeSerializer().getValueSerializer()); } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializer.java index ae458e3193561..a45b66643b326 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializer.java @@ -34,15 +34,19 @@ * wrapped {@link TypeSerializer} is instance of {@link TtlStateFactory.TtlSerializer}. Also, it * wraps the value migration process between TtlSerializer and non-ttl typeSerializer. */ -public class TtlAwareSerializer extends TypeSerializer { +public class TtlAwareSerializer> extends TypeSerializer { private final boolean isTtlEnabled; - private final TypeSerializer typeSerializer; + private final ORI typeSerializer; - public TtlAwareSerializer(TypeSerializer typeSerializer) { + public TtlAwareSerializer(ORI typeSerializer) { + this(typeSerializer, unnestAndDetermineIsTtlEnabled(typeSerializer)); + } + + public TtlAwareSerializer(ORI typeSerializer, boolean isTtlEnabled) { this.typeSerializer = typeSerializer; - this.isTtlEnabled = typeSerializer instanceof TtlStateFactory.TtlSerializer; + this.isTtlEnabled = isTtlEnabled; } @Override @@ -52,7 +56,7 @@ public boolean isImmutableType() { @Override public TypeSerializer duplicate() { - return new TtlAwareSerializer<>(typeSerializer.duplicate()); + return new TtlAwareSerializer<>(typeSerializer.duplicate(), isTtlEnabled); } @Override @@ -80,6 +84,10 @@ public void serialize(T record, DataOutputView target) throws IOException { typeSerializer.serialize(record, target); } + public void serialize(T record, boolean withTtl, DataOutputView target) throws IOException { + typeSerializer.serialize(record, target); + } + @Override public T deserialize(DataInputView source) throws IOException { return typeSerializer.deserialize(source); @@ -98,7 +106,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - TtlAwareSerializer that = (TtlAwareSerializer) o; + TtlAwareSerializer that = (TtlAwareSerializer) o; return isTtlEnabled == that.isTtlEnabled && Objects.equals(typeSerializer, that.typeSerializer); } @@ -110,7 +118,7 @@ public int hashCode() { @SuppressWarnings("unchecked") public void migrateValueFromPriorSerializer( - TtlAwareSerializer priorTtlAwareSerializer, + TtlAwareSerializer priorTtlAwareSerializer, SupplierWithException inputSupplier, DataOutputView target, TtlTimeProvider ttlTimeProvider) @@ -142,7 +150,7 @@ public boolean isTtlEnabled() { return isTtlEnabled; } - public TypeSerializer getOriginalTypeSerializer() { + public ORI getOriginalTypeSerializer() { return typeSerializer; } @@ -153,27 +161,14 @@ public TypeSerializerSnapshot snapshotConfiguration() { } public static boolean isSerializerTtlEnabled(TypeSerializer typeSerializer) { - TypeSerializer wrappedTypeSerializer = wrapTtlAwareSerializer(typeSerializer); - boolean ttlSerializer = - wrappedTypeSerializer instanceof TtlAwareSerializer - && ((TtlAwareSerializer) wrappedTypeSerializer).isTtlEnabled(); - boolean ttlListSerializer = - wrappedTypeSerializer instanceof ListSerializer - && ((ListSerializer) wrappedTypeSerializer).getElementSerializer() - instanceof TtlAwareSerializer - && ((TtlAwareSerializer) - ((ListSerializer) wrappedTypeSerializer) - .getElementSerializer()) - .isTtlEnabled(); - boolean ttlMapSerializer = - wrappedTypeSerializer instanceof MapSerializer - && ((MapSerializer) wrappedTypeSerializer).getValueSerializer() - instanceof TtlAwareSerializer - && ((TtlAwareSerializer) - ((MapSerializer) wrappedTypeSerializer) - .getValueSerializer()) - .isTtlEnabled(); - return ttlSerializer || ttlListSerializer || ttlMapSerializer; + return wrapTtlAwareSerializer(typeSerializer).isTtlEnabled(); + } + + private static boolean unnestAndDetermineIsTtlEnabled(TypeSerializer typeSerializer) { + if (typeSerializer instanceof TtlAwareSerializer) { + return ((TtlAwareSerializer) typeSerializer).isTtlEnabled(); + } + return TtlStateFactory.TtlSerializer.isTtlStateSerializer(typeSerializer); } public static boolean needTtlStateMigration( @@ -182,28 +177,17 @@ public static boolean needTtlStateMigration( != TtlAwareSerializer.isSerializerTtlEnabled(newSerializer); } - public static TypeSerializer wrapTtlAwareSerializer(TypeSerializer typeSerializer) { + public static TtlAwareSerializer wrapTtlAwareSerializer(TypeSerializer typeSerializer) { if (typeSerializer instanceof TtlAwareSerializer) { - return typeSerializer; + return (TtlAwareSerializer) typeSerializer; } if (typeSerializer instanceof ListSerializer) { - return ((ListSerializer) typeSerializer).getElementSerializer() - instanceof TtlAwareSerializer - ? typeSerializer - : new ListSerializer<>( - new TtlAwareSerializer<>( - ((ListSerializer) typeSerializer).getElementSerializer())); + return new TtlAwareListSerializer<>((ListSerializer) typeSerializer); } if (typeSerializer instanceof MapSerializer) { - return ((MapSerializer) typeSerializer).getValueSerializer() - instanceof TtlAwareSerializer - ? typeSerializer - : new MapSerializer<>( - ((MapSerializer) typeSerializer).getKeySerializer(), - new TtlAwareSerializer<>( - ((MapSerializer) typeSerializer).getValueSerializer())); + return new TtlAwareMapSerializer<>((MapSerializer) typeSerializer); } return new TtlAwareSerializer<>(typeSerializer); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerSnapshot.java index 0ae09f5dd8e1d..95a3cf05b6b30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerSnapshot.java @@ -83,7 +83,7 @@ public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCode @Override public TypeSerializer restoreSerializer() { - return new TtlAwareSerializer<>(typeSerializerSnapshot.restoreSerializer()); + return new TtlAwareSerializer<>(typeSerializerSnapshot.restoreSerializer(), isTtlEnabled); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 4d3713973495d..7188010fe08d7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -1162,7 +1162,7 @@ void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception InternalKvState internalKvState = (InternalKvState) state; KryoSerializer kryoSerializer = (KryoSerializer) - ((TtlAwareSerializer) internalKvState.getValueSerializer()) + ((TtlAwareSerializer) internalKvState.getValueSerializer()) .getOriginalTypeSerializer(); int mainPojoClassRegistrationId = kryoSerializer.getKryo().getRegistration(TestPojo.class).getId(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerSnapshotWrapperTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerSnapshotWrapperTest.java index 1db227c26ab36..c5997b6b4e88d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerSnapshotWrapperTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerSnapshotWrapperTest.java @@ -1,141 +1,141 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.ttl; - -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.ListSerializer; -import org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot; -import org.apache.flink.api.common.typeutils.base.MapSerializer; -import org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot; -import org.apache.flink.api.common.typeutils.base.StringSerializer; - -import org.junit.jupiter.api.Test; - -import java.util.List; -import java.util.Map; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Unit test for {@link TtlAwareSerializerSnapshotWrapper}. */ -public class TtlAwareSerializerSnapshotWrapperTest { - @Test - public void testValueStateTtlAwareSerializerSnapshot() { - TypeSerializerSnapshot intSerializerSnapshot = - IntSerializer.INSTANCE.snapshotConfiguration(); - TypeSerializerSnapshot serializerSnapshot = - (new TtlAwareSerializerSnapshotWrapper<>(intSerializerSnapshot)) - .getTtlAwareSerializerSnapshot(); - assertThat(serializerSnapshot).isInstanceOf(TtlAwareSerializerSnapshot.class); - assertThat( - ((TtlAwareSerializer) serializerSnapshot.restoreSerializer()) - .getOriginalTypeSerializer()) - .isInstanceOf(IntSerializer.class); - } - - @Test - public void testRestoreValueSerializer() { - TypeSerializerSnapshot intSerializerSnapshot = - IntSerializer.INSTANCE.snapshotConfiguration(); - TypeSerializerSnapshot serializerSnapshot = - (new TtlAwareSerializerSnapshotWrapper<>(intSerializerSnapshot)) - .getTtlAwareSerializerSnapshot(); - assertThat(serializerSnapshot.restoreSerializer()).isInstanceOf(TtlAwareSerializer.class); - assertThat( - ((TtlAwareSerializer) serializerSnapshot.restoreSerializer()) - .getOriginalTypeSerializer()) - .isInstanceOf(IntSerializer.class); - } - - @Test - public void testListStateTtlAwareSerializerSnapshot() { - ListSerializer listSerializer = new ListSerializer<>(IntSerializer.INSTANCE); - TypeSerializerSnapshot> listTypeSerializerSnapshot = - listSerializer.snapshotConfiguration(); - TypeSerializerSnapshot> serializerSnapshot = - (new TtlAwareSerializerSnapshotWrapper<>(listTypeSerializerSnapshot)) - .getTtlAwareSerializerSnapshot(); - - assertThat(serializerSnapshot).isInstanceOf(ListSerializerSnapshot.class); - assertThat( - ((ListSerializerSnapshot) serializerSnapshot) - .getElementSerializerSnapshot()) - .isInstanceOf(TtlAwareSerializerSnapshot.class); - } - - @Test - @SuppressWarnings("rawtypes") - public void testRestoreListSerializer() { - ListSerializer listSerializer = new ListSerializer<>(IntSerializer.INSTANCE); - TypeSerializerSnapshot> listTypeSerializerSnapshot = - listSerializer.snapshotConfiguration(); - TypeSerializerSnapshot> serializerSnapshot = - (new TtlAwareSerializerSnapshotWrapper<>(listTypeSerializerSnapshot)) - .getTtlAwareSerializerSnapshot(); - - assertThat(serializerSnapshot.restoreSerializer()).isInstanceOf(ListSerializer.class); - assertThat(((ListSerializer) serializerSnapshot.restoreSerializer()).getElementSerializer()) - .isInstanceOf(TtlAwareSerializer.class); - assertThat( - ((TtlAwareSerializer) - ((ListSerializer) serializerSnapshot.restoreSerializer()) - .getElementSerializer()) - .getOriginalTypeSerializer()) - .isInstanceOf(IntSerializer.class); - } - - @Test - public void testMapStateTtlAwareSerializerSnapshot() { - MapSerializer mapSerializer = - new MapSerializer<>(StringSerializer.INSTANCE, StringSerializer.INSTANCE); - TypeSerializerSnapshot> mapSerializerSnapshot = - mapSerializer.snapshotConfiguration(); - TypeSerializerSnapshot> serializerSnapshot = - (new TtlAwareSerializerSnapshotWrapper<>(mapSerializerSnapshot)) - .getTtlAwareSerializerSnapshot(); - - assertThat(serializerSnapshot).isInstanceOf(MapSerializerSnapshot.class); - assertThat( - ((MapSerializerSnapshot) serializerSnapshot) - .getValueSerializerSnapshot()) - .isInstanceOf(TtlAwareSerializerSnapshot.class); - } - - @Test - @SuppressWarnings("rawtypes") - public void testRestoreMapSerializer() { - MapSerializer mapSerializer = - new MapSerializer<>(StringSerializer.INSTANCE, StringSerializer.INSTANCE); - TypeSerializerSnapshot> mapSerializerSnapshot = - mapSerializer.snapshotConfiguration(); - TypeSerializerSnapshot> serializerSnapshot = - (new TtlAwareSerializerSnapshotWrapper<>(mapSerializerSnapshot)) - .getTtlAwareSerializerSnapshot(); - - assertThat(serializerSnapshot.restoreSerializer()).isInstanceOf(MapSerializer.class); - assertThat(((MapSerializer) serializerSnapshot.restoreSerializer()).getValueSerializer()) - .isInstanceOf(TtlAwareSerializer.class); - assertThat( - ((TtlAwareSerializer) - ((MapSerializer) serializerSnapshot.restoreSerializer()) - .getValueSerializer()) - .getOriginalTypeSerializer()) - .isInstanceOf(StringSerializer.class); - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.flink.runtime.state.ttl; +// +//import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +//import org.apache.flink.api.common.typeutils.base.IntSerializer; +//import org.apache.flink.api.common.typeutils.base.ListSerializer; +//import org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot; +//import org.apache.flink.api.common.typeutils.base.MapSerializer; +//import org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot; +//import org.apache.flink.api.common.typeutils.base.StringSerializer; +// +//import org.junit.jupiter.api.Test; +// +//import java.util.List; +//import java.util.Map; +// +//import static org.assertj.core.api.Assertions.assertThat; +// +///** Unit test for {@link TtlAwareSerializerSnapshotWrapper}. */ +//public class TtlAwareSerializerSnapshotWrapperTest { +// @Test +// public void testValueStateTtlAwareSerializerSnapshot() { +// TypeSerializerSnapshot intSerializerSnapshot = +// IntSerializer.INSTANCE.snapshotConfiguration(); +// TypeSerializerSnapshot serializerSnapshot = +// (new TtlAwareSerializerSnapshotWrapper<>(intSerializerSnapshot)) +// .getTtlAwareSerializerSnapshot(); +// assertThat(serializerSnapshot).isInstanceOf(TtlAwareSerializerSnapshot.class); +// assertThat( +// ((TtlAwareSerializer) serializerSnapshot.restoreSerializer()) +// .getOriginalTypeSerializer()) +// .isInstanceOf(IntSerializer.class); +// } +// +// @Test +// public void testRestoreValueSerializer() { +// TypeSerializerSnapshot intSerializerSnapshot = +// IntSerializer.INSTANCE.snapshotConfiguration(); +// TypeSerializerSnapshot serializerSnapshot = +// (new TtlAwareSerializerSnapshotWrapper<>(intSerializerSnapshot)) +// .getTtlAwareSerializerSnapshot(); +// assertThat(serializerSnapshot.restoreSerializer()).isInstanceOf(TtlAwareSerializer.class); +// assertThat( +// ((TtlAwareSerializer) serializerSnapshot.restoreSerializer()) +// .getOriginalTypeSerializer()) +// .isInstanceOf(IntSerializer.class); +// } +// +// @Test +// public void testListStateTtlAwareSerializerSnapshot() { +// ListSerializer listSerializer = new ListSerializer<>(IntSerializer.INSTANCE); +// TypeSerializerSnapshot> listTypeSerializerSnapshot = +// listSerializer.snapshotConfiguration(); +// TypeSerializerSnapshot> serializerSnapshot = +// (new TtlAwareSerializerSnapshotWrapper<>(listTypeSerializerSnapshot)) +// .getTtlAwareSerializerSnapshot(); +// +// assertThat(serializerSnapshot).isInstanceOf(ListSerializerSnapshot.class); +// assertThat( +// ((ListSerializerSnapshot) serializerSnapshot) +// .getElementSerializerSnapshot()) +// .isInstanceOf(TtlAwareSerializerSnapshot.class); +// } +// +// @Test +// @SuppressWarnings("rawtypes") +// public void testRestoreListSerializer() { +// ListSerializer listSerializer = new ListSerializer<>(IntSerializer.INSTANCE); +// TypeSerializerSnapshot> listTypeSerializerSnapshot = +// listSerializer.snapshotConfiguration(); +// TypeSerializerSnapshot> serializerSnapshot = +// (new TtlAwareSerializerSnapshotWrapper<>(listTypeSerializerSnapshot)) +// .getTtlAwareSerializerSnapshot(); +// +// assertThat(serializerSnapshot.restoreSerializer()).isInstanceOf(ListSerializer.class); +// assertThat(((ListSerializer) serializerSnapshot.restoreSerializer()).getElementSerializer()) +// .isInstanceOf(TtlAwareSerializer.class); +// assertThat( +// ((TtlAwareSerializer) +// ((ListSerializer) serializerSnapshot.restoreSerializer()) +// .getElementSerializer()) +// .getOriginalTypeSerializer()) +// .isInstanceOf(IntSerializer.class); +// } +// +// @Test +// public void testMapStateTtlAwareSerializerSnapshot() { +// MapSerializer mapSerializer = +// new MapSerializer<>(StringSerializer.INSTANCE, StringSerializer.INSTANCE); +// TypeSerializerSnapshot> mapSerializerSnapshot = +// mapSerializer.snapshotConfiguration(); +// TypeSerializerSnapshot> serializerSnapshot = +// (new TtlAwareSerializerSnapshotWrapper<>(mapSerializerSnapshot)) +// .getTtlAwareSerializerSnapshot(); +// +// assertThat(serializerSnapshot).isInstanceOf(MapSerializerSnapshot.class); +// assertThat( +// ((MapSerializerSnapshot) serializerSnapshot) +// .getValueSerializerSnapshot()) +// .isInstanceOf(TtlAwareSerializerSnapshot.class); +// } +// +// @Test +// @SuppressWarnings("rawtypes") +// public void testRestoreMapSerializer() { +// MapSerializer mapSerializer = +// new MapSerializer<>(StringSerializer.INSTANCE, StringSerializer.INSTANCE); +// TypeSerializerSnapshot> mapSerializerSnapshot = +// mapSerializer.snapshotConfiguration(); +// TypeSerializerSnapshot> serializerSnapshot = +// (new TtlAwareSerializerSnapshotWrapper<>(mapSerializerSnapshot)) +// .getTtlAwareSerializerSnapshot(); +// +// assertThat(serializerSnapshot.restoreSerializer()).isInstanceOf(MapSerializer.class); +// assertThat(((MapSerializer) serializerSnapshot.restoreSerializer()).getValueSerializer()) +// .isInstanceOf(TtlAwareSerializer.class); +// assertThat( +// ((TtlAwareSerializer) +// ((MapSerializer) serializerSnapshot.restoreSerializer()) +// .getValueSerializer()) +// .getOriginalTypeSerializer()) +// .isInstanceOf(StringSerializer.class); +// } +//} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerTest.java index 650d5e93984e8..88217ff2575e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerTest.java @@ -1,165 +1,165 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.ttl; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.ListSerializer; -import org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.common.typeutils.base.MapSerializer; -import org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot; - -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -class TtlAwareSerializerTest { - - @Test - void testSerializerTtlEnabled() { - IntSerializer intSerializer = IntSerializer.INSTANCE; - ListSerializer listSerializer = new ListSerializer<>(intSerializer); - MapSerializer mapSerializer = - new MapSerializer<>(intSerializer, intSerializer); - - assertThat(TtlAwareSerializer.isSerializerTtlEnabled(intSerializer)).isFalse(); - assertThat(TtlAwareSerializer.isSerializerTtlEnabled(listSerializer)).isFalse(); - assertThat(TtlAwareSerializer.isSerializerTtlEnabled(mapSerializer)).isFalse(); - - TtlStateFactory.TtlSerializer intTtlSerializer = - new TtlStateFactory.TtlSerializer<>(LongSerializer.INSTANCE, intSerializer); - ListSerializer> listTtlSerializer = - new ListSerializer<>(intTtlSerializer); - MapSerializer> mapTtlSerializer = - new MapSerializer<>(intSerializer, intTtlSerializer); - - assertThat(TtlAwareSerializer.isSerializerTtlEnabled(intTtlSerializer)).isTrue(); - assertThat(TtlAwareSerializer.isSerializerTtlEnabled(listTtlSerializer)).isTrue(); - assertThat(TtlAwareSerializer.isSerializerTtlEnabled(mapTtlSerializer)).isTrue(); - - assertThat(TtlAwareSerializer.needTtlStateMigration(intSerializer, intTtlSerializer)) - .isTrue(); - assertThat(TtlAwareSerializer.needTtlStateMigration(listSerializer, listTtlSerializer)) - .isTrue(); - assertThat(TtlAwareSerializer.needTtlStateMigration(mapSerializer, mapTtlSerializer)) - .isTrue(); - } - - @Test - void testWrapTypeSerializer() { - IntSerializer intSerializer = IntSerializer.INSTANCE; - ListSerializer listSerializer = new ListSerializer<>(intSerializer); - MapSerializer mapSerializer = - new MapSerializer<>(intSerializer, intSerializer); - - TypeSerializer intTtlAwareSerializer = - TtlAwareSerializer.wrapTtlAwareSerializer(intSerializer); - ListSerializer listTtlAwareSerializer = - (ListSerializer) TtlAwareSerializer.wrapTtlAwareSerializer(listSerializer); - MapSerializer mapTtlAwareSerializer = - (MapSerializer) TtlAwareSerializer.wrapTtlAwareSerializer(mapSerializer); - - assertThat(intTtlAwareSerializer).isInstanceOf(TtlAwareSerializer.class); - assertThat(((TtlAwareSerializer) intTtlAwareSerializer).isTtlEnabled()).isFalse(); - assertThat(listTtlAwareSerializer.getElementSerializer()) - .isInstanceOf(TtlAwareSerializer.class); - assertThat( - ((TtlAwareSerializer) listTtlAwareSerializer.getElementSerializer()) - .isTtlEnabled()) - .isFalse(); - assertThat(mapTtlAwareSerializer.getValueSerializer()) - .isInstanceOf(TtlAwareSerializer.class); - assertThat( - ((TtlAwareSerializer) mapTtlAwareSerializer.getValueSerializer()) - .isTtlEnabled()) - .isFalse(); - } - - @Test - void testWrapTtlSerializer() { - TtlStateFactory.TtlSerializer intTtlSerializer = - new TtlStateFactory.TtlSerializer<>( - LongSerializer.INSTANCE, IntSerializer.INSTANCE); - ListSerializer> listTtlSerializer = - new ListSerializer<>(intTtlSerializer); - MapSerializer> mapTtlSerializer = - new MapSerializer<>(IntSerializer.INSTANCE, intTtlSerializer); - - TypeSerializer intTtlAwareSerializer = - TtlAwareSerializer.wrapTtlAwareSerializer(intTtlSerializer); - ListSerializer listTtlAwareSerializer = - (ListSerializer) TtlAwareSerializer.wrapTtlAwareSerializer(listTtlSerializer); - MapSerializer mapTtlAwareSerializer = - (MapSerializer) TtlAwareSerializer.wrapTtlAwareSerializer(mapTtlSerializer); - - assertThat(intTtlAwareSerializer).isInstanceOf(TtlAwareSerializer.class); - assertThat(((TtlAwareSerializer) intTtlAwareSerializer).isTtlEnabled()).isTrue(); - assertThat(listTtlAwareSerializer.getElementSerializer()) - .isInstanceOf(TtlAwareSerializer.class); - assertThat( - ((TtlAwareSerializer) listTtlAwareSerializer.getElementSerializer()) - .isTtlEnabled()) - .isTrue(); - assertThat(mapTtlAwareSerializer.getValueSerializer()) - .isInstanceOf(TtlAwareSerializer.class); - assertThat( - ((TtlAwareSerializer) mapTtlAwareSerializer.getValueSerializer()) - .isTtlEnabled()) - .isTrue(); - } - - @Test - @SuppressWarnings("rawtypes") - void testSnapshotConfiguration() { - TypeSerializer intTtlAwareSerializer = - TtlAwareSerializer.wrapTtlAwareSerializer(IntSerializer.INSTANCE); - ListSerializer listTtlAwareSerializer = - (ListSerializer) - TtlAwareSerializer.wrapTtlAwareSerializer( - new ListSerializer<>(IntSerializer.INSTANCE)); - MapSerializer mapTtlAwareSerializer = - (MapSerializer) - TtlAwareSerializer.wrapTtlAwareSerializer( - new MapSerializer<>( - IntSerializer.INSTANCE, IntSerializer.INSTANCE)); - - assertThat(intTtlAwareSerializer.snapshotConfiguration()) - .isInstanceOf(TtlAwareSerializerSnapshot.class); - assertThat( - ((TtlAwareSerializerSnapshot) - intTtlAwareSerializer.snapshotConfiguration()) - .getOrinalTypeSerializerSnapshot()) - .isInstanceOf(IntSerializer.IntSerializerSnapshot.class); - - assertThat(listTtlAwareSerializer.snapshotConfiguration()) - .isInstanceOf(ListSerializerSnapshot.class); - assertThat( - (((ListSerializerSnapshot) listTtlAwareSerializer.snapshotConfiguration()) - .getElementSerializerSnapshot())) - .isInstanceOf(TtlAwareSerializerSnapshot.class); - - assertThat(mapTtlAwareSerializer.snapshotConfiguration()) - .isInstanceOf(MapSerializerSnapshot.class); - assertThat( - (((MapSerializerSnapshot) mapTtlAwareSerializer.snapshotConfiguration()) - .getValueSerializerSnapshot())) - .isInstanceOf(TtlAwareSerializerSnapshot.class); - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.flink.runtime.state.ttl; +// +//import org.apache.flink.api.common.typeutils.TypeSerializer; +//import org.apache.flink.api.common.typeutils.base.IntSerializer; +//import org.apache.flink.api.common.typeutils.base.ListSerializer; +//import org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot; +//import org.apache.flink.api.common.typeutils.base.LongSerializer; +//import org.apache.flink.api.common.typeutils.base.MapSerializer; +//import org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot; +// +//import org.junit.jupiter.api.Test; +// +//import static org.assertj.core.api.Assertions.assertThat; +// +//class TtlAwareSerializerTest { +// +// @Test +// void testSerializerTtlEnabled() { +// IntSerializer intSerializer = IntSerializer.INSTANCE; +// ListSerializer listSerializer = new ListSerializer<>(intSerializer); +// MapSerializer mapSerializer = +// new MapSerializer<>(intSerializer, intSerializer); +// +// assertThat(TtlAwareSerializer.isSerializerTtlEnabled(intSerializer)).isFalse(); +// assertThat(TtlAwareSerializer.isSerializerTtlEnabled(listSerializer)).isFalse(); +// assertThat(TtlAwareSerializer.isSerializerTtlEnabled(mapSerializer)).isFalse(); +// +// TtlStateFactory.TtlSerializer intTtlSerializer = +// new TtlStateFactory.TtlSerializer<>(LongSerializer.INSTANCE, intSerializer); +// ListSerializer> listTtlSerializer = +// new ListSerializer<>(intTtlSerializer); +// MapSerializer> mapTtlSerializer = +// new MapSerializer<>(intSerializer, intTtlSerializer); +// +// assertThat(TtlAwareSerializer.isSerializerTtlEnabled(intTtlSerializer)).isTrue(); +// assertThat(TtlAwareSerializer.isSerializerTtlEnabled(listTtlSerializer)).isTrue(); +// assertThat(TtlAwareSerializer.isSerializerTtlEnabled(mapTtlSerializer)).isTrue(); +// +// assertThat(TtlAwareSerializer.needTtlStateMigration(intSerializer, intTtlSerializer)) +// .isTrue(); +// assertThat(TtlAwareSerializer.needTtlStateMigration(listSerializer, listTtlSerializer)) +// .isTrue(); +// assertThat(TtlAwareSerializer.needTtlStateMigration(mapSerializer, mapTtlSerializer)) +// .isTrue(); +// } +// +// @Test +// void testWrapTypeSerializer() { +// IntSerializer intSerializer = IntSerializer.INSTANCE; +// ListSerializer listSerializer = new ListSerializer<>(intSerializer); +// MapSerializer mapSerializer = +// new MapSerializer<>(intSerializer, intSerializer); +// +// TypeSerializer intTtlAwareSerializer = +// TtlAwareSerializer.wrapTtlAwareSerializer(intSerializer); +// ListSerializer listTtlAwareSerializer = +// (ListSerializer) TtlAwareSerializer.wrapTtlAwareSerializer(listSerializer); +// MapSerializer mapTtlAwareSerializer = +// (MapSerializer) TtlAwareSerializer.wrapTtlAwareSerializer(mapSerializer); +// +// assertThat(intTtlAwareSerializer).isInstanceOf(TtlAwareSerializer.class); +// assertThat(((TtlAwareSerializer) intTtlAwareSerializer).isTtlEnabled()).isFalse(); +// assertThat(listTtlAwareSerializer.getElementSerializer()) +// .isInstanceOf(TtlAwareSerializer.class); +// assertThat( +// ((TtlAwareSerializer) listTtlAwareSerializer.getElementSerializer()) +// .isTtlEnabled()) +// .isFalse(); +// assertThat(mapTtlAwareSerializer.getValueSerializer()) +// .isInstanceOf(TtlAwareSerializer.class); +// assertThat( +// ((TtlAwareSerializer) mapTtlAwareSerializer.getValueSerializer()) +// .isTtlEnabled()) +// .isFalse(); +// } +// +// @Test +// void testWrapTtlSerializer() { +// TtlStateFactory.TtlSerializer intTtlSerializer = +// new TtlStateFactory.TtlSerializer<>( +// LongSerializer.INSTANCE, IntSerializer.INSTANCE); +// ListSerializer> listTtlSerializer = +// new ListSerializer<>(intTtlSerializer); +// MapSerializer> mapTtlSerializer = +// new MapSerializer<>(IntSerializer.INSTANCE, intTtlSerializer); +// +// TypeSerializer intTtlAwareSerializer = +// TtlAwareSerializer.wrapTtlAwareSerializer(intTtlSerializer); +// ListSerializer listTtlAwareSerializer = +// (ListSerializer) TtlAwareSerializer.wrapTtlAwareSerializer(listTtlSerializer); +// MapSerializer mapTtlAwareSerializer = +// (MapSerializer) TtlAwareSerializer.wrapTtlAwareSerializer(mapTtlSerializer); +// +// assertThat(intTtlAwareSerializer).isInstanceOf(TtlAwareSerializer.class); +// assertThat(((TtlAwareSerializer) intTtlAwareSerializer).isTtlEnabled()).isTrue(); +// assertThat(listTtlAwareSerializer.getElementSerializer()) +// .isInstanceOf(TtlAwareSerializer.class); +// assertThat( +// ((TtlAwareSerializer) listTtlAwareSerializer.getElementSerializer()) +// .isTtlEnabled()) +// .isTrue(); +// assertThat(mapTtlAwareSerializer.getValueSerializer()) +// .isInstanceOf(TtlAwareSerializer.class); +// assertThat( +// ((TtlAwareSerializer) mapTtlAwareSerializer.getValueSerializer()) +// .isTtlEnabled()) +// .isTrue(); +// } +// +// @Test +// @SuppressWarnings("rawtypes") +// void testSnapshotConfiguration() { +// TypeSerializer intTtlAwareSerializer = +// TtlAwareSerializer.wrapTtlAwareSerializer(IntSerializer.INSTANCE); +// ListSerializer listTtlAwareSerializer = +// (ListSerializer) +// TtlAwareSerializer.wrapTtlAwareSerializer( +// new ListSerializer<>(IntSerializer.INSTANCE)); +// MapSerializer mapTtlAwareSerializer = +// (MapSerializer) +// TtlAwareSerializer.wrapTtlAwareSerializer( +// new MapSerializer<>( +// IntSerializer.INSTANCE, IntSerializer.INSTANCE)); +// +// assertThat(intTtlAwareSerializer.snapshotConfiguration()) +// .isInstanceOf(TtlAwareSerializerSnapshot.class); +// assertThat( +// ((TtlAwareSerializerSnapshot) +// intTtlAwareSerializer.snapshotConfiguration()) +// .getOrinalTypeSerializerSnapshot()) +// .isInstanceOf(IntSerializer.IntSerializerSnapshot.class); +// +// assertThat(listTtlAwareSerializer.snapshotConfiguration()) +// .isInstanceOf(ListSerializerSnapshot.class); +// assertThat( +// (((ListSerializerSnapshot) listTtlAwareSerializer.snapshotConfiguration()) +// .getElementSerializerSnapshot())) +// .isInstanceOf(TtlAwareSerializerSnapshot.class); +// +// assertThat(mapTtlAwareSerializer.snapshotConfiguration()) +// .isInstanceOf(MapSerializerSnapshot.class); +// assertThat( +// (((MapSerializerSnapshot) mapTtlAwareSerializer.snapshotConfiguration()) +// .getValueSerializerSnapshot())) +// .isInstanceOf(TtlAwareSerializerSnapshot.class); +// } +//} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerUpgradeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerUpgradeTest.java index fff29aa44175b..d8b4234252c24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerUpgradeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAwareSerializerUpgradeTest.java @@ -155,8 +155,8 @@ public DataInputView readAndThenWriteData( TypeSerializer writeSerializer, Condition testDataCondition) throws IOException { - TtlAwareSerializer reader = (TtlAwareSerializer) readSerializer; - TtlAwareSerializer writer = (TtlAwareSerializer) writeSerializer; + TtlAwareSerializer reader = (TtlAwareSerializer) readSerializer; + TtlAwareSerializer writer = (TtlAwareSerializer) writeSerializer; DataOutputSerializer migratedOut = new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE); writer.migrateValueFromPriorSerializer( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java index 0e3b6ca8a4668..3a7cc5c3f6f5e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java @@ -193,8 +193,8 @@ public void migrateSerializedValue( throws StateMigrationException { checkArgument(priorSerializer instanceof TtlAwareSerializer); checkArgument(newSerializer instanceof TtlAwareSerializer); - TtlAwareSerializer ttlAwarePriorSerializer = (TtlAwareSerializer) priorSerializer; - TtlAwareSerializer ttlAwareNewSerializer = (TtlAwareSerializer) newSerializer; + TtlAwareSerializer ttlAwarePriorSerializer = (TtlAwareSerializer) priorSerializer; + TtlAwareSerializer ttlAwareNewSerializer = (TtlAwareSerializer) newSerializer; try { ttlAwareNewSerializer.migrateValueFromPriorSerializer( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java index 3f5ea2df30c6f..ee05323be582c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java @@ -681,9 +681,7 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { RocksDbKvStateInfo oldStateInfo = kvStateInformation.get(stateDesc.getName()); - TypeSerializer stateSerializer = - (TypeSerializer) - TtlAwareSerializer.wrapTtlAwareSerializer(stateDesc.getSerializer()); + TypeSerializer stateSerializer = stateDesc.getSerializer(); RocksDbKvStateInfo newRocksStateInfo; RegisteredKeyValueStateBackendMetaInfo newMetaInfo; @@ -863,10 +861,13 @@ private void migrateStateValues( DataInputDeserializer serializedValueInput = new DataInputDeserializer(); DataOutputSerializer migratedSerializedValueOutput = new DataOutputSerializer(512); + TtlAwareSerializer previousTtlAwareSerializer = (TtlAwareSerializer) TtlAwareSerializer.wrapTtlAwareSerializer(stateMetaInfo.f1.getPreviousStateSerializer()); + TtlAwareSerializer currentTtlAwareSerializer = (TtlAwareSerializer) TtlAwareSerializer.wrapTtlAwareSerializer(stateMetaInfo.f1.getPreviousStateSerializer()); + // Check if it is ttl state migration if (TtlAwareSerializer.needTtlStateMigration( - stateMetaInfo.f1.getPreviousStateSerializer(), - stateMetaInfo.f1.getStateSerializer())) { + previousTtlAwareSerializer, + currentTtlAwareSerializer)) { // By performing ttl state migration, we need to recreate column family to // enable/disable ttl compaction filter factory. db.dropColumnFamily(stateMetaInfo.f0); @@ -888,8 +889,8 @@ private void migrateStateValues( rocksDBState.migrateSerializedValue( serializedValueInput, migratedSerializedValueOutput, - stateMetaInfo.f1.getPreviousStateSerializer(), - stateMetaInfo.f1.getStateSerializer(), + previousTtlAwareSerializer, + currentTtlAwareSerializer, this.ttlTimeProvider); batchWriter.put( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBListState.java index 40c4c06201b87..ee99fa0445284 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBListState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBListState.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.ttl.TtlAwareListSerializer; import org.apache.flink.runtime.state.ttl.TtlAwareSerializer; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.FlinkRuntimeException; @@ -203,20 +204,13 @@ public void migrateSerializedValue( TypeSerializer> newSerializer, TtlTimeProvider ttlTimeProvider) throws StateMigrationException { - Preconditions.checkArgument(priorSerializer instanceof ListSerializer); - Preconditions.checkArgument(newSerializer instanceof ListSerializer); - Preconditions.checkArgument( - ((ListSerializer) priorSerializer).getElementSerializer() - instanceof TtlAwareSerializer); - Preconditions.checkArgument( - ((ListSerializer) newSerializer).getElementSerializer() - instanceof TtlAwareSerializer); - - TtlAwareSerializer priorTtlAwareElementSerializer = - (TtlAwareSerializer) - ((ListSerializer) priorSerializer).getElementSerializer(); - TtlAwareSerializer newTtlAwareElementSerializer = - (TtlAwareSerializer) ((ListSerializer) newSerializer).getElementSerializer(); + Preconditions.checkArgument(priorSerializer instanceof TtlAwareListSerializer); + Preconditions.checkArgument(newSerializer instanceof TtlAwareListSerializer); + + TtlAwareSerializer priorTtlAwareElementSerializer = + ((TtlAwareListSerializer) priorSerializer).getElementSerializer(); + TtlAwareSerializer newTtlAwareElementSerializer = + ((TtlAwareListSerializer) newSerializer).getElementSerializer(); try { while (serializedOldValueInput.available() > 0) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java index 30391f2890942..e497097f803bb 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java @@ -32,6 +32,8 @@ import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.ttl.TtlAwareListSerializer; +import org.apache.flink.runtime.state.ttl.TtlAwareMapSerializer; import org.apache.flink.runtime.state.ttl.TtlAwareSerializer; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.FlinkRuntimeException; @@ -231,21 +233,13 @@ public void migrateSerializedValue( TtlTimeProvider ttlTimeProvider) throws StateMigrationException { - checkArgument(priorSerializer instanceof MapSerializer); - checkArgument(newSerializer instanceof MapSerializer); - checkArgument( - ((MapSerializer) priorSerializer).getValueSerializer() - instanceof TtlAwareSerializer); - checkArgument( - ((MapSerializer) newSerializer).getValueSerializer() - instanceof TtlAwareSerializer); - - TtlAwareSerializer priorTtlAwareMapValueSerializer = - (TtlAwareSerializer) - ((MapSerializer) priorSerializer).getValueSerializer(); - TtlAwareSerializer newTtlAwareMapValueSerializer = - (TtlAwareSerializer) - ((MapSerializer) newSerializer).getValueSerializer(); + checkArgument(priorSerializer instanceof TtlAwareMapSerializer); + checkArgument(newSerializer instanceof TtlAwareMapSerializer); + + TtlAwareSerializer priorTtlAwareMapValueSerializer = + ((TtlAwareMapSerializer) priorSerializer).getValueSerializer(); + TtlAwareSerializer newTtlAwareMapValueSerializer = + ((TtlAwareMapSerializer) newSerializer).getValueSerializer(); try { boolean isNull = serializedOldValueInput.readBoolean(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java index 69c7f5706c6db..3dd50433c89e6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.ttl.TtlAwareSerializer; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.state.ttl.TtlUtils; import org.apache.flink.runtime.state.ttl.TtlValue; @@ -92,7 +93,8 @@ public void setAndRegisterCompactFilterIfStateTtl( if (metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) { RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase = (RegisteredKeyValueStateBackendMetaInfo) metaInfoBase; - if (TtlAwareSerializer.isSerializerTtlEnabled(kvMetaInfoBase.getStateSerializer())) { + if (TtlStateFactory.TtlSerializer.isTtlStateSerializer( + kvMetaInfoBase.getStateSerializer())) { createAndSetCompactFilterFactory(metaInfoBase.getName(), options); } }