From 202c14556eb1b9b90d60c5dc8a058edba74b40ad Mon Sep 17 00:00:00 2001 From: Zakelly Date: Wed, 11 Dec 2024 20:09:01 +0800 Subject: [PATCH] [FLINK-36944][Datastream] Implement async reduce operator --- ...StreamGroupedReduceAsyncStateOperator.java | 74 +++++++++++++++++++ .../transformations/ReduceTransformation.java | 10 +++ .../ReduceTransformationTranslator.java | 29 +++++--- 3 files changed, 104 insertions(+), 9 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java new file mode 100644 index 0000000000000..189769d11cbcd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; +import org.apache.flink.runtime.state.v2.ValueStateDescriptor; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * A {@link StreamOperator} for executing a {@link ReduceFunction} on a {@link + * org.apache.flink.streaming.api.datastream.KeyedStream}. + */ +@Internal +public class StreamGroupedReduceAsyncStateOperator + extends AbstractAsyncStateUdfStreamOperator> + implements OneInputStreamOperator { + + private static final long serialVersionUID = 1L; + + private static final String STATE_NAME = "_op_state"; + + private transient ValueState values; + + private final TypeSerializer serializer; + + public StreamGroupedReduceAsyncStateOperator( + ReduceFunction reducer, TypeSerializer serializer) { + super(reducer); + this.serializer = serializer; + } + + @Override + public void open() throws Exception { + super.open(); + ValueStateDescriptor stateId = new ValueStateDescriptor<>(STATE_NAME, serializer); + values = getRuntimeContext().getValueState(stateId); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + IN value = element.getValue(); + values.asyncValue() + .thenAccept( + currentValue -> { + if (currentValue != null) { + IN reduced = userFunction.reduce(currentValue, value); + values.asyncUpdate(reduced) + .thenAccept(e -> output.collect(element.replace(reduced))); + } else { + values.asyncUpdate(value) + .thenAccept(e -> output.collect(element.replace(value))); + } + }); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java index 3b0f7564a7576..10e8fb4c34512 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java @@ -44,6 +44,7 @@ public final class ReduceTransformation extends PhysicalTransformation keySelector; private final TypeInformation keyTypeInfo; private ChainingStrategy chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY; + private boolean isEnableAsyncState; public ReduceTransformation( String name, @@ -100,4 +101,13 @@ protected List> getTransitivePredecessorsInternal() { public List> getInputs() { return Collections.singletonList(input); } + + @Override + public void enableAsyncState() { + isEnableAsyncState = true; + } + + public boolean isEnableAsyncState() { + return isEnableAsyncState; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/ReduceTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/ReduceTransformationTranslator.java index 030e4adcad6ba..00e5f7227b265 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/ReduceTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/ReduceTransformationTranslator.java @@ -22,7 +22,9 @@ import org.apache.flink.streaming.api.graph.TransformationTranslator; import org.apache.flink.streaming.api.operators.BatchGroupedReduceOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamGroupedReduceAsyncStateOperator; import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.transformations.ReduceTransformation; import java.util.Collection; @@ -66,15 +68,24 @@ public Collection translateForBatchInternal( @Override public Collection translateForStreamingInternal( final ReduceTransformation transformation, final Context context) { - StreamGroupedReduceOperator groupedReduce = - new StreamGroupedReduceOperator<>( - transformation.getReducer(), - transformation - .getInputType() - .createSerializer( - context.getStreamGraph() - .getExecutionConfig() - .getSerializerConfig())); + StreamOperator groupedReduce = + transformation.isEnableAsyncState() + ? new StreamGroupedReduceAsyncStateOperator<>( + transformation.getReducer(), + transformation + .getInputType() + .createSerializer( + context.getStreamGraph() + .getExecutionConfig() + .getSerializerConfig())) + : new StreamGroupedReduceOperator<>( + transformation.getReducer(), + transformation + .getInputType() + .createSerializer( + context.getStreamGraph() + .getExecutionConfig() + .getSerializerConfig())); SimpleOperatorFactory operatorFactory = SimpleOperatorFactory.of(groupedReduce); operatorFactory.setChainingStrategy(transformation.getChainingStrategy());