diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 6e65b95416fc6..262c12d72cdd6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -503,6 +503,6 @@ public void close() throws IOException { /** A listener listens the key context switch. */ public interface SwitchContextListener { - void switchContext(RecordContext context); + void switchContext(@Nullable RecordContext context); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 8374668889061..2afb68bf565fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -77,6 +77,7 @@ public abstract class AbstractAsyncStateStreamOperator extends AbstractStre private AsyncExecutionController asyncExecutionController; + /** Act as a cache for {@link #setAsyncKeyedContextElement} and {@link #postProcessElement}. */ private RecordContext currentProcessingContext; private Environment environment; @@ -175,20 +176,18 @@ public final void preserveRecordOrderAndProcess(ThrowingRunnable proc @Override @SuppressWarnings("unchecked") public void asyncProcessWithKey(K key, ThrowingRunnable processing) { - RecordContext previousContext = currentProcessingContext; - + RecordContext oldContext = asyncExecutionController.getCurrentContext(); // build a context and switch to the new context - currentProcessingContext = asyncExecutionController.buildContext(null, key, true); - currentProcessingContext.retain(); - asyncExecutionController.setCurrentContext(currentProcessingContext); + RecordContext newContext = asyncExecutionController.buildContext(null, key, true); + newContext.retain(); + asyncExecutionController.setCurrentContext(newContext); // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key // pass the same key in. preserveRecordOrderAndProcess(processing); - postProcessElement(); + newContext.release(); // switch to original context - asyncExecutionController.setCurrentContext(previousContext); - currentProcessingContext = previousContext; + asyncExecutionController.setCurrentContext(oldContext); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 0a4d630c8a76e..06b8315526e44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -73,6 +73,7 @@ public abstract class AbstractAsyncStateStreamOperatorV2 extends AbstractSt private final Environment environment; private AsyncExecutionController asyncExecutionController; + /** Act as a cache for {@link #setAsyncKeyedContextElement} and {@link #postProcessElement}. */ private RecordContext currentProcessingContext; public AbstractAsyncStateStreamOperatorV2( @@ -182,20 +183,18 @@ public final void preserveRecordOrderAndProcess(ThrowingRunnable proc @Override @SuppressWarnings("unchecked") public void asyncProcessWithKey(K key, ThrowingRunnable processing) { - RecordContext previousContext = currentProcessingContext; - + RecordContext oldContext = asyncExecutionController.getCurrentContext(); // build a context and switch to the new context - currentProcessingContext = asyncExecutionController.buildContext(null, key, true); - currentProcessingContext.retain(); - asyncExecutionController.setCurrentContext(currentProcessingContext); + RecordContext newContext = asyncExecutionController.buildContext(null, key, true); + newContext.retain(); + asyncExecutionController.setCurrentContext(newContext); // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key // pass the same key in. preserveRecordOrderAndProcess(processing); - postProcessElement(); + newContext.release(); // switch to original context - asyncExecutionController.setCurrentContext(previousContext); - currentProcessingContext = previousContext; + asyncExecutionController.setCurrentContext(oldContext); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java index 133e107e76864..bdd1571fbce9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java @@ -32,6 +32,7 @@ import org.apache.flink.util.Disposable; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.Closeable; @@ -110,7 +111,7 @@ S createStateInternal( /** By default, a state backend does nothing when a key is switched in async processing. */ @Override - default void switchContext(RecordContext context) {} + default void switchContext(@Nullable RecordContext context) {} // TODO remove this once heap-based timers are working with ForSt incremental snapshots! /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java index 79b98491bac34..d0ec7188fd4a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java @@ -49,6 +49,7 @@ import org.apache.flink.runtime.state.v2.internal.InternalKeyedState; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.util.concurrent.RunnableFuture; @@ -119,8 +120,10 @@ public KeyGroupRange getKeyGroupRange() { } @Override - public void switchContext(RecordContext context) { - keyedStateBackend.setCurrentKeyAndKeyGroup(context.getKey(), context.getKeyGroup()); + public void switchContext(@Nullable RecordContext context) { + if (context != null) { + keyedStateBackend.setCurrentKeyAndKeyGroup(context.getKey(), context.getKeyGroup()); + } } @Override