From bb882727c03e2a386a895d394e7e807d45a7826f Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Tue, 5 Mar 2024 10:35:31 +0100 Subject: [PATCH] Fix Windowing/scanAndSort query issues on top of Joins. (#15996) allow a hashjoin result to be converted to RowsAndColumns added StorageAdapterRowsAndColumns fix incorrect isConcrete() return values during early phase of planning --- .../StorageAdapterRowsAndColumns.java | 144 ++++++++++++++++++ .../druid/segment/join/HashJoinSegment.java | 13 ++ .../operator/window/RowsAndColumnsHelper.java | 7 + .../rowsandcols/RowsAndColumnsTestBase.java | 4 +- .../StorageAdapterRowsAndColumnsTest.java | 44 ++++++ .../concrete/FrameRowsAndColumnsTest.java | 2 +- .../ClusteredGroupPartitionerTest.java | 15 ++ .../calcite/rel/DruidCorrelateUnnestRel.java | 9 +- .../sql/calcite/rel/DruidJoinQueryRel.java | 9 +- .../druid/sql/calcite/CalciteQueryTest.java | 74 ++++++++- 10 files changed, 314 insertions(+), 7 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumns.java create mode 100644 processing/src/test/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumnsTest.java diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumns.java new file mode 100644 index 000000000000..dbe1bdbb7ddd --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumns.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; +import org.apache.druid.segment.CloseableShapeshifter; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nonnull; +import java.util.Collection; +import java.util.Collections; + +/** + * Provides {@link RowsAndColumns} on top of a {@link StorageAdapter}. + */ +public class StorageAdapterRowsAndColumns implements CloseableShapeshifter, RowsAndColumns +{ + private final StorageAdapter storageAdapter; + private RowsAndColumns materialized; + + public StorageAdapterRowsAndColumns(StorageAdapter storageAdapter) + { + this.storageAdapter = storageAdapter; + } + + @SuppressWarnings("unchecked") + @Override + public T as(Class clazz) + { + if (StorageAdapter.class == clazz) { + return (T) storageAdapter; + } + return null; + } + + @Override + public Collection getColumnNames() + { + return storageAdapter.getRowSignature().getColumnNames(); + } + + @Override + public int numRows() + { + return storageAdapter.getNumRows(); + } + + @Override + public Column findColumn(String name) + { + return getRealRAC().findColumn(name); + } + + @Override + public void close() + { + } + + protected RowsAndColumns getRealRAC() + { + if (materialized == null) { + materialized = materialize(storageAdapter); + } + return materialized; + } + + @Nonnull + private static RowsAndColumns materialize(StorageAdapter as) + { + final Sequence cursors = as.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + RowSignature rowSignature = as.getRowSignature(); + + FrameWriter writer = cursors.accumulate(null, (accumulated, in) -> { + if (accumulated != null) { + // We should not get multiple cursors because we set the granularity to ALL. So, this should never + // actually happen, but it doesn't hurt us to defensive here, so we test against it. + throw new ISE("accumulated[%s] non-null, why did we get multiple cursors?", accumulated); + } + + final ColumnSelectorFactory columnSelectorFactory = in.getColumnSelectorFactory(); + + final FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory( + FrameType.COLUMNAR, + new ArenaMemoryAllocatorFactory(200 << 20), // 200 MB, because, why not? + rowSignature, + Collections.emptyList() + ); + + final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(columnSelectorFactory); + while (!in.isDoneOrInterrupted()) { + frameWriter.addSelection(); + in.advance(); + } + return frameWriter; + }); + + if (writer == null) { + return new EmptyRowsAndColumns(); + } else { + final byte[] bytes = writer.toByteArray(); + return new FrameRowsAndColumns(Frame.wrap(bytes), rowSignature); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java index 55dd5ebdc01e..cdcc3ad8d6b9 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java @@ -23,6 +23,8 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.rowsandcols.StorageAdapterRowsAndColumns; +import org.apache.druid.segment.CloseableShapeshifter; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.StorageAdapter; @@ -32,6 +34,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; + import java.io.Closeable; import java.io.IOException; import java.util.List; @@ -148,4 +151,14 @@ public Optional acquireReferences() return Optional.empty(); } } + + @SuppressWarnings("unchecked") + @Override + public T as(Class clazz) + { + if (CloseableShapeshifter.class.equals(clazz)) { + return (T) new StorageAdapterRowsAndColumns(this.asStorageAdapter()); + } + return SegmentReference.super.as(clazz); + } } diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java index b9781b12d74b..2636156b53cc 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java @@ -99,6 +99,13 @@ public RowsAndColumnsHelper expectColumn(String col, double[] expectedVals) return this; } + public RowsAndColumnsHelper expectColumn(String col, float[] expectedVals) + { + final ColumnHelper helper = columnHelper(col, expectedVals.length, ColumnType.FLOAT); + helper.setExpectation(expectedVals); + return this; + } + public RowsAndColumnsHelper expectColumn(String col, ColumnType type, Object... expectedVals) { return expectColumn(col, expectedVals, type); diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java index 16b84f82728f..0983805ee1d4 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java @@ -28,6 +28,7 @@ import org.junit.Test; import javax.annotation.Nonnull; + import java.util.ArrayList; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -66,7 +67,8 @@ private static ArrayList getMakers() new Object[]{ArrayListRowsAndColumns.class, ArrayListRowsAndColumnsTest.MAKER}, new Object[]{ConcatRowsAndColumns.class, ConcatRowsAndColumnsTest.MAKER}, new Object[]{RearrangedRowsAndColumns.class, RearrangedRowsAndColumnsTest.MAKER}, - new Object[]{FrameRowsAndColumns.class, FrameRowsAndColumnsTest.MAKER} + new Object[]{FrameRowsAndColumns.class, FrameRowsAndColumnsTest.MAKER}, + new Object[]{StorageAdapterRowsAndColumns.class, StorageAdapterRowsAndColumnsTest.MAKER} ); } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumnsTest.java new file mode 100644 index 000000000000..080c3d74099d --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumnsTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumnsTest; +import org.apache.druid.segment.StorageAdapter; + +import java.util.function.Function; + +public class StorageAdapterRowsAndColumnsTest extends RowsAndColumnsTestBase +{ + public StorageAdapterRowsAndColumnsTest() + { + super(StorageAdapterRowsAndColumns.class); + } + + public static Function MAKER = input -> { + return buildFrame(input); + }; + + private static StorageAdapterRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input) + { + FrameRowsAndColumns fRAC = FrameRowsAndColumnsTest.buildFrame(input); + return new StorageAdapterRowsAndColumns(fRAC.as(StorageAdapter.class)); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumnsTest.java index 9bd529b195f8..5f4179abdd99 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumnsTest.java @@ -37,7 +37,7 @@ public FrameRowsAndColumnsTest() return buildFrame(input); }; - private static FrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input) + public static FrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input) { LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(input, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null); diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/ClusteredGroupPartitionerTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/ClusteredGroupPartitionerTest.java index d9851b175308..dd62fcb04c73 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/ClusteredGroupPartitionerTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/ClusteredGroupPartitionerTest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.rowsandcols.semantic; import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.operator.window.RowsAndColumnsHelper; import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; @@ -36,6 +37,8 @@ import java.util.function.BiFunction; import java.util.function.Function; +import static org.junit.Assume.assumeTrue; + public class ClusteredGroupPartitionerTest extends SemanticTestBase { public ClusteredGroupPartitionerTest( @@ -132,9 +135,13 @@ public void testDefaultClusteredGroupPartitioner() @Test public void testDefaultClusteredGroupPartitionerWithNulls() { + assumeTrue("testcase only enabled in sqlCompatible mode", NullHandling.sqlCompatible()); + RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap( ImmutableMap.of( "sorted", new ObjectArrayColumn(new Object[]{null, null, null, 1, 1, 2, 4, 4, 4}, ColumnType.LONG), + "col_d", new ObjectArrayColumn(new Object[]{null, null, null, 1.0, 1.0, 2.0, 4.0, 4.0, 4.0}, ColumnType.DOUBLE), + "col_f", new ObjectArrayColumn(new Object[]{null, null, null, 1.0f, 1.0f, 2.0f, 4.0f, 4.0f, 4.0f}, ColumnType.FLOAT), "unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92}) ) )); @@ -146,18 +153,26 @@ public void testDefaultClusteredGroupPartitionerWithNulls() List expectations = Arrays.asList( new RowsAndColumnsHelper() .expectColumn("sorted", new Object[]{null, null, null}, ColumnType.LONG) + .expectColumn("col_d", new Object[]{null, null, null}, ColumnType.DOUBLE) + .expectColumn("col_f", new Object[]{null, null, null}, ColumnType.FLOAT) .expectColumn("unsorted", new int[]{3, 54, 21}) .allColumnsRegistered(), new RowsAndColumnsHelper() .expectColumn("sorted", new int[]{1, 1}) + .expectColumn("col_d", new double[]{1.0, 1.0}) + .expectColumn("col_f", new float[]{1.0f, 1.0f}) .expectColumn("unsorted", new int[]{1, 5}) .allColumnsRegistered(), new RowsAndColumnsHelper() .expectColumn("sorted", new int[]{2}) + .expectColumn("col_d", new double[]{2.0}) + .expectColumn("col_f", new float[]{2.0f}) .expectColumn("unsorted", new int[]{54}) .allColumnsRegistered(), new RowsAndColumnsHelper() .expectColumn("sorted", new int[]{4, 4, 4}) + .expectColumn("col_d", new double[]{4.0, 4.0, 4.0}) + .expectColumn("col_f", new float[]{4.0f, 4.0f, 4.0f}) .expectColumn("unsorted", new int[]{2, 3, 92}) .allColumnsRegistered() ); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java index 116bf1da3a6d..e9abd16f461f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java @@ -78,7 +78,14 @@ */ public class DruidCorrelateUnnestRel extends DruidRel { - private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__correlate_unnest__"); + private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__correlate_unnest__") + { + @Override + public boolean isConcrete() + { + return false; + } + }; private static final String BASE_UNNEST_OUTPUT_COLUMN = "unnest"; private final Correlate correlateRel; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java index 0020ef2b2716..6a8f15299661 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java @@ -70,7 +70,14 @@ */ public class DruidJoinQueryRel extends DruidRel { - private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__join__"); + static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__join__") + { + @Override + public boolean isConcrete() + { + return false; + } + }; private final Filter leftFilter; private final PartialDruidQuery partialQuery; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 96e696c0c907..d513387cfe9a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -15126,7 +15126,6 @@ public void testInGroupByOrderByLimitOutGroupByOrderByLimit() .run(); } - @Test public void testScanAndSortCanGetSchemaFromScanQuery() { @@ -15145,7 +15144,6 @@ public void testScanAndSortCanGetSchemaFromScanQuery() .run(); } - @DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.SLIGHTLY_WORSE_PLAN) @Test public void testWindowingWithScanAndSort() @@ -15249,7 +15247,6 @@ public void testWindowingWithScanAndSort() @Test public void testWindowingWithOrderBy() { - skipVectorize(); msqIncompatible(); testBuilder() .sql( @@ -15310,6 +15307,77 @@ public void testWindowingWithOrderBy() .run(); } + @NotYetSupported(Modes.MISSING_JOIN_CONVERSION) + @Test + public void testScanAndSortOnJoin() + { + msqIncompatible(); + testBuilder() + .sql("with " + + "main as " + + "(select dim1 as pickup,count(*) as cnt from foo group by 1 order by 2 desc limit 200)," + + "compare0 as " + + "(select dim1 as pickup,count(*) as cnt from numfoo group by 1 order by 2 desc limit 200) " + + "SELECT " + + " main.pickup," + + " main.cnt," + + " coalesce(compare0.cnt,0) as prevCount," + + " safe_divide(100.0 * (main.cnt - compare0.cnt), compare0.cnt) as delta " + + "from main " + + "left join compare0 on main.pickup is not distinct from compare0.pickup " + + "order by delta desc" + ) + .expectedResults( + ImmutableList.of( + new Object[] {"", 1L, 1L, 0.0D}, + new Object[] {"1", 1L, 1L, 0.0D}, + new Object[] {"10.1", 1L, 1L, 0.0D}, + new Object[] {"2", 1L, 1L, 0.0D}, + new Object[] {"abc", 1L, 1L, 0.0D}, + new Object[] {"def", 1L, 1L, 0.0D} + ) + ) + .run(); + } + + @NotYetSupported(Modes.MISSING_JOIN_CONVERSION) + @Test + public void testWindowingOverJoin() + { + msqIncompatible(); + testBuilder() + .sql("with " + + "main as " + + "(select dim1 as pickup,count(*) as cnt from foo group by 1 order by 2 desc limit 200)," + + "compare0 as " + + "(select dim1 as pickup,count(*) as cnt from numfoo group by 1 order by 2 desc limit 200) " + + "SELECT " + + " main.pickup," + + " main.cnt," + + " compare0.cnt," + + " SUM(main.cnt) OVER (ORDER BY main.pickup)" + + "from main " + + "left join compare0 on main.pickup is not distinct from compare0.pickup " + ) + .queryContext( + ImmutableMap.of( + PlannerContext.CTX_ENABLE_WINDOW_FNS, true, + QueryContexts.ENABLE_DEBUG, true + ) + ) + .expectedResults( + ImmutableList.of( + new Object[]{"", 1L, 1L, 1L}, + new Object[]{"1", 1L, 1L, 2L}, + new Object[]{"10.1", 1L, 1L, 3L}, + new Object[]{"2", 1L, 1L, 4L}, + new Object[]{"abc", 1L, 1L, 5L}, + new Object[]{"def", 1L, 1L, 6L} + ) + ) + .run(); + } + @Test public void testCastCharToVarcharInFlattenConcat() {