From ee182511e71af5cd2193bdcc379220bbf3a7f363 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Wed, 23 Oct 2024 14:34:40 +0800 Subject: [PATCH] [FLINK-36589][state/runtime] Decouple the initialization of sync and async keyed statebackend --- .../input/StreamOperatorContextBuilder.java | 1 + .../AbstractAsyncStateStreamOperator.java | 32 +---- .../AbstractAsyncStateStreamOperatorV2.java | 34 +----- .../api/operators/AbstractStreamOperator.java | 25 ++-- .../operators/AbstractStreamOperatorV2.java | 24 +++- .../operators/StreamOperatorStateContext.java | 6 +- .../operators/StreamOperatorStateHandler.java | 55 +++++---- .../operators/StreamTaskStateInitializer.java | 3 +- .../StreamTaskStateInitializerImpl.java | 113 +++++++++++------- .../operators/StreamingRuntimeContext.java | 8 +- .../StateInitializationContextImplTest.java | 1 + .../StreamOperatorStateHandlerTest.java | 1 + .../StreamTaskStateInitializerImplTest.java | 2 + .../runtime/tasks/StreamTaskTest.java | 13 +- 14 files changed, 173 insertions(+), 145 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/StreamOperatorContextBuilder.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/StreamOperatorContextBuilder.java index c6d5446032e82..e52b1a1bd7076 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/StreamOperatorContextBuilder.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/StreamOperatorContextBuilder.java @@ -135,6 +135,7 @@ StreamOperatorStateContext build(Logger logger) throws IOException { registry, ctx.getMetricGroup(), 1.0, + false, false); } catch (Exception e) { throw new IOException("Failed to restore state backend", e); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index f44717f37bb1b..eafe692055766 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -28,18 +28,14 @@ import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.AsyncStateException; import org.apache.flink.runtime.asyncprocessing.RecordContext; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; -import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimerService; -import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -60,8 +56,6 @@ import javax.annotation.Nonnull; -import java.util.Optional; - import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -89,7 +83,6 @@ public abstract class AbstractAsyncStateStreamOperator extends AbstractStre public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { super.initializeState(streamTaskStateManager); - this.timeServiceManager = stateHandler.getAsyncInternalTimerServiceManager(); getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null)); final StreamTask containingTask = checkNotNull(getContainingTask()); environment = containingTask.getEnvironment(); @@ -239,25 +232,6 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { } } - @Override - public OperatorSnapshotFutures snapshotState( - long checkpointId, - long timestamp, - CheckpointOptions checkpointOptions, - CheckpointStreamFactory factory) - throws Exception { - return stateHandler.snapshotState( - this, - Optional.ofNullable(timeServiceManager), - getOperatorName(), - checkpointId, - timestamp, - checkpointOptions, - factory, - isUsingCustomRawKeyedState(), - true); - } - /** * Returns a {@link InternalTimerService} that can be used to query current processing time and * event time and to set timers. An operator can have several timer services, where each has its @@ -293,13 +267,13 @@ public InternalTimerService getInternalTimerService( InternalTimeServiceManager keyedTimeServiceHandler = (InternalTimeServiceManager) timeServiceManager; - KeyedStateBackend keyedStateBackend = getKeyedStateBackend(); - checkState(keyedStateBackend != null, "Timers can only be used on keyed operators."); + TypeSerializer keySerializer = stateHandler.getKeySerializer(); + checkState(keySerializer != null, "Timers can only be used on keyed operators."); // A {@link RecordContext} will be set as the current processing context to preserve record // order when the given {@link Triggerable} is invoked. return keyedTimeServiceHandler.getAsyncInternalTimerService( name, - keyedStateBackend.getKeySerializer(), + keySerializer, namespaceSerializer, triggerable, (AsyncExecutionController) asyncExecutionController); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 874d8709151bd..be5fe56bf1b50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -26,17 +26,13 @@ import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.AsyncStateException; import org.apache.flink.runtime.asyncprocessing.RecordContext; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; -import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimerService; -import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.Triggerable; @@ -56,8 +52,6 @@ import javax.annotation.Nonnull; -import java.util.Optional; - import static org.apache.flink.util.Preconditions.checkState; /** @@ -89,7 +83,6 @@ public AbstractAsyncStateStreamOperatorV2( public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { super.initializeState(streamTaskStateManager); - this.timeServiceManager = stateHandler.getAsyncInternalTimerServiceManager(); getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null)); final int inFlightRecordsLimit = getExecutionConfig().getAsyncInflightRecordsLimit(); @@ -213,25 +206,6 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { } } - @Override - public OperatorSnapshotFutures snapshotState( - long checkpointId, - long timestamp, - CheckpointOptions checkpointOptions, - CheckpointStreamFactory factory) - throws Exception { - return stateHandler.snapshotState( - this, - Optional.ofNullable(timeServiceManager), - getOperatorName(), - checkpointId, - timestamp, - checkpointOptions, - factory, - isUsingCustomRawKeyedState(), - true); - } - @SuppressWarnings("unchecked") public InternalTimerService getInternalTimerService( String name, TypeSerializer namespaceSerializer, Triggerable triggerable) { @@ -246,11 +220,13 @@ public InternalTimerService getInternalTimerService( InternalTimeServiceManager keyedTimeServiceHandler = (InternalTimeServiceManager) timeServiceManager; - KeyedStateBackend keyedStateBackend = getKeyedStateBackend(); - checkState(keyedStateBackend != null, "Timers can only be used on keyed operators."); + TypeSerializer keySerializer = stateHandler.getKeySerializer(); + checkState(keySerializer != null, "Timers can only be used on keyed operators."); + // A {@link RecordContext} will be set as the current processing context to preserve record + // order when the given {@link Triggerable} is invoked. return keyedTimeServiceHandler.getAsyncInternalTimerService( name, - keyedStateBackend.getKeySerializer(), + keySerializer, namespaceSerializer, triggerable, (AsyncExecutionController) asyncExecutionController); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 0b87b571071f6..7c7ba229d506a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -286,12 +286,15 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) runtimeContext.getJobConfiguration(), runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), runtimeContext.getUserCodeClassLoader()), - isUsingCustomRawKeyedState()); - + isUsingCustomRawKeyedState(), + isAsyncStateProcessingEnabled()); stateHandler = new StreamOperatorStateHandler( context, getExecutionConfig(), streamTaskCloseableRegistry); - timeServiceManager = context.internalTimerServiceManager(); + timeServiceManager = + isAsyncStateProcessingEnabled() + ? context.asyncInternalTimerServiceManager() + : context.internalTimerServiceManager(); stateHandler.initializeOperatorState(this); runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null)); } @@ -320,6 +323,14 @@ protected boolean isUsingCustomRawKeyedState() { return false; } + /** + * Indicates whether this operator is enabling the async state. Can be overridden by subclasses. + */ + @Internal + public boolean isAsyncStateProcessingEnabled() { + return false; + } + @Internal @Override public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { @@ -402,7 +413,7 @@ public OperatorSnapshotFutures snapshotState( checkpointOptions, factory, isUsingCustomRawKeyedState(), - false); + isAsyncStateProcessingEnabled()); } /** @@ -650,10 +661,10 @@ public InternalTimerService getInternalTimerService( @SuppressWarnings("unchecked") InternalTimeServiceManager keyedTimeServiceHandler = (InternalTimeServiceManager) timeServiceManager; - KeyedStateBackend keyedStateBackend = getKeyedStateBackend(); - checkState(keyedStateBackend != null, "Timers can only be used on keyed operators."); + TypeSerializer keySerializer = stateHandler.getKeySerializer(); + checkState(keySerializer != null, "Timers can only be used on keyed operators."); return keyedTimeServiceHandler.getInternalTimerService( - name, keyedStateBackend.getKeySerializer(), namespaceSerializer, triggerable); + name, keySerializer, namespaceSerializer, triggerable); } public void processWatermark(Watermark mark) throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index a7e65f7896db8..76b13cecc9988 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -217,10 +217,14 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) runtimeContext.getJobConfiguration(), runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), runtimeContext.getUserCodeClassLoader()), - isUsingCustomRawKeyedState()); + isUsingCustomRawKeyedState(), + isAsyncStateProcessingEnabled()); stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables); - timeServiceManager = context.internalTimerServiceManager(); + timeServiceManager = + isAsyncStateProcessingEnabled() + ? context.asyncInternalTimerServiceManager() + : context.internalTimerServiceManager(); stateHandler.initializeOperatorState(this); if (useSplittableTimers() @@ -275,6 +279,14 @@ protected boolean isUsingCustomRawKeyedState() { return false; } + /** + * Indicates whether this operator is enabling the async state. Can be overridden by subclasses. + */ + @Internal + public boolean isAsyncStateProcessingEnabled() { + return false; + } + /** * This method is called immediately before any elements are processed, it should contain the * operator's initialization logic, e.g. state initialization. @@ -318,7 +330,7 @@ public OperatorSnapshotFutures snapshotState( checkpointOptions, factory, isUsingCustomRawKeyedState(), - false); + isAsyncStateProcessingEnabled()); } /** @@ -510,10 +522,10 @@ public InternalTimerService getInternalTimerService( @SuppressWarnings("unchecked") InternalTimeServiceManager keyedTimeServiceHandler = (InternalTimeServiceManager) timeServiceManager; - KeyedStateBackend keyedStateBackend = getKeyedStateBackend(); - checkState(keyedStateBackend != null, "Timers can only be used on keyed operators."); + TypeSerializer keySerializer = stateHandler.getKeySerializer(); + checkState(keySerializer != null, "Timers can only be used on keyed operators."); return keyedTimeServiceHandler.getInternalTimerService( - name, keyedStateBackend.getKeySerializer(), namespaceSerializer, triggerable); + name, keySerializer, namespaceSerializer, triggerable); } public void processWatermark(Watermark mark) throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java index d6b1adc0132d8..e991af3d4375a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; @@ -49,6 +50,9 @@ default boolean isRestored() { /** Returns the operator state backend for the stream operator. */ OperatorStateBackend operatorStateBackend(); + /** Returns the key serializer for keyed state backends. */ + TypeSerializer keySerializer(); + /** * Returns the keyed state backend for the stream operator. This method returns null for * non-keyed operators. @@ -59,7 +63,7 @@ default boolean isRestored() { * Returns the async keyed state backend for the stream operator. This method returns null for * operators which don't support async keyed state backend. */ - AsyncKeyedStateBackend asyncKeyedStateBackend(); + AsyncKeyedStateBackend asyncKeyedStateBackend(); /** * Returns the internal timer service manager for the stream operator. This method returns null diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java index a8f3af027e150..7c172e14fda8b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java @@ -83,6 +83,8 @@ public class StreamOperatorStateHandler { protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + @Nullable private final TypeSerializer keySerializer; + @Nullable private final AsyncKeyedStateBackend asyncKeyedStateBackend; @Nullable private final KeyedStateStoreV2 keyedStateStoreV2; @@ -100,8 +102,9 @@ public StreamOperatorStateHandler( ExecutionConfig executionConfig, CloseableRegistry closeableRegistry) { this.context = context; - operatorStateBackend = context.operatorStateBackend(); - keyedStateBackend = context.keyedStateBackend(); + this.keySerializer = context.keySerializer(); + this.operatorStateBackend = context.operatorStateBackend(); + this.keyedStateBackend = context.keyedStateBackend(); this.closeableRegistry = closeableRegistry; if (keyedStateBackend != null) { @@ -234,22 +237,28 @@ void snapshotState( throws CheckpointException { try { if (timeServiceManager.isPresent()) { - checkState( - keyedStateBackend != null, - "keyedStateBackend should be available with timeServiceManager"); - final InternalTimeServiceManager manager = timeServiceManager.get(); - - boolean requiresLegacyRawKeyedStateSnapshots = - keyedStateBackend instanceof AbstractKeyedStateBackend - && ((AbstractKeyedStateBackend) keyedStateBackend) - .requiresLegacySynchronousTimerSnapshots( - checkpointOptions.getCheckpointType()); - requiresLegacyRawKeyedStateSnapshots |= - keyedStateBackend instanceof AsyncKeyedStateBackend - && ((AsyncKeyedStateBackend) keyedStateBackend) - .requiresLegacySynchronousTimerSnapshots( - checkpointOptions.getCheckpointType()); - + boolean requiresLegacyRawKeyedStateSnapshots; + final InternalTimeServiceManager manager; + if (useAsyncState) { + checkState( + asyncKeyedStateBackend != null, + "keyedStateBackend should be available with timeServiceManager"); + manager = timeServiceManager.get(); + requiresLegacyRawKeyedStateSnapshots = + asyncKeyedStateBackend.requiresLegacySynchronousTimerSnapshots( + checkpointOptions.getCheckpointType()); + } else { + checkState( + keyedStateBackend != null, + "keyedStateBackend should be available with timeServiceManager"); + manager = timeServiceManager.get(); + + requiresLegacyRawKeyedStateSnapshots = + keyedStateBackend instanceof AbstractKeyedStateBackend + && ((AbstractKeyedStateBackend) keyedStateBackend) + .requiresLegacySynchronousTimerSnapshots( + checkpointOptions.getCheckpointType()); + } if (requiresLegacyRawKeyedStateSnapshots) { checkState( !isUsingCustomRawKeyedState, @@ -365,14 +374,20 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { } } + @SuppressWarnings("unchecked") + public TypeSerializer getKeySerializer() { + return (TypeSerializer) keySerializer; + } + @SuppressWarnings("unchecked") public KeyedStateBackend getKeyedStateBackend() { return (KeyedStateBackend) keyedStateBackend; } @Nullable - public AsyncKeyedStateBackend getAsyncKeyedStateBackend() { - return asyncKeyedStateBackend; + @SuppressWarnings("unchecked") + public AsyncKeyedStateBackend getAsyncKeyedStateBackend() { + return (AsyncKeyedStateBackend) asyncKeyedStateBackend; } public OperatorStateBackend getOperatorStateBackend() { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java index ecc0b33d7bc0f..8142437d8faad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java @@ -63,6 +63,7 @@ StreamOperatorStateContext streamOperatorStateContext( @Nonnull CloseableRegistry streamTaskCloseableRegistry, @Nonnull MetricGroup metricGroup, double managedMemoryFraction, - boolean isUsingCustomRawKeyedState) + boolean isUsingCustomRawKeyedState, + boolean isAsyncState) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 4195bacee219c..892f74f45e6fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -157,7 +157,8 @@ public StreamOperatorStateContext streamOperatorStateContext( @Nonnull CloseableRegistry streamTaskCloseableRegistry, @Nonnull MetricGroup metricGroup, double managedMemoryFraction, - boolean isUsingCustomRawKeyedState) + boolean isUsingCustomRawKeyedState, + boolean isAsyncState) throws Exception { TaskInfo taskInfo = environment.getTaskInfo(); @@ -188,18 +189,33 @@ public StreamOperatorStateContext streamOperatorStateContext( try { // -------------- Keyed State Backend -------------- - keyedStatedBackend = - keyedStatedBackend( - keySerializer, - operatorIdentifierText, - prioritizedOperatorSubtaskStates, - streamTaskCloseableRegistry, - metricGroup, - managedMemoryFraction, - statsCollector, - StateBackend::createKeyedStateBackend); - if (stateBackend.supportsAsyncKeyedStateBackend()) { - asyncKeyedStateBackend = + if (isAsyncState) { + if (stateBackend.supportsAsyncKeyedStateBackend()) { + asyncKeyedStateBackend = + keyedStatedBackend( + keySerializer, + operatorIdentifierText, + prioritizedOperatorSubtaskStates, + streamTaskCloseableRegistry, + metricGroup, + managedMemoryFraction, + statsCollector, + StateBackend::createAsyncKeyedStateBackend); + } else { + asyncKeyedStateBackend = + new AsyncKeyedStateBackendAdaptor<>( + keyedStatedBackend( + keySerializer, + operatorIdentifierText, + prioritizedOperatorSubtaskStates, + streamTaskCloseableRegistry, + metricGroup, + managedMemoryFraction, + statsCollector, + StateBackend::createKeyedStateBackend)); + } + } else { + keyedStatedBackend = keyedStatedBackend( keySerializer, operatorIdentifierText, @@ -208,12 +224,7 @@ public StreamOperatorStateContext streamOperatorStateContext( metricGroup, managedMemoryFraction, statsCollector, - StateBackend::createAsyncKeyedStateBackend); - } else { - if (keyedStatedBackend != null) { - asyncKeyedStateBackend = - new AsyncKeyedStateBackendAdaptor<>(keyedStatedBackend); - } + StateBackend::createKeyedStateBackend); } // -------------- Operator State Backend -------------- @@ -252,31 +263,32 @@ public StreamOperatorStateContext streamOperatorStateContext( (prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState) ? rawKeyedStateInputs : Collections.emptyList(); - if (keyedStatedBackend != null) { - timeServiceManager = - timeServiceManagerProvider.create( - environment.getMetricGroup().getIOMetricGroup(), - keyedStatedBackend, - keyedStatedBackend.getKeyGroupRange(), - environment.getUserCodeClassLoader().asClassLoader(), - keyContext, - processingTimeService, - restoredRawKeyedStateTimers, - cancellationContext); - } - if (stateBackend.supportsAsyncKeyedStateBackend()) { - asyncTimeServiceManager = - timeServiceManagerProvider.create( - environment.getMetricGroup().getIOMetricGroup(), - asyncKeyedStateBackend, - asyncKeyedStateBackend.getKeyGroupRange(), - environment.getUserCodeClassLoader().asClassLoader(), - keyContext, - processingTimeService, - restoredRawKeyedStateTimers, - cancellationContext); + if (isAsyncState) { + if (asyncKeyedStateBackend != null) { + asyncTimeServiceManager = + timeServiceManagerProvider.create( + environment.getMetricGroup().getIOMetricGroup(), + asyncKeyedStateBackend, + asyncKeyedStateBackend.getKeyGroupRange(), + environment.getUserCodeClassLoader().asClassLoader(), + keyContext, + processingTimeService, + restoredRawKeyedStateTimers, + cancellationContext); + } } else { - asyncTimeServiceManager = timeServiceManager; + if (keyedStatedBackend != null) { + timeServiceManager = + timeServiceManagerProvider.create( + environment.getMetricGroup().getIOMetricGroup(), + keyedStatedBackend, + keyedStatedBackend.getKeyGroupRange(), + environment.getUserCodeClassLoader().asClassLoader(), + keyContext, + processingTimeService, + restoredRawKeyedStateTimers, + cancellationContext); + } } // Add stats for input channel and result partition state Stream.concat( @@ -301,6 +313,7 @@ public StreamOperatorStateContext streamOperatorStateContext( return new StreamOperatorStateContextImpl( prioritizedOperatorSubtaskStates.getRestoredCheckpointId(), operatorStateBackend, + keySerializer, keyedStatedBackend, asyncKeyedStateBackend, timeServiceManager, @@ -785,8 +798,9 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta private final @Nullable Long restoredCheckpointId; private final OperatorStateBackend operatorStateBackend; + private final @Nullable TypeSerializer keySerializer; private final CheckpointableKeyedStateBackend keyedStateBackend; - private final AsyncKeyedStateBackend asyncKeyedStateBackend; + private final AsyncKeyedStateBackend asyncKeyedStateBackend; private final InternalTimeServiceManager internalTimeServiceManager; private final InternalTimeServiceManager asyncInternalTimeServiceManager; @@ -796,8 +810,9 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta StreamOperatorStateContextImpl( @Nullable Long restoredCheckpointId, OperatorStateBackend operatorStateBackend, + @Nullable TypeSerializer keySerializer, CheckpointableKeyedStateBackend keyedStateBackend, - AsyncKeyedStateBackend asyncKeyedStateBackend, + AsyncKeyedStateBackend asyncKeyedStateBackend, InternalTimeServiceManager internalTimeServiceManager, InternalTimeServiceManager asyncInternalTimeServiceManager, CloseableIterable rawOperatorStateInputs, @@ -805,6 +820,7 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta this.restoredCheckpointId = restoredCheckpointId; this.operatorStateBackend = operatorStateBackend; + this.keySerializer = keySerializer; this.keyedStateBackend = keyedStateBackend; this.asyncKeyedStateBackend = asyncKeyedStateBackend; this.internalTimeServiceManager = internalTimeServiceManager; @@ -820,13 +836,18 @@ public OptionalLong getRestoredCheckpointId() { : OptionalLong.of(restoredCheckpointId); } + @Override + public TypeSerializer keySerializer() { + return keySerializer; + } + @Override public CheckpointableKeyedStateBackend keyedStateBackend() { return keyedStateBackend; } @Override - public AsyncKeyedStateBackend asyncKeyedStateBackend() { + public AsyncKeyedStateBackend asyncKeyedStateBackend() { return asyncKeyedStateBackend; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 32a895170a122..8d1a653860972 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -122,9 +122,11 @@ public void setKeyedStateStore(@Nullable KeyedStateStore keyedStateStore) { } public void setKeyedStateStoreV2(@Nullable KeyedStateStoreV2 keyedStateStoreV2) { - this.keyedStateStoreV2 = keyedStateStoreV2; - // Only if the keyedStateStoreV2 is set, this context is switch to support state v2 - this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V2; + if (keyedStateStoreV2 != null) { + // Only if the keyedStateStoreV2 is set, this context is switch to support state v2 + this.keyedStateStoreV2 = keyedStateStoreV2; + this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V2; + } } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java index faedc0e00ab60..d18e6eb3c8f44 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java @@ -230,6 +230,7 @@ public InternalTimeServiceManager create( closableRegistry, new UnregisteredMetricsGroup(), 1.0, + false, false); OptionalLong restoredCheckpointId = stateContext.getRestoredCheckpointId(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java index dbd8560e1553c..9cb1d4522f2d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java @@ -110,6 +110,7 @@ void testFailingBackendSnapshotMethod() throws Exception { closeableRegistry, new InterceptingOperatorMetricGroup(), 1.0, + false, false); StreamOperatorStateHandler stateHandler = new StreamOperatorStateHandler( diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java index 9c3be957b9f7a..ef93d9c5b2690 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java @@ -111,6 +111,7 @@ void testNoRestore() throws Exception { closeableRegistry, new UnregisteredMetricsGroup(), 1.0, + false, false); OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend(); @@ -224,6 +225,7 @@ public OperatorStateBackend createOperatorStateBackend( closeableRegistry, new UnregisteredMetricsGroup(), 1.0, + false, false); OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index ef99125a6de6c..d155f58dc6498 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -2315,7 +2315,8 @@ public StreamTaskStateInitializer createStreamTaskStateInitializer( closeableRegistry, metricGroup, fraction, - isUsingCustomRawKeyedState) -> { + isUsingCustomRawKeyedState, + isAsyncState) -> { final StreamOperatorStateContext controller = streamTaskStateManager.streamOperatorStateContext( operatorID, @@ -2326,7 +2327,8 @@ public StreamTaskStateInitializer createStreamTaskStateInitializer( closeableRegistry, metricGroup, fraction, - isUsingCustomRawKeyedState); + isUsingCustomRawKeyedState, + isAsyncState); return new StreamOperatorStateContext() { @Override @@ -2344,13 +2346,18 @@ public OperatorStateBackend operatorStateBackend() { return controller.operatorStateBackend(); } + @Override + public TypeSerializer keySerializer() { + return controller.keySerializer(); + } + @Override public CheckpointableKeyedStateBackend keyedStateBackend() { return controller.keyedStateBackend(); } @Override - public AsyncKeyedStateBackend asyncKeyedStateBackend() { + public AsyncKeyedStateBackend asyncKeyedStateBackend() { return controller.asyncKeyedStateBackend(); }