diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index e21cae36d0f52..5fbfd3119d030 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -51,6 +51,7 @@ import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.column.NullableTypeStrategy; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; @@ -59,7 +60,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -85,6 +85,10 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor private ResultRow outputRow = null; private FrameWriter frameWriter = null; + // List of type strategies to compare the partition columns across rows. + // Type strategies are pushed in the same order as column types in frameReader.signature() + private final NullableTypeStrategy[] typeStrategies; + public WindowOperatorQueryFrameProcessor( WindowOperatorQuery query, ReadableFrameChannel inputChannel, @@ -103,13 +107,18 @@ public WindowOperatorQueryFrameProcessor( this.frameWriterFactory = frameWriterFactory; this.operatorFactoryList = operatorFactoryList; this.jsonMapper = jsonMapper; - this.frameReader = frameReader; this.query = query; this.frameRowsAndCols = new ArrayList<>(); this.resultRowAndCols = new ArrayList<>(); this.objectsOfASingleRac = new ArrayList<>(); this.maxRowsMaterialized = maxRowsMaterializedInWindow; this.partitionColumnNames = partitionColumnNames; + + this.frameReader = frameReader; + this.typeStrategies = new NullableTypeStrategy[frameReader.signature().size()]; + for (int i = 0; i < frameReader.signature().size(); i++) { + typeStrategies[i] = frameReader.signature().getColumnType(i).get().getNullableStrategy(); + } } @Override @@ -499,7 +508,7 @@ private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List