-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unnest now works on MSQ #14886
Unnest now works on MSQ #14886
Changes from 9 commits
416a139
5037157
5916cfd
eb024d0
d3c1e76
b4a8c62
96bbaa4
a3599b7
fab0425
b17186e
31f283b
be06d35
12bd9fa
5096be3
e249736
79c1cd4
1dd7933
53db2ff
948668c
8d33ed9
a4659d3
7a30bfc
d942772
d0a423b
7119ed0
4588e30
3b2bdfb
32dbeba
ef2424f
77905d4
d4a6c7c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -44,12 +44,14 @@ | |||||
import org.apache.druid.msq.kernel.StageDefinitionBuilder; | ||||||
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory; | ||||||
import org.apache.druid.query.DataSource; | ||||||
import org.apache.druid.query.FilteredDataSource; | ||||||
import org.apache.druid.query.InlineDataSource; | ||||||
import org.apache.druid.query.JoinDataSource; | ||||||
import org.apache.druid.query.LookupDataSource; | ||||||
import org.apache.druid.query.QueryContext; | ||||||
import org.apache.druid.query.QueryDataSource; | ||||||
import org.apache.druid.query.TableDataSource; | ||||||
import org.apache.druid.query.UnnestDataSource; | ||||||
import org.apache.druid.query.filter.DimFilter; | ||||||
import org.apache.druid.query.planning.DataSourceAnalysis; | ||||||
import org.apache.druid.query.planning.PreJoinableClause; | ||||||
|
@@ -137,6 +139,29 @@ public static DataSourcePlan forDataSource( | |||||
} else if (dataSource instanceof LookupDataSource) { | ||||||
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); | ||||||
return forLookup((LookupDataSource) dataSource, broadcast); | ||||||
} else if (dataSource instanceof FilteredDataSource) { | ||||||
return forFilteredDataSource( | ||||||
queryKit, | ||||||
queryId, | ||||||
queryContext, | ||||||
(FilteredDataSource) dataSource, | ||||||
querySegmentSpec, | ||||||
maxWorkerCount, | ||||||
minStageNumber, | ||||||
broadcast | ||||||
); | ||||||
} else if (dataSource instanceof UnnestDataSource) { | ||||||
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); | ||||||
return forUnnest( | ||||||
queryKit, | ||||||
queryId, | ||||||
queryContext, | ||||||
(UnnestDataSource) dataSource, | ||||||
querySegmentSpec, | ||||||
maxWorkerCount, | ||||||
minStageNumber, | ||||||
broadcast | ||||||
); | ||||||
} else if (dataSource instanceof QueryDataSource) { | ||||||
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); | ||||||
return forQuery( | ||||||
|
@@ -347,6 +372,91 @@ private static DataSourcePlan forQuery( | |||||
); | ||||||
} | ||||||
|
||||||
private static DataSourcePlan forFilteredDataSource( | ||||||
final QueryKit queryKit, | ||||||
final String queryId, | ||||||
final QueryContext queryContext, | ||||||
final FilteredDataSource dataSource, | ||||||
final QuerySegmentSpec querySegmentSpec, | ||||||
final int maxWorkerCount, | ||||||
final int minStageNumber, | ||||||
final boolean broadcast | ||||||
) | ||||||
{ | ||||||
final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(); | ||||||
final DataSourcePlan basePlan = forDataSource( | ||||||
queryKit, | ||||||
queryId, | ||||||
queryContext, | ||||||
dataSource.getBase(), | ||||||
querySegmentSpec, | ||||||
null, | ||||||
maxWorkerCount, | ||||||
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), | ||||||
broadcast | ||||||
); | ||||||
|
||||||
DataSource newDataSource = basePlan.getNewDataSource(); | ||||||
basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); | ||||||
|
||||||
final List<InputSpec> inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); | ||||||
|
||||||
int shift = basePlan.getInputSpecs().size(); | ||||||
newDataSource = FilteredDataSource.create(shiftInputNumbers(newDataSource, shift), dataSource.getFilter()); | ||||||
return new DataSourcePlan( | ||||||
newDataSource, | ||||||
inputSpecs, | ||||||
broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), | ||||||
somu-imply marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
subQueryDefBuilder | ||||||
); | ||||||
|
||||||
} | ||||||
|
||||||
/** | ||||||
* Build a plan for Unnest data source | ||||||
*/ | ||||||
private static DataSourcePlan forUnnest( | ||||||
final QueryKit queryKit, | ||||||
final String queryId, | ||||||
final QueryContext queryContext, | ||||||
final UnnestDataSource dataSource, | ||||||
final QuerySegmentSpec querySegmentSpec, | ||||||
final int maxWorkerCount, | ||||||
final int minStageNumber, | ||||||
final boolean broadcast | ||||||
) | ||||||
{ | ||||||
final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(); | ||||||
somu-imply marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
final DataSourcePlan basePlan = forDataSource( | ||||||
queryKit, | ||||||
queryId, | ||||||
queryContext, | ||||||
dataSource.getBase(), | ||||||
querySegmentSpec, | ||||||
null, | ||||||
maxWorkerCount, | ||||||
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wont this line add a new stage ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be fine as it won't be adding a new stage, however, this is effective
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, thinking about it, we probably didn't need to create a separate subqueryDefBuilder since we aren't adding a new stage anyway, we can pass in the same one to the recursive call which might be causing this confusion. Since there are a few changes pending, let's clean this up as well. |
||||||
broadcast | ||||||
); | ||||||
DataSource newDataSource = basePlan.getNewDataSource(); | ||||||
basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); | ||||||
somu-imply marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
final List<InputSpec> inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); | ||||||
|
||||||
int shift = basePlan.getInputSpecs().size(); | ||||||
newDataSource = UnnestDataSource.create( | ||||||
shiftInputNumbers(newDataSource, shift), | ||||||
dataSource.getVirtualColumn(), | ||||||
dataSource.getUnnestFilter() | ||||||
); | ||||||
return new DataSourcePlan( | ||||||
newDataSource, | ||||||
inputSpecs, | ||||||
broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), | ||||||
subQueryDefBuilder | ||||||
); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Build a plan for broadcast hash-join. | ||||||
*/ | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-confirm if this check is required here. I think it should be removed unless UNNEST on a table cannot have a time filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were you able to confirm if the check is required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have asked around, I'll update it once I get back the comments. I have removed it as the base data source types should take care of it, but will add it back otherwise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any updates on this? I wanted to get a final check on this before the approval