From 52fb78d3c41b568de9b42709e3bc381d7f486531 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 3 Oct 2023 14:53:30 -0700 Subject: [PATCH 1/7] Updating plans when using joins with unnest on the left --- .../apache/druid/query/JoinDataSource.java | 18 ++- .../druid/sql/calcite/rel/DruidRels.java | 2 +- .../sql/calcite/CalciteJoinQueryTest.java | 107 ++++++++++++++++++ 3 files changed, 125 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java index a7af13ed76cf..c3689fd38646 100644 --- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java @@ -401,9 +401,13 @@ public Function createSegmentMapFn( ) ); + final Function baseMapFn = left.createSegmentMapFunction( + query, + cpuTimeAccumulator + ); return baseSegment -> new HashJoinSegment( - baseSegment, + baseMapFn.apply(baseSegment), baseFilterToUse, GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()), joinFilterPreAnalysis @@ -518,6 +522,18 @@ private static Triple> flattenJoi ); } + // Handling for cases when the left side of a join is an UnnestDataSource or a FilteredDataSource + // the base data source for the analysis needs to look deeper into the base of the datasources + while (current instanceof UnnestDataSource) { + final UnnestDataSource unnestDataSource = (UnnestDataSource) current; + current = unnestDataSource.getBase(); + } + + while (current instanceof FilteredDataSource) { + final FilteredDataSource filteredDataSource = (FilteredDataSource) current; + current = filteredDataSource.getBase(); + } + // Join clauses were added in the order we saw them while traversing down, but we need to apply them in the // going-up order. So reverse them. Collections.reverse(preJoinableClauses); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRels.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRels.java index c35c872544f1..1627329c75e4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRels.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRels.java @@ -66,7 +66,7 @@ public static boolean isScanOrMapping(final DruidRel druidRel, final boolean */ public static boolean isScanOrProject(final DruidRel druidRel, final boolean canBeJoinOrUnion) { - if (druidRel instanceof DruidQueryRel || (canBeJoinOrUnion && (druidRel instanceof DruidJoinQueryRel + if (druidRel instanceof DruidQueryRel || (canBeJoinOrUnion && (druidRel instanceof DruidJoinQueryRel || druidRel instanceof DruidCorrelateUnnestRel || druidRel instanceof DruidUnionDataSourceRel))) { final PartialDruidQuery partialQuery = druidRel.getPartialDruidQuery(); final PartialDruidQuery.Stage stage = partialQuery.stage(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index bb1660f856a9..1a23902e89bc 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -38,6 +38,7 @@ import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.DataSource; import org.apache.druid.query.Druids; +import org.apache.druid.query.FilteredDataSource; import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.JoinDataSource; @@ -49,6 +50,7 @@ import org.apache.druid.query.QueryException; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FilteredAggregatorFactory; @@ -5914,4 +5916,109 @@ public void testJoinWithInputRefCondition() ) ); } + + @Test + public void testJoinsWithUnnestOnLeft() + { + Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + testQuery( + "with t1 as (\n" + + "select * from foo, unnest(MV_TO_ARRAY(\"dim3\")) as u(d3)\n" + + ")\n" + + "select t1.dim3, t1.d3, t2.dim2 from t1 JOIN numfoo as t2\n" + + "ON t1.d3 = t2.\"dim2\"", + context, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + join( + UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE1), + expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING), + null + ), + new QueryDataSource( + newScanQueryBuilder() + .intervals(querySegmentSpec(Filtration.eternity())) + .dataSource(CalciteTests.DATASOURCE3) + .columns("dim2") + .legacy(false) + .context(context) + .build() + ), + "_j0.", + "(\"j0.unnest\" == \"_j0.dim2\")", + JoinType.INNER + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("_j0.dim2", "dim3", "j0.unnest") + .context(context) + .build() + ), + useDefault ? + ImmutableList.of( + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"} + ) : ImmutableList.of( + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"", "", ""} + ) + ); + } + + @Test + public void testJoinsWithUnnestOverFilteredDSOnLeft() + { + Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + testQuery( + "with t1 as (\n" + + "select * from foo, unnest(MV_TO_ARRAY(\"dim3\")) as u(d3) where dim2='a'\n" + + ")\n" + + "select t1.dim3, t1.d3, t2.dim2 from t1 JOIN numfoo as t2\n" + + "ON t1.d3 = t2.\"dim2\"", + context, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + join( + UnnestDataSource.create( + FilteredDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE1), + equality("dim2", "a", ColumnType.STRING) + ), + expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING), + null + ), + new QueryDataSource( + newScanQueryBuilder() + .intervals(querySegmentSpec(Filtration.eternity())) + .dataSource(CalciteTests.DATASOURCE3) + .columns("dim2") + .legacy(false) + .context(context) + .build() + ), + "_j0.", + "(\"j0.unnest\" == \"_j0.dim2\")", + JoinType.INNER + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("_j0.dim2", "dim3", "j0.unnest") + .context(context) + .build() + ), + useDefault ? + ImmutableList.of( + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"} + ) : ImmutableList.of( + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"", "", ""} + ) + ); + } } From e2b132960ce96e7f73a0763d248034bdfea04cd9 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 3 Oct 2023 17:57:13 -0700 Subject: [PATCH 2/7] Correcting segment map function for hashJoin --- .../apache/druid/query/JoinDataSource.java | 61 +++++++------- .../sql/calcite/CalciteJoinQueryTest.java | 83 +++++++++++++++++++ 2 files changed, 116 insertions(+), 28 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java index c3689fd38646..038980ceba2c 100644 --- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java @@ -400,11 +400,15 @@ public Function createSegmentMapFn( .orElse(null) ) ); - - final Function baseMapFn = left.createSegmentMapFunction( - query, - cpuTimeAccumulator - ); + final Function baseMapFn; + if (left instanceof JoinDataSource) { + baseMapFn = Function.identity(); + } else { + baseMapFn = left.createSegmentMapFunction( + query, + cpuTimeAccumulator + ); + } return baseSegment -> new HashJoinSegment( baseMapFn.apply(baseSegment), @@ -508,31 +512,32 @@ private static Triple> flattenJoi DimFilter currentDimFilter = null; final List preJoinableClauses = new ArrayList<>(); - while (current instanceof JoinDataSource) { - final JoinDataSource joinDataSource = (JoinDataSource) current; - current = joinDataSource.getLeft(); - currentDimFilter = validateLeftFilter(current, joinDataSource.getLeftFilter()); - preJoinableClauses.add( - new PreJoinableClause( - joinDataSource.getRightPrefix(), - joinDataSource.getRight(), - joinDataSource.getJoinType(), - joinDataSource.getConditionAnalysis() - ) - ); - } - - // Handling for cases when the left side of a join is an UnnestDataSource or a FilteredDataSource - // the base data source for the analysis needs to look deeper into the base of the datasources - while (current instanceof UnnestDataSource) { - final UnnestDataSource unnestDataSource = (UnnestDataSource) current; - current = unnestDataSource.getBase(); + // There can be queries like + // Join of Unnest of Join of Unnest of Filter + // so these checks are needed to be ORed + // to get the base + while (current instanceof JoinDataSource || current instanceof UnnestDataSource || current instanceof FilteredDataSource) { + if (current instanceof JoinDataSource) { + final JoinDataSource joinDataSource = (JoinDataSource) current; + current = joinDataSource.getLeft(); + currentDimFilter = validateLeftFilter(current, joinDataSource.getLeftFilter()); + preJoinableClauses.add( + new PreJoinableClause( + joinDataSource.getRightPrefix(), + joinDataSource.getRight(), + joinDataSource.getJoinType(), + joinDataSource.getConditionAnalysis() + ) + ); + } else if (current instanceof UnnestDataSource) { + final UnnestDataSource unnestDataSource = (UnnestDataSource) current; + current = unnestDataSource.getBase(); + } else { + final FilteredDataSource filteredDataSource = (FilteredDataSource) current; + current = filteredDataSource.getBase(); + } } - while (current instanceof FilteredDataSource) { - final FilteredDataSource filteredDataSource = (FilteredDataSource) current; - current = filteredDataSource.getBase(); - } // Join clauses were added in the order we saw them while traversing down, but we need to apply them in the // going-up order. So reverse them. diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index 1a23902e89bc..f926788bf7f4 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -66,6 +66,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.ExtractionDimensionSpec; import org.apache.druid.query.extraction.SubstringDimExtractionFn; +import org.apache.druid.query.filter.InDimFilter; import org.apache.druid.query.filter.LikeDimFilter; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.ResultRow; @@ -6021,4 +6022,86 @@ public void testJoinsWithUnnestOverFilteredDSOnLeft() ) ); } + + @Test + public void testJoinsWithUnnestOverJoin() + { + Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + testQuery( + "with t1 as (\n" + + "select * from (SELECT * from foo JOIN (select dim2 as t from foo where dim2 IN ('a','b','ab','abc')) ON dim2=t), " + + " unnest(MV_TO_ARRAY(\"dim3\")) as u(d3) \n" + + ")\n" + + "select t1.dim3, t1.d3, t2.dim2 from t1 JOIN numfoo as t2\n" + + "ON t1.d3 = t2.\"dim2\"", + context, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + join( + UnnestDataSource.create( + join( + new TableDataSource(CalciteTests.DATASOURCE1), + new QueryDataSource( + newScanQueryBuilder() + .intervals(querySegmentSpec(Filtration.eternity())) + .dataSource(CalciteTests.DATASOURCE1) + .filters(new InDimFilter("dim2", ImmutableList.of("a", "b", "ab", "abc"), null)) + .legacy(false) + .context(context) + .columns("dim2") + .build() + ), + "j0.", + "(\"dim2\" == \"j0.dim2\")", + JoinType.INNER + ), + expressionVirtualColumn("_j0.unnest", "\"dim3\"", ColumnType.STRING), + null + ), + new QueryDataSource( + newScanQueryBuilder() + .intervals(querySegmentSpec(Filtration.eternity())) + .dataSource(CalciteTests.DATASOURCE3) + .columns("dim2") + .legacy(false) + .context(context) + .build() + ), + "__j0.", + "(\"_j0.unnest\" == \"__j0.dim2\")", + JoinType.INNER + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__j0.dim2", "_j0.unnest", "dim3") + .context(context) + .build() + ), + useDefault ? + ImmutableList.of( + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"} + ) : ImmutableList.of( + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"", "", ""}, + new Object[]{"", "", ""}, + new Object[]{"", "", ""}, + new Object[]{"", "", ""} + ) + ); + } } From 7a06d1b4085aff250a9159d3c8f49bdca79bd8ec Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 3 Oct 2023 19:03:25 -0700 Subject: [PATCH 3/7] The changes done here are not reflected into MSQ yet so these tests might not run in MSQ --- .../druid/sql/calcite/CalciteJoinQueryTest.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index f926788bf7f4..2bf0cc799b87 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -5921,6 +5921,10 @@ public void testJoinWithInputRefCondition() @Test public void testJoinsWithUnnestOnLeft() { + // Segment map function of MSQ needs some work + // To handle these nested cases + // Remove this when that's handled + notMsqCompatible(); Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); testQuery( "with t1 as (\n" @@ -5972,6 +5976,10 @@ public void testJoinsWithUnnestOnLeft() @Test public void testJoinsWithUnnestOverFilteredDSOnLeft() { + // Segment map function of MSQ needs some work + // To handle these nested cases + // Remove this when that's handled + notMsqCompatible(); Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); testQuery( "with t1 as (\n" @@ -6026,6 +6034,10 @@ public void testJoinsWithUnnestOverFilteredDSOnLeft() @Test public void testJoinsWithUnnestOverJoin() { + // Segment map function of MSQ needs some work + // To handle these nested cases + // Remove this when that's handled + notMsqCompatible(); Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); testQuery( "with t1 as (\n" From 0a4892d5b376c544b644f24c48f9696b59ca5c5c Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Wed, 4 Oct 2023 18:33:24 -0700 Subject: [PATCH 4/7] native tests --- .../apache/druid/query/JoinDataSource.java | 4 ++ .../druid/query/JoinDataSourceTest.java | 48 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java index 038980ceba2c..1f16f0f69ca9 100644 --- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java @@ -516,6 +516,10 @@ private static Triple> flattenJoi // Join of Unnest of Join of Unnest of Filter // so these checks are needed to be ORed // to get the base + // This also means that an addition of a new datasource + // Will need an instanceof check here + // A future work should look into if the flattenJoin + // can be refactored to omit these instanceof checks while (current instanceof JoinDataSource || current instanceof UnnestDataSource || current instanceof FilteredDataSource) { if (current instanceof JoinDataSource) { final JoinDataSource joinDataSource = (JoinDataSource) current; diff --git a/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java index b23c0b92dbbd..b821bc49c4e7 100644 --- a/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java @@ -29,11 +29,14 @@ import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.filter.InDimFilter; import org.apache.druid.query.filter.TrueDimFilter; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.join.JoinConditionAnalysis; import org.apache.druid.segment.join.JoinType; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.join.NoopJoinableFactory; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.easymock.Mock; import org.junit.Assert; import org.junit.Rule; @@ -433,6 +436,51 @@ public void test_computeJoinDataSourceCacheKey_keyChangesWithPrefix() Assert.assertFalse(Arrays.equals(cacheKey1, cacheKey2)); } + @Test + public void testGetAnalysisWithUnnestDS() + { + JoinDataSource dataSource = JoinDataSource.create( + UnnestDataSource.create( + new TableDataSource("table1"), + new ExpressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING, ExprMacroTable.nil()), + null + ), + new TableDataSource("table2"), + "j.", + "x == \"j.x\"", + JoinType.LEFT, + null, + ExprMacroTable.nil(), + null + ); + DataSourceAnalysis analysis = dataSource.getAnalysis(); + Assert.assertEquals("table1", analysis.getBaseDataSource().getTableNames().iterator().next()); + } + + @Test + public void testGetAnalysisWithFilteredDS() + { + JoinDataSource dataSource = JoinDataSource.create( + UnnestDataSource.create( + FilteredDataSource.create( + new TableDataSource("table1"), + TrueDimFilter.instance() + ), + new ExpressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING, ExprMacroTable.nil()), + null + ), + new TableDataSource("table2"), + "j.", + "x == \"j.x\"", + JoinType.LEFT, + null, + ExprMacroTable.nil(), + null + ); + DataSourceAnalysis analysis = dataSource.getAnalysis(); + Assert.assertEquals("table1", analysis.getBaseDataSource().getTableNames().iterator().next()); + } + @Test public void test_computeJoinDataSourceCacheKey_keyChangesWithBaseFilter() { From 78f0090767f29a544499e797d8aa0c4b9d4dd21d Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Thu, 5 Oct 2023 14:07:10 -0700 Subject: [PATCH 5/7] Self joins with unnest data source --- .../sql/calcite/CalciteJoinQueryTest.java | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index 5a43980f5bf7..306457055c7f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -6116,4 +6116,73 @@ public void testJoinsWithUnnestOverJoin() ) ); } + + @Test + public void testSelfJoinsWithUnnestOnLeftAndRight() + { + // Segment map function of MSQ needs some work + // To handle these nested cases + // Remove this when that's handled + msqIncompatible(); + Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + testQuery( + "with t1 as (\n" + + "select * from foo, unnest(MV_TO_ARRAY(\"dim3\")) as u(d3)\n" + + ")\n" + + "select t1.dim3, t1.d3, t2.dim2 from t1 JOIN t1 as t2\n" + + "ON t1.d3 = t2.d3", + context, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + join( + UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE1), + expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING), + null + ), + new QueryDataSource( + newScanQueryBuilder() + .intervals(querySegmentSpec(Filtration.eternity())) + .dataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE1), + expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING), + null + )) + .columns("dim2", "j0.unnest") + .legacy(false) + .context(context) + .build() + ), + "_j0.", + "(\"j0.unnest\" == \"_j0.j0.unnest\")", + JoinType.INNER + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("_j0.dim2", "dim3", "j0.unnest") + .context(context) + .build() + ), + useDefault ? + ImmutableList.of( + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "b", "a"}, + new Object[]{"[\"a\",\"b\"]", "b", ""}, + new Object[]{"[\"b\",\"c\"]", "b", "a"}, + new Object[]{"[\"b\",\"c\"]", "b", ""}, + new Object[]{"[\"b\",\"c\"]", "c", ""}, + new Object[]{"d", "d", ""} + ) : ImmutableList.of( + new Object[]{"[\"a\",\"b\"]", "a", "a"}, + new Object[]{"[\"a\",\"b\"]", "b", "a"}, + new Object[]{"[\"a\",\"b\"]", "b", null}, + new Object[]{"[\"b\",\"c\"]", "b", "a"}, + new Object[]{"[\"b\",\"c\"]", "b", null}, + new Object[]{"[\"b\",\"c\"]", "c", null}, + new Object[]{"d", "d", ""}, + new Object[]{"", "", "a"} + ) + ); + } } From 31153f71d55e1773d02515b064a0c75d881a0f1f Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Thu, 5 Oct 2023 14:28:05 -0700 Subject: [PATCH 6/7] Making this pass --- .../org/apache/druid/sql/calcite/CalciteSysQueryTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSysQueryTest.java index 5a66383194cf..d67084ea27a5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSysQueryTest.java @@ -33,7 +33,7 @@ public class CalciteSysQueryTest extends BaseCalciteQueryTest @Test public void testTasksSum() { - notMsqCompatible(); + msqIncompatible(); testBuilder() .sql("select datasource, sum(duration) from sys.tasks group by datasource") @@ -50,7 +50,7 @@ public void testTasksSum() @Test public void testTasksSumOver() { - notMsqCompatible(); + msqIncompatible(); testBuilder() .sql("select datasource, sum(duration) over () from sys.tasks group by datasource") From b6c2e03c989aee41ccd456d97496b857d01326c9 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Fri, 6 Oct 2023 13:49:54 -0700 Subject: [PATCH 7/7] Addressing comments by adding explanation and new test --- .../apache/druid/query/JoinDataSource.java | 13 ++- .../sql/calcite/CalciteJoinQueryTest.java | 84 +++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java index ab099f1ac42d..220f18a94855 100644 --- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java @@ -477,6 +477,13 @@ private Function createSegmentMapFunctionInt ) ); final Function baseMapFn; + // A join data source is not concrete + // And isConcrete() of an unnest datasource delegates to its base + // Hence, in the case of a Join -> Unnest -> Join + // if we just use isConcrete on the left + // the segment map function for the unnest would never get called + // This calls us to delegate to the segmentMapFunction of the left + // only when it is not a JoinDataSource if (left instanceof JoinDataSource) { baseMapFn = Function.identity(); } else { @@ -513,6 +520,11 @@ private static Triple> flattenJoi // Join of Unnest of Join of Unnest of Filter // so these checks are needed to be ORed // to get the base + // This method is called to get the analysis for the join data source + // Since the analysis of an UnnestDS or FilteredDS always delegates to its base + // To obtain the base data source underneath a Join + // we also iterate through the base of the FilterDS and UnnestDS in its path + // the base of which can be a concrete data source // This also means that an addition of a new datasource // Will need an instanceof check here // A future work should look into if the flattenJoin @@ -539,7 +551,6 @@ private static Triple> flattenJoi } } - // Join clauses were added in the order we saw them while traversing down, but we need to apply them in the // going-up order. So reverse them. Collections.reverse(preJoinableClauses); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index 306457055c7f..e8a728339605 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -6185,4 +6185,88 @@ public void testSelfJoinsWithUnnestOnLeftAndRight() ) ); } + + @Test + public void testJoinsOverUnnestOverFilterDSOverJoin() + { + // Segment map function of MSQ needs some work + // To handle these nested cases + // Remove this when that's handled + msqIncompatible(); + Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + testQuery( + "with t1 as (\n" + + "select * from (SELECT * from foo JOIN (select dim2 as t from foo where dim2 IN ('a','b','ab','abc')) ON dim2=t),\n" + + "unnest(MV_TO_ARRAY(\"dim3\")) as u(d3) where m1 IN (1,4) and d3='a'\n" + + ")\n" + + "select t1.dim3, t1.d3, t2.dim2, t1.m1 from t1 JOIN numfoo as t2\n" + + "ON t1.d3 = t2.\"dim2\"", + context, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + join( + UnnestDataSource.create( + FilteredDataSource.create( + join( + new TableDataSource(CalciteTests.DATASOURCE1), + new QueryDataSource( + newScanQueryBuilder() + .intervals(querySegmentSpec(Filtration.eternity())) + .dataSource(CalciteTests.DATASOURCE1) + .columns("dim2") + .filters(new InDimFilter( + "dim2", + ImmutableList.of("a", "ab", "abc", "b"), + null + )) + .legacy(false) + .context(context) + .build() + ), + "j0.", + "(\"dim2\" == \"j0.dim2\")", + JoinType.INNER + ), + useDefault ? + new InDimFilter("m1", ImmutableList.of("1", "4"), null) : + or( + equality("m1", 1.0, ColumnType.FLOAT), + equality("m1", 4.0, ColumnType.FLOAT) + ) + ), + expressionVirtualColumn("_j0.unnest", "\"dim3\"", ColumnType.STRING), + equality("_j0.unnest", "a", ColumnType.STRING) + ), + new QueryDataSource( + newScanQueryBuilder() + .intervals(querySegmentSpec(Filtration.eternity())) + .dataSource(CalciteTests.DATASOURCE3) + .columns("dim2") + .legacy(false) + .context(context) + .build() + ), + "__j0.", + "(\"_j0.unnest\" == \"__j0.dim2\")", + JoinType.INNER + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__j0.dim2", "_j0.unnest", "dim3", "m1") + .context(context) + .build() + ), + ImmutableList.of( + new Object[]{"[\"a\",\"b\"]", "a", "a", 1.0f}, + new Object[]{"[\"a\",\"b\"]", "a", "a", 1.0f}, + new Object[]{"[\"a\",\"b\"]", "a", "a", 1.0f}, + new Object[]{"[\"a\",\"b\"]", "a", "a", 1.0f}, + new Object[]{"[\"a\",\"b\"]", "a", "a", 1.0f}, + new Object[]{"[\"a\",\"b\"]", "a", "a", 1.0f}, + new Object[]{"[\"a\",\"b\"]", "a", "a", 1.0f}, + new Object[]{"[\"a\",\"b\"]", "a", "a", 1.0f} + ) + ); + } }