diff --git a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java index 804638c30fd4b4..bce2b749e7a6ff 100644 --- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java @@ -615,6 +615,15 @@ public final List> getTransitivePredecessors() { */ public abstract List> getInputs(); + /** Enabling the async state for this transformation. */ + public void enableAsyncState() { + // Subclass should override this method if they support async state processing. + throw new UnsupportedOperationException( + "The transformation does not support async state, " + + "or you are enabling the async state without a keyed context " + + "(not behind a keyBy())."); + } + @Override public String toString() { return getClass().getSimpleName() diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 82b0a0f4014f3b..d840eac666a594 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; @@ -95,6 +96,9 @@ public class KeyedStream extends DataStream { /** The type of the key by which the stream is partitioned. */ private final TypeInformation keyType; + /** Whether the async state has been enabled. */ + private boolean isEnableAsyncState = false; + /** * Creates a new {@link KeyedStream} using the given {@link KeySelector} to partition operator * state by key. @@ -285,6 +289,9 @@ protected SingleOutputStreamOperator doTransform( (OneInputTransformation) returnStream.getTransformation(); transform.setStateKeySelector(keySelector); transform.setStateKeyType(keyType); + if (isEnableAsyncState) { + transform.enableAsyncState(); + } return returnStream; } @@ -438,8 +445,19 @@ public IntervalJoined between(Duration lowerBound, Duration upperBo checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); - return new IntervalJoined<>( - streamOne, streamTwo, lowerBound.toMillis(), upperBound.toMillis(), true, true); + IntervalJoined intervalJoined = + new IntervalJoined<>( + streamOne, + streamTwo, + lowerBound.toMillis(), + upperBound.toMillis(), + true, + true); + if (streamOne.isEnableAsyncState() || streamTwo.isEnableAsyncState()) { + // enable async state if any stream enabled the async state. + intervalJoined = intervalJoined.enableAsyncState(); + } + return intervalJoined; } } @@ -469,6 +487,8 @@ public static class IntervalJoined { private OutputTag leftLateDataOutputTag; private OutputTag rightLateDataOutputTag; + private boolean isEnableAsyncState = false; + public IntervalJoined( KeyedStream left, KeyedStream right, @@ -595,6 +615,18 @@ public SingleOutputStreamOperator process( .keyBy(keySelector1, keySelector2) .transform("Interval Join", outputType, operator); } + + /** + * Enable the async state processing for following keyed processing function. This also + * requires only State V2 APIs are used in the function. + * + * @return the configured IntervalJoin itself. + */ + @Experimental + public IntervalJoined enableAsyncState() { + isEnableAsyncState = true; + return this; + } } // ------------------------------------------------------------------------ @@ -663,6 +695,9 @@ public SingleOutputStreamOperator reduce(ReduceFunction reducer) { keySelector, getKeyType(), false); + if (isEnableAsyncState) { + reduce.enableAsyncState(); + } getExecutionEnvironment().addOperator(reduce); @@ -1018,4 +1053,21 @@ public QueryableStateStream asQueryableState( stateDescriptor, getKeyType().createSerializer(getExecutionConfig().getSerializerConfig())); } + + /** + * Enable the async state processing for following keyed processing function. This also requires + * only State V2 APIs are used in the function. + * + * @return the configured KeyedStream itself. + */ + @Experimental + public KeyedStream enableAsyncState() { + isEnableAsyncState = true; + return this; + } + + @Internal + boolean isEnableAsyncState() { + return isEnableAsyncState; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 4141a31d85ca82..42bb8f30efebce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.InvalidTypesException; @@ -455,4 +456,18 @@ public CachedDataStream cache() { return new CachedDataStream<>(this.environment, this.transformation); } + + /** + * Enable the async state processing for following previous transformation. This also requires + * only State V2 APIs are used in the user function. + * + * @return the configured SingleOutputStreamOperator itself. + * @throws UnsupportedOperationException when the transformation does not support the async + * state processing. + */ + @Experimental + public SingleOutputStreamOperator enableAsyncState() { + transformation.enableAsyncState(); + return this; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index eca4c046e5acc7..1c694b4b81b6b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; @@ -851,6 +852,18 @@ private SingleOutputStreamOperator aggregate(AggregationFunction aggregato return reduce(aggregator); } + /** + * Enable the async state processing for following keyed processing function. This also requires + * only State V2 APIs are used in the function. + * + * @return the configured WindowedStream itself. + */ + @Experimental + public WindowedStream enableAsyncState() { + input.enableAsyncState(); + return this; + } + public StreamExecutionEnvironment getExecutionEnvironment() { return input.getExecutionEnvironment(); }