Skip to content

Commit

Permalink
[FLINK-35186][Runtime/State] Create State V2 from new StateDescriptor
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Apr 22, 2024
1 parent 9336760 commit e9029b4
Show file tree
Hide file tree
Showing 25 changed files with 334 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
Expand Down Expand Up @@ -548,4 +549,8 @@ <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
*/
@PublicEvolving
TaskInfo getTaskInfo();

@Experimental
<T> org.apache.flink.api.common.state.v2.ValueState<T> getStateV2(
ValueStateDescriptor<T> stateProperties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.common.functions.util;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
Expand Down Expand Up @@ -243,4 +244,12 @@ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> statePro
public String getAllocationIDAsString() {
return taskInfo.getAllocationIDAsString();
}

@Override
@Experimental
public <T> org.apache.flink.api.common.state.v2.ValueState<T> getStateV2(
ValueStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,10 @@ public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
public <UK, UV> MapState<UK, UV> getMapState(final MapStateDescriptor<UK, UV> stateProperties) {
throw new UnsupportedOperationException("State is not supported.");
}

@Override
public <T> org.apache.flink.api.common.state.v2.ValueState<T> getStateV2(
final ValueStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException("State is not supported.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> statePro
return keyedStateStore.getMapState(stateProperties);
}

@Override
public <T> org.apache.flink.api.common.state.v2.ValueState<T> getStateV2(
ValueStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException("State is not supported in rich async functions.");
}

public List<StateDescriptor<?, ?>> getStateDescriptors() {
if (registeredDescriptors.isEmpty()) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing;

import org.apache.flink.annotation.Internal;

/** Basic implementation of {@link StateExecutor}. */
@Internal
public abstract class AbstractStateExecutor implements StateExecutor {

protected AsyncExecutionController<?> asyncExecutionController;

/** Bind a {@link AsyncExecutionController} with this executor. */
@Override
public void bindAsyncExecutionController(AsyncExecutionController<?> asyncExecutionController) {
this.asyncExecutionController = asyncExecutionController;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public AsyncExecutionController(
this.maxInFlightRecordNum = maxInFlightRecords;
this.stateRequestsBuffer = new StateRequestBuffer<>();
this.inFlightRecordNum = new AtomicInteger(0);
stateExecutor.bindAsyncExecutionController(this);
LOG.info(
"Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}",
batchSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ public interface StateExecutor {
*/
CompletableFuture<Boolean> executeBatchRequests(
Iterable<StateRequest<?, ?, ?>> processingRequests);

/** Bind an {@link AsyncExecutionController} with this executor. */
void bindAsyncExecutionController(AsyncExecutionController<?> asyncExecutionController);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.flink.runtime.state;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.util.Disposable;

/**
Expand All @@ -37,4 +39,13 @@ public interface AsyncKeyedStateBackend extends Disposable {
* asynchronously.
*/
StateExecutor createStateExecutor();

/**
* Create a state by a given state descriptor.
*
* @param stateDescriptor the given state descriptor.
* @return the created state.
* @param <S> The type of state.
*/
<S extends State> S getState(StateDescriptor<?> stateDescriptor) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;

import static java.util.Objects.requireNonNull;

/** This contains methods for registering keyed state with a managed store. */
public class KeyedStateStore {

private final AsyncKeyedStateBackend asyncKeyedStateBackend;

public KeyedStateStore(AsyncKeyedStateBackend asyncKeyedStateBackend) {
this.asyncKeyedStateBackend = asyncKeyedStateBackend;
}

public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
requireNonNull(stateProperties, "The state properties must not be null");
try {
return asyncKeyedStateBackend.getState(stateProperties);
} catch (Exception e) {
throw new RuntimeException("Error while getting state", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ protected StateDescriptor(
this.typeSerializer = typeInfo.createSerializer(serializerConfig);
}

protected StateDescriptor(@Nonnull String stateId, @Nonnull TypeSerializer<T> serializer) {
this.stateId = checkNotNull(stateId, "stateId must not be null");
checkNotNull(serializer, "type serializer must not be null");
this.typeSerializer = serializer;
}

// ------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

/**
* A util class that convert the {@link org.apache.flink.api.common.state.StateDescriptor} to the
* new {@link StateDescriptor}.
*/
public class StateDescriptorConversionUtil {

public static <T> ValueStateDescriptor<T> convert(
org.apache.flink.api.common.state.ValueStateDescriptor<T> oldDescriptor) {
return new ValueStateDescriptor<>(oldDescriptor.getName(), oldDescriptor.getSerializer());
}

private StateDescriptorConversionUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;

/**
* {@link StateDescriptor} for {@link ValueState}. This can be used to create partitioned value
Expand Down Expand Up @@ -53,6 +54,16 @@ public ValueStateDescriptor(
super(stateId, typeInfo, serializerConfig);
}

/**
* Creates a new {@code ValueStateDescriptor} with the given stateId and type.
*
* @param stateId The (unique) stateId for the state.
* @param typeSerializer The serializer to serialize the value.
*/
public ValueStateDescriptor(String stateId, TypeSerializer<T> typeSerializer) {
super(stateId, typeSerializer);
}

@Override
public Type getType() {
return Type.VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.asyncprocessing;

import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.state.StateFutureUtils;
Expand All @@ -27,6 +28,7 @@
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.v2.InternalValueState;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -414,6 +416,11 @@ public StateExecutor createStateExecutor() {
return new TestStateExecutor();
}

@Override
public <S extends State> S getState(StateDescriptor<?> stateDescriptor) {
return null;
}

@Override
public void dispose() {
// do nothing
Expand All @@ -426,7 +433,7 @@ public void dispose() {
* A brief implementation of {@link StateExecutor}, to illustrate the interaction between AEC
* and StateExecutor.
*/
static class TestStateExecutor implements StateExecutor {
static class TestStateExecutor extends AbstractStateExecutor {

public TestStateExecutor() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,13 @@ public JobInfo getJobInfo() {
public TaskInfo getTaskInfo() {
return runtimeContext.getTaskInfo();
}

@Override
public <T> org.apache.flink.api.common.state.v2.ValueState<T> getStateV2(
ValueStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException(
"State is not supported in rich async functions.");
}
}

private static class RichAsyncFunctionIterationRuntimeContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public abstract class AbstractStreamOperator<OUT>
*/
protected transient KeySelector<?, ?> stateKeySelector2;

private transient StreamOperatorStateHandler stateHandler;
protected transient StreamOperatorStateHandler stateHandler;

private transient InternalTimeServiceManager<?> timeServiceManager;

Expand Down Expand Up @@ -253,7 +253,7 @@ public OperatorMetricGroup getMetricGroup() {
}

@Override
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {

final TypeSerializer<?> keySerializer =
Expand Down Expand Up @@ -285,6 +285,7 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana
timeServiceManager = context.internalTimerServiceManager();
stateHandler.initializeOperatorState(this);
runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
runtimeContext.setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public abstract class AbstractStreamOperatorV2<OUT>
protected final ProcessingTimeService processingTimeService;
protected final RecordAttributes[] lastRecordAttributes;

private StreamOperatorStateHandler stateHandler;
protected StreamOperatorStateHandler stateHandler;
private InternalTimeServiceManager<?> timeServiceManager;

public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int numberOfInputs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.operators;

import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.OperatorStateBackend;
Expand Down Expand Up @@ -71,4 +72,10 @@ default boolean isRestored() {
* are assigned to this operator. This method returns null for non-keyed operators.
*/
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs();

/**
* Returns the async keyed state backend for the stream operator. This method returns null for
* non-keyed operators. Also returns null if the user-specified
*/
AsyncKeyedStateBackend asyncKeyedStateBackend();
}
Loading

0 comments on commit e9029b4

Please sign in to comment.