Skip to content

Commit

Permalink
[FLINK-36589][state/runtime] Decouple the initialization of sync and …
Browse files Browse the repository at this point in the history
…async keyed statebackend
  • Loading branch information
Zakelly committed Oct 24, 2024
1 parent 28ae3ae commit ee18251
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ StreamOperatorStateContext build(Logger logger) throws IOException {
registry,
ctx.getMetricGroup(),
1.0,
false,
false);
} catch (Exception e) {
throw new IOException("Failed to restore state backend", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,14 @@
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.AsyncStateException;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
Expand All @@ -60,8 +56,6 @@

import javax.annotation.Nonnull;

import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -89,7 +83,6 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
super.initializeState(streamTaskStateManager);
this.timeServiceManager = stateHandler.getAsyncInternalTimerServiceManager();
getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null));
final StreamTask<?, ?> containingTask = checkNotNull(getContainingTask());
environment = containingTask.getEnvironment();
Expand Down Expand Up @@ -239,25 +232,6 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
}
}

@Override
public OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory)
throws Exception {
return stateHandler.snapshotState(
this,
Optional.ofNullable(timeServiceManager),
getOperatorName(),
checkpointId,
timestamp,
checkpointOptions,
factory,
isUsingCustomRawKeyedState(),
true);
}

/**
* Returns a {@link InternalTimerService} that can be used to query current processing time and
* event time and to set timers. An operator can have several timer services, where each has its
Expand Down Expand Up @@ -293,13 +267,13 @@ public <K, N> InternalTimerService<N> getInternalTimerService(

InternalTimeServiceManager<K> keyedTimeServiceHandler =
(InternalTimeServiceManager<K>) timeServiceManager;
KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
checkState(keyedStateBackend != null, "Timers can only be used on keyed operators.");
TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
// A {@link RecordContext} will be set as the current processing context to preserve record
// order when the given {@link Triggerable} is invoked.
return keyedTimeServiceHandler.getAsyncInternalTimerService(
name,
keyedStateBackend.getKeySerializer(),
keySerializer,
namespaceSerializer,
triggerable,
(AsyncExecutionController<K>) asyncExecutionController);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,13 @@
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.AsyncStateException;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.Triggerable;
Expand All @@ -56,8 +52,6 @@

import javax.annotation.Nonnull;

import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkState;

/**
Expand Down Expand Up @@ -89,7 +83,6 @@ public AbstractAsyncStateStreamOperatorV2(
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
super.initializeState(streamTaskStateManager);
this.timeServiceManager = stateHandler.getAsyncInternalTimerServiceManager();
getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null));

final int inFlightRecordsLimit = getExecutionConfig().getAsyncInflightRecordsLimit();
Expand Down Expand Up @@ -213,25 +206,6 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
}
}

@Override
public OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory)
throws Exception {
return stateHandler.snapshotState(
this,
Optional.ofNullable(timeServiceManager),
getOperatorName(),
checkpointId,
timestamp,
checkpointOptions,
factory,
isUsingCustomRawKeyedState(),
true);
}

@SuppressWarnings("unchecked")
public <K, N> InternalTimerService<N> getInternalTimerService(
String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
Expand All @@ -246,11 +220,13 @@ public <K, N> InternalTimerService<N> getInternalTimerService(

InternalTimeServiceManager<K> keyedTimeServiceHandler =
(InternalTimeServiceManager<K>) timeServiceManager;
KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
checkState(keyedStateBackend != null, "Timers can only be used on keyed operators.");
TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
// A {@link RecordContext} will be set as the current processing context to preserve record
// order when the given {@link Triggerable} is invoked.
return keyedTimeServiceHandler.getAsyncInternalTimerService(
name,
keyedStateBackend.getKeySerializer(),
keySerializer,
namespaceSerializer,
triggerable,
(AsyncExecutionController<K>) asyncExecutionController);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,15 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
runtimeContext.getJobConfiguration(),
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader()),
isUsingCustomRawKeyedState());

isUsingCustomRawKeyedState(),
isAsyncStateProcessingEnabled());
stateHandler =
new StreamOperatorStateHandler(
context, getExecutionConfig(), streamTaskCloseableRegistry);
timeServiceManager = context.internalTimerServiceManager();
timeServiceManager =
isAsyncStateProcessingEnabled()
? context.asyncInternalTimerServiceManager()
: context.internalTimerServiceManager();
stateHandler.initializeOperatorState(this);
runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
}
Expand Down Expand Up @@ -320,6 +323,14 @@ protected boolean isUsingCustomRawKeyedState() {
return false;
}

/**
* Indicates whether this operator is enabling the async state. Can be overridden by subclasses.
*/
@Internal
public boolean isAsyncStateProcessingEnabled() {
return false;
}

@Internal
@Override
public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
Expand Down Expand Up @@ -402,7 +413,7 @@ public OperatorSnapshotFutures snapshotState(
checkpointOptions,
factory,
isUsingCustomRawKeyedState(),
false);
isAsyncStateProcessingEnabled());
}

/**
Expand Down Expand Up @@ -650,10 +661,10 @@ public <K, N> InternalTimerService<N> getInternalTimerService(
@SuppressWarnings("unchecked")
InternalTimeServiceManager<K> keyedTimeServiceHandler =
(InternalTimeServiceManager<K>) timeServiceManager;
KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
checkState(keyedStateBackend != null, "Timers can only be used on keyed operators.");
TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
return keyedTimeServiceHandler.getInternalTimerService(
name, keyedStateBackend.getKeySerializer(), namespaceSerializer, triggerable);
name, keySerializer, namespaceSerializer, triggerable);
}

public void processWatermark(Watermark mark) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,14 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
runtimeContext.getJobConfiguration(),
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader()),
isUsingCustomRawKeyedState());
isUsingCustomRawKeyedState(),
isAsyncStateProcessingEnabled());

stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables);
timeServiceManager = context.internalTimerServiceManager();
timeServiceManager =
isAsyncStateProcessingEnabled()
? context.asyncInternalTimerServiceManager()
: context.internalTimerServiceManager();
stateHandler.initializeOperatorState(this);

if (useSplittableTimers()
Expand Down Expand Up @@ -275,6 +279,14 @@ protected boolean isUsingCustomRawKeyedState() {
return false;
}

/**
* Indicates whether this operator is enabling the async state. Can be overridden by subclasses.
*/
@Internal
public boolean isAsyncStateProcessingEnabled() {
return false;
}

/**
* This method is called immediately before any elements are processed, it should contain the
* operator's initialization logic, e.g. state initialization.
Expand Down Expand Up @@ -318,7 +330,7 @@ public OperatorSnapshotFutures snapshotState(
checkpointOptions,
factory,
isUsingCustomRawKeyedState(),
false);
isAsyncStateProcessingEnabled());
}

/**
Expand Down Expand Up @@ -510,10 +522,10 @@ public <K, N> InternalTimerService<N> getInternalTimerService(
@SuppressWarnings("unchecked")
InternalTimeServiceManager<K> keyedTimeServiceHandler =
(InternalTimeServiceManager<K>) timeServiceManager;
KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
checkState(keyedStateBackend != null, "Timers can only be used on keyed operators.");
TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
return keyedTimeServiceHandler.getInternalTimerService(
name, keyedStateBackend.getKeySerializer(), namespaceSerializer, triggerable);
name, keySerializer, namespaceSerializer, triggerable);
}

public void processWatermark(Watermark mark) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.operators;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
Expand Down Expand Up @@ -49,6 +50,9 @@ default boolean isRestored() {
/** Returns the operator state backend for the stream operator. */
OperatorStateBackend operatorStateBackend();

/** Returns the key serializer for keyed state backends. */
TypeSerializer<?> keySerializer();

/**
* Returns the keyed state backend for the stream operator. This method returns null for
* non-keyed operators.
Expand All @@ -59,7 +63,7 @@ default boolean isRestored() {
* Returns the async keyed state backend for the stream operator. This method returns null for
* operators which don't support async keyed state backend.
*/
AsyncKeyedStateBackend asyncKeyedStateBackend();
AsyncKeyedStateBackend<?> asyncKeyedStateBackend();

/**
* Returns the internal timer service manager for the stream operator. This method returns null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class StreamOperatorStateHandler {

protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class);

@Nullable private final TypeSerializer<?> keySerializer;

@Nullable private final AsyncKeyedStateBackend<?> asyncKeyedStateBackend;

@Nullable private final KeyedStateStoreV2 keyedStateStoreV2;
Expand All @@ -100,8 +102,9 @@ public StreamOperatorStateHandler(
ExecutionConfig executionConfig,
CloseableRegistry closeableRegistry) {
this.context = context;
operatorStateBackend = context.operatorStateBackend();
keyedStateBackend = context.keyedStateBackend();
this.keySerializer = context.keySerializer();
this.operatorStateBackend = context.operatorStateBackend();
this.keyedStateBackend = context.keyedStateBackend();
this.closeableRegistry = closeableRegistry;

if (keyedStateBackend != null) {
Expand Down Expand Up @@ -234,22 +237,28 @@ void snapshotState(
throws CheckpointException {
try {
if (timeServiceManager.isPresent()) {
checkState(
keyedStateBackend != null,
"keyedStateBackend should be available with timeServiceManager");
final InternalTimeServiceManager<?> manager = timeServiceManager.get();

boolean requiresLegacyRawKeyedStateSnapshots =
keyedStateBackend instanceof AbstractKeyedStateBackend
&& ((AbstractKeyedStateBackend<?>) keyedStateBackend)
.requiresLegacySynchronousTimerSnapshots(
checkpointOptions.getCheckpointType());
requiresLegacyRawKeyedStateSnapshots |=
keyedStateBackend instanceof AsyncKeyedStateBackend
&& ((AsyncKeyedStateBackend<?>) keyedStateBackend)
.requiresLegacySynchronousTimerSnapshots(
checkpointOptions.getCheckpointType());

boolean requiresLegacyRawKeyedStateSnapshots;
final InternalTimeServiceManager<?> manager;
if (useAsyncState) {
checkState(
asyncKeyedStateBackend != null,
"keyedStateBackend should be available with timeServiceManager");
manager = timeServiceManager.get();
requiresLegacyRawKeyedStateSnapshots =
asyncKeyedStateBackend.requiresLegacySynchronousTimerSnapshots(
checkpointOptions.getCheckpointType());
} else {
checkState(
keyedStateBackend != null,
"keyedStateBackend should be available with timeServiceManager");
manager = timeServiceManager.get();

requiresLegacyRawKeyedStateSnapshots =
keyedStateBackend instanceof AbstractKeyedStateBackend
&& ((AbstractKeyedStateBackend<?>) keyedStateBackend)
.requiresLegacySynchronousTimerSnapshots(
checkpointOptions.getCheckpointType());
}
if (requiresLegacyRawKeyedStateSnapshots) {
checkState(
!isUsingCustomRawKeyedState,
Expand Down Expand Up @@ -365,14 +374,20 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception {
}
}

@SuppressWarnings("unchecked")
public <K> TypeSerializer<K> getKeySerializer() {
return (TypeSerializer<K>) keySerializer;
}

@SuppressWarnings("unchecked")
public <K> KeyedStateBackend<K> getKeyedStateBackend() {
return (KeyedStateBackend<K>) keyedStateBackend;
}

@Nullable
public AsyncKeyedStateBackend getAsyncKeyedStateBackend() {
return asyncKeyedStateBackend;
@SuppressWarnings("unchecked")
public <K> AsyncKeyedStateBackend<K> getAsyncKeyedStateBackend() {
return (AsyncKeyedStateBackend<K>) asyncKeyedStateBackend;
}

public OperatorStateBackend getOperatorStateBackend() {
Expand Down
Loading

0 comments on commit ee18251

Please sign in to comment.