diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 27cec546bb27a7..2e785121a7eb69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -216,7 +216,7 @@ public RecordContext buildContext(Object record, K key, boolean inheritEpoch) key, this::disposeContext, KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism), - inheritEpoch + inheritEpoch && currentContext != null ? epochManager.onEpoch(currentContext.getEpoch()) : epochManager.onRecord()); } @@ -225,7 +225,7 @@ public RecordContext buildContext(Object record, K key, boolean inheritEpoch) key, this::disposeContext, KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism), - inheritEpoch + inheritEpoch && currentContext != null ? epochManager.onEpoch(currentContext.getEpoch()) : epochManager.onRecord()); } @@ -456,7 +456,11 @@ public void processNonRecord( ? null : () -> { try { + // We clear the current context since this is a non-record context. + RecordContext previousContext = currentContext; + currentContext = null; triggerAction.run(); + currentContext = previousContext; } catch (Exception e) { exceptionHandler.handleException( "Failed to process non-record.", e); @@ -466,7 +470,10 @@ public void processNonRecord( ? null : () -> { try { + RecordContext previousContext = currentContext; + currentContext = null; finalAction.run(); + currentContext = previousContext; } catch (Exception e) { exceptionHandler.handleException( "Failed to process non-record.", e); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index c8b70819e7aa92..91af56155daa16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -327,16 +327,65 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) { // Watermark handling // ------------------------------------------------------------------------ + /** + * A hook that will be triggered when receiving a watermark. Some async state can safely go + * within this method. Return the watermark that should be normally processed. + * + * @param watermark the receiving watermark. + * @return the watermark that should be processed. Null if there is no need for following + * processing. + */ + public Watermark preProcessWatermark(Watermark watermark) throws Exception { + return watermark; + } + + /** + * A hook that will be invoked after finishing advancing the watermark. It is not recommended to + * perform async state here. Only some synchronous logic is suggested. + * + * @param watermark the advanced watermark. + */ + public void postProcessWatermark(Watermark watermark) throws Exception {} + + /** + * Process a watermark when receiving it. Do not override this method since the async processing + * is difficult to write. Please override the hooks, see {@link #preProcessWatermark(Watermark)} + * and {@link #postProcessWatermark(Watermark)}. The basic logic of processWatermark with hooks + * in sync form would be: + * + *
+     *             Watermark watermark = preProcessWatermark(mark);
+     *             if (watermark != null) {
+     *                 super.processWatermark(watermark);
+     *                 postProcessWatermark(watermark);
+     *             }
+     * 
+ */ @Override - public void processWatermark(Watermark mark) throws Exception { + public final void processWatermark(Watermark mark) throws Exception { if (!isAsyncStateProcessingEnabled()) { // If async state processing is disabled, fallback to the super class. - super.processWatermark(mark); + Watermark watermark = preProcessWatermark(mark); + if (watermark != null) { + super.processWatermark(watermark); + postProcessWatermark(watermark); + } return; } + AtomicReference watermarkRef = new AtomicReference<>(null); asyncExecutionController.processNonRecord( - timeServiceManager == null ? null : () -> timeServiceManager.advanceWatermark(mark), - () -> output.emitWatermark(mark)); + () -> { + watermarkRef.set(preProcessWatermark(mark)); + if (timeServiceManager != null && watermarkRef.get() != null) { + timeServiceManager.advanceWatermark(watermarkRef.get()); + } + }, + () -> { + if (watermarkRef.get() != null) { + output.emitWatermark(watermarkRef.get()); + postProcessWatermark(watermarkRef.get()); + } + }); } @Override @@ -364,8 +413,10 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index wasIdle.set(combinedWatermark.isIdle()); // index is 0-based if (combinedWatermark.updateStatus(index, watermarkStatus.isIdle())) { - watermarkRef.set(new Watermark(combinedWatermark.getCombinedWatermark())); - if (timeServiceManager != null) { + watermarkRef.set( + preProcessWatermark( + new Watermark(combinedWatermark.getCombinedWatermark()))); + if (timeServiceManager != null && watermarkRef.get() != null) { timeServiceManager.advanceWatermark(watermarkRef.get()); } } @@ -373,6 +424,7 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index () -> { if (watermarkRef.get() != null) { output.emitWatermark(watermarkRef.get()); + postProcessWatermark(watermarkRef.get()); } if (wasIdle.get() != combinedWatermark.isIdle()) { output.emitWatermarkStatus(watermarkStatus); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 6c90e4f997f9eb..47a0fede32cc5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -260,15 +260,66 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) { // ------------------------------------------------------------------------ // Watermark handling // ------------------------------------------------------------------------ + + /** + * A hook that will be triggered when receiving a watermark. Some async state can safely go + * within this method. Return the watermark that should be normally processed. + * + * @param watermark the receiving watermark. + * @return the watermark that should be processed. Null if there is no need for following + * processing. + */ + public Watermark preProcessWatermark(Watermark watermark) throws Exception { + return watermark; + } + + /** + * A hook that will be invoked after finishing advancing the watermark. It is not recommended to + * perform async state here. Only some synchronous logic is suggested. + * + * @param watermark the advanced watermark. + */ + public void postProcessWatermark(Watermark watermark) throws Exception {} + + /** + * Process a watermark when receiving it. Do not override this method since the async processing + * is difficult to write. Please override the hooks, see {@link #preProcessWatermark(Watermark)} + * and {@link #postProcessWatermark(Watermark)}. The basic logic of processWatermark with hooks + * in sync form would be: + * + *
+     *             Watermark watermark = preProcessWatermark(mark);
+     *             if (watermark != null) {
+     *                 super.processWatermark(watermark);
+     *                 postProcessWatermark(watermark);
+     *             }
+     * 
+ */ @Override public void processWatermark(Watermark mark) throws Exception { if (!isAsyncStateProcessingEnabled()) { - super.processWatermark(mark); + // If async state processing is disabled, fallback to the super class. + Watermark watermark = preProcessWatermark(mark); + if (watermark != null) { + super.processWatermark(watermark); + postProcessWatermark(watermark); + } return; } + AtomicReference watermarkRef = new AtomicReference<>(null); asyncExecutionController.processNonRecord( - timeServiceManager == null ? null : () -> timeServiceManager.advanceWatermark(mark), - () -> output.emitWatermark(mark)); + () -> { + watermarkRef.set(preProcessWatermark(mark)); + if (timeServiceManager != null && watermarkRef.get() != null) { + timeServiceManager.advanceWatermark(watermarkRef.get()); + } + }, + () -> { + if (watermarkRef.get() != null) { + output.emitWatermark(watermarkRef.get()); + postProcessWatermark(watermarkRef.get()); + } + }); } @Override @@ -284,8 +335,10 @@ public void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId) () -> { wasIdle.set(combinedWatermark.isIdle()); if (combinedWatermark.updateStatus(inputId - 1, watermarkStatus.isIdle())) { - watermarkRef.set(new Watermark(combinedWatermark.getCombinedWatermark())); - if (timeServiceManager != null) { + watermarkRef.set( + preProcessWatermark( + new Watermark(combinedWatermark.getCombinedWatermark()))); + if (timeServiceManager != null && watermarkRef.get() != null) { timeServiceManager.advanceWatermark(watermarkRef.get()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java index 496d46e3f34994..0892b401bcded5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java @@ -42,6 +42,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.ThrowingConsumer; import org.junit.jupiter.api.Test; @@ -282,6 +283,68 @@ void testWatermark() throws Exception { } } + @Test + void testWatermarkHooks() throws Exception { + final WatermarkTestingOperator testOperator = new WatermarkTestingOperator(); + + AtomicInteger counter = new AtomicInteger(0); + testOperator.setPreProcessFunction( + (watermark) -> { + testOperator.asyncProcessWithKey( + 1L, + () -> { + testOperator.output(watermark.getTimestamp() + 1000L); + }); + if (counter.incrementAndGet() % 2 == 0) { + return null; + } else { + return new Watermark(watermark.getTimestamp() + 1L); + } + }); + + testOperator.setPostProcessFunction( + (watermark) -> { + testOperator.output(watermark.getTimestamp() + 100L); + }); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + KeySelector dummyKeySelector = l -> 0; + try (AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( + testOperator, + dummyKeySelector, + dummyKeySelector, + BasicTypeInfo.INT_TYPE_INFO, + 1, + 1, + 0)) { + testHarness.setup(); + testHarness.open(); + testHarness.processElement1(1L, 1L); + testHarness.processElement1(3L, 3L); + testHarness.processElement1(4L, 4L); + testHarness.processWatermark1(new Watermark(2L)); + testHarness.processWatermark2(new Watermark(2L)); + expectedOutput.add(new StreamRecord<>(1002L)); + expectedOutput.add(new StreamRecord<>(1L)); + expectedOutput.add(new StreamRecord<>(3L)); + expectedOutput.add(new Watermark(3L)); + expectedOutput.add(new StreamRecord<>(103L)); + testHarness.processWatermark1(new Watermark(4L)); + testHarness.processWatermark2(new Watermark(4L)); + expectedOutput.add(new StreamRecord<>(1004L)); + testHarness.processWatermark1(new Watermark(5L)); + testHarness.processWatermark2(new Watermark(5L)); + expectedOutput.add(new StreamRecord<>(1005L)); + expectedOutput.add(new StreamRecord<>(4L)); + expectedOutput.add(new Watermark(6L)); + expectedOutput.add(new StreamRecord<>(106L)); + + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + } + } + @Test void testWatermarkStatus() throws Exception { try (AsyncKeyedOneInputStreamOperatorTestHarness, String> @@ -498,6 +561,24 @@ private static class WatermarkTestingOperator extends AbstractAsyncStateStreamOp private transient InternalTimerService timerService; + private FunctionWithException preProcessFunction; + + private ThrowingConsumer postProcessFunction; + + public void setPreProcessFunction( + FunctionWithException preProcessFunction) { + this.preProcessFunction = preProcessFunction; + } + + public void setPostProcessFunction( + ThrowingConsumer postProcessFunction) { + this.postProcessFunction = postProcessFunction; + } + + public void output(Long o) { + output.collect(new StreamRecord<>(o)); + } + @Override public void open() throws Exception { super.open(); @@ -506,6 +587,18 @@ public void open() throws Exception { getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this); } + @Override + public Watermark preProcessWatermark(Watermark watermark) throws Exception { + return preProcessFunction == null ? watermark : preProcessFunction.apply(watermark); + } + + @Override + public void postProcessWatermark(Watermark watermark) throws Exception { + if (postProcessFunction != null) { + postProcessFunction.accept(watermark); + } + } + @Override public void onEventTime(InternalTimer timer) throws Exception { output.collect(new StreamRecord<>(timer.getTimestamp())); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java index 0a9521dcb5af52..671df723761854 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java @@ -38,6 +38,8 @@ import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedMultiInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.util.function.BiConsumerWithException; +import org.apache.flink.util.function.BiFunctionWithException; import org.junit.jupiter.api.Test; @@ -274,6 +276,66 @@ void testWatermark() throws Exception { } } + @Test + void testWatermarkHooks() throws Exception { + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + KeySelector dummyKeySelector = l -> 0; + List> keySelectors = + Arrays.asList(dummyKeySelector, dummyKeySelector, dummyKeySelector); + + WatermarkTestingOperatorFactory factory = new WatermarkTestingOperatorFactory(); + AtomicInteger counter = new AtomicInteger(0); + factory.setPreProcessFunction( + (operator, watermark) -> { + operator.asyncProcessWithKey( + 1L, + () -> { + operator.output(watermark.getTimestamp() + 1000L); + }); + if (counter.incrementAndGet() % 2 == 0) { + return null; + } else { + return new Watermark(watermark.getTimestamp() + 1L); + } + }); + + factory.setPostProcessFunction( + (operator, watermark) -> { + operator.output(watermark.getTimestamp() + 100L); + }); + try (AsyncKeyedMultiInputStreamOperatorTestHarness testHarness = + AsyncKeyedMultiInputStreamOperatorTestHarness.create( + factory, BasicTypeInfo.INT_TYPE_INFO, keySelectors, 1, 1, 0)) { + testHarness.setup(); + testHarness.open(); + testHarness.processElement(0, new StreamRecord<>(1L, 1L)); + testHarness.processElement(0, new StreamRecord<>(3L, 3L)); + testHarness.processElement(0, new StreamRecord<>(4L, 4L)); + testHarness.processWatermark(0, new Watermark(2L)); + testHarness.processWatermark(1, new Watermark(2L)); + testHarness.processWatermark(2, new Watermark(2L)); + expectedOutput.add(new StreamRecord<>(1002L)); + expectedOutput.add(new StreamRecord<>(1L)); + expectedOutput.add(new StreamRecord<>(3L)); + expectedOutput.add(new Watermark(3L)); + expectedOutput.add(new StreamRecord<>(103L)); + testHarness.processWatermark(0, new Watermark(4L)); + testHarness.processWatermark(1, new Watermark(4L)); + testHarness.processWatermark(2, new Watermark(4L)); + expectedOutput.add(new StreamRecord<>(1004L)); + testHarness.processWatermark(0, new Watermark(5L)); + testHarness.processWatermark(1, new Watermark(5L)); + testHarness.processWatermark(2, new Watermark(5L)); + expectedOutput.add(new StreamRecord<>(1005L)); + expectedOutput.add(new StreamRecord<>(4L)); + expectedOutput.add(new Watermark(6L)); + expectedOutput.add(new StreamRecord<>(106L)); + + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + } + } + @Test void testWatermarkStatus() throws Exception { try (KeyedOneInputStreamOperatorV2TestHarness, String> @@ -697,10 +759,31 @@ public void onProcessingTime(InternalTimer timer) throws private static class WatermarkTestingOperatorFactory extends AbstractStreamOperatorFactory { + + private BiFunctionWithException + preProcessFunction; + + private BiConsumerWithException + postProcessFunction; + + public void setPreProcessFunction( + BiFunctionWithException + preProcessFunction) { + this.preProcessFunction = preProcessFunction; + } + + public void setPostProcessFunction( + BiConsumerWithException + postProcessFunction) { + this.postProcessFunction = postProcessFunction; + } + @Override public > T createStreamOperator( StreamOperatorParameters parameters) { - return (T) new WatermarkTestingOperator(parameters); + return (T) + new WatermarkTestingOperator( + parameters, preProcessFunction, postProcessFunction); } @Override @@ -714,8 +797,25 @@ private static class WatermarkTestingOperator extends AbstractAsyncStateStreamOp private transient InternalTimerService timerService; - public WatermarkTestingOperator(StreamOperatorParameters parameters) { + private BiFunctionWithException + preProcessFunction; + + private BiConsumerWithException + postProcessFunction; + + public WatermarkTestingOperator( + StreamOperatorParameters parameters, + BiFunctionWithException + preProcessFunction, + BiConsumerWithException + postProcessFunction) { super(parameters, 3); + this.preProcessFunction = preProcessFunction; + this.postProcessFunction = postProcessFunction; + } + + public void output(Long o) { + output.collect(new StreamRecord<>(o)); } @Override @@ -726,6 +826,20 @@ public void open() throws Exception { getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this); } + @Override + public Watermark preProcessWatermark(Watermark watermark) throws Exception { + return preProcessFunction == null + ? watermark + : preProcessFunction.apply(this, watermark); + } + + @Override + public void postProcessWatermark(Watermark watermark) throws Exception { + if (postProcessFunction != null) { + postProcessFunction.accept(this, watermark); + } + } + @Override public void onEventTime(InternalTimer timer) throws Exception { output.collect(new StreamRecord<>(timer.getTimestamp()));