Skip to content

Commit

Permalink
[FLINK-36944][Datastream] Implement async reduce operator
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Dec 22, 2024
1 parent ec3a9c3 commit 202c145
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/**
* A {@link StreamOperator} for executing a {@link ReduceFunction} on a {@link
* org.apache.flink.streaming.api.datastream.KeyedStream}.
*/
@Internal
public class StreamGroupedReduceAsyncStateOperator<IN>
extends AbstractAsyncStateUdfStreamOperator<IN, ReduceFunction<IN>>
implements OneInputStreamOperator<IN, IN> {

private static final long serialVersionUID = 1L;

private static final String STATE_NAME = "_op_state";

private transient ValueState<IN> values;

private final TypeSerializer<IN> serializer;

public StreamGroupedReduceAsyncStateOperator(
ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
super(reducer);
this.serializer = serializer;
}

@Override
public void open() throws Exception {
super.open();
ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);
values = getRuntimeContext().getValueState(stateId);
}

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
IN value = element.getValue();
values.asyncValue()
.thenAccept(
currentValue -> {
if (currentValue != null) {
IN reduced = userFunction.reduce(currentValue, value);
values.asyncUpdate(reduced)
.thenAccept(e -> output.collect(element.replace(reduced)));
} else {
values.asyncUpdate(value)
.thenAccept(e -> output.collect(element.replace(value)));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public final class ReduceTransformation<IN, K> extends PhysicalTransformation<IN
private final KeySelector<IN, K> keySelector;
private final TypeInformation<K> keyTypeInfo;
private ChainingStrategy chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
private boolean isEnableAsyncState;

public ReduceTransformation(
String name,
Expand Down Expand Up @@ -100,4 +101,13 @@ protected List<Transformation<?>> getTransitivePredecessorsInternal() {
public List<Transformation<?>> getInputs() {
return Collections.singletonList(input);
}

@Override
public void enableAsyncState() {
isEnableAsyncState = true;
}

public boolean isEnableAsyncState() {
return isEnableAsyncState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.BatchGroupedReduceOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamGroupedReduceAsyncStateOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.transformations.ReduceTransformation;

import java.util.Collection;
Expand Down Expand Up @@ -66,15 +68,24 @@ public Collection<Integer> translateForBatchInternal(
@Override
public Collection<Integer> translateForStreamingInternal(
final ReduceTransformation<IN, KEY> transformation, final Context context) {
StreamGroupedReduceOperator<IN> groupedReduce =
new StreamGroupedReduceOperator<>(
transformation.getReducer(),
transformation
.getInputType()
.createSerializer(
context.getStreamGraph()
.getExecutionConfig()
.getSerializerConfig()));
StreamOperator<IN> groupedReduce =
transformation.isEnableAsyncState()
? new StreamGroupedReduceAsyncStateOperator<>(
transformation.getReducer(),
transformation
.getInputType()
.createSerializer(
context.getStreamGraph()
.getExecutionConfig()
.getSerializerConfig()))
: new StreamGroupedReduceOperator<>(
transformation.getReducer(),
transformation
.getInputType()
.createSerializer(
context.getStreamGraph()
.getExecutionConfig()
.getSerializerConfig()));

SimpleOperatorFactory<IN> operatorFactory = SimpleOperatorFactory.of(groupedReduce);
operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
Expand Down

0 comments on commit 202c145

Please sign in to comment.