Skip to content

Commit

Permalink
[FLINK-36892][Runtime] Properly handle the watermark status within as…
Browse files Browse the repository at this point in the history
…ync state processing
  • Loading branch information
Zakelly committed Dec 13, 2024
1 parent 43c50b9 commit a12d280
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -332,17 +333,33 @@ public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
return;
}
asyncExecutionController.processNonRecord(
() -> {
// todo: make async operator deal with interruptible watermark
if (timeServiceManager != null) {
CompletableFuture<Void> future = timeServiceManager.advanceWatermark(mark);
future.thenAccept(v -> output.emitWatermark(mark));
asyncExecutionController.drainWithTimerIfNeeded(future);
} else {
output.emitWatermark(mark);
}
});
asyncExecutionController.processNonRecord(() -> doProcessWatermark(mark, null));
}

/**
* Handle the watermark and timers, then run a provided {@link Runnable} asynchronously right
* after the watermark is emitted.
*
* @param mark The watermark.
* @param postAction The runnable for post action.
*/
protected void doProcessWatermark(Watermark mark, @Nullable Runnable postAction)
throws Exception {
// todo: make async operator deal with interruptible watermark
if (timeServiceManager != null) {
CompletableFuture<Void> future = timeServiceManager.advanceWatermark(mark);
future.thenAccept(v -> emitWatermark(mark, postAction));
asyncExecutionController.drainWithTimerIfNeeded(future);
} else {
emitWatermark(mark, postAction);
}
}

private void emitWatermark(Watermark mark, @Nullable Runnable postAction) {
output.emitWatermark(mark);
if (postAction != null) {
postAction.run();
}
}

@Override
Expand All @@ -368,10 +385,12 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index
boolean wasIdle = combinedWatermark.isIdle();
// index is 0-based
if (combinedWatermark.updateStatus(index, watermarkStatus.isIdle())) {
super.processWatermark(
new Watermark(combinedWatermark.getCombinedWatermark()));
}
if (wasIdle != combinedWatermark.isIdle()) {
doProcessWatermark(
new Watermark(combinedWatermark.getCombinedWatermark()),
wasIdle == combinedWatermark.isIdle()
? null
: () -> output.emitWatermarkStatus(watermarkStatus));
} else if (wasIdle != combinedWatermark.isIdle()) {
output.emitWatermarkStatus(watermarkStatus);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -151,6 +152,11 @@ public final <T> void setAsyncKeyedContextElement(
asyncExecutionController.setCurrentContext(currentProcessingContext);
}

@Override
public Object getCurrentKey() {
return currentProcessingContext.getKey();
}

@Override
public final void postProcessElement() {
// The processElement will be treated as a callback for dummy request. So reference
Expand Down Expand Up @@ -259,17 +265,33 @@ public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
return;
}
asyncExecutionController.processNonRecord(
() -> {
// todo: make async operator deal with interruptible watermark
if (timeServiceManager != null) {
CompletableFuture<Void> future = timeServiceManager.advanceWatermark(mark);
future.thenAccept(v -> output.emitWatermark(mark));
asyncExecutionController.drainWithTimerIfNeeded(future);
} else {
output.emitWatermark(mark);
}
});
asyncExecutionController.processNonRecord(() -> doProcessWatermark(mark, null));
}

/**
* Handle the watermark and timers, then run a provided {@link Runnable} asynchronously right
* after the watermark is emitted.
*
* @param mark The watermark.
* @param postAction The runnable for post action.
*/
protected void doProcessWatermark(Watermark mark, @Nullable Runnable postAction)
throws Exception {
// todo: make async operator deal with interruptible watermark
if (timeServiceManager != null) {
CompletableFuture<Void> future = timeServiceManager.advanceWatermark(mark);
future.thenAccept(v -> emitWatermark(mark, postAction));
asyncExecutionController.drainWithTimerIfNeeded(future);
} else {
emitWatermark(mark, postAction);
}
}

private void emitWatermark(Watermark mark, @Nullable Runnable postAction) {
output.emitWatermark(mark);
if (postAction != null) {
postAction.run();
}
}

@Override
Expand All @@ -283,10 +305,12 @@ public void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId)
() -> {
boolean wasIdle = combinedWatermark.isIdle();
if (combinedWatermark.updateStatus(inputId - 1, watermarkStatus.isIdle())) {
super.processWatermark(
new Watermark(combinedWatermark.getCombinedWatermark()));
}
if (wasIdle != combinedWatermark.isIdle()) {
doProcessWatermark(
new Watermark(combinedWatermark.getCombinedWatermark()),
wasIdle == combinedWatermark.isIdle()
? null
: () -> output.emitWatermarkStatus(watermarkStatus));
} else if (wasIdle != combinedWatermark.isIdle()) {
output.emitWatermarkStatus(watermarkStatus);
}
});
Expand Down Expand Up @@ -316,12 +340,16 @@ public RecordContext getCurrentProcessingContext() {
@Override
public void finish() throws Exception {
super.finish();
asyncExecutionController.drainInflightRecords(0);
if (isAsyncStateProcessingEnabled()) {
asyncExecutionController.drainInflightRecords(0);
}
}

@Override
public void close() throws Exception {
super.close();
asyncExecutionController.close();
if (isAsyncStateProcessingEnabled()) {
asyncExecutionController.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
Expand Down Expand Up @@ -292,7 +294,9 @@ void testWatermarkStatus() throws Exception {
() -> {
try {
processor.accept(new StreamRecord<>(Tuple2.of(5, "5")));
testOperator.processWatermarkStatus(new WatermarkStatus(0), 1);
testOperator.processWatermark1(new Watermark(205L));
testOperator.processWatermark2(new Watermark(105L));
testOperator.processWatermarkStatus(WatermarkStatus.IDLE, 1);
} catch (Exception e) {
}
});
Expand All @@ -309,8 +313,12 @@ void testWatermarkStatus() throws Exception {
anotherThread.shutdown();
Thread.sleep(1000);
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
assertThat(testOperator.watermarkStatus.isActive()).isTrue();
assertThat(testOperator.watermarkIndex).isEqualTo(1);
assertThat(testOperator.watermarkStatus.isActive()).isFalse();
assertThat(testHarness.getOutput())
.containsExactly(
new StreamRecord<>("EventTimer-5-105"),
new Watermark(105L),
new Watermark(205L));
}
}

Expand All @@ -332,13 +340,18 @@ private static class TestOperator extends AbstractAsyncStateStreamOperator<Strin
private WatermarkStatus watermarkStatus = new WatermarkStatus(-1);
private int watermarkIndex = -1;

InternalTimerService<VoidNamespace> timerService;

TestOperator(ElementOrder elementOrder) {
this.elementOrder = elementOrder;
}

@Override
public void open() throws Exception {
super.open();
this.timerService =
getInternalTimerService(
"processing timer", VoidNamespaceSerializer.INSTANCE, this);
}

@Override
Expand All @@ -352,6 +365,8 @@ public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws
synchronized (objectToWait) {
objectToWait.wait();
}
timerService.registerEventTimeTimer(
VoidNamespace.INSTANCE, element.getValue().f0 + 100L);
}

@Override
Expand All @@ -369,11 +384,18 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index
}

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {}
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
output.collect(
new StreamRecord<>(
"EventTimer-" + timer.getKey() + "-" + timer.getTimestamp()));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer)
throws Exception {}
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
output.collect(
new StreamRecord<>(
"ProcessingTimer-" + timer.getKey() + "-" + timer.getTimestamp()));
}

public int getProcessed() {
return processed.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
Expand Down Expand Up @@ -293,7 +294,11 @@ void testWatermarkStatus() throws Exception {
() -> {
try {
processor.accept(new StreamRecord<>(Tuple2.of(5, "5")));
testOperator.processWatermarkStatus(new WatermarkStatus(0), 1);
testOperator.getInputs().get(0).processWatermark(new Watermark(205L));
testOperator
.getInputs()
.get(0)
.processWatermarkStatus(WatermarkStatus.IDLE);
} catch (Exception e) {
}
});
Expand All @@ -308,8 +313,12 @@ void testWatermarkStatus() throws Exception {
Thread.sleep(3000);
anotherThread.shutdown();
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
assertThat(testOperator.watermarkStatus.isActive()).isTrue();
assertThat(testOperator.watermarkIndex).isEqualTo(1);
assertThat(testOperator.watermarkStatus.isActive()).isFalse();
assertThat(testHarness.getOutput())
.containsExactly(
new StreamRecord<>("EventTimer-5-105"),
new Watermark(205L),
WatermarkStatus.IDLE);
}
}

Expand Down Expand Up @@ -372,6 +381,8 @@ private static class SingleInputTestOperator extends AbstractAsyncStateStreamOpe
private WatermarkStatus watermarkStatus = new WatermarkStatus(-1);
private int watermarkIndex = -1;

InternalTimerService<VoidNamespace> timerService;

public SingleInputTestOperator(
StreamOperatorParameters<String> parameters, ElementOrder elementOrder) {
super(parameters, 1);
Expand All @@ -385,13 +396,18 @@ public void processElement(StreamRecord<Tuple2<Integer, String>> element)
synchronized (objectToWait) {
objectToWait.wait();
}
timerService.registerEventTimeTimer(
VoidNamespace.INSTANCE, element.getValue().f0 + 100L);
}
};
}

@Override
public void open() throws Exception {
super.open();
this.timerService =
getInternalTimerService(
"processing timer", VoidNamespaceSerializer.INSTANCE, this);
}

@Override
Expand All @@ -405,11 +421,18 @@ public ElementOrder getElementOrder() {
}

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {}
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
output.collect(
new StreamRecord<>(
"EventTimer-" + timer.getKey() + "-" + timer.getTimestamp()));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer)
throws Exception {}
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
output.collect(
new StreamRecord<>(
"ProcessingTimer-" + timer.getKey() + "-" + timer.getTimestamp()));
}

@Override
public void processRecordAttributes(RecordAttributes recordAttributes, int inputId)
Expand Down

0 comments on commit a12d280

Please sign in to comment.