Skip to content

Commit

Permalink
fixup! [FLINK-36949][Runtime] Make getCurrentKey of async state opera…
Browse files Browse the repository at this point in the history
…tors return the ground truth
  • Loading branch information
Zakelly committed Dec 25, 2024
1 parent 56ba88b commit 64ad410
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre

private AsyncExecutionController asyncExecutionController;

/** Act as a cache for {@link #setAsyncKeyedContextElement} and {@link #postProcessElement}. */
private RecordContext currentProcessingContext;

private Environment environment;
Expand Down Expand Up @@ -175,20 +176,18 @@ public final void preserveRecordOrderAndProcess(ThrowingRunnable<Exception> proc
@Override
@SuppressWarnings("unchecked")
public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> processing) {
RecordContext<K> previousContext = currentProcessingContext;

RecordContext<K> oldContext = asyncExecutionController.getCurrentContext();
// build a context and switch to the new context
currentProcessingContext = asyncExecutionController.buildContext(null, key, true);
currentProcessingContext.retain();
asyncExecutionController.setCurrentContext(currentProcessingContext);
RecordContext<K> newContext = asyncExecutionController.buildContext(null, key, true);
newContext.retain();
asyncExecutionController.setCurrentContext(newContext);
// Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key
// pass the same key in.
preserveRecordOrderAndProcess(processing);
postProcessElement();
newContext.release();

// switch to original context
asyncExecutionController.setCurrentContext(previousContext);
currentProcessingContext = previousContext;
asyncExecutionController.setCurrentContext(oldContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt
private final Environment environment;
private AsyncExecutionController asyncExecutionController;

/** Act as a cache for {@link #setAsyncKeyedContextElement} and {@link #postProcessElement}. */
private RecordContext currentProcessingContext;

public AbstractAsyncStateStreamOperatorV2(
Expand Down Expand Up @@ -182,20 +183,18 @@ public final void preserveRecordOrderAndProcess(ThrowingRunnable<Exception> proc
@Override
@SuppressWarnings("unchecked")
public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> processing) {
RecordContext<K> previousContext = currentProcessingContext;

RecordContext<K> oldContext = asyncExecutionController.getCurrentContext();
// build a context and switch to the new context
currentProcessingContext = asyncExecutionController.buildContext(null, key, true);
currentProcessingContext.retain();
asyncExecutionController.setCurrentContext(currentProcessingContext);
RecordContext<K> newContext = asyncExecutionController.buildContext(null, key, true);
newContext.retain();
asyncExecutionController.setCurrentContext(newContext);
// Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key
// pass the same key in.
preserveRecordOrderAndProcess(processing);
postProcessElement();
newContext.release();

// switch to original context
asyncExecutionController.setCurrentContext(previousContext);
currentProcessingContext = previousContext;
asyncExecutionController.setCurrentContext(oldContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.ArrayList;
Expand All @@ -54,7 +54,7 @@
* async processing, please use methods of test harness instead of operator.
*/
public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
extends AbstractStreamOperatorTestHarness<OUT> {
extends OneInputStreamOperatorTestHarness<IN, OUT> {

/** Empty if the {@link #operator} is not {@link MultipleInputStreamOperator}. */
private final List<Input> inputs = new ArrayList<>();
Expand Down

0 comments on commit 64ad410

Please sign in to comment.