-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unnest now works on MSQ #14886
Unnest now works on MSQ #14886
Conversation
sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
Outdated
Show resolved
Hide resolved
sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
Outdated
Show resolved
Hide resolved
...sions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Left some comments.
sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
Outdated
Show resolved
Hide resolved
// An UnnestDataSource or FilteredDataSource can have a join as a base | ||
// In such a case a side channel is expected to be there | ||
if (!(dataSource instanceof JoinDataSource | ||
|| dataSource instanceof UnnestDataSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnest seems to have 2 data sources FilteredDS and UnnestDS. Could you please help me understand a bit more here.
Seems like the filter could have easily been pushed to the unnest data source. Is there are reason not to ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If we are keeping it as is, then there should be a better pattern than doing an instanceof for all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question:
What happens if we have a simple unnest query like -
SELECT * FROM tab1,UNNEST(col1)
What is expected to be in the side channel then? Seems like that they should be empty in this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, a Filtered data source is a data source with a filter. It was introduced for an issue where unnesting over a data source with a selector filter was planned as a QueryDataSource instead of a table data source. A FilteredDS is basically a base data source with a filter. It pushes a filter to the data source and not requiring a query data source in between does not need results to be materialized to the broker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, should we just check that the delegate of the FilteredDataSource
is not a JoinDataSource or an UnnestDataSource, instead of doing the check on the FilteredDataSource
as a whole?
This will help if the FilteredDataSource evolves to have a delegate that is something other than the unnest data source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I think, we just need to check if the FilteredDataSource or UnnestDataSource has a Join as its base
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be recursive which we already do. It would be good if we could cleanly merge this with the existing code path, that recurses on the data source's children, though not necessary.
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
Show resolved
Hide resolved
// An UnnestDataSource or FilteredDataSource can have a join as a base | ||
// In such a case a side channel is expected to be there | ||
if (!(dataSource instanceof JoinDataSource | ||
|| dataSource instanceof UnnestDataSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If we are keeping it as is, then there should be a better pattern than doing an instanceof for all.
// An UnnestDataSource or FilteredDataSource can have a join as a base | ||
// In such a case a side channel is expected to be there | ||
if (!(dataSource instanceof JoinDataSource | ||
|| dataSource instanceof UnnestDataSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question:
What happens if we have a simple unnest query like -
SELECT * FROM tab1,UNNEST(col1)
What is expected to be in the side channel then? Seems like that they should be empty in this case
@@ -107,7 +113,8 @@ private static Pair<List<ReadableFrameChannel>, BroadcastJoinHelper> makeInputCh | |||
inputChannels.add(baseInput.getChannel()); | |||
} | |||
|
|||
if (dataSource instanceof JoinDataSource) { | |||
|
|||
if (dataSource instanceof JoinDataSource || dataSource instanceof UnnestDataSource || dataSource instanceof FilteredDataSource) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seeing the same pattern as above, it should be a good refactor candidate.
… That way Unnest can be turned on/off for individual engine such as MSQ if needed
…engines. That way Unnest can be turned on/off for individual engine such as MSQ if needed" This reverts commit d942772.
broadcast | ||
); | ||
} else if (dataSource instanceof UnnestDataSource) { | ||
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-confirm if this check is required here. I think it should be removed unless UNNEST on a table cannot have a time filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were you able to confirm if the check is required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have asked around, I'll update it once I get back the comments. I have removed it as the base data source types should take care of it, but will add it back otherwise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any updates on this? I wanted to get a final check on this before the approval
@@ -137,6 +139,30 @@ public static DataSourcePlan forDataSource( | |||
} else if (dataSource instanceof LookupDataSource) { | |||
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); | |||
return forLookup((LookupDataSource) dataSource, broadcast); | |||
} else if (dataSource instanceof FilteredDataSource) { | |||
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the other comment about the check. Perhaps this should also be removed since we would plan the base data source, and there would be individual checks there.
See the plan for broadcast joins - The base data source doesn't enforce the eternity check, however, it is required on the clauses. I think something of that sort should be applicable in Filtered and Unnest data sources (please confirm it though)
@LakshSingla @cryptoe I have addressed the comments, added the refactoring, and also removed the context level feature to push it into individual engines. Please re-review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last minute review comments, overall LGTM.
sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
Outdated
Show resolved
Hide resolved
@@ -123,6 +124,7 @@ public static List<RelOptRule> rules(PlannerContext plannerContext) | |||
retVal.add(DruidFilterUnnestRule.DruidProjectOnUnnestRule.instance()); | |||
} | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Unnecessary
sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
Outdated
Show resolved
Hide resolved
sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left my final review. Please let me know once those are addressed. Will approve and merge.
...re/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
Outdated
Show resolved
Hide resolved
...re/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
Show resolved
Hide resolved
querySegmentSpec, | ||
null, | ||
maxWorkerCount, | ||
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wont this line add a new stage ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be fine as it won't be adding a new stage, however, this is effective Math.max(minStageNumber, 0)
since the builder here doesn't contain any stages. We can simplify it as below:
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), | |
minStageNumber, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, thinking about it, we probably didn't need to create a separate subqueryDefBuilder since we aren't adding a new stage anyway, we can pass in the same one to the recursive call which might be causing this confusion. Since there are a few changes pending, let's clean this up as well.
From a convo with @somu-imply, it seems like queries with a time filter |
Seems like the above is a problem in the native query as well. #15020 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM once @cryptoe's comments have been addressed. The major concern behind the correct optimization of the filter is present in the native stack as well (occurs while query planning), and is unrelated to enabling unnest in MSQ.
...sions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
Outdated
Show resolved
Hide resolved
...sions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
Outdated
Show resolved
Hide resolved
Addressed the comments and added the test case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes LGTM!!
This entails:
This PR has: