Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unnest now works on MSQ #14886

Merged
merged 31 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
416a139
Adding unnest to MSQ
somu-imply Aug 15, 2023
5037157
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Aug 21, 2023
5916cfd
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Aug 23, 2023
eb024d0
Adding filtered data source + nested querying on unnest works in MSQ
somu-imply Aug 24, 2023
d3c1e76
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Aug 24, 2023
b4a8c62
Removing Unnest feature
somu-imply Aug 24, 2023
96bbaa4
Cleaning up code
somu-imply Aug 24, 2023
a3599b7
removing a test added by mistake
somu-imply Aug 24, 2023
fab0425
Handling failed CI for useDefault=false
somu-imply Aug 25, 2023
b17186e
tmp
somu-imply Aug 25, 2023
31f283b
Do not need the shift
somu-imply Aug 25, 2023
be06d35
Updating to accept JOIN under unnest
somu-imply Aug 29, 2023
12bd9fa
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Aug 29, 2023
5096be3
FilteredDataSource and unnestdata source can have joins underneath an…
somu-imply Aug 29, 2023
e249736
Removing stale comment
somu-imply Aug 29, 2023
79c1cd4
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Aug 29, 2023
1dd7933
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Sep 5, 2023
53db2ff
Adding insert and replace tests and fixing broadcast inputs for filte…
somu-imply Sep 5, 2023
948668c
Refactor 1
somu-imply Sep 5, 2023
8d33ed9
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Sep 7, 2023
a4659d3
Adding the unnest engine feature back
somu-imply Sep 7, 2023
7a30bfc
Making the feature always true
somu-imply Sep 7, 2023
d942772
Moving the engine feature from planner context to individual engines.…
somu-imply Sep 7, 2023
d0a423b
Revert "Moving the engine feature from planner context to individual …
somu-imply Sep 7, 2023
7119ed0
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Sep 15, 2023
4588e30
Removing an unnecessary check in dataSourcePlan + pushing feature ins…
somu-imply Sep 15, 2023
3b2bdfb
Updating engine in tests
somu-imply Sep 15, 2023
32dbeba
Adding new datasource to msq tests
somu-imply Sep 15, 2023
ef2424f
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Sep 18, 2023
77905d4
Updating code and test cases
somu-imply Sep 21, 2023
d4a6c7c
Removed the subqueryDefBuilder to use the one from basePlan for Unnes…
somu-imply Sep 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private boolean initializeSegmentMapFn(final IntSet readableInputs)
if (segmentMapFn != null) {
return true;
} else if (broadcastJoinHelper == null) {
segmentMapFn = Function.identity();
segmentMapFn = query.getDataSource().createSegmentMapFunction(query, cpuAccumulator);
return true;
} else {
final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
Expand Down Expand Up @@ -137,6 +139,29 @@ public static DataSourcePlan forDataSource(
} else if (dataSource instanceof LookupDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forLookup((LookupDataSource) dataSource, broadcast);
} else if (dataSource instanceof FilteredDataSource) {
return forFilteredDataSource(
queryKit,
queryId,
queryContext,
(FilteredDataSource) dataSource,
querySegmentSpec,
maxWorkerCount,
minStageNumber,
broadcast
);
} else if (dataSource instanceof UnnestDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-confirm if this check is required here. I think it should be removed unless UNNEST on a table cannot have a time filter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you able to confirm if the check is required?

Copy link
Contributor Author

@somu-imply somu-imply Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have asked around, I'll update it once I get back the comments. I have removed it as the base data source types should take care of it, but will add it back otherwise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any updates on this? I wanted to get a final check on this before the approval

return forUnnest(
queryKit,
queryId,
queryContext,
(UnnestDataSource) dataSource,
querySegmentSpec,
maxWorkerCount,
minStageNumber,
broadcast
);
} else if (dataSource instanceof QueryDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forQuery(
Expand Down Expand Up @@ -347,6 +372,91 @@ private static DataSourcePlan forQuery(
);
}

private static DataSourcePlan forFilteredDataSource(
final QueryKit queryKit,
final String queryId,
final QueryContext queryContext,
final FilteredDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
)
{
final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder();
final DataSourcePlan basePlan = forDataSource(
queryKit,
queryId,
queryContext,
dataSource.getBase(),
querySegmentSpec,
null,
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
broadcast
);

DataSource newDataSource = basePlan.getNewDataSource();
basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll);

final List<InputSpec> inputSpecs = new ArrayList<>(basePlan.getInputSpecs());

int shift = basePlan.getInputSpecs().size();
newDataSource = FilteredDataSource.create(shiftInputNumbers(newDataSource, shift), dataSource.getFilter());
return new DataSourcePlan(
newDataSource,
inputSpecs,
broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(),
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
subQueryDefBuilder
);

}

/**
* Build a plan for Unnest data source
*/
private static DataSourcePlan forUnnest(
final QueryKit queryKit,
final String queryId,
final QueryContext queryContext,
final UnnestDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
)
{
final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder();
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
final DataSourcePlan basePlan = forDataSource(
queryKit,
queryId,
queryContext,
dataSource.getBase(),
querySegmentSpec,
null,
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wont this line add a new stage ?

Copy link
Contributor

@LakshSingla LakshSingla Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be fine as it won't be adding a new stage, however, this is effective Math.max(minStageNumber, 0) since the builder here doesn't contain any stages. We can simplify it as below:

Suggested change
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
minStageNumber,

Copy link
Contributor

@LakshSingla LakshSingla Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, thinking about it, we probably didn't need to create a separate subqueryDefBuilder since we aren't adding a new stage anyway, we can pass in the same one to the recursive call which might be causing this confusion. Since there are a few changes pending, let's clean this up as well.

broadcast
);
DataSource newDataSource = basePlan.getNewDataSource();
basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll);
somu-imply marked this conversation as resolved.
Show resolved Hide resolved

final List<InputSpec> inputSpecs = new ArrayList<>(basePlan.getInputSpecs());

int shift = basePlan.getInputSpecs().size();
newDataSource = UnnestDataSource.create(
shiftInputNumbers(newDataSource, shift),
dataSource.getVirtualColumn(),
dataSource.getUnnestFilter()
);
return new DataSourcePlan(
newDataSource,
inputSpecs,
broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(),
subQueryDefBuilder
);
}

/**
* Build a plan for broadcast hash-join.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon
case TIME_BOUNDARY_QUERY:
case GROUPING_SETS:
case WINDOW_FUNCTIONS:
case UNNEST:
return false;
case CAN_SELECT:
case CAN_INSERT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
Expand Down Expand Up @@ -136,6 +137,7 @@ public static Collection<Object[]> data()
{QUERY_RESULTS_WITH_DURABLE_STORAGE, QUERY_RESULTS_WITH_DURABLE_STORAGE_CONTEXT},
{QUERY_RESULTS_WITH_DEFAULT, QUERY_RESULTS_WITH_DEFAULT_CONTEXT}
};

return Arrays.asList(data);
}

Expand Down Expand Up @@ -2119,6 +2121,207 @@ public void testJoinUsesDifferentAlgorithm()
.verifyResults();
}

@Test
public void testSelectUnnestOnInlineFoo()
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
{
RowSignature resultSignature = RowSignature.builder()
.add("EXPR$0", ColumnType.LONG)
.build();
RowSignature outputSignature = RowSignature.builder()
.add("d", ColumnType.LONG)
.build();

final ColumnMappings expectedColumnMappings = new ColumnMappings(
ImmutableList.of(
new ColumnMapping("EXPR$0", "d")
)
);

testSelectQuery()
.setSql("select d from UNNEST(ARRAY[1,2,3]) as unnested(d)")
.setExpectedMSQSpec(
MSQSpec.builder()
.query(newScanQueryBuilder()
.dataSource(
InlineDataSource.fromIterable(
ImmutableList.of(
new Object[]{1L},
new Object[]{2L},
new Object[]{3L}
),
resultSignature
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("EXPR$0")
.context(defaultScanQueryContext(
context,
resultSignature
))
.build())
.columnMappings(expectedColumnMappings)
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(isDurableStorageDestination()
? DurableStorageMSQDestination.INSTANCE
: TaskReportMSQDestination.INSTANCE)
.build()
)
.setExpectedRowSignature(outputSignature)
.setQueryContext(context)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1},
new Object[]{2},
new Object[]{3}
))
.verifyResults();
}


@Test
public void testSelectUnnestOnFoo()
{
RowSignature resultSignature = RowSignature.builder()
.add("j0.unnest", ColumnType.STRING)
.build();

RowSignature outputSignature = RowSignature.builder()
.add("d3", ColumnType.STRING)
.build();

final ColumnMappings expectedColumnMappings = new ColumnMappings(
ImmutableList.of(
new ColumnMapping("j0.unnest", "d3")
)
);

testSelectQuery()
.setSql("SELECT d3 FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)")
.setExpectedMSQSpec(
MSQSpec.builder()
.query(newScanQueryBuilder()
.dataSource(UnnestDataSource.create(
new TableDataSource(CalciteTests.DATASOURCE1),
expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
null
))
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(defaultScanQueryContext(
context,
resultSignature
))
.columns(ImmutableList.of("j0.unnest"))
.build())
.columnMappings(expectedColumnMappings)
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(isDurableStorageDestination()
? DurableStorageMSQDestination.INSTANCE
: TaskReportMSQDestination.INSTANCE)
.build()
)
.setExpectedRowSignature(outputSignature)
.setQueryContext(context)
.setExpectedResultRows(
useDefault ? ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"},
new Object[]{"b"},
new Object[]{"c"},
new Object[]{"d"},
new Object[]{""},
new Object[]{""},
new Object[]{""}
) : ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"},
new Object[]{"b"},
new Object[]{"c"},
new Object[]{"d"},
new Object[]{""},
new Object[]{null},
new Object[]{null}
))
.verifyResults();
}

@Test
public void testSelectUnnestOnQueryFoo()
{
RowSignature resultSignature = RowSignature.builder()
.add("j0.unnest", ColumnType.STRING)
.build();

RowSignature resultSignature1 = RowSignature.builder()
.add("dim3", ColumnType.STRING)
.build();

RowSignature outputSignature = RowSignature.builder()
.add("d3", ColumnType.STRING)
.build();

final ColumnMappings expectedColumnMappings = new ColumnMappings(
ImmutableList.of(
new ColumnMapping("j0.unnest", "d3")
)
);

testSelectQuery()
.setSql("SELECT d3 FROM (select * from druid.foo where dim2='a' LIMIT 10), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)")
.setExpectedMSQSpec(
MSQSpec.builder()
.query(newScanQueryBuilder()
.dataSource(UnnestDataSource.create(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(
new TableDataSource(CalciteTests.DATASOURCE1)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.filters(equality("dim2", "a", ColumnType.STRING))
.columns("dim3")
.context(defaultScanQueryContext(
context,
resultSignature1
))
.limit(10)
.build()
),
expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
null
))
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(defaultScanQueryContext(
context,
resultSignature
))
.columns(ImmutableList.of("j0.unnest"))
.build())
.columnMappings(expectedColumnMappings)
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(isDurableStorageDestination()
? DurableStorageMSQDestination.INSTANCE
: TaskReportMSQDestination.INSTANCE)
.build()
)
.setExpectedRowSignature(outputSignature)
.setQueryContext(context)
.setExpectedResultRows(
useDefault ? ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"}
) : ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"},
new Object[]{""}
))
.verifyResults();
}

@Nonnull
private List<Object[]> expectedMultiValueFooRowsGroup()
{
Expand Down
Loading