Skip to content

Commit

Permalink
[FLINK-36589][state/runtime] Decouple the initialization of sync and …
Browse files Browse the repository at this point in the history
…async keyed statebackend
  • Loading branch information
Zakelly committed Oct 23, 2024
1 parent 039a07d commit b5d4ade
Show file tree
Hide file tree
Showing 14 changed files with 151 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ StreamOperatorStateContext build(Logger logger) throws IOException {
registry,
ctx.getMetricGroup(),
1.0,
false,
false);
} catch (Exception e) {
throw new IOException("Failed to restore state backend", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,14 @@
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.AsyncStateException;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
Expand All @@ -60,8 +56,6 @@

import javax.annotation.Nonnull;

import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -89,7 +83,6 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
super.initializeState(streamTaskStateManager);
this.timeServiceManager = stateHandler.getAsyncInternalTimerServiceManager();
getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null));
final StreamTask<?, ?> containingTask = checkNotNull(getContainingTask());
environment = containingTask.getEnvironment();
Expand Down Expand Up @@ -239,25 +232,6 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
}
}

@Override
public OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory)
throws Exception {
return stateHandler.snapshotState(
this,
Optional.ofNullable(timeServiceManager),
getOperatorName(),
checkpointId,
timestamp,
checkpointOptions,
factory,
isUsingCustomRawKeyedState(),
true);
}

/**
* Returns a {@link InternalTimerService} that can be used to query current processing time and
* event time and to set timers. An operator can have several timer services, where each has its
Expand Down Expand Up @@ -293,13 +267,13 @@ public <K, N> InternalTimerService<N> getInternalTimerService(

InternalTimeServiceManager<K> keyedTimeServiceHandler =
(InternalTimeServiceManager<K>) timeServiceManager;
KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
checkState(keyedStateBackend != null, "Timers can only be used on keyed operators.");
TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
// A {@link RecordContext} will be set as the current processing context to preserve record
// order when the given {@link Triggerable} is invoked.
return keyedTimeServiceHandler.getAsyncInternalTimerService(
name,
keyedStateBackend.getKeySerializer(),
keySerializer,
namespaceSerializer,
triggerable,
(AsyncExecutionController<K>) asyncExecutionController);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,13 @@
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.AsyncStateException;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.Triggerable;
Expand All @@ -56,8 +52,6 @@

import javax.annotation.Nonnull;

import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkState;

/**
Expand Down Expand Up @@ -89,7 +83,6 @@ public AbstractAsyncStateStreamOperatorV2(
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
super.initializeState(streamTaskStateManager);
this.timeServiceManager = stateHandler.getAsyncInternalTimerServiceManager();
getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null));

final int inFlightRecordsLimit = getExecutionConfig().getAsyncInflightRecordsLimit();
Expand Down Expand Up @@ -213,25 +206,6 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
}
}

@Override
public OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory)
throws Exception {
return stateHandler.snapshotState(
this,
Optional.ofNullable(timeServiceManager),
getOperatorName(),
checkpointId,
timestamp,
checkpointOptions,
factory,
isUsingCustomRawKeyedState(),
true);
}

@SuppressWarnings("unchecked")
public <K, N> InternalTimerService<N> getInternalTimerService(
String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
Expand All @@ -246,11 +220,13 @@ public <K, N> InternalTimerService<N> getInternalTimerService(

InternalTimeServiceManager<K> keyedTimeServiceHandler =
(InternalTimeServiceManager<K>) timeServiceManager;
KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
checkState(keyedStateBackend != null, "Timers can only be used on keyed operators.");
TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
// A {@link RecordContext} will be set as the current processing context to preserve record
// order when the given {@link Triggerable} is invoked.
return keyedTimeServiceHandler.getAsyncInternalTimerService(
name,
keyedStateBackend.getKeySerializer(),
keySerializer,
namespaceSerializer,
triggerable,
(AsyncExecutionController<K>) asyncExecutionController);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,15 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
runtimeContext.getJobConfiguration(),
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader()),
isUsingCustomRawKeyedState());

isUsingCustomRawKeyedState(),
isAsyncStateProcessingEnabled());
stateHandler =
new StreamOperatorStateHandler(
context, getExecutionConfig(), streamTaskCloseableRegistry);
timeServiceManager = context.internalTimerServiceManager();
timeServiceManager =
isAsyncStateProcessingEnabled()
? context.asyncInternalTimerServiceManager()
: context.internalTimerServiceManager();
stateHandler.initializeOperatorState(this);
runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
}
Expand Down Expand Up @@ -320,6 +323,14 @@ protected boolean isUsingCustomRawKeyedState() {
return false;
}

/**
* Indicates whether this operator is enabling the async state. Can be overridden by subclasses.
*/
@Internal
public boolean isAsyncStateProcessingEnabled() {
return false;
}

@Internal
@Override
public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
Expand Down Expand Up @@ -402,7 +413,7 @@ public OperatorSnapshotFutures snapshotState(
checkpointOptions,
factory,
isUsingCustomRawKeyedState(),
false);
isAsyncStateProcessingEnabled());
}

/**
Expand Down Expand Up @@ -650,10 +661,10 @@ public <K, N> InternalTimerService<N> getInternalTimerService(
@SuppressWarnings("unchecked")
InternalTimeServiceManager<K> keyedTimeServiceHandler =
(InternalTimeServiceManager<K>) timeServiceManager;
KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
checkState(keyedStateBackend != null, "Timers can only be used on keyed operators.");
TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
return keyedTimeServiceHandler.getInternalTimerService(
name, keyedStateBackend.getKeySerializer(), namespaceSerializer, triggerable);
name, keySerializer, namespaceSerializer, triggerable);
}

public void processWatermark(Watermark mark) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,14 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
runtimeContext.getJobConfiguration(),
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader()),
isUsingCustomRawKeyedState());
isUsingCustomRawKeyedState(),
isAsyncStateProcessingEnabled());

stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables);
timeServiceManager = context.internalTimerServiceManager();
timeServiceManager =
isAsyncStateProcessingEnabled()
? context.asyncInternalTimerServiceManager()
: context.internalTimerServiceManager();
stateHandler.initializeOperatorState(this);

if (useSplittableTimers()
Expand Down Expand Up @@ -275,6 +279,14 @@ protected boolean isUsingCustomRawKeyedState() {
return false;
}

/**
* Indicates whether this operator is enabling the async state. Can be overridden by subclasses.
*/
@Internal
public boolean isAsyncStateProcessingEnabled() {
return false;
}

/**
* This method is called immediately before any elements are processed, it should contain the
* operator's initialization logic, e.g. state initialization.
Expand Down Expand Up @@ -318,7 +330,7 @@ public OperatorSnapshotFutures snapshotState(
checkpointOptions,
factory,
isUsingCustomRawKeyedState(),
false);
isAsyncStateProcessingEnabled());
}

/**
Expand Down Expand Up @@ -510,10 +522,10 @@ public <K, N> InternalTimerService<N> getInternalTimerService(
@SuppressWarnings("unchecked")
InternalTimeServiceManager<K> keyedTimeServiceHandler =
(InternalTimeServiceManager<K>) timeServiceManager;
KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
checkState(keyedStateBackend != null, "Timers can only be used on keyed operators.");
TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
return keyedTimeServiceHandler.getInternalTimerService(
name, keyedStateBackend.getKeySerializer(), namespaceSerializer, triggerable);
name, keySerializer, namespaceSerializer, triggerable);
}

public void processWatermark(Watermark mark) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.operators;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
Expand Down Expand Up @@ -49,6 +50,9 @@ default boolean isRestored() {
/** Returns the operator state backend for the stream operator. */
OperatorStateBackend operatorStateBackend();

/** Returns the key serializer for keyed state backends. */
TypeSerializer<?> keySerializer();

/**
* Returns the keyed state backend for the stream operator. This method returns null for
* non-keyed operators.
Expand All @@ -59,7 +63,7 @@ default boolean isRestored() {
* Returns the async keyed state backend for the stream operator. This method returns null for
* operators which don't support async keyed state backend.
*/
AsyncKeyedStateBackend asyncKeyedStateBackend();
AsyncKeyedStateBackend<?> asyncKeyedStateBackend();

/**
* Returns the internal timer service manager for the stream operator. This method returns null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class StreamOperatorStateHandler {

protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class);

@Nullable private final TypeSerializer<?> keySerializer;

@Nullable private final AsyncKeyedStateBackend<?> asyncKeyedStateBackend;

@Nullable private final KeyedStateStoreV2 keyedStateStoreV2;
Expand All @@ -100,8 +102,9 @@ public StreamOperatorStateHandler(
ExecutionConfig executionConfig,
CloseableRegistry closeableRegistry) {
this.context = context;
operatorStateBackend = context.operatorStateBackend();
keyedStateBackend = context.keyedStateBackend();
this.keySerializer = context.keySerializer();
this.operatorStateBackend = context.operatorStateBackend();
this.keyedStateBackend = context.keyedStateBackend();
this.closeableRegistry = closeableRegistry;

if (keyedStateBackend != null) {
Expand Down Expand Up @@ -365,14 +368,20 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception {
}
}

@SuppressWarnings("unchecked")
public <K> TypeSerializer<K> getKeySerializer() {
return (TypeSerializer<K>) keySerializer;
}

@SuppressWarnings("unchecked")
public <K> KeyedStateBackend<K> getKeyedStateBackend() {
return (KeyedStateBackend<K>) keyedStateBackend;
}

@Nullable
public AsyncKeyedStateBackend getAsyncKeyedStateBackend() {
return asyncKeyedStateBackend;
@SuppressWarnings("unchecked")
public <K> AsyncKeyedStateBackend<K> getAsyncKeyedStateBackend() {
return (AsyncKeyedStateBackend<K>) asyncKeyedStateBackend;
}

public OperatorStateBackend getOperatorStateBackend() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ StreamOperatorStateContext streamOperatorStateContext(
@Nonnull CloseableRegistry streamTaskCloseableRegistry,
@Nonnull MetricGroup metricGroup,
double managedMemoryFraction,
boolean isUsingCustomRawKeyedState)
boolean isUsingCustomRawKeyedState,
boolean isAsyncState)
throws Exception;
}
Loading

0 comments on commit b5d4ade

Please sign in to comment.