diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index e783d2938641a..bc0a96f177026 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.functions; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; @@ -548,4 +549,8 @@ AggregatingState getAggregatingState( */ @PublicEvolving TaskInfo getTaskInfo(); + + @Experimental + org.apache.flink.api.common.state.v2.ValueState getStateV2( + ValueStateDescriptor stateProperties); } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index a5e36599b89f5..63eb16e4971c0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.functions.util; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; @@ -243,4 +244,12 @@ public MapState getMapState(MapStateDescriptor statePro public String getAllocationIDAsString() { return taskInfo.getAllocationIDAsString(); } + + @Override + @Experimental + public org.apache.flink.api.common.state.v2.ValueState getStateV2( + ValueStateDescriptor stateProperties) { + throw new UnsupportedOperationException( + "This state is only accessible by functions executed on a KeyedStream"); + } } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java index fb33f9c5eb683..81d53e9cc38ce 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java @@ -201,4 +201,10 @@ public AggregatingState getAggregatingState( public MapState getMapState(final MapStateDescriptor stateProperties) { throw new UnsupportedOperationException("State is not supported."); } + + @Override + public org.apache.flink.api.common.state.v2.ValueState getStateV2( + final ValueStateDescriptor stateProperties) { + throw new UnsupportedOperationException("State is not supported."); + } } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java index 8190c9b0c13a7..a7029440d3870 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java @@ -239,6 +239,12 @@ public MapState getMapState(MapStateDescriptor statePro return keyedStateStore.getMapState(stateProperties); } + @Override + public org.apache.flink.api.common.state.v2.ValueState getStateV2( + ValueStateDescriptor stateProperties) { + throw new UnsupportedOperationException("State is not supported in rich async functions."); + } + public List> getStateDescriptors() { if (registeredDescriptors.isEmpty()) { return Collections.emptyList(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateExecutor.java new file mode 100644 index 0000000000000..fe9e8f194ec33 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateExecutor.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.annotation.Internal; + +/** Basic implementation of {@link StateExecutor}. */ +@Internal +public abstract class AbstractStateExecutor implements StateExecutor { + + protected AsyncExecutionController asyncExecutionController; + + /** Bind a {@link AsyncExecutionController} with this executor. */ + @Override + public void bindAsyncExecutionController(AsyncExecutionController asyncExecutionController) { + this.asyncExecutionController = asyncExecutionController; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index cfa0e4f0d0e49..566145ec0c5a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -111,6 +111,7 @@ public AsyncExecutionController( this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); + stateExecutor.bindAsyncExecutionController(this); LOG.info( "Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}", batchSize, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java index 63b0257f8ec18..9d15f11f8b797 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java @@ -33,4 +33,7 @@ public interface StateExecutor { */ CompletableFuture executeBatchRequests( Iterable> processingRequests); + + /** Bind an {@link AsyncExecutionController} with this executor. */ + void bindAsyncExecutionController(AsyncExecutionController asyncExecutionController); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java index 99c48f4abad8e..3f0d4d916b6a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java @@ -19,7 +19,9 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.State; import org.apache.flink.runtime.asyncprocessing.StateExecutor; +import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.util.Disposable; /** @@ -37,4 +39,13 @@ public interface AsyncKeyedStateBackend extends Disposable { * asynchronously. */ StateExecutor createStateExecutor(); + + /** + * Create a state by a given state descriptor. + * + * @param stateDescriptor the given state descriptor. + * @return the created state. + * @param The type of state. + */ + S getState(StateDescriptor stateDescriptor) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java new file mode 100644 index 0000000000000..ddf6817eefe04 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; + +import static java.util.Objects.requireNonNull; + +/** This contains methods for registering keyed state with a managed store. */ +public class KeyedStateStore { + + private final AsyncKeyedStateBackend asyncKeyedStateBackend; + + public KeyedStateStore(AsyncKeyedStateBackend asyncKeyedStateBackend) { + this.asyncKeyedStateBackend = asyncKeyedStateBackend; + } + + public ValueState getState(ValueStateDescriptor stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + return asyncKeyedStateBackend.getState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java index 1ddc92c034dc8..1d393210e3efb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java @@ -90,6 +90,12 @@ protected StateDescriptor( this.typeSerializer = typeInfo.createSerializer(serializerConfig); } + protected StateDescriptor(@Nonnull String stateId, @Nonnull TypeSerializer serializer) { + this.stateId = checkNotNull(stateId, "stateId must not be null"); + checkNotNull(serializer, "type serializer must not be null"); + this.typeSerializer = serializer; + } + // ------------------------------------------------------------------------ /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorConversionUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorConversionUtil.java new file mode 100644 index 0000000000000..069ceae10fea4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorConversionUtil.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +/** + * A util class that convert the {@link org.apache.flink.api.common.state.StateDescriptor} to the + * new {@link StateDescriptor}. + */ +public class StateDescriptorConversionUtil { + + public static ValueStateDescriptor convert( + org.apache.flink.api.common.state.ValueStateDescriptor oldDescriptor) { + return new ValueStateDescriptor<>(oldDescriptor.getName(), oldDescriptor.getSerializer()); + } + + private StateDescriptorConversionUtil() {} +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java index aa63e76ab5b1c..652167a4ca4b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.state.v2.ValueState; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; /** * {@link StateDescriptor} for {@link ValueState}. This can be used to create partitioned value @@ -53,6 +54,16 @@ public ValueStateDescriptor( super(stateId, typeInfo, serializerConfig); } + /** + * Creates a new {@code ValueStateDescriptor} with the given stateId and type. + * + * @param stateId The (unique) stateId for the state. + * @param typeSerializer The serializer to serialize the value. + */ + public ValueStateDescriptor(String stateId, TypeSerializer typeSerializer) { + super(stateId, typeSerializer); + } + @Override public Type getType() { return Type.VALUE; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index 9f1e1a203a58a..06817c37333dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.asyncprocessing; +import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.state.StateFutureUtils; @@ -27,6 +28,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.v2.InternalValueState; +import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.util.Preconditions; @@ -414,6 +416,11 @@ public StateExecutor createStateExecutor() { return new TestStateExecutor(); } + @Override + public S getState(StateDescriptor stateDescriptor) { + return null; + } + @Override public void dispose() { // do nothing @@ -426,7 +433,7 @@ public void dispose() { * A brief implementation of {@link StateExecutor}, to illustrate the interaction between AEC * and StateExecutor. */ - static class TestStateExecutor implements StateExecutor { + static class TestStateExecutor extends AbstractStateExecutor { public TestStateExecutor() {} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java index 2a727c69dcfb4..f84ea123bfaef 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java @@ -258,6 +258,13 @@ public JobInfo getJobInfo() { public TaskInfo getTaskInfo() { return runtimeContext.getTaskInfo(); } + + @Override + public org.apache.flink.api.common.state.v2.ValueState getStateV2( + ValueStateDescriptor stateProperties) { + throw new UnsupportedOperationException( + "State is not supported in rich async functions."); + } } private static class RichAsyncFunctionIterationRuntimeContext diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 64549334f56a1..de5c0e1bcb32e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -138,7 +138,7 @@ public abstract class AbstractStreamOperator */ protected transient KeySelector stateKeySelector2; - private transient StreamOperatorStateHandler stateHandler; + protected transient StreamOperatorStateHandler stateHandler; private transient InternalTimeServiceManager timeServiceManager; @@ -253,7 +253,7 @@ public OperatorMetricGroup getMetricGroup() { } @Override - public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) + public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { final TypeSerializer keySerializer = @@ -285,6 +285,7 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana timeServiceManager = context.internalTimerServiceManager(); stateHandler.initializeOperatorState(this); runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null)); + runtimeContext.setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null)); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index 14605603bfc4f..e2730f94e5d38 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -102,7 +102,7 @@ public abstract class AbstractStreamOperatorV2 protected final ProcessingTimeService processingTimeService; protected final RecordAttributes[] lastRecordAttributes; - private StreamOperatorStateHandler stateHandler; + protected StreamOperatorStateHandler stateHandler; private InternalTimeServiceManager timeServiceManager; public AbstractStreamOperatorV2(StreamOperatorParameters parameters, int numberOfInputs) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java index 695129ca852c7..c3a32a0f5f88b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.OperatorStateBackend; @@ -71,4 +72,10 @@ default boolean isRestored() { * are assigned to this operator. This method returns null for non-keyed operators. */ CloseableIterable rawKeyedStateInputs(); + + /** + * Returns the async keyed state backend for the stream operator. This method returns null for + * non-keyed operators. Also returns null if the user-specified + */ + AsyncKeyedStateBackend asyncKeyedStateBackend(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java index eb8a01b91ebc5..1b0d8a68d2320 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.checkpoint.SavepointType; import org.apache.flink.runtime.checkpoint.SnapshotType; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.DefaultKeyedStateStore; @@ -83,8 +84,16 @@ public class StreamOperatorStateHandler { /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ @Nullable private final CheckpointableKeyedStateBackend keyedStateBackend; + /** + * Backend for async keyed state. This might be empty if we're not on a keyed stream or not + * supported. + */ + @Nullable private final AsyncKeyedStateBackend asyncKeyedStateBackend; + private final CloseableRegistry closeableRegistry; @Nullable private final DefaultKeyedStateStore keyedStateStore; + + @Nullable private final org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStoreV2; private final OperatorStateBackend operatorStateBackend; private final StreamOperatorStateContext context; @@ -95,6 +104,7 @@ public StreamOperatorStateHandler( this.context = context; operatorStateBackend = context.operatorStateBackend(); keyedStateBackend = context.keyedStateBackend(); + asyncKeyedStateBackend = context.asyncKeyedStateBackend(); this.closeableRegistry = closeableRegistry; if (keyedStateBackend != null) { @@ -112,6 +122,13 @@ public TypeSerializer createSerializer( } else { keyedStateStore = null; } + + if (asyncKeyedStateBackend != null) { + keyedStateStoreV2 = + new org.apache.flink.runtime.state.v2.KeyedStateStore(asyncKeyedStateBackend); + } else { + keyedStateStoreV2 = null; + } } public void initializeOperatorState(CheckpointedStreamOperator streamOperator) @@ -325,6 +342,10 @@ public KeyedStateBackend getKeyedStateBackend() { return (KeyedStateBackend) keyedStateBackend; } + public AsyncKeyedStateBackend getAsyncKeyedStateBackend() { + return asyncKeyedStateBackend; + } + public OperatorStateBackend getOperatorStateBackend() { return operatorStateBackend; } @@ -400,6 +421,10 @@ public Optional getKeyedStateStore() { return Optional.ofNullable(keyedStateStore); } + public Optional getKeyedStateStoreV2() { + return Optional.ofNullable(keyedStateStoreV2); + } + /** Custom state handling hooks to be invoked by {@link StreamOperatorStateHandler}. */ public interface CheckpointedStreamOperator { void initializeState(StateInitializationContext context) throws Exception; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 50b3c4e0867d9..f794057a460e8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; @@ -165,6 +166,7 @@ public StreamOperatorStateContext streamOperatorStateContext( taskStateManager.prioritizedOperatorState(operatorID); CheckpointableKeyedStateBackend keyedStatedBackend = null; + AsyncKeyedStateBackend asyncKeyedStateBackend = null; OperatorStateBackend operatorStateBackend = null; CloseableIterable rawKeyedStateInputs = null; CloseableIterable rawOperatorStateInputs = null; @@ -186,6 +188,14 @@ public StreamOperatorStateContext streamOperatorStateContext( managedMemoryFraction, statsCollector); + // -------------- Async Keyed State Backend -------------- + asyncKeyedStateBackend = + asyncKeyedStatedBackend( + keySerializer, + operatorIdentifierText, + metricGroup, + managedMemoryFraction); + // -------------- Operator State Backend -------------- operatorStateBackend = operatorStateBackend( @@ -263,6 +273,7 @@ public StreamOperatorStateContext streamOperatorStateContext( prioritizedOperatorSubtaskStates.getRestoredCheckpointId(), operatorStateBackend, keyedStatedBackend, + asyncKeyedStateBackend, timeServiceManager, rawOperatorStateInputs, rawKeyedStateInputs); @@ -407,6 +418,44 @@ protected CheckpointableKeyedStateBackend keyedStatedBackend( } } + protected AsyncKeyedStateBackend asyncKeyedStatedBackend( + TypeSerializer keySerializer, + String operatorIdentifierText, + MetricGroup metricGroup, + double managedMemoryFraction) + throws Exception { + if (keySerializer == null || !stateBackend.supportsAsyncKeyedStateBackend()) { + return null; + } + + TaskInfo taskInfo = environment.getTaskInfo(); + + final KeyGroupRange keyGroupRange = + KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( + taskInfo.getMaxNumberOfParallelSubtasks(), + taskInfo.getNumberOfParallelSubtasks(), + taskInfo.getIndexOfThisSubtask()); + + CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry(); + // TODO: exclude unused parameters. + KeyedStateBackendParametersImpl parameters = + new KeyedStateBackendParametersImpl<>( + environment, + environment.getJobID(), + operatorIdentifierText, + keySerializer, + taskInfo.getMaxNumberOfParallelSubtasks(), + keyGroupRange, + environment.getTaskKvStateRegistry(), + ttlTimeProvider, + metricGroup, + initializationMetrics::addDurationMetric, + null, + cancelStreamRegistryForRestore, + managedMemoryFraction); + return stateBackend.createAsyncKeyedStateBackend(parameters); + } + protected CloseableIterable rawOperatorStateInputs( @Nonnull Iterator> restoreStateAlternatives, @Nonnull StateObject.StateObjectSizeStatsCollector statsCollector) { @@ -704,6 +753,9 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta private final OperatorStateBackend operatorStateBackend; private final CheckpointableKeyedStateBackend keyedStateBackend; + + private final AsyncKeyedStateBackend asyncKeyedStateBackend; + private final InternalTimeServiceManager internalTimeServiceManager; private final CloseableIterable rawOperatorStateInputs; @@ -713,6 +765,7 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta @Nullable Long restoredCheckpointId, OperatorStateBackend operatorStateBackend, CheckpointableKeyedStateBackend keyedStateBackend, + AsyncKeyedStateBackend asyncKeyedStateBackend, InternalTimeServiceManager internalTimeServiceManager, CloseableIterable rawOperatorStateInputs, CloseableIterable rawKeyedStateInputs) { @@ -720,6 +773,7 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta this.restoredCheckpointId = restoredCheckpointId; this.operatorStateBackend = operatorStateBackend; this.keyedStateBackend = keyedStateBackend; + this.asyncKeyedStateBackend = asyncKeyedStateBackend; this.internalTimeServiceManager = internalTimeServiceManager; this.rawOperatorStateInputs = rawOperatorStateInputs; this.rawKeyedStateInputs = rawKeyedStateInputs; @@ -737,6 +791,11 @@ public CheckpointableKeyedStateBackend keyedStateBackend() { return keyedStateBackend; } + @Override + public AsyncKeyedStateBackend asyncKeyedStateBackend() { + return asyncKeyedStateBackend; + } + @Override public OperatorStateBackend operatorStateBackend() { return operatorStateBackend; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 75bed241a4c07..5a373d2db3bc1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.state.v2.StateDescriptorConversionUtil; import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -69,6 +70,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { private final String operatorUniqueID; private final ProcessingTimeService processingTimeService; private @Nullable KeyedStateStore keyedStateStore; + private @Nullable org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStoreV2; private final ExternalResourceInfoProvider externalResourceInfoProvider; @VisibleForTesting @@ -114,6 +116,11 @@ public void setKeyedStateStore(@Nullable KeyedStateStore keyedStateStore) { this.keyedStateStore = keyedStateStore; } + public void setKeyedStateStoreV2( + @Nullable org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore) { + this.keyedStateStoreV2 = keyedStateStore; + } + // ------------------------------------------------------------------------ /** @@ -202,6 +209,14 @@ public ValueState getState(ValueStateDescriptor stateProperties) { return keyedStateStore.getState(stateProperties); } + @Override + public org.apache.flink.api.common.state.v2.ValueState getStateV2( + ValueStateDescriptor stateProperties) { + org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore = + checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); + return keyedStateStore.getState(StateDescriptorConversionUtil.convert(stateProperties)); + } + @Override public ListState getListState(ListStateDescriptor stateProperties) { KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); @@ -242,6 +257,19 @@ private KeyedStateStore checkPreconditionsAndGetKeyedStateStore( return keyedStateStore; } + private org.apache.flink.runtime.state.v2.KeyedStateStore + checkPreconditionsAndGetKeyedStateStoreV2(StateDescriptor stateDescriptor) { + checkNotNull(stateDescriptor, "The state properties must not be null"); + checkNotNull( + keyedStateStoreV2, + String.format( + "Keyed state '%s' with type %s can not be created." + + " This should be used on a 'keyed stream', i.e., after a 'keyBy()' operation." + + " And the specified state backend should support async state (state v2)", + stateDescriptor.getName(), stateDescriptor.getType())); + return keyedStateStoreV2; + } + // ------------------ expose (read only) relevant information from the stream config -------- // /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java index d7608be1f57b7..819d1bb599baf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java @@ -24,16 +24,18 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.runtime.asyncprocessing.StateExecutor; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; -import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; @@ -51,17 +53,17 @@ public abstract class AbstractAsyncStateStreamOperator extends AbstractStre private RecordContext currentProcessingContext; - /** Initialize necessary state components for {@link AbstractStreamOperator}. */ @Override - public void setup( - StreamTask containingTask, - StreamConfig config, - Output> output) { - super.setup(containingTask, config, output); - // TODO: properly read config and setup + public void initializeState(StreamTaskStateInitializer streamTaskStateManager) + throws Exception { + super.initializeState(streamTaskStateManager); + final StreamTask containingTask = Preconditions.checkNotNull(getContainingTask()); final MailboxExecutor mailboxExecutor = containingTask.getEnvironment().getMainMailboxExecutor(); - this.asyncExecutionController = new AsyncExecutionController(mailboxExecutor, null); + final AsyncKeyedStateBackend keyedStateBackend = stateHandler.getAsyncKeyedStateBackend(); + final StateExecutor stateExecutor = keyedStateBackend.createStateExecutor(); + this.asyncExecutionController = + new AsyncExecutionController(mailboxExecutor, stateExecutor); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java index a4a36bb6f7aba..ea8d0cd0c6977 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java @@ -24,7 +24,9 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.runtime.asyncprocessing.StateExecutor; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; @@ -64,6 +66,10 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana super.initializeState(streamTaskStateManager); // TODO: Read config and properly set. this.asyncExecutionController = new AsyncExecutionController(mailboxExecutor, null); + final AsyncKeyedStateBackend keyedStateBackend = stateHandler.getAsyncKeyedStateBackend(); + final StateExecutor stateExecutor = keyedStateBackend.createStateExecutor(); + this.asyncExecutionController = + new AsyncExecutionController(mailboxExecutor, stateExecutor); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java index b57a0c0146e7a..81f034469b9cd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.asyncprocessing.AbstractStateExecutor; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; -import org.apache.flink.runtime.asyncprocessing.StateExecutor; import org.apache.flink.runtime.asyncprocessing.StateRequest; import org.apache.flink.runtime.asyncprocessing.StateRequestType; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -152,7 +152,7 @@ void testCheckpointDrain() throws Exception { ((AbstractAsyncStateStreamOperator) testHarness.getOperator()) .getAsyncExecutionController(); asyncExecutionController.setStateExecutor( - new StateExecutor() { + new AbstractStateExecutor() { @Override public CompletableFuture executeBatchRequests( Iterable> processingRequests) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java index fbb5a4fd70185..adf5d838b91b6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java @@ -23,8 +23,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.asyncprocessing.AbstractStateExecutor; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; -import org.apache.flink.runtime.asyncprocessing.StateExecutor; import org.apache.flink.runtime.asyncprocessing.StateRequest; import org.apache.flink.runtime.asyncprocessing.StateRequestType; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -164,7 +164,7 @@ void testCheckpointDrain() throws Exception { AsyncExecutionController asyncExecutionController = testOperator.getAsyncExecutionController(); asyncExecutionController.setStateExecutor( - new StateExecutor() { + new AbstractStateExecutor() { @Override public CompletableFuture executeBatchRequests( Iterable> processingRequests) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index ccf6ab98311b7..70f8d46fa96aa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -76,6 +76,7 @@ import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.DoneFuture; @@ -2353,6 +2354,11 @@ public CheckpointableKeyedStateBackend keyedStateBackend() { return controller.keyedStateBackend(); } + @Override + public AsyncKeyedStateBackend asyncKeyedStateBackend() { + return controller.asyncKeyedStateBackend(); + } + @Override public InternalTimeServiceManager internalTimerServiceManager() { InternalTimeServiceManager timeServiceManager =