From 416a139446f0f372eda79e697df10d666b3c2cff Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Mon, 14 Aug 2023 19:21:57 -0700 Subject: [PATCH 01/22] Adding unnest to MSQ --- .../msq/querykit/BaseLeafFrameProcessor.java | 2 +- .../druid/msq/querykit/DataSourcePlan.java | 49 ++++- .../druid/msq/sql/MSQTaskSqlEngine.java | 1 - .../apache/druid/msq/exec/MSQSelectTest.java | 188 ++++++++++++++++++ .../test/CalciteArraysSelectQueryMSQTest.java | 99 +++++++++ .../apache/druid/msq/test/MSQTestBase.java | 7 + .../sql/calcite/CalciteArraysQueryTest.java | 42 ++++ 7 files changed, 385 insertions(+), 3 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java index d5b31328b0b8..ffdec8afd087 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java @@ -196,7 +196,7 @@ private boolean initializeSegmentMapFn(final IntSet readableInputs) if (segmentMapFn != null) { return true; } else if (broadcastJoinHelper == null) { - segmentMapFn = Function.identity(); + segmentMapFn = query.getDataSource().createSegmentMapFunction(query, cpuAccumulator); return true; } else { final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 477c3e0e1982..ce19ec5f13d0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -50,6 +50,7 @@ import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.planning.PreJoinableClause; @@ -137,7 +138,13 @@ public static DataSourcePlan forDataSource( } else if (dataSource instanceof LookupDataSource) { checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forLookup((LookupDataSource) dataSource, broadcast); - } else if (dataSource instanceof QueryDataSource) { + } else if (dataSource instanceof UnnestDataSource) { + return forUnnest( + queryKit, queryId, queryContext, + (UnnestDataSource) dataSource, querySegmentSpec, maxWorkerCount, minStageNumber, broadcast + ); + } + else if (dataSource instanceof QueryDataSource) { checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forQuery( queryKit, @@ -347,6 +354,46 @@ private static DataSourcePlan forQuery( ); } + /** + * Build a plan for Unnest data source + */ + private static DataSourcePlan forUnnest( + final QueryKit queryKit, + final String queryId, + final QueryContext queryContext, + final UnnestDataSource dataSource, + final QuerySegmentSpec querySegmentSpec, + final int maxWorkerCount, + final int minStageNumber, + final boolean broadcast + ) + { + final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(); + final DataSourcePlan basePlan = forDataSource( + queryKit, + queryId, + queryContext, + dataSource.getBase(), + querySegmentSpec, + null, // Don't push query filters down through a join: this needs some work to ensure pruning works properly. + maxWorkerCount, + Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), + broadcast + ); + DataSource newDataSource = basePlan.getNewDataSource(); + basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); + + final List inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); + basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); + + int shift = basePlan.getInputSpecs().size(); + newDataSource = UnnestDataSource.create(shiftInputNumbers(newDataSource, shift ), dataSource.getVirtualColumn(), dataSource.getUnnestFilter()); + return new DataSourcePlan(newDataSource, + inputSpecs, + broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), + subQueryDefBuilder); + } + /** * Build a plan for broadcast hash-join. */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 37b8692cb4df..56c945fb381f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -113,7 +113,6 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case GROUPING_SETS: case WINDOW_FUNCTIONS: case UNNEST: - return false; case CAN_SELECT: case CAN_INSERT: case CAN_REPLACE: diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index ae0bfa71f1e1..c1845080382b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -51,6 +51,7 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FilteredAggregatorFactory; @@ -128,6 +129,7 @@ public class MSQSelectTest extends MSQTestBase @Parameterized.Parameters(name = "{index}:with context {0}") public static Collection data() { + /* Object[][] data = new Object[][]{ {DEFAULT, DEFAULT_MSQ_CONTEXT}, {DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT}, @@ -135,6 +137,10 @@ public static Collection data() {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}, {QUERY_RESULTS_WITH_DURABLE_STORAGE, QUERY_RESULTS_WITH_DURABLE_STORAGE_CONTEXT}, {QUERY_RESULTS_WITH_DEFAULT, QUERY_RESULTS_WITH_DEFAULT_CONTEXT} + };*/ + + Object[][] data = new Object[][]{ + {DEFAULT, UNNEST_CONTEXT} }; return Arrays.asList(data); } @@ -2119,6 +2125,188 @@ public void testJoinUsesDifferentAlgorithm() .verifyResults(); } + @Test + public void testSelectOnFoo3() + { + RowSignature resultSignature = RowSignature.builder() + .add("EXPR$0", ColumnType.LONG) + .build(); + RowSignature outputSignature = RowSignature.builder() + .add("d", ColumnType.LONG) + .build(); + + final ColumnMappings expectedColumnMappings = new ColumnMappings( + ImmutableList.of( + new ColumnMapping("EXPR$0", "d") + ) + ); + + testSelectQuery() + .setSql("select d from UNNEST(ARRAY[1,2,3]) as unnested(d)") + .setExpectedMSQSpec( + MSQSpec.builder() + .query(newScanQueryBuilder() + .dataSource( + InlineDataSource.fromIterable( + ImmutableList.of( + new Object[]{1L}, + new Object[]{2L}, + new Object[]{3L} + ), + resultSignature + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("EXPR$0") + .context(defaultScanQueryContext( + context, + resultSignature + )) + .build()) + .columnMappings(expectedColumnMappings) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination() + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build() + ) + .setExpectedRowSignature(outputSignature) + .setQueryContext(context) + .setExpectedResultRows(ImmutableList.of( + new Object[]{1}, + new Object[]{2}, + new Object[]{3} + )) + .verifyResults(); + } + + + @Test + public void testSelectOnFoo4() + { + RowSignature resultSignature = RowSignature.builder() + .add("j0.unnest", ColumnType.STRING) + .build(); + + RowSignature outputSignature = RowSignature.builder() + .add("d3", ColumnType.STRING) + .build(); + + final ColumnMappings expectedColumnMappings = new ColumnMappings( + ImmutableList.of( + new ColumnMapping("j0.unnest", "d3") + ) + ); + + testSelectQuery() + .setSql("SELECT d3 FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)") + .setExpectedMSQSpec( + MSQSpec.builder() + .query(newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE1), + expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING), + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(defaultScanQueryContext( + context, + resultSignature + )) + .columns(ImmutableList.of("j0.unnest")) + .build()) + .columnMappings(expectedColumnMappings) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination() + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build() + ) + .setExpectedRowSignature(outputSignature) + .setQueryContext(context) + .setExpectedResultRows(ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"b"}, + new Object[]{"c"}, + new Object[]{"d"}, + new Object[]{""}, + new Object[]{""}, + new Object[]{""} + )) + .verifyResults(); + } + + @Test + public void testSelectOnFoo5() + { + RowSignature resultSignature = RowSignature.builder() + .add("j0.unnest", ColumnType.STRING) + .build(); + + RowSignature outputSignature = RowSignature.builder() + .add("d3", ColumnType.STRING) + .build(); + + final ColumnMappings expectedColumnMappings = new ColumnMappings( + ImmutableList.of( + new ColumnMapping("j0.unnest", "d3") + ) + ); + + testSelectQuery() + .setSql("SELECT d3 FROM (select * from druid.foo where dim2='a'), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)") + .setExpectedMSQSpec( + MSQSpec.builder() + .query(newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new QueryDataSource( + newScanQueryBuilder() + .dataSource( + new TableDataSource(CalciteTests.DATASOURCE1) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .filters(equality("dim2", "a", ColumnType.STRING)) + .columns("dim3") + .context(defaultScanQueryContext( + context, + resultSignature + )) + .build() + ), + expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING), + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(defaultScanQueryContext( + context, + resultSignature + )) + .columns(ImmutableList.of("j0.unnest")) + .build()) + .columnMappings(expectedColumnMappings) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination() + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build() + ) + .setExpectedRowSignature(outputSignature) + .setQueryContext(context) + .setExpectedResultRows(ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{""} + )) + .verifyResults(); + } + @Nonnull private List expectedMultiValueFooRowsGroup() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java new file mode 100644 index 000000000000..c75d3843ff21 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Injector; +import com.google.inject.Module; +import org.apache.druid.guice.DruidInjectorBuilder; +import org.apache.druid.msq.exec.WorkerMemoryParameters; +import org.apache.druid.msq.sql.MSQTaskSqlEngine; +import org.apache.druid.query.groupby.TestGroupByBuffers; +import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.sql.calcite.CalciteArraysQueryTest; +import org.apache.druid.sql.calcite.CalciteQueryTest; +import org.apache.druid.sql.calcite.QueryTestBuilder; +import org.apache.druid.sql.calcite.run.SqlEngine; +import org.junit.After; +import org.junit.Before; + +/** + * Runs {@link CalciteArraysQueryTest} but with MSQ engine + */ +public class CalciteArraysSelectQueryMSQTest extends CalciteArraysQueryTest +{ + private TestGroupByBuffers groupByBuffers; + + @Before + public void setup2() + { + groupByBuffers = TestGroupByBuffers.createDefault(); + } + + @After + public void teardown2() + { + groupByBuffers.close(); + } + + @Override + public void configureGuice(DruidInjectorBuilder builder) + { + super.configureGuice(builder); + builder.addModules(CalciteMSQTestsHelper.fetchModules(temporaryFolder, groupByBuffers).toArray(new Module[0])); + } + + + @Override + public SqlEngine createEngine( + QueryLifecycleFactory qlf, + ObjectMapper queryJsonMapper, + Injector injector + ) + { + final WorkerMemoryParameters workerMemoryParameters = + WorkerMemoryParameters.createInstance( + WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50, + 2, + 10, + 2, + 0, + 0 + ); + final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient( + queryJsonMapper, + injector, + new MSQTestTaskActionClient(queryJsonMapper), + workerMemoryParameters + ); + return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper); + } + + @Override + protected QueryTestBuilder testBuilder() + { + return new QueryTestBuilder(new CalciteTestConfig(true)) + .addCustomRunner(new ExtractResultsFactory(() -> (MSQTestOverlordServiceClient) ((MSQTaskSqlEngine) queryFramework().engine()).overlordClient())) + .skipVectorize(true) + .verifyNativeQueries(new VerifyMSQSupportedNativeQueriesPredicate()) + .msqCompatible(msqCompatible); + } +} + diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 4e00fd657ac5..4af11fb34478 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -154,6 +154,7 @@ import org.apache.druid.sql.calcite.planner.CalciteRulesManager; import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.planner.PlannerFactory; import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.sql.calcite.run.SqlEngine; @@ -268,6 +269,12 @@ public class MSQTestBase extends BaseCalciteQueryTest .put(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false) .build(); + public static final Map UNNEST_CONTEXT = + ImmutableMap.builder() + .putAll(DEFAULT_MSQ_CONTEXT) + .put(PlannerContext.CTX_ENABLE_UNNEST, true) + .build(); + public static final String FAULT_TOLERANCE = "fault_tolerance"; public static final String DURABLE_STORAGE = "durable_storage"; public static final String DEFAULT = "default"; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index f5ea4ea60980..e5217e9084f3 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -3823,6 +3823,48 @@ public void testUnnestWithINFiltersWithNoLeftRewrite() ); } + @Test + public void testUnnestVirtualWithColumns1() + { + // This tells the test to skip generating (vectorize = force) path + // Generates only 1 native query with vectorize = false + skipVectorize(); + // This tells that both vectorize = force and vectorize = false takes the same path of non vectorization + // Generates 2 native queries with 2 different values of vectorize + cannotVectorize(); + testQuery( + "SELECT strings FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (strings) where (strings='a' or (m1=2 and strings='b'))", + QUERY_CONTEXT_UNNEST, + ImmutableList.of( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE3), + expressionVirtualColumn("j0.unnest", "array(\"dim4\",\"dim5\")", ColumnType.STRING_ARRAY), + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(QUERY_CONTEXT_UNNEST) + .columns(ImmutableList.of("j0.unnest")) + .build() + ), + ImmutableList.of( + new Object[]{"a"}, + new Object[]{"aa"}, + new Object[]{"a"}, + new Object[]{"ab"}, + new Object[]{"a"}, + new Object[]{"ba"}, + new Object[]{"b"}, + new Object[]{"ad"}, + new Object[]{"b"}, + new Object[]{"aa"}, + new Object[]{"b"}, + new Object[]{"ab"} + ) + ); + } @Test public void testUnnestWithInvalidINFiltersOnUnnestedColumn() { From eb024d00a5bdb81cf4ddb7c2c7607feb4a8b7074 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Wed, 23 Aug 2023 20:37:37 -0700 Subject: [PATCH 02/22] Adding filtered data source + nested querying on unnest works in MSQ --- .../druid/msq/querykit/DataSourcePlan.java | 43 ++++++++++++++++++- .../apache/druid/msq/exec/MSQSelectTest.java | 13 ++++-- .../sql/calcite/CalciteArraysQueryTest.java | 19 +++----- 3 files changed, 58 insertions(+), 17 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index ce19ec5f13d0..77e0b8dd3790 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -44,6 +44,7 @@ import org.apache.druid.msq.kernel.StageDefinitionBuilder; import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory; import org.apache.druid.query.DataSource; +import org.apache.druid.query.FilteredDataSource; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.LookupDataSource; @@ -138,7 +139,11 @@ public static DataSourcePlan forDataSource( } else if (dataSource instanceof LookupDataSource) { checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forLookup((LookupDataSource) dataSource, broadcast); + } else if (dataSource instanceof FilteredDataSource) { + return forFilteredDataSource(queryKit, queryId, queryContext, + (FilteredDataSource) dataSource, querySegmentSpec, maxWorkerCount, minStageNumber, broadcast); } else if (dataSource instanceof UnnestDataSource) { + checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forUnnest( queryKit, queryId, queryContext, (UnnestDataSource) dataSource, querySegmentSpec, maxWorkerCount, minStageNumber, broadcast @@ -354,6 +359,43 @@ private static DataSourcePlan forQuery( ); } + private static DataSourcePlan forFilteredDataSource( + final QueryKit queryKit, + final String queryId, + final QueryContext queryContext, + final FilteredDataSource dataSource, + final QuerySegmentSpec querySegmentSpec, + final int maxWorkerCount, + final int minStageNumber, + final boolean broadcast + ) + { + final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(); + final DataSourcePlan basePlan = forDataSource( + queryKit, + queryId, + queryContext, + dataSource.getBase(), + querySegmentSpec, + null, // Don't push query filters down through a join: this needs some work to ensure pruning works properly. + maxWorkerCount, + Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), + broadcast + ); + + DataSource newDataSource = basePlan.getNewDataSource(); + basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); + + final List inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); + + int shift = basePlan.getInputSpecs().size(); + newDataSource = FilteredDataSource.create(shiftInputNumbers(newDataSource, shift), dataSource.getFilter()); + return new DataSourcePlan(newDataSource, + inputSpecs, + broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), + subQueryDefBuilder); + + } /** * Build a plan for Unnest data source */ @@ -384,7 +426,6 @@ private static DataSourcePlan forUnnest( basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); final List inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); - basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); int shift = basePlan.getInputSpecs().size(); newDataSource = UnnestDataSource.create(shiftInputNumbers(newDataSource, shift ), dataSource.getVirtualColumn(), dataSource.getUnnestFilter()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index c1845080382b..1bb810fe22b7 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -2233,8 +2233,8 @@ public void testSelectOnFoo4() new Object[]{"c"}, new Object[]{"d"}, new Object[]{""}, - new Object[]{""}, - new Object[]{""} + new Object[]{null}, + new Object[]{null} )) .verifyResults(); } @@ -2246,6 +2246,10 @@ public void testSelectOnFoo5() .add("j0.unnest", ColumnType.STRING) .build(); + RowSignature resultSignature1 = RowSignature.builder() + .add("dim3", ColumnType.STRING) + .build(); + RowSignature outputSignature = RowSignature.builder() .add("d3", ColumnType.STRING) .build(); @@ -2257,7 +2261,7 @@ public void testSelectOnFoo5() ); testSelectQuery() - .setSql("SELECT d3 FROM (select * from druid.foo where dim2='a'), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)") + .setSql("SELECT d3 FROM (select * from druid.foo where dim2='a' LIMIT 10), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)") .setExpectedMSQSpec( MSQSpec.builder() .query(newScanQueryBuilder() @@ -2274,8 +2278,9 @@ public void testSelectOnFoo5() .columns("dim3") .context(defaultScanQueryContext( context, - resultSignature + resultSignature1 )) + .limit(10) .build() ), expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index a6f4f829e84d..a1ad45365170 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -4160,7 +4160,7 @@ public void testUnnestWithINFiltersWithNoLeftRewrite() } @Test - public void testUnnestVirtualWithColumns1() + public void testUnnestVirtualWithColumns3() { // This tells the test to skip generating (vectorize = force) path // Generates only 1 native query with vectorize = false @@ -4169,7 +4169,7 @@ public void testUnnestVirtualWithColumns1() // Generates 2 native queries with 2 different values of vectorize cannotVectorize(); testQuery( - "SELECT strings FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (strings) where (strings='a' or (m1=2 and strings='b'))", + "SELECT strings FROM druid.numfoo, UNNEST(ARRAY[dim4,dim5]) as unnested (strings) where (strings='a' or (m1=2 and strings='b'))", QUERY_CONTEXT_UNNEST, ImmutableList.of( Druids.newScanQueryBuilder() @@ -4183,21 +4183,16 @@ public void testUnnestVirtualWithColumns1() .legacy(false) .context(QUERY_CONTEXT_UNNEST) .columns(ImmutableList.of("j0.unnest")) + .filters(or( + equality("j0.unnest", "a", ColumnType.STRING), + and(equality("m1", 2.0, ColumnType.FLOAT), equality("j0.unnest", "b", ColumnType.STRING)) + )) .build() ), ImmutableList.of( new Object[]{"a"}, - new Object[]{"aa"}, - new Object[]{"a"}, - new Object[]{"ab"}, new Object[]{"a"}, - new Object[]{"ba"}, - new Object[]{"b"}, - new Object[]{"ad"}, - new Object[]{"b"}, - new Object[]{"aa"}, - new Object[]{"b"}, - new Object[]{"ab"} + new Object[]{"a"} ) ); } From b4a8c62b02c7f01ea279196f0863fe918ce36de6 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Thu, 24 Aug 2023 14:13:00 -0700 Subject: [PATCH 03/22] Removing Unnest feature --- .../druid/msq/querykit/DataSourcePlan.java | 52 +++++++++++----- .../druid/msq/sql/MSQTaskSqlEngine.java | 2 +- .../apache/druid/msq/exec/MSQSelectTest.java | 60 ++++++++++++++++++- .../test/CalciteArraysSelectQueryMSQTest.java | 1 - .../apache/druid/msq/test/MSQTestBase.java | 7 --- .../sql/calcite/planner/PlannerContext.java | 9 +-- .../druid/sql/calcite/rule/DruidRules.java | 14 ++--- .../sql/calcite/run/NativeSqlEngine.java | 1 - .../druid/sql/calcite/view/ViewSqlEngine.java | 3 - .../sql/calcite/BaseCalciteQueryTest.java | 1 - .../sql/calcite/CalciteArraysQueryTest.java | 21 ++++++- 11 files changed, 125 insertions(+), 46 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 77e0b8dd3790..11ef7bfafae1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -140,16 +140,29 @@ public static DataSourcePlan forDataSource( checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forLookup((LookupDataSource) dataSource, broadcast); } else if (dataSource instanceof FilteredDataSource) { - return forFilteredDataSource(queryKit, queryId, queryContext, - (FilteredDataSource) dataSource, querySegmentSpec, maxWorkerCount, minStageNumber, broadcast); + return forFilteredDataSource( + queryKit, + queryId, + queryContext, + (FilteredDataSource) dataSource, + querySegmentSpec, + maxWorkerCount, + minStageNumber, + broadcast + ); } else if (dataSource instanceof UnnestDataSource) { checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forUnnest( - queryKit, queryId, queryContext, - (UnnestDataSource) dataSource, querySegmentSpec, maxWorkerCount, minStageNumber, broadcast + queryKit, + queryId, + queryContext, + (UnnestDataSource) dataSource, + querySegmentSpec, + maxWorkerCount, + minStageNumber, + broadcast ); - } - else if (dataSource instanceof QueryDataSource) { + } else if (dataSource instanceof QueryDataSource) { checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forQuery( queryKit, @@ -390,12 +403,15 @@ private static DataSourcePlan forFilteredDataSource( int shift = basePlan.getInputSpecs().size(); newDataSource = FilteredDataSource.create(shiftInputNumbers(newDataSource, shift), dataSource.getFilter()); - return new DataSourcePlan(newDataSource, - inputSpecs, - broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), - subQueryDefBuilder); + return new DataSourcePlan( + newDataSource, + inputSpecs, + broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), + subQueryDefBuilder + ); } + /** * Build a plan for Unnest data source */ @@ -428,11 +444,17 @@ private static DataSourcePlan forUnnest( final List inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); int shift = basePlan.getInputSpecs().size(); - newDataSource = UnnestDataSource.create(shiftInputNumbers(newDataSource, shift ), dataSource.getVirtualColumn(), dataSource.getUnnestFilter()); - return new DataSourcePlan(newDataSource, - inputSpecs, - broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), - subQueryDefBuilder); + newDataSource = UnnestDataSource.create( + shiftInputNumbers(newDataSource, shift), + dataSource.getVirtualColumn(), + dataSource.getUnnestFilter() + ); + return new DataSourcePlan( + newDataSource, + inputSpecs, + broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), + subQueryDefBuilder + ); } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 6dad97f19086..2518b3eb3326 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -114,7 +114,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case TIME_BOUNDARY_QUERY: case GROUPING_SETS: case WINDOW_FUNCTIONS: - case UNNEST: + return false; case CAN_SELECT: case CAN_INSERT: case CAN_REPLACE: diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 1bb810fe22b7..2a8bf5b64c93 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -140,7 +140,7 @@ public static Collection data() };*/ Object[][] data = new Object[][]{ - {DEFAULT, UNNEST_CONTEXT} + {DEFAULT, DEFAULT_MSQ_CONTEXT} }; return Arrays.asList(data); } @@ -2312,6 +2312,64 @@ public void testSelectOnFoo5() .verifyResults(); } + @Test + public void testSelectOnFoo6() + { + RowSignature resultSignature = RowSignature.builder() + .add("j0.unnest", ColumnType.STRING) + .build(); + + RowSignature outputSignature = RowSignature.builder() + .add("d3", ColumnType.STRING) + .build(); + + final ColumnMappings expectedColumnMappings = new ColumnMappings( + ImmutableList.of( + new ColumnMapping("j0.unnest", "d3") + ) + ); + + testSelectQuery() + .setSql("SELECT COUNT(*) FROM foo") + .setExpectedMSQSpec( + MSQSpec.builder() + .query(newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE1), + expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING), + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(defaultScanQueryContext( + context, + resultSignature + )) + .columns(ImmutableList.of("j0.unnest")) + .build()) + .columnMappings(expectedColumnMappings) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination() + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build() + ) + .setExpectedRowSignature(outputSignature) + .setQueryContext(context) + .setExpectedResultRows(ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"b"}, + new Object[]{"c"}, + new Object[]{"d"}, + new Object[]{""}, + new Object[]{null}, + new Object[]{null} + )) + .verifyResults(); + } + @Nonnull private List expectedMultiValueFooRowsGroup() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java index c75d3843ff21..16aef6bdcaaa 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java @@ -28,7 +28,6 @@ import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.sql.calcite.CalciteArraysQueryTest; -import org.apache.druid.sql.calcite.CalciteQueryTest; import org.apache.druid.sql.calcite.QueryTestBuilder; import org.apache.druid.sql.calcite.run.SqlEngine; import org.junit.After; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 8d2c304c4945..3ce2d18e40de 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -154,7 +154,6 @@ import org.apache.druid.sql.calcite.planner.CalciteRulesManager; import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.planner.PlannerConfig; -import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.planner.PlannerFactory; import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.sql.calcite.run.SqlEngine; @@ -269,12 +268,6 @@ public class MSQTestBase extends BaseCalciteQueryTest .put(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false) .build(); - public static final Map UNNEST_CONTEXT = - ImmutableMap.builder() - .putAll(DEFAULT_MSQ_CONTEXT) - .put(PlannerContext.CTX_ENABLE_UNNEST, true) - .build(); - public static final String FAULT_TOLERANCE = "fault_tolerance"; public static final String DURABLE_STORAGE = "durable_storage"; public static final String DEFAULT = "default"; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index d72b577ef3fa..4988d848baf3 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -82,11 +82,6 @@ public class PlannerContext */ public static final String CTX_ENABLE_WINDOW_FNS = "windowsAreForClosers"; - /** - * Undocumented context key, used to enable {@link org.apache.calcite.sql.fun.SqlStdOperatorTable#UNNEST}. - */ - public static final String CTX_ENABLE_UNNEST = "enableUnnest"; - public static final String CTX_SQL_USE_BOUNDS_AND_SELECTORS = "sqlUseBoundAndSelectors"; public static final boolean DEFAULT_SQL_USE_BOUNDS_AND_SELECTORS = NullHandling.replaceWithDefault(); @@ -528,11 +523,11 @@ public boolean featureAvailable(final EngineFeature feature) return false; } - if (feature == EngineFeature.UNNEST && + /*if (feature == EngineFeature.UNNEST && !QueryContexts.getAsBoolean(CTX_ENABLE_UNNEST, queryContext.get(CTX_ENABLE_UNNEST), false)) { // Short-circuit: feature requires context flag. return false; - } + }*/ return engine.featureAvailable(feature, this); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java index 526eb6a976c0..9b4df6f7233b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java @@ -115,13 +115,13 @@ public static List rules(PlannerContext plannerContext) retVal.add(DruidOuterQueryRule.WINDOW); } - if (plannerContext.featureAvailable(EngineFeature.UNNEST)) { - retVal.add(new DruidUnnestRule(plannerContext)); - retVal.add(new DruidCorrelateUnnestRule(plannerContext)); - retVal.add(CoreRules.PROJECT_CORRELATE_TRANSPOSE); - retVal.add(DruidFilterUnnestRule.instance()); - retVal.add(DruidFilterUnnestRule.DruidProjectOnUnnestRule.instance()); - } + // Adding unnest specific rules + retVal.add(new DruidUnnestRule(plannerContext)); + retVal.add(new DruidCorrelateUnnestRule(plannerContext)); + retVal.add(CoreRules.PROJECT_CORRELATE_TRANSPOSE); + retVal.add(DruidFilterUnnestRule.instance()); + retVal.add(DruidFilterUnnestRule.DruidProjectOnUnnestRule.instance()); + return retVal; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java index d7fc7d043b6f..ce7296d10aec 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java @@ -103,7 +103,6 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case TOPN_QUERY: case GROUPING_SETS: case WINDOW_FUNCTIONS: - case UNNEST: case ALLOW_BROADCAST_RIGHTY_JOIN: return true; case TIME_BOUNDARY_QUERY: diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java index 47dae5c4d97f..21aee6b6e72a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java @@ -62,9 +62,6 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case SCAN_ORDER_BY_NON_TIME: case GROUPING_SETS: case WINDOW_FUNCTIONS: - case UNNEST: - return true; - // Views can't sit on top of INSERT or REPLACE. case CAN_INSERT: case CAN_REPLACE: diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 3c73319898e4..419405a0cbdf 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -203,7 +203,6 @@ public static void setupNullValues() ImmutableMap.builder() .putAll(QUERY_CONTEXT_DEFAULT) .put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false) - .put(PlannerContext.CTX_ENABLE_UNNEST, true) .build(); public static final Map QUERY_CONTEXT_NO_STRINGIFY_ARRAY_USE_EQUALITY = diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index a1ad45365170..86b3dd9cc11e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -60,7 +60,6 @@ import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.JoinType; import org.apache.druid.sql.calcite.filtration.Filtration; -import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.util.CalciteTests; import org.junit.Assert; import org.junit.Test; @@ -78,7 +77,6 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest private static final Map QUERY_CONTEXT_UNNEST = ImmutableMap.builder() .putAll(QUERY_CONTEXT_DEFAULT) - .put(PlannerContext.CTX_ENABLE_UNNEST, true) .put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false) .build(); @@ -87,6 +85,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testSelectConstantArrayExpressionFromTable() { + notMsqCompatible(); testQuery( "SELECT ARRAY[1,2] as arr, dim1 FROM foo LIMIT 1", ImmutableList.of( @@ -168,6 +167,7 @@ public void testSelectNonConstantArrayExpressionFromTable() @Test public void testSelectNonConstantArrayExpressionFromTableForMultival() { + notMsqCompatible(); final String sql = "SELECT ARRAY[CONCAT(dim3, 'word'),'up'] as arr, dim1 FROM foo LIMIT 5"; final Query scanQuery = newScanQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) @@ -207,6 +207,7 @@ public void testSomeArrayFunctionsWithScanQuery() // Yes these outputs are strange sometimes, arrays are in a partial state of existence so end up a bit // stringy for now this is because virtual column selectors are coercing values back to stringish so that // multi-valued string dimensions can be grouped on. + notMsqCompatible(); List expectedResults; if (useDefault) { expectedResults = ImmutableList.of( @@ -386,6 +387,7 @@ public void testSomeArrayFunctionsWithScanQueryNoStringify() // which will still always be stringified to ultimately adhere to the varchar type // as array support increases in the engine this will likely change since using explict array functions should // probably kick it into an array + notMsqCompatible(); List expectedResults; if (useDefault) { expectedResults = ImmutableList.of( @@ -1017,6 +1019,7 @@ public void testArrayOffset() @Test public void testArrayGroupAsLongArray() { + notMsqCompatible(); // Cannot vectorize as we donot have support in native query subsytem for grouping on arrays cannotVectorize(); testQuery( @@ -1068,6 +1071,7 @@ public void testArrayGroupAsDoubleArray() { // Cannot vectorize as we donot have support in native query subsytem for grouping on arrays as keys cannotVectorize(); + notMsqCompatible(); testQuery( "SELECT ARRAY[d1], SUM(cnt) FROM druid.numfoo GROUP BY 1 ORDER BY 2 DESC", QUERY_CONTEXT_NO_STRINGIFY_ARRAY, @@ -1115,6 +1119,7 @@ public void testArrayGroupAsDoubleArray() @Test public void testArrayGroupAsFloatArray() { + notMsqCompatible(); // Cannot vectorize as we donot have support in native query subsytem for grouping on arrays as keys cannotVectorize(); testQuery( @@ -1605,6 +1610,7 @@ public void testArrayAggMultiValue() @Test public void testArrayAggNumeric() { + notMsqCompatible(); cannotVectorize(); testQuery( "SELECT ARRAY_AGG(l1), ARRAY_AGG(DISTINCT l1), ARRAY_AGG(d1), ARRAY_AGG(DISTINCT d1), ARRAY_AGG(f1), ARRAY_AGG(DISTINCT f1) FROM numfoo", @@ -1741,6 +1747,7 @@ public void testArrayAggNumeric() @Test public void testArrayAggQuantile() { + notMsqCompatible(); cannotVectorize(); testQuery( "SELECT ARRAY_QUANTILE(ARRAY_AGG(l1), 0.9) FROM numfoo", @@ -1784,6 +1791,7 @@ public void testArrayAggQuantile() @Test public void testArrayAggArrays() { + notMsqCompatible(); cannotVectorize(); testQuery( "SELECT ARRAY_AGG(ARRAY[l1, l2]), ARRAY_AGG(DISTINCT ARRAY[l1, l2]) FROM numfoo", @@ -1880,6 +1888,7 @@ public void testArrayAggArrays() @Test public void testArrayConcatAggArrays() { + notMsqCompatible(); cannotVectorize(); testQuery( "SELECT ARRAY_CONCAT_AGG(ARRAY[l1, l2]), ARRAY_CONCAT_AGG(DISTINCT ARRAY[l1, l2]) FROM numfoo", @@ -2028,6 +2037,7 @@ public void testArrayAggExpression() public void testArrayAggMaxBytes() { cannotVectorize(); + notMsqCompatible(); testQuery( "SELECT ARRAY_AGG(l1, 128), ARRAY_AGG(DISTINCT l1, 128) FROM numfoo", ImmutableList.of( @@ -2227,6 +2237,7 @@ public void testArrayAggGroupByArrayAggFromSubquery() @Test public void testArrayAggGroupByArrayAggOfLongsFromSubquery() { + notMsqCompatible(); requireMergeBuffers(3); cannotVectorize(); testQuery( @@ -2366,6 +2377,7 @@ public void testArrayAggGroupByArrayAggOfStringsFromSubquery() @Test public void testArrayAggGroupByArrayAggOfDoubleFromSubquery() { + notMsqCompatible(); requireMergeBuffers(3); cannotVectorize(); testQuery( @@ -2883,6 +2895,7 @@ public void testUnnestTwiceWithFiltersAndExpressions() @Test public void testUnnestThriceWithFiltersOnDimAndUnnestCol() { + notMsqCompatible(); cannotVectorize(); String sql = " SELECT dimZipf, dim3_unnest1, dim3_unnest2, dim3_unnest3 FROM \n" + " ( SELECT * FROM \n" @@ -2981,6 +2994,7 @@ public void testUnnestThriceWithFiltersOnDimAndUnnestCol() @Test public void testUnnestThriceWithFiltersOnDimAndAllUnnestColumns() { + notMsqCompatible(); cannotVectorize(); String sql = " SELECT dimZipf, dim3_unnest1, dim3_unnest2, dim3_unnest3 FROM \n" + " ( SELECT * FROM \n" @@ -3052,6 +3066,7 @@ public void testUnnestThriceWithFiltersOnDimAndUnnestColumnsORCombinations() { cannotVectorize(); skipVectorize(); + notMsqCompatible(); String sql = " SELECT dimZipf, dim3_unnest1, dim3_unnest2, dim3_unnest3 FROM \n" + " ( SELECT * FROM \n" + " ( SELECT * FROM lotsocolumns, UNNEST(MV_TO_ARRAY(dimMultivalEnumerated)) as ut(dim3_unnest1) )" @@ -3930,6 +3945,8 @@ public void testUnnestWithJoinOnTheLeft() { skipVectorize(); cannotVectorize(); + notMsqCompatible(); + // @todo check this back testQuery( "SELECT d3 from (SELECT * from druid.numfoo JOIN (select dim2 as t from druid.numfoo where dim2 IN ('a','b','ab','abc')) ON dim2=t), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)", QUERY_CONTEXT_UNNEST, From 96bbaa4197202682a5f0801507cd2798cebb6fec Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Thu, 24 Aug 2023 16:06:38 -0700 Subject: [PATCH 04/22] Cleaning up code --- .../druid/msq/querykit/DataSourcePlan.java | 4 +- .../apache/druid/msq/exec/MSQSelectTest.java | 70 ++----------------- .../sql/calcite/planner/PlannerContext.java | 6 -- .../druid/sql/calcite/view/ViewSqlEngine.java | 1 + 4 files changed, 7 insertions(+), 74 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 11ef7bfafae1..32d77bc9c361 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -390,7 +390,7 @@ private static DataSourcePlan forFilteredDataSource( queryContext, dataSource.getBase(), querySegmentSpec, - null, // Don't push query filters down through a join: this needs some work to ensure pruning works properly. + null, maxWorkerCount, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), broadcast @@ -433,7 +433,7 @@ private static DataSourcePlan forUnnest( queryContext, dataSource.getBase(), querySegmentSpec, - null, // Don't push query filters down through a join: this needs some work to ensure pruning works properly. + null, maxWorkerCount, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), broadcast diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 2a8bf5b64c93..c3ab9a6ad244 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -129,7 +129,6 @@ public class MSQSelectTest extends MSQTestBase @Parameterized.Parameters(name = "{index}:with context {0}") public static Collection data() { - /* Object[][] data = new Object[][]{ {DEFAULT, DEFAULT_MSQ_CONTEXT}, {DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT}, @@ -137,11 +136,8 @@ public static Collection data() {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}, {QUERY_RESULTS_WITH_DURABLE_STORAGE, QUERY_RESULTS_WITH_DURABLE_STORAGE_CONTEXT}, {QUERY_RESULTS_WITH_DEFAULT, QUERY_RESULTS_WITH_DEFAULT_CONTEXT} - };*/ - - Object[][] data = new Object[][]{ - {DEFAULT, DEFAULT_MSQ_CONTEXT} }; + return Arrays.asList(data); } @@ -2126,7 +2122,7 @@ public void testJoinUsesDifferentAlgorithm() } @Test - public void testSelectOnFoo3() + public void testSelectUnnestOnInlineFoo() { RowSignature resultSignature = RowSignature.builder() .add("EXPR$0", ColumnType.LONG) @@ -2182,7 +2178,7 @@ public void testSelectOnFoo3() @Test - public void testSelectOnFoo4() + public void testSelectUnnestOnFoo() { RowSignature resultSignature = RowSignature.builder() .add("j0.unnest", ColumnType.STRING) @@ -2240,7 +2236,7 @@ public void testSelectOnFoo4() } @Test - public void testSelectOnFoo5() + public void testSelectUnnestOnQueryFoo() { RowSignature resultSignature = RowSignature.builder() .add("j0.unnest", ColumnType.STRING) @@ -2312,64 +2308,6 @@ public void testSelectOnFoo5() .verifyResults(); } - @Test - public void testSelectOnFoo6() - { - RowSignature resultSignature = RowSignature.builder() - .add("j0.unnest", ColumnType.STRING) - .build(); - - RowSignature outputSignature = RowSignature.builder() - .add("d3", ColumnType.STRING) - .build(); - - final ColumnMappings expectedColumnMappings = new ColumnMappings( - ImmutableList.of( - new ColumnMapping("j0.unnest", "d3") - ) - ); - - testSelectQuery() - .setSql("SELECT COUNT(*) FROM foo") - .setExpectedMSQSpec( - MSQSpec.builder() - .query(newScanQueryBuilder() - .dataSource(UnnestDataSource.create( - new TableDataSource(CalciteTests.DATASOURCE1), - expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING), - null - )) - .intervals(querySegmentSpec(Filtration.eternity())) - .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) - .legacy(false) - .context(defaultScanQueryContext( - context, - resultSignature - )) - .columns(ImmutableList.of("j0.unnest")) - .build()) - .columnMappings(expectedColumnMappings) - .tuningConfig(MSQTuningConfig.defaultConfig()) - .destination(isDurableStorageDestination() - ? DurableStorageMSQDestination.INSTANCE - : TaskReportMSQDestination.INSTANCE) - .build() - ) - .setExpectedRowSignature(outputSignature) - .setQueryContext(context) - .setExpectedResultRows(ImmutableList.of( - new Object[]{"a"}, - new Object[]{"b"}, - new Object[]{"b"}, - new Object[]{"c"}, - new Object[]{"d"}, - new Object[]{""}, - new Object[]{null}, - new Object[]{null} - )) - .verifyResults(); - } - @Nonnull private List expectedMultiValueFooRowsGroup() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 4988d848baf3..d71704433987 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -523,12 +523,6 @@ public boolean featureAvailable(final EngineFeature feature) return false; } - /*if (feature == EngineFeature.UNNEST && - !QueryContexts.getAsBoolean(CTX_ENABLE_UNNEST, queryContext.get(CTX_ENABLE_UNNEST), false)) { - // Short-circuit: feature requires context flag. - return false; - }*/ - return engine.featureAvailable(feature, this); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java index 21aee6b6e72a..95483c649b57 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java @@ -62,6 +62,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case SCAN_ORDER_BY_NON_TIME: case GROUPING_SETS: case WINDOW_FUNCTIONS: + return true; // Views can't sit on top of INSERT or REPLACE. case CAN_INSERT: case CAN_REPLACE: From a3599b7249e1ba1889d07bad2f24aecaa02679f0 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Thu, 24 Aug 2023 16:40:03 -0700 Subject: [PATCH 05/22] removing a test added by mistake --- .../sql/calcite/CalciteArraysQueryTest.java | 37 ------------------- 1 file changed, 37 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index 86b3dd9cc11e..a32479d293f0 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -4176,43 +4176,6 @@ public void testUnnestWithINFiltersWithNoLeftRewrite() ); } - @Test - public void testUnnestVirtualWithColumns3() - { - // This tells the test to skip generating (vectorize = force) path - // Generates only 1 native query with vectorize = false - skipVectorize(); - // This tells that both vectorize = force and vectorize = false takes the same path of non vectorization - // Generates 2 native queries with 2 different values of vectorize - cannotVectorize(); - testQuery( - "SELECT strings FROM druid.numfoo, UNNEST(ARRAY[dim4,dim5]) as unnested (strings) where (strings='a' or (m1=2 and strings='b'))", - QUERY_CONTEXT_UNNEST, - ImmutableList.of( - Druids.newScanQueryBuilder() - .dataSource(UnnestDataSource.create( - new TableDataSource(CalciteTests.DATASOURCE3), - expressionVirtualColumn("j0.unnest", "array(\"dim4\",\"dim5\")", ColumnType.STRING_ARRAY), - null - )) - .intervals(querySegmentSpec(Filtration.eternity())) - .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) - .legacy(false) - .context(QUERY_CONTEXT_UNNEST) - .columns(ImmutableList.of("j0.unnest")) - .filters(or( - equality("j0.unnest", "a", ColumnType.STRING), - and(equality("m1", 2.0, ColumnType.FLOAT), equality("j0.unnest", "b", ColumnType.STRING)) - )) - .build() - ), - ImmutableList.of( - new Object[]{"a"}, - new Object[]{"a"}, - new Object[]{"a"} - ) - ); - } @Test public void testUnnestWithInvalidINFiltersOnUnnestedColumn() { From fab04253f34421c6f36651881cdd0a7654397ed1 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Thu, 24 Aug 2023 17:47:38 -0700 Subject: [PATCH 06/22] Handling failed CI for useDefault=false --- .../apache/druid/msq/exec/MSQSelectTest.java | 44 ++++++++++++------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index c3ab9a6ad244..6b6f8ff356e8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -2222,16 +2222,26 @@ public void testSelectUnnestOnFoo() ) .setExpectedRowSignature(outputSignature) .setQueryContext(context) - .setExpectedResultRows(ImmutableList.of( - new Object[]{"a"}, - new Object[]{"b"}, - new Object[]{"b"}, - new Object[]{"c"}, - new Object[]{"d"}, - new Object[]{""}, - new Object[]{null}, - new Object[]{null} - )) + .setExpectedResultRows( + useDefault ? ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"b"}, + new Object[]{"c"}, + new Object[]{"d"}, + new Object[]{""}, + new Object[]{""}, + new Object[]{""} + ) : ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"b"}, + new Object[]{"c"}, + new Object[]{"d"}, + new Object[]{""}, + new Object[]{null}, + new Object[]{null} + )) .verifyResults(); } @@ -2300,11 +2310,15 @@ public void testSelectUnnestOnQueryFoo() ) .setExpectedRowSignature(outputSignature) .setQueryContext(context) - .setExpectedResultRows(ImmutableList.of( - new Object[]{"a"}, - new Object[]{"b"}, - new Object[]{""} - )) + .setExpectedResultRows( + useDefault ? ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"} + ) : ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{""} + )) .verifyResults(); } From b17186e6490eab0f154b72dc8e70876ab3913bbb Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Fri, 25 Aug 2023 06:49:52 -0700 Subject: [PATCH 07/22] tmp --- .../org/apache/druid/sql/calcite/CalciteArraysQueryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index a32479d293f0..7fff348497aa 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -3945,7 +3945,7 @@ public void testUnnestWithJoinOnTheLeft() { skipVectorize(); cannotVectorize(); - notMsqCompatible(); + //notMsqCompatible(); // @todo check this back testQuery( "SELECT d3 from (SELECT * from druid.numfoo JOIN (select dim2 as t from druid.numfoo where dim2 IN ('a','b','ab','abc')) ON dim2=t), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)", From 31f283bec915c1a36c6dfe335e500592cbec66f8 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Fri, 25 Aug 2023 16:30:08 -0700 Subject: [PATCH 08/22] Do not need the shift --- .../java/org/apache/druid/msq/querykit/DataSourcePlan.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 32d77bc9c361..7a17e83e2a87 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -400,9 +400,7 @@ private static DataSourcePlan forFilteredDataSource( basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); final List inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); - - int shift = basePlan.getInputSpecs().size(); - newDataSource = FilteredDataSource.create(shiftInputNumbers(newDataSource, shift), dataSource.getFilter()); + newDataSource = FilteredDataSource.create(newDataSource, dataSource.getFilter()); return new DataSourcePlan( newDataSource, inputSpecs, @@ -443,9 +441,8 @@ private static DataSourcePlan forUnnest( final List inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); - int shift = basePlan.getInputSpecs().size(); newDataSource = UnnestDataSource.create( - shiftInputNumbers(newDataSource, shift), + newDataSource, dataSource.getVirtualColumn(), dataSource.getUnnestFilter() ); From be06d3570424734c5984812cc6def8914e6cf453 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Mon, 28 Aug 2023 23:09:04 -0700 Subject: [PATCH 09/22] Updating to accept JOIN under unnest --- .../apache/druid/msq/querykit/BaseLeafFrameProcessor.java | 6 ++++-- .../java/org/apache/druid/msq/querykit/DataSourcePlan.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java index ffdec8afd087..74a9f567077b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java @@ -38,6 +38,7 @@ import org.apache.druid.query.DataSource; import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.Query; +import org.apache.druid.query.UnnestDataSource; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentReference; @@ -96,7 +97,7 @@ private static Pair, BroadcastJoinHelper> makeInputCh final long memoryReservedForBroadcastJoin ) { - if (!(dataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) { + if (!(dataSource instanceof JoinDataSource || dataSource instanceof UnnestDataSource) && !sideChannels.isEmpty()) { throw new ISE("Did not expect side channels for dataSource [%s]", dataSource); } @@ -107,7 +108,8 @@ private static Pair, BroadcastJoinHelper> makeInputCh inputChannels.add(baseInput.getChannel()); } - if (dataSource instanceof JoinDataSource) { + + if (dataSource instanceof JoinDataSource || dataSource instanceof UnnestDataSource) { final Int2IntMap inputNumberToProcessorChannelMap = new Int2IntOpenHashMap(); final List channelReaders = new ArrayList<>(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 7a17e83e2a87..f60684650475 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -449,7 +449,7 @@ private static DataSourcePlan forUnnest( return new DataSourcePlan( newDataSource, inputSpecs, - broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), + basePlan.getBroadcastInputs(), subQueryDefBuilder ); } From 5096be333cd0b92d9c4b73ae3612a7bad6883c58 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 29 Aug 2023 08:27:00 -0700 Subject: [PATCH 10/22] FilteredDataSource and unnestdata source can have joins underneath and can have non empty side channels --- .../druid/msq/querykit/BaseLeafFrameProcessor.java | 9 +++++++-- .../org/apache/druid/msq/querykit/DataSourcePlan.java | 5 +++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java index 74a9f567077b..3f7e40365238 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java @@ -36,6 +36,7 @@ import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.input.table.SegmentWithDescriptor; import org.apache.druid.query.DataSource; +import org.apache.druid.query.FilteredDataSource; import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.Query; import org.apache.druid.query.UnnestDataSource; @@ -97,7 +98,11 @@ private static Pair, BroadcastJoinHelper> makeInputCh final long memoryReservedForBroadcastJoin ) { - if (!(dataSource instanceof JoinDataSource || dataSource instanceof UnnestDataSource) && !sideChannels.isEmpty()) { + // An UnnestDataSource or FilteredDataSource can have a join as a base + // In such a case a side channel is expected to be there + if (!(dataSource instanceof JoinDataSource + || dataSource instanceof UnnestDataSource + || dataSource instanceof FilteredDataSource) && !sideChannels.isEmpty()) { throw new ISE("Did not expect side channels for dataSource [%s]", dataSource); } @@ -109,7 +114,7 @@ private static Pair, BroadcastJoinHelper> makeInputCh } - if (dataSource instanceof JoinDataSource || dataSource instanceof UnnestDataSource) { + if (dataSource instanceof JoinDataSource || dataSource instanceof UnnestDataSource || dataSource instanceof FilteredDataSource) { final Int2IntMap inputNumberToProcessorChannelMap = new Int2IntOpenHashMap(); final List channelReaders = new ArrayList<>(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index f60684650475..1d3a7af8b38e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -140,6 +140,7 @@ public static DataSourcePlan forDataSource( checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forLookup((LookupDataSource) dataSource, broadcast); } else if (dataSource instanceof FilteredDataSource) { + checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forFilteredDataSource( queryKit, queryId, @@ -425,6 +426,7 @@ private static DataSourcePlan forUnnest( ) { final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(); + // Find the plan for base data source by recursing final DataSourcePlan basePlan = forDataSource( queryKit, queryId, @@ -441,11 +443,14 @@ private static DataSourcePlan forUnnest( final List inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); + // Create the new data source using the data source from the base plan newDataSource = UnnestDataSource.create( newDataSource, dataSource.getVirtualColumn(), dataSource.getUnnestFilter() ); + // The base data source can be a join and might already have broadcast inputs + // Need to set the broadcast inputs from the basePlan return new DataSourcePlan( newDataSource, inputSpecs, From e2497361035c80b2f7426302e7e29afef2d5d1ff Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 29 Aug 2023 08:58:10 -0700 Subject: [PATCH 11/22] Removing stale comment --- .../org/apache/druid/sql/calcite/CalciteArraysQueryTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index 7fff348497aa..508514ad55f8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -3945,8 +3945,6 @@ public void testUnnestWithJoinOnTheLeft() { skipVectorize(); cannotVectorize(); - //notMsqCompatible(); - // @todo check this back testQuery( "SELECT d3 from (SELECT * from druid.numfoo JOIN (select dim2 as t from druid.numfoo where dim2 IN ('a','b','ab','abc')) ON dim2=t), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)", QUERY_CONTEXT_UNNEST, From 53db2ff11d32d93136bc6f85b65220e769ba59ec Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 5 Sep 2023 13:53:12 -0700 Subject: [PATCH 12/22] Adding insert and replace tests and fixing broadcast inputs for filtered data source --- .../druid/msq/querykit/DataSourcePlan.java | 2 +- .../apache/druid/msq/exec/MSQInsertTest.java | 89 +++++++++++++ .../apache/druid/msq/exec/MSQReplaceTest.java | 122 ++++++++++++++++++ .../sql/calcite/CalciteArraysQueryTest.java | 1 + 4 files changed, 213 insertions(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 1d3a7af8b38e..9608b5f39cc1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -405,7 +405,7 @@ private static DataSourcePlan forFilteredDataSource( return new DataSourcePlan( newDataSource, inputSpecs, - broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), + basePlan.getBroadcastInputs(), subQueryDefBuilder ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index 009f595bf310..81ba53f755ef 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -185,6 +185,95 @@ public void testInsertWithExistingTimeColumn() throws IOException } + @Test + public void testInsertWithUnnestInline() + { + List expectedRows = ImmutableList.of( + new Object[]{1692226800000L, 1L}, + new Object[]{1692226800000L, 2L}, + new Object[]{1692226800000L, 3L} + ); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("d", ColumnType.LONG) + .build(); + + + testIngestQuery().setSql( + "insert into foo1 select TIME_PARSE('2023-08-16T23:00') as __time, d from UNNEST(ARRAY[1,2,3]) as unnested(d) PARTITIONED BY ALL") + .setQueryContext(context) + .setExpectedResultRows(expectedRows) + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .verifyResults(); + + } + + @Test + public void testInsertWithUnnest() + { + List expectedRows = ImmutableList.of( + new Object[]{946684800000L, "a"}, + new Object[]{946684800000L, "b"}, + new Object[]{946771200000L, "b"}, + new Object[]{946771200000L, "c"}, + new Object[]{946857600000L, "d"}, + new Object[]{978307200000L, NullHandling.sqlCompatible() ? "" : null}, + new Object[]{978393600000L, null}, + new Object[]{978480000000L, null} + ); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("d", ColumnType.STRING) + .build(); + + + testIngestQuery().setSql( + "insert into foo1 select __time, d from foo,UNNEST(MV_TO_ARRAY(dim3)) as unnested(d) PARTITIONED BY ALL") + .setQueryContext(context) + .setExpectedResultRows(expectedRows) + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .verifyResults(); + + } + + @Test + public void testInsertWithUnnestWithVirtualColumns() + { + List expectedRows = ImmutableList.of( + new Object[]{946684800000L, 1.0f}, + new Object[]{946684800000L, 1.0f}, + new Object[]{946771200000L, 2.0f}, + new Object[]{946771200000L, 2.0f}, + new Object[]{946857600000L, 3.0f}, + new Object[]{946857600000L, 3.0f}, + new Object[]{978307200000L, 4.0f}, + new Object[]{978307200000L, 4.0f}, + new Object[]{978393600000L, 5.0f}, + new Object[]{978393600000L, 5.0f}, + new Object[]{978480000000L, 6.0f}, + new Object[]{978480000000L, 6.0f} + ); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("d", ColumnType.FLOAT) + .build(); + + + testIngestQuery().setSql( + "insert into foo1 select __time, d from foo,UNNEST(ARRAY[m1,m2]) as unnested(d) PARTITIONED BY ALL") + .setQueryContext(context) + .setExpectedResultRows(expectedRows) + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .verifyResults(); + + } + @Test public void testInsertOnExternalDataSource() throws IOException { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 80d6719f1211..9d2937f7a52d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -739,6 +739,128 @@ public void testReplaceWithClusteredByDescendingThrowsException() .verifyPlanningErrors(); } + @Test + public void testReplaceUnnestSegmentEntireTable() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("d", ColumnType.STRING) + .build(); + + testIngestQuery().setSql(" REPLACE INTO foo " + + "OVERWRITE ALL " + + "SELECT __time, d " + + "FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as unnested(d) " + + "PARTITIONED BY ALL TIME ") + .setExpectedDataSource("foo") + .setExpectedRowSignature(rowSignature) + .setQueryContext(context) + .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY) + .setExpectedSegment(ImmutableSet.of(SegmentId.of( + "foo", + Intervals.of("2000-01-01T/P1M"), + "test", + 0 + ))) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{946684800000L, "a"}, + new Object[]{946684800000L, "b"}, + new Object[]{946771200000L, "b"}, + new Object[]{946771200000L, "c"}, + new Object[]{946857600000L, "d"}, + new Object[]{978307200000L, NullHandling.sqlCompatible() ? "" : null}, + new Object[]{978393600000L, null}, + new Object[]{978480000000L, null} + ) + ) + .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", Intervals.ETERNITY, "test", 0))) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(8).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(8).frames(1), + 1, 0, "input0" + ) + .setExpectedSegmentGenerationProgressCountersForStageWorker( + CounterSnapshotMatcher + .with().segmentRowsProcessed(8), + 1, 0 + ) + .verifyResults(); + } + + @Test + public void testReplaceUnnestWithVirtualColumnSegmentEntireTable() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("d", ColumnType.FLOAT) + .build(); + + testIngestQuery().setSql(" REPLACE INTO foo " + + "OVERWRITE ALL " + + "SELECT __time, d " + + "FROM foo, UNNEST(ARRAY[m1, m2]) as unnested(d) " + + "PARTITIONED BY ALL TIME ") + .setExpectedDataSource("foo") + .setExpectedRowSignature(rowSignature) + .setQueryContext(context) + .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY) + .setExpectedSegment(ImmutableSet.of(SegmentId.of( + "foo", + Intervals.of("2000-01-01T/P1M"), + "test", + 0 + ))) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{946684800000L, 1.0f}, + new Object[]{946684800000L, 1.0f}, + new Object[]{946771200000L, 2.0f}, + new Object[]{946771200000L, 2.0f}, + new Object[]{946857600000L, 3.0f}, + new Object[]{946857600000L, 3.0f}, + new Object[]{978307200000L, 4.0f}, + new Object[]{978307200000L, 4.0f}, + new Object[]{978393600000L, 5.0f}, + new Object[]{978393600000L, 5.0f}, + new Object[]{978480000000L, 6.0f}, + new Object[]{978480000000L, 6.0f} + ) + ) + .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", Intervals.ETERNITY, "test", 0))) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(12).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(12).frames(1), + 1, 0, "input0" + ) + .setExpectedSegmentGenerationProgressCountersForStageWorker( + CounterSnapshotMatcher + .with().segmentRowsProcessed(12), + 1, 0 + ) + .verifyResults(); + } + @Test public void testReplaceTombstonesOverPartiallyOverlappingSegments() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index 508514ad55f8..1cd7d47d78c6 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -3066,6 +3066,7 @@ public void testUnnestThriceWithFiltersOnDimAndUnnestColumnsORCombinations() { cannotVectorize(); skipVectorize(); + // lotsofcolumns dataset is not loaded into MSQ tests now, once we load it we can remove this notMsqCompatible(); String sql = " SELECT dimZipf, dim3_unnest1, dim3_unnest2, dim3_unnest3 FROM \n" + " ( SELECT * FROM \n" From 948668cc51ea080b74a12df4263fb4da4ab5e2bf Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 5 Sep 2023 15:52:24 -0700 Subject: [PATCH 13/22] Refactor 1 --- .../druid/msq/querykit/BaseLeafFrameProcessor.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java index 3f7e40365238..6756c5a985de 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java @@ -100,9 +100,16 @@ private static Pair, BroadcastJoinHelper> makeInputCh { // An UnnestDataSource or FilteredDataSource can have a join as a base // In such a case a side channel is expected to be there + final DataSource baseDataSource; + if (dataSource instanceof UnnestDataSource) { + baseDataSource = ((UnnestDataSource) dataSource).getBase(); + } else if (dataSource instanceof FilteredDataSource) { + baseDataSource = ((FilteredDataSource) dataSource).getBase(); + } else { + baseDataSource = dataSource; + } if (!(dataSource instanceof JoinDataSource - || dataSource instanceof UnnestDataSource - || dataSource instanceof FilteredDataSource) && !sideChannels.isEmpty()) { + || baseDataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) { throw new ISE("Did not expect side channels for dataSource [%s]", dataSource); } @@ -114,7 +121,7 @@ private static Pair, BroadcastJoinHelper> makeInputCh } - if (dataSource instanceof JoinDataSource || dataSource instanceof UnnestDataSource || dataSource instanceof FilteredDataSource) { + if (dataSource instanceof JoinDataSource || baseDataSource instanceof JoinDataSource) { final Int2IntMap inputNumberToProcessorChannelMap = new Int2IntOpenHashMap(); final List channelReaders = new ArrayList<>(); From a4659d304708f188466e5b003c3d3bb8afb16219 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Wed, 6 Sep 2023 18:46:00 -0700 Subject: [PATCH 14/22] Adding the unnest engine feature back --- .../druid/msq/querykit/BaseLeafFrameProcessor.java | 2 +- .../org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 1 + .../apache/druid/sql/calcite/rule/DruidRules.java | 12 +++++++----- .../druid/sql/calcite/run/NativeSqlEngine.java | 1 + .../apache/druid/sql/calcite/view/ViewSqlEngine.java | 1 + 5 files changed, 11 insertions(+), 6 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java index 6756c5a985de..b65fe55ec1af 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java @@ -101,7 +101,7 @@ private static Pair, BroadcastJoinHelper> makeInputCh // An UnnestDataSource or FilteredDataSource can have a join as a base // In such a case a side channel is expected to be there final DataSource baseDataSource; - if (dataSource instanceof UnnestDataSource) { + if (dataSource instanceof UnnestDataSource) { baseDataSource = ((UnnestDataSource) dataSource).getBase(); } else if (dataSource instanceof FilteredDataSource) { baseDataSource = ((FilteredDataSource) dataSource).getBase(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 2518b3eb3326..e6578388a40e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -115,6 +115,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case GROUPING_SETS: case WINDOW_FUNCTIONS: return false; + case UNNEST: case CAN_SELECT: case CAN_INSERT: case CAN_REPLACE: diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java index 9b4df6f7233b..6dcbe369e3d8 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java @@ -116,11 +116,13 @@ public static List rules(PlannerContext plannerContext) } // Adding unnest specific rules - retVal.add(new DruidUnnestRule(plannerContext)); - retVal.add(new DruidCorrelateUnnestRule(plannerContext)); - retVal.add(CoreRules.PROJECT_CORRELATE_TRANSPOSE); - retVal.add(DruidFilterUnnestRule.instance()); - retVal.add(DruidFilterUnnestRule.DruidProjectOnUnnestRule.instance()); + if (plannerContext.featureAvailable(EngineFeature.UNNEST)) { + retVal.add(new DruidUnnestRule(plannerContext)); + retVal.add(new DruidCorrelateUnnestRule(plannerContext)); + retVal.add(CoreRules.PROJECT_CORRELATE_TRANSPOSE); + retVal.add(DruidFilterUnnestRule.instance()); + retVal.add(DruidFilterUnnestRule.DruidProjectOnUnnestRule.instance()); + } return retVal; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java index ce7296d10aec..d7fc7d043b6f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java @@ -103,6 +103,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case TOPN_QUERY: case GROUPING_SETS: case WINDOW_FUNCTIONS: + case UNNEST: case ALLOW_BROADCAST_RIGHTY_JOIN: return true; case TIME_BOUNDARY_QUERY: diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java index 95483c649b57..cd719d7f29fd 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java @@ -62,6 +62,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case SCAN_ORDER_BY_NON_TIME: case GROUPING_SETS: case WINDOW_FUNCTIONS: + case UNNEST: return true; // Views can't sit on top of INSERT or REPLACE. case CAN_INSERT: From 7a30bfc5570bc9ae7944946b91b474c15e592429 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Wed, 6 Sep 2023 19:24:41 -0700 Subject: [PATCH 15/22] Making the feature always true --- .../org/apache/druid/sql/calcite/planner/PlannerContext.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index d71704433987..f7d0b24de916 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -523,6 +523,10 @@ public boolean featureAvailable(final EngineFeature feature) return false; } + if (feature == EngineFeature.UNNEST) { + return true; + } + return engine.featureAvailable(feature, this); } From d94277264df43c5e2dc198d40eba45cc1833dad7 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Thu, 7 Sep 2023 08:40:43 -0700 Subject: [PATCH 16/22] Moving the engine feature from planner context to individual engines. That way Unnest can be turned on/off for individual engine such as MSQ if needed --- .../org/apache/druid/sql/calcite/planner/PlannerContext.java | 4 ---- .../org/apache/druid/sql/calcite/IngestionTestSqlEngine.java | 1 + 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index f7d0b24de916..d71704433987 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -523,10 +523,6 @@ public boolean featureAvailable(final EngineFeature feature) return false; } - if (feature == EngineFeature.UNNEST) { - return true; - } - return engine.featureAvailable(feature, this); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java index 272fddbd8a42..4f6e22f4bbed 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java @@ -84,6 +84,7 @@ public boolean featureAvailable(final EngineFeature feature, final PlannerContex case TIME_BOUNDARY_QUERY: case SCAN_NEEDS_SIGNATURE: return false; + case UNNEST: case CAN_INSERT: case CAN_REPLACE: case READ_EXTERNAL_DATA: From d0a423ba0aa0f1f93bd935d4110278ffb6efe61b Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Thu, 7 Sep 2023 10:02:49 -0700 Subject: [PATCH 17/22] Revert "Moving the engine feature from planner context to individual engines. That way Unnest can be turned on/off for individual engine such as MSQ if needed" This reverts commit d94277264df43c5e2dc198d40eba45cc1833dad7. --- .../org/apache/druid/sql/calcite/planner/PlannerContext.java | 4 ++++ .../org/apache/druid/sql/calcite/IngestionTestSqlEngine.java | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index d71704433987..f7d0b24de916 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -523,6 +523,10 @@ public boolean featureAvailable(final EngineFeature feature) return false; } + if (feature == EngineFeature.UNNEST) { + return true; + } + return engine.featureAvailable(feature, this); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java index 4f6e22f4bbed..272fddbd8a42 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java @@ -84,7 +84,6 @@ public boolean featureAvailable(final EngineFeature feature, final PlannerContex case TIME_BOUNDARY_QUERY: case SCAN_NEEDS_SIGNATURE: return false; - case UNNEST: case CAN_INSERT: case CAN_REPLACE: case READ_EXTERNAL_DATA: From 4588e303a943a7a791577978e093c5412f9c5187 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Fri, 15 Sep 2023 09:14:27 -0700 Subject: [PATCH 18/22] Removing an unnecessary check in dataSourcePlan + pushing feature inside engines --- .../java/org/apache/druid/msq/querykit/DataSourcePlan.java | 2 -- .../apache/druid/sql/calcite/planner/PlannerContext.java | 6 ++---- .../apache/druid/sql/calcite/IngestionTestSqlEngine.java | 1 + 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 9608b5f39cc1..0cecf4483406 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -137,10 +137,8 @@ public static DataSourcePlan forDataSource( checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forInline((InlineDataSource) dataSource, broadcast); } else if (dataSource instanceof LookupDataSource) { - checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forLookup((LookupDataSource) dataSource, broadcast); } else if (dataSource instanceof FilteredDataSource) { - checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forFilteredDataSource( queryKit, queryId, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index f7d0b24de916..2397566932e1 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -522,11 +522,9 @@ public boolean featureAvailable(final EngineFeature feature) // Short-circuit: feature requires context flag. return false; } - - if (feature == EngineFeature.UNNEST) { - return true; + if (engine == null) { + return false; } - return engine.featureAvailable(feature, this); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java index 272fddbd8a42..46fb40fddadb 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java @@ -83,6 +83,7 @@ public boolean featureAvailable(final EngineFeature feature, final PlannerContex case TOPN_QUERY: case TIME_BOUNDARY_QUERY: case SCAN_NEEDS_SIGNATURE: + case UNNEST: return false; case CAN_INSERT: case CAN_REPLACE: From 3b2bdfb6f9e972cdc9c079aea1be56da814b95c1 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Fri, 15 Sep 2023 11:27:46 -0700 Subject: [PATCH 19/22] Updating engine in tests --- .../apache/druid/sql/calcite/planner/PlannerContext.java | 3 --- .../sql/calcite/planner/CalcitePlannerModuleTest.java | 8 +++++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 2397566932e1..9141c4db090f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -522,9 +522,6 @@ public boolean featureAvailable(final EngineFeature feature) // Short-circuit: feature requires context flag. return false; } - if (engine == null) { - return false; - } return engine.featureAvailable(feature, this); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java index 52e52ec7f89e..48e7ee2423b3 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java @@ -42,6 +42,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider; +import org.apache.druid.sql.calcite.run.SqlEngine; import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog; import org.apache.druid.sql.calcite.schema.DruidSchemaName; import org.apache.druid.sql.calcite.schema.NamedSchema; @@ -89,6 +90,8 @@ public class CalcitePlannerModuleTest extends CalciteTestBase @Mock private DruidSchemaCatalog rootSchema; + @Mock + private SqlEngine engine; private Set aggregators; private Set operatorConversions; @@ -185,13 +188,16 @@ public void testExtensionCalciteRule() CalciteTests.TEST_AUTHORIZER_MAPPER, AuthConfig.newBuilder().build() ); + + PlannerContext context = PlannerContext.create( toolbox, "SELECT 1", - null, + engine, Collections.emptyMap(), null ); + boolean containsCustomRule = injector.getInstance(CalciteRulesManager.class) .druidConventionRuleSet(context) .contains(customRule); From 32dbebaaaa115482773303aa43e82cdb7d7efa0c Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Fri, 15 Sep 2023 11:54:46 -0700 Subject: [PATCH 20/22] Adding new datasource to msq tests --- .../apache/druid/msq/querykit/DataSourcePlan.java | 1 - .../apache/druid/msq/test/CalciteMSQTestsHelper.java | 12 ++++++++++++ .../druid/sql/calcite/CalciteArraysQueryTest.java | 2 -- .../druid/sql/calcite/util/TestDataBuilder.java | 2 +- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 0cecf4483406..b44b4e0d1661 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -150,7 +150,6 @@ public static DataSourcePlan forDataSource( broadcast ); } else if (dataSource instanceof UnnestDataSource) { - checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forUnnest( queryKit, queryId, diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 0c840b14a88e..d7c0ea1f2d5f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -89,10 +89,13 @@ import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3; +import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE5; +import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS; import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS; import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1; import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS; import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS2; +import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS_LOTS_OF_COLUMNS; /** * Helper class aiding in wiring up the Guice bindings required for MSQ engine to work with the Calcite's tests @@ -246,6 +249,15 @@ private static Supplier> getSupplierForSegment(SegmentId .rows(ROWS1_WITH_NUMERIC_DIMS) .buildMMappedIndex(); break; + case DATASOURCE5: + index = IndexBuilder + .create() + .tmpDir(new File(temporaryFolder.newFolder(), "5")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema(INDEX_SCHEMA_LOTS_O_COLUMNS) + .rows(ROWS_LOTS_OF_COLUMNS) + .buildMMappedIndex(); + break; default: throw new ISE("Cannot query segment %s in test runner", segmentId); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index eff284bad7ff..ae4437faf699 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -3066,8 +3066,6 @@ public void testUnnestThriceWithFiltersOnDimAndUnnestColumnsORCombinations() { cannotVectorize(); skipVectorize(); - // lotsofcolumns dataset is not loaded into MSQ tests now, once we load it we can remove this - notMsqCompatible(); String sql = " SELECT dimZipf, dim3_unnest1, dim3_unnest2, dim3_unnest3 FROM \n" + " ( SELECT * FROM \n" + " ( SELECT * FROM lotsocolumns, UNNEST(MV_TO_ARRAY(dimMultivalEnumerated)) as ut(dim3_unnest1) )" diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java index b2f93340dbf7..c6f05697026f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java @@ -222,7 +222,7 @@ public Optional build( .withRollup(false) .build(); - private static final IncrementalIndexSchema INDEX_SCHEMA_LOTS_O_COLUMNS = new IncrementalIndexSchema.Builder() + public static final IncrementalIndexSchema INDEX_SCHEMA_LOTS_O_COLUMNS = new IncrementalIndexSchema.Builder() .withMetrics( new CountAggregatorFactory("count") ) From 77905d4ae6d94df4330c9b9ed5c8c9b30ae116cf Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Wed, 20 Sep 2023 21:18:36 -0700 Subject: [PATCH 21/22] Updating code and test cases --- .../msq/querykit/BaseLeafFrameProcessor.java | 8 +-- .../druid/msq/querykit/DataSourcePlan.java | 5 +- .../apache/druid/msq/exec/MSQReplaceTest.java | 58 ++++++++++++++++++- .../druid/msq/test/MSQTestWorkerContext.java | 3 +- .../druid/sql/calcite/rule/DruidRules.java | 1 - 5 files changed, 64 insertions(+), 11 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java index b65fe55ec1af..d0a12b3ae753 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java @@ -108,8 +108,7 @@ private static Pair, BroadcastJoinHelper> makeInputCh } else { baseDataSource = dataSource; } - if (!(dataSource instanceof JoinDataSource - || baseDataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) { + if (!(baseDataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) { throw new ISE("Did not expect side channels for dataSource [%s]", dataSource); } @@ -119,9 +118,8 @@ private static Pair, BroadcastJoinHelper> makeInputCh if (baseInput.hasChannel()) { inputChannels.add(baseInput.getChannel()); } - - - if (dataSource instanceof JoinDataSource || baseDataSource instanceof JoinDataSource) { + + if (baseDataSource instanceof JoinDataSource) { final Int2IntMap inputNumberToProcessorChannelMap = new Int2IntOpenHashMap(); final List channelReaders = new ArrayList<>(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index b44b4e0d1661..65788d602773 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -390,7 +390,7 @@ private static DataSourcePlan forFilteredDataSource( querySegmentSpec, null, maxWorkerCount, - Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), + minStageNumber, broadcast ); @@ -432,7 +432,7 @@ private static DataSourcePlan forUnnest( querySegmentSpec, null, maxWorkerCount, - Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), + minStageNumber, broadcast ); DataSource newDataSource = basePlan.getNewDataSource(); @@ -482,7 +482,6 @@ private static DataSourcePlan forBroadcastHashJoin( null, // Don't push query filters down through a join: this needs some work to ensure pruning works properly. maxWorkerCount, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), - broadcast ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 9d2937f7a52d..0a43fdaea721 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -861,6 +861,60 @@ public void testReplaceUnnestWithVirtualColumnSegmentEntireTable() .verifyResults(); } + @Test + public void testReplaceUnnestSegmentWithTimeFilter() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("d", ColumnType.STRING) + .build(); + + testIngestQuery().setSql(" REPLACE INTO foo " + + "OVERWRITE WHERE __time >= TIMESTAMP '1999-01-01 00:00:00' and __time < TIMESTAMP '2002-01-01 00:00:00'" + + "SELECT __time, d " + + "FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as unnested(d) " + + "PARTITIONED BY DAY CLUSTERED BY d ") + .setExpectedDataSource("foo") + .setExpectedRowSignature(rowSignature) + .setQueryContext(context) + .setExpectedDestinationIntervals(ImmutableList.of(Intervals.of( + "1999-01-01T00:00:00.000Z/2002-01-01T00:00:00.000Z"))) + .setExpectedShardSpec(DimensionRangeShardSpec.class) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{946684800000L, "a"}, + new Object[]{946684800000L, "b"}, + new Object[]{946771200000L, "b"}, + new Object[]{946771200000L, "c"}, + new Object[]{946857600000L, "d"}, + new Object[]{978307200000L, NullHandling.sqlCompatible() ? "" : null}, + new Object[]{978393600000L, null}, + new Object[]{978480000000L, null} + ) + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(2, 2, 1, 1, 1, 1).frames(1, 1, 1, 1, 1, 1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(2, 2, 1, 1, 1, 1).frames(1, 1, 1, 1, 1, 1), + 1, 0, "input0" + ) + .setExpectedSegmentGenerationProgressCountersForStageWorker( + CounterSnapshotMatcher + .with().segmentRowsProcessed(8), + 1, 0 + ) + .verifyResults(); + } + @Test public void testReplaceTombstonesOverPartiallyOverlappingSegments() { @@ -877,7 +931,9 @@ public void testReplaceTombstonesOverPartiallyOverlappingSegments() .dataSource("foo1") .build(); - Mockito.doReturn(ImmutableSet.of(existingDataSegment)).when(testTaskActionClient).submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class)); + Mockito.doReturn(ImmutableSet.of(existingDataSegment)) + .when(testTaskActionClient) + .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class)); List expectedResults; if (NullHandling.sqlCompatible()) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java index a6f98b3ba85f..a478d1c3c171 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java @@ -119,7 +119,8 @@ public FrameContext frameContext(QueryDefinition queryDef, int stageNumber) IndexMergerV9 indexMerger = new IndexMergerV9( mapper, indexIO, - OffHeapMemorySegmentWriteOutMediumFactory.instance() + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + true ); final TaskReportFileWriter reportFileWriter = new TaskReportFileWriter() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java index 6dcbe369e3d8..8ca4ab076d9c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java @@ -124,7 +124,6 @@ public static List rules(PlannerContext plannerContext) retVal.add(DruidFilterUnnestRule.DruidProjectOnUnnestRule.instance()); } - return retVal; } From d4a6c7c0e5850a6b756041ad1f52722a282d1b4d Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Thu, 21 Sep 2023 15:50:29 -0700 Subject: [PATCH 22/22] Removed the subqueryDefBuilder to use the one from basePlan for Unnest/Filter DS --- .../org/apache/druid/msq/querykit/DataSourcePlan.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 65788d602773..9c39d8f5495a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -381,7 +381,6 @@ private static DataSourcePlan forFilteredDataSource( final boolean broadcast ) { - final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(); final DataSourcePlan basePlan = forDataSource( queryKit, queryId, @@ -395,7 +394,6 @@ private static DataSourcePlan forFilteredDataSource( ); DataSource newDataSource = basePlan.getNewDataSource(); - basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); final List inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); newDataSource = FilteredDataSource.create(newDataSource, dataSource.getFilter()); @@ -403,7 +401,7 @@ private static DataSourcePlan forFilteredDataSource( newDataSource, inputSpecs, basePlan.getBroadcastInputs(), - subQueryDefBuilder + basePlan.getSubQueryDefBuilder().orElse(null) ); } @@ -422,7 +420,6 @@ private static DataSourcePlan forUnnest( final boolean broadcast ) { - final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(); // Find the plan for base data source by recursing final DataSourcePlan basePlan = forDataSource( queryKit, @@ -436,7 +433,6 @@ private static DataSourcePlan forUnnest( broadcast ); DataSource newDataSource = basePlan.getNewDataSource(); - basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); final List inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); @@ -452,7 +448,7 @@ private static DataSourcePlan forUnnest( newDataSource, inputSpecs, basePlan.getBroadcastInputs(), - subQueryDefBuilder + basePlan.getSubQueryDefBuilder().orElse(null) ); }