Skip to content

Commit

Permalink
[FLINK-35904][test] Enhance test harness for async state processing w…
Browse files Browse the repository at this point in the history
…ith fast fail
  • Loading branch information
Zakelly committed Dec 26, 2024
1 parent f74e937 commit 9a2cc3e
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.List;
Expand All @@ -40,6 +41,7 @@

import static org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.drain;
import static org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.execute;
import static org.assertj.core.api.Assertions.fail;

/**
* A test harness for testing a {@link MultipleInputStreamOperator}.
Expand Down Expand Up @@ -100,6 +102,8 @@ private AsyncKeyedMultiInputStreamOperatorTestHarness(
setKeySelector(i, keySelectors.get(i));
}
this.executor = executor;
// Make environment record any failure
getEnvironment().setExpectedExternalFailureCause(Throwable.class);
}

public void setKeySelector(int idx, KeySelector<?, K> keySelector) {
Expand All @@ -114,38 +118,56 @@ public void processElement(int idx, StreamRecord<?> element) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
ThrowingConsumer<StreamRecord<?>, Exception> inputProcessor =
RecordProcessorUtils.getRecordProcessor(input);
execute(executor, (ignore) -> inputProcessor.accept(element)).get();
executeAndGet(() -> inputProcessor.accept(element));
}

@Override
@SuppressWarnings("rawtypes")
public void processWatermark(int idx, Watermark mark) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
execute(executor, (ignore) -> input.processWatermark(mark)).get();
executeAndGet(() -> input.processWatermark(mark));
}

@Override
@SuppressWarnings("rawtypes")
public void processWatermarkStatus(int idx, WatermarkStatus watermarkStatus) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
execute(executor, (ignore) -> input.processWatermarkStatus(watermarkStatus)).get();
executeAndGet(() -> input.processWatermarkStatus(watermarkStatus));
}

@Override
@SuppressWarnings("rawtypes")
public void processRecordAttributes(int idx, RecordAttributes recordAttributes)
throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
execute(executor, (ignore) -> input.processRecordAttributes(recordAttributes)).get();
executeAndGet(() -> input.processRecordAttributes(recordAttributes));
}

public void drainStateRequests() throws Exception {
execute(executor, (ignore) -> drain(operator)).get();
executeAndGet(() -> drain(operator));
}

@Override
public void close() throws Exception {
execute(executor, (ignore) -> super.close()).get();
executeAndGet(super::close);
executor.shutdown();
}

private void executeAndGet(RunnableWithException runnable) throws Exception {
execute(
executor,
() -> {
checkEnvState();
runnable.run();
})
.get();
}

private void checkEnvState() {
if (getEnvironment().getActualExternalFailureCause().isPresent()) {
fail(
"There is an error on other threads",
getEnvironment().getActualExternalFailureCause().get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.ArrayList;
Expand All @@ -45,8 +46,8 @@
import java.util.concurrent.Executors;

import static org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.drain;
import static org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.execute;
import static org.apache.flink.util.Preconditions.checkState;
import static org.assertj.core.api.Assertions.fail;

/**
* A test harness for testing a {@link OneInputStreamOperator} which uses async state.
Expand Down Expand Up @@ -120,6 +121,8 @@ protected AsyncKeyedOneInputStreamOperatorTestHarness(
keyType.createSerializer(executionConfig.getSerializerConfig()));
config.serializeAllConfigs();
this.executor = executor;
// Make environment record any failure
getEnvironment().setExpectedExternalFailureCause(Throwable.class);
}

@Override
Expand All @@ -146,16 +149,14 @@ public CompletableFuture<Void> processElementInternal(StreamRecord<IN> element)
throws Exception {
if (inputs.isEmpty()) {
return execute(
executor,
(ignore) ->
() ->
RecordProcessorUtils.getRecordProcessor(getOneInputOperator())
.accept(element));
} else {
checkState(inputs.size() == 1);
Input input = inputs.get(0);
return execute(
executor,
(ignore) ->
() ->
((ThrowingConsumer<StreamRecord, Exception>)
RecordProcessorUtils.getRecordProcessor(input))
.accept(element));
Expand All @@ -181,12 +182,11 @@ public void processWatermarkStatus(WatermarkStatus status) throws Exception {
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> processWatermarkStatusInternal(WatermarkStatus status) {
if (inputs.isEmpty()) {
return execute(
executor, (ignore) -> getOneInputOperator().processWatermarkStatus(status));
return execute(() -> getOneInputOperator().processWatermarkStatus(status));
} else {
checkState(inputs.size() == 1);
Input input = inputs.get(0);
return execute(executor, (ignore) -> input.processWatermarkStatus(status));
return execute(() -> input.processWatermarkStatus(status));
}
}

Expand All @@ -198,7 +198,7 @@ public void processWatermark(Watermark mark) throws Exception {
@Override
public void endInput() throws Exception {
if (operator instanceof BoundedOneInput) {
execute(executor, (ignore) -> ((BoundedOneInput) operator).endInput()).get();
executeAndGet(() -> ((BoundedOneInput) operator).endInput());
}
}

Expand All @@ -207,11 +207,11 @@ public void endInput() throws Exception {
public CompletableFuture<Void> processWatermarkInternal(Watermark mark) {
currentWatermark = mark.getTimestamp();
if (inputs.isEmpty()) {
return execute(executor, (ignore) -> getOneInputOperator().processWatermark(mark));
return execute(() -> getOneInputOperator().processWatermark(mark));
} else {
checkState(inputs.size() == 1);
Input input = inputs.get(0);
return execute(executor, (ignore) -> input.processWatermark(mark));
return execute(() -> input.processWatermark(mark));
}
}

Expand All @@ -223,12 +223,11 @@ public void processLatencyMarker(LatencyMarker marker) throws Exception {
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> processLatencyMarkerInternal(LatencyMarker marker) {
if (inputs.isEmpty()) {
return execute(
executor, (ignore) -> getOneInputOperator().processLatencyMarker(marker));
return execute(() -> getOneInputOperator().processLatencyMarker(marker));
} else {
checkState(inputs.size() == 1);
Input input = inputs.get(0);
return execute(executor, (ignore) -> input.processLatencyMarker(marker));
return execute(() -> input.processLatencyMarker(marker));
}
}

Expand All @@ -247,28 +246,47 @@ public long getCurrentWatermark() {
public CompletableFuture<Void> processRecordAttributesInternal(
RecordAttributes recordAttributes) {
if (inputs.isEmpty()) {
return execute(
executor,
(ignore) -> getOneInputOperator().processRecordAttributes(recordAttributes));
return execute(() -> getOneInputOperator().processRecordAttributes(recordAttributes));
} else {
checkState(inputs.size() == 1);
Input input = inputs.get(0);
return execute(executor, (ignore) -> input.processRecordAttributes(recordAttributes));
return execute(() -> input.processRecordAttributes(recordAttributes));
}
}

public void drainStateRequests() throws Exception {
execute(executor, (ignore) -> drain(operator)).get();
executeAndGet(() -> drain(operator));
}

@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
execute(executor, (ignore) -> operator.prepareSnapshotPreBarrier(checkpointId)).get();
executeAndGet(() -> operator.prepareSnapshotPreBarrier(checkpointId));
}

@Override
public void close() throws Exception {
execute(executor, (ignore) -> super.close()).get();
executeAndGet(super::close);
executor.shutdown();
}

private CompletableFuture<Void> execute(RunnableWithException runnable) {
return AsyncProcessingTestUtil.execute(
executor,
() -> {
checkEnvState();
runnable.run();
});
}

private void executeAndGet(RunnableWithException runnable) throws Exception {
execute(runnable).get();
}

private void checkEnvState() {
if (getEnvironment().getActualExternalFailureCause().isPresent()) {
fail(
"There is an error on other threads",
getEnvironment().getActualExternalFailureCause().get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.concurrent.CompletableFuture;
Expand All @@ -41,6 +42,7 @@

import static org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.drain;
import static org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.execute;
import static org.assertj.core.api.Assertions.fail;

/**
* A test harness for testing a {@link OneInputStreamOperator} which uses async state.
Expand Down Expand Up @@ -117,6 +119,8 @@ public AsyncKeyedTwoInputStreamOperatorTestHarness(
"Operator is not an AsyncStateProcessingOperator");
this.twoInputOperator = operator;
this.executor = executor;
// Make environment record any failure
getEnvironment().setExpectedExternalFailureCause(Throwable.class);
}

private ThrowingConsumer<StreamRecord<IN1>, Exception> getRecordProcessor1() {
Expand All @@ -135,7 +139,7 @@ private ThrowingConsumer<StreamRecord<IN2>, Exception> getRecordProcessor2() {

@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
execute(executor, (ignore) -> getRecordProcessor1().accept(element)).get();
executeAndGet(() -> getRecordProcessor1().accept(element));
}

@Override
Expand All @@ -145,7 +149,7 @@ public void processElement1(IN1 value, long timestamp) throws Exception {

@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
execute(executor, (ignore) -> getRecordProcessor2().accept(element)).get();
executeAndGet(() -> getRecordProcessor2().accept(element));
}

@Override
Expand All @@ -155,63 +159,77 @@ public void processElement2(IN2 value, long timestamp) throws Exception {

@Override
public void processWatermark1(Watermark mark) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermark1(mark)).get();
executeAndGet(() -> twoInputOperator.processWatermark1(mark));
}

@Override
public void processWatermark2(Watermark mark) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermark2(mark)).get();
executeAndGet(() -> twoInputOperator.processWatermark2(mark));
}

@Override
public void processBothWatermarks(Watermark mark) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermark1(mark)).get();
execute(executor, (ignore) -> twoInputOperator.processWatermark2(mark)).get();
executeAndGet(() -> twoInputOperator.processWatermark1(mark));
executeAndGet(() -> twoInputOperator.processWatermark2(mark));
}

@Override
public void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermarkStatus1(watermarkStatus))
.get();
executeAndGet(() -> twoInputOperator.processWatermarkStatus1(watermarkStatus));
}

@Override
public void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermarkStatus2(watermarkStatus))
.get();
executeAndGet(() -> twoInputOperator.processWatermarkStatus2(watermarkStatus));
}

@Override
public void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processRecordAttributes1(recordAttributes))
.get();
executeAndGet(() -> twoInputOperator.processRecordAttributes1(recordAttributes));
}

@Override
public void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processRecordAttributes2(recordAttributes))
.get();
executeAndGet(() -> twoInputOperator.processRecordAttributes2(recordAttributes));
}

public void endInput1() throws Exception {
if (operator instanceof BoundedMultiInput) {
execute(executor, (ignore) -> ((BoundedMultiInput) operator).endInput(1)).get();
executeAndGet(() -> ((BoundedMultiInput) operator).endInput(1));
}
}

public void endInput2() throws Exception {
if (operator instanceof BoundedMultiInput) {
execute(executor, (ignore) -> ((BoundedMultiInput) operator).endInput(2)).get();
executeAndGet(() -> ((BoundedMultiInput) operator).endInput(2));
}
}

public void drainStateRequests() throws Exception {
execute(executor, (ignore) -> drain(operator)).get();
executeAndGet(() -> drain(operator));
}

@Override
public void close() throws Exception {
execute(executor, (ignore) -> super.close()).get();
executeAndGet(super::close);
executor.shutdown();
}

private void executeAndGet(RunnableWithException runnable) throws Exception {
execute(
executor,
() -> {
checkEnvState();
runnable.run();
})
.get();
}

private void checkEnvState() {
if (getEnvironment().getActualExternalFailureCause().isPresent()) {
fail(
"There is an error on other threads",
getEnvironment().getActualExternalFailureCause().get());
}
}
}
Loading

0 comments on commit 9a2cc3e

Please sign in to comment.