diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 049db3144b2c8..5c3929f79dc64 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -57,8 +57,11 @@ import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.triggers.Trigger; @@ -527,6 +530,16 @@ && isCleanupTime(triggerContext.window, timer.getTimestamp())) { } } + @Override + public OperatorAttributes getOperatorAttributes() { + boolean isOutputOnlyAfterEndOfStream = + windowAssigner instanceof GlobalWindows + && trigger instanceof GlobalWindows.EndOfStreamTrigger; + return new OperatorAttributesBuilder() + .setOutputOnlyAfterEndOfStream(isOutputOnlyAfterEndOfStream) + .build(); + } + /** * Drops all state for the given window and calls {@link Trigger#clear(Window, * Trigger.TriggerContext)}. diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java index d462b16e0ea66..636d2563684bb 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; @@ -153,4 +155,9 @@ public void finish() throws Exception { } super.finish(); } + + @Override + public OperatorAttributes getOperatorAttributes() { + return new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build(); + } }