Skip to content

Commit

Permalink
[FLINK-36944][API] Introduce enableAsyncState API
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Dec 20, 2024
1 parent 1523f2c commit 5d00028
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,15 @@ public final List<Transformation<?>> getTransitivePredecessors() {
*/
public abstract List<Transformation<?>> getInputs();

/** Enabling the async state for this transformation. */
public void enableAsyncState() {
// Subclass should override this method if they support async state processing.
throw new UnsupportedOperationException(
"The transformation does not support async state, "
+ "or you are enabling the async state without a keyed context "
+ "(not behind a keyBy()).");
}

@Override
public String toString() {
return getClass().getSimpleName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
Expand Down Expand Up @@ -95,6 +96,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
/** The type of the key by which the stream is partitioned. */
private final TypeInformation<KEY> keyType;

/** Whether the async state has been enabled. */
private boolean isEnableAsyncState = false;

/**
* Creates a new {@link KeyedStream} using the given {@link KeySelector} to partition operator
* state by key.
Expand Down Expand Up @@ -285,6 +289,9 @@ protected <R> SingleOutputStreamOperator<R> doTransform(
(OneInputTransformation<T, R>) returnStream.getTransformation();
transform.setStateKeySelector(keySelector);
transform.setStateKeyType(keyType);
if (isEnableAsyncState) {
transform.enableAsyncState();
}

return returnStream;
}
Expand Down Expand Up @@ -438,8 +445,19 @@ public IntervalJoined<T1, T2, KEY> between(Duration lowerBound, Duration upperBo
checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");

return new IntervalJoined<>(
streamOne, streamTwo, lowerBound.toMillis(), upperBound.toMillis(), true, true);
IntervalJoined<T1, T2, KEY> intervalJoined =
new IntervalJoined<>(
streamOne,
streamTwo,
lowerBound.toMillis(),
upperBound.toMillis(),
true,
true);
if (streamOne.isEnableAsyncState() || streamTwo.isEnableAsyncState()) {
// enable async state if any stream enabled the async state.
intervalJoined = intervalJoined.enableAsyncState();
}
return intervalJoined;
}
}

Expand Down Expand Up @@ -469,6 +487,8 @@ public static class IntervalJoined<IN1, IN2, KEY> {
private OutputTag<IN1> leftLateDataOutputTag;
private OutputTag<IN2> rightLateDataOutputTag;

private boolean isEnableAsyncState = false;

public IntervalJoined(
KeyedStream<IN1, KEY> left,
KeyedStream<IN2, KEY> right,
Expand Down Expand Up @@ -595,6 +615,18 @@ public <OUT> SingleOutputStreamOperator<OUT> process(
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);
}

/**
* Enable the async state processing for following keyed processing function. This also
* requires only State V2 APIs are used in the function.
*
* @return the configured IntervalJoin itself.
*/
@Experimental
public IntervalJoined<IN1, IN2, KEY> enableAsyncState() {
isEnableAsyncState = true;
return this;
}
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -663,6 +695,9 @@ public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
keySelector,
getKeyType(),
false);
if (isEnableAsyncState) {
reduce.enableAsyncState();
}

getExecutionEnvironment().addOperator(reduce);

Expand Down Expand Up @@ -1018,4 +1053,21 @@ public QueryableStateStream<KEY, T> asQueryableState(
stateDescriptor,
getKeyType().createSerializer(getExecutionConfig().getSerializerConfig()));
}

/**
* Enable the async state processing for following keyed processing function. This also requires
* only State V2 APIs are used in the function.
*
* @return the configured KeyedStream itself.
*/
@Experimental
public KeyedStream<T, KEY> enableAsyncState() {
isEnableAsyncState = true;
return this;
}

@Internal
boolean isEnableAsyncState() {
return isEnableAsyncState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.InvalidTypesException;
Expand Down Expand Up @@ -455,4 +456,18 @@ public CachedDataStream<T> cache() {

return new CachedDataStream<>(this.environment, this.transformation);
}

/**
* Enable the async state processing for following previous transformation. This also requires
* only State V2 APIs are used in the user function.
*
* @return the configured SingleOutputStreamOperator itself.
* @throws UnsupportedOperationException when the transformation does not support the async
* state processing.
*/
@Experimental
public SingleOutputStreamOperator<T> enableAsyncState() {
transformation.enableAsyncState();
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
Expand Down Expand Up @@ -851,6 +852,18 @@ private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregato
return reduce(aggregator);
}

/**
* Enable the async state processing for following keyed processing function. This also requires
* only State V2 APIs are used in the function.
*
* @return the configured WindowedStream itself.
*/
@Experimental
public WindowedStream<T, K, W> enableAsyncState() {
input.enableAsyncState();
return this;
}

public StreamExecutionEnvironment getExecutionEnvironment() {
return input.getExecutionEnvironment();
}
Expand Down

0 comments on commit 5d00028

Please sign in to comment.