From a0437b6c9319e81a41a6d408b745e364696a51bb Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 24 Jul 2024 18:47:04 +0530 Subject: [PATCH] MSQ window functions: Fix partition boundary issues for arrays (#16780) * MSQ window functions: Fix partition boundary issues for arrays * Address review comments * Cache type strategies * Trigger Build * Convert typeStrategies from list to array --- .../WindowOperatorQueryFrameProcessor.java | 15 ++++++++++--- .../sql/calcite/DrillWindowQueryTest.java | 21 +++++++++++++++++++ .../partition_by_array/wikipedia_query_1.e | 13 ++++++++++++ .../partition_by_array/wikipedia_query_1.q | 6 ++++++ .../partition_by_array/wikipedia_query_2.e | 13 ++++++++++++ .../partition_by_array/wikipedia_query_2.q | 6 ++++++ .../partition_by_array/wikipedia_query_3.e | 13 ++++++++++++ .../partition_by_array/wikipedia_query_3.q | 6 ++++++ 8 files changed, 90 insertions(+), 3 deletions(-) create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.q create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.q create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.q diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index e21cae36d0f5..5fbfd3119d03 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -51,6 +51,7 @@ import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.column.NullableTypeStrategy; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; @@ -59,7 +60,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -85,6 +85,10 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor private ResultRow outputRow = null; private FrameWriter frameWriter = null; + // List of type strategies to compare the partition columns across rows. + // Type strategies are pushed in the same order as column types in frameReader.signature() + private final NullableTypeStrategy[] typeStrategies; + public WindowOperatorQueryFrameProcessor( WindowOperatorQuery query, ReadableFrameChannel inputChannel, @@ -103,13 +107,18 @@ public WindowOperatorQueryFrameProcessor( this.frameWriterFactory = frameWriterFactory; this.operatorFactoryList = operatorFactoryList; this.jsonMapper = jsonMapper; - this.frameReader = frameReader; this.query = query; this.frameRowsAndCols = new ArrayList<>(); this.resultRowAndCols = new ArrayList<>(); this.objectsOfASingleRac = new ArrayList<>(); this.maxRowsMaterialized = maxRowsMaterializedInWindow; this.partitionColumnNames = partitionColumnNames; + + this.frameReader = frameReader; + this.typeStrategies = new NullableTypeStrategy[frameReader.signature().size()]; + for (int i = 0; i < frameReader.signature().size(); i++) { + typeStrategies[i] = frameReader.signature().getColumnType(i).get().getNullableStrategy(); + } } @Override @@ -499,7 +508,7 @@ private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List