Skip to content

Commit

Permalink
[FLINK-36892][Test] Use record processor in operator test harness
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Dec 12, 2024
1 parent 58de480 commit d962832
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/** Base class for broadcast stream operator test harnesses. */
Expand All @@ -38,17 +39,15 @@ public TwoInputStreamOperator<IN1, IN2, OUT> getTwoInputOperator() {
}

public void processElement(StreamRecord<IN1> element) throws Exception {
getTwoInputOperator().setKeyContextElement1(element);
getTwoInputOperator().processElement1(element);
RecordProcessorUtils.getRecordProcessor1(getTwoInputOperator()).accept(element);
}

public void processElement(IN1 value, long timestamp) throws Exception {
processElement(new StreamRecord<>(value, timestamp));
}

public void processBroadcastElement(StreamRecord<IN2> element) throws Exception {
getTwoInputOperator().setKeyContextElement2(element);
getTwoInputOperator().processElement2(element);
RecordProcessorUtils.getRecordProcessor2(getTwoInputOperator()).accept(element);
}

public void processBroadcastElement(IN2 value, long timestamp) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.function.ThrowingConsumer;

/**
* A test harness for testing a {@link MultipleInputStreamOperator}.
Expand Down Expand Up @@ -52,8 +54,8 @@ public MultiInputStreamOperatorTestHarness(

public void processElement(int idx, StreamRecord<?> element) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
input.setKeyContextElement(element);
input.processElement(element);
((ThrowingConsumer<StreamRecord, Exception>) RecordProcessorUtils.getRecordProcessor(input))
.accept(element);
}

public void processWatermark(int idx, Watermark mark) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -213,13 +215,13 @@ public void processElement(IN value, long timestamp) throws Exception {

public void processElement(StreamRecord<IN> element) throws Exception {
if (inputs.isEmpty()) {
operator.setKeyContextElement1(element);
getOneInputOperator().processElement(element);
RecordProcessorUtils.getRecordProcessor(getOneInputOperator()).accept(element);
} else {
checkState(inputs.size() == 1);
Input input = inputs.get(0);
input.setKeyContextElement(element);
input.processElement(element);
((ThrowingConsumer<StreamRecord, Exception>)
RecordProcessorUtils.getRecordProcessor(input))
.accept(element);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.function.ThrowingConsumer;

/**
* A test harness for testing a {@link TwoInputStreamOperator}.
Expand All @@ -37,6 +39,9 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>

private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;

private final ThrowingConsumer<StreamRecord<IN1>, Exception> processor1;
private final ThrowingConsumer<StreamRecord<IN2>, Exception> processor2;

public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator)
throws Exception {
this(operator, 1, 1, 0);
Expand All @@ -51,20 +56,20 @@ public TwoInputStreamOperatorTestHarness(
super(operator, maxParallelism, numSubtasks, subtaskIndex);

this.twoInputOperator = operator;
processor1 = RecordProcessorUtils.getRecordProcessor1(twoInputOperator);
processor2 = RecordProcessorUtils.getRecordProcessor2(twoInputOperator);
}

public void processElement1(StreamRecord<IN1> element) throws Exception {
twoInputOperator.setKeyContextElement1(element);
twoInputOperator.processElement1(element);
processor1.accept(element);
}

public void processElement1(IN1 value, long timestamp) throws Exception {
processElement1(new StreamRecord<>(value, timestamp));
}

public void processElement2(StreamRecord<IN2> element) throws Exception {
twoInputOperator.setKeyContextElement2(element);
twoInputOperator.processElement2(element);
processor2.accept(element);
}

public void processElement2(IN2 value, long timestamp) throws Exception {
Expand Down

0 comments on commit d962832

Please sign in to comment.