-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MSQ window functions: Pass only WindowOperatorFactory for window stage definition #16781
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,9 +47,8 @@ | |
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery> | ||
{ | ||
|
@@ -230,6 +229,13 @@ public QueryDefinition makeQueryDefinition( | |
partitionColumnNames | ||
); | ||
|
||
// We need to pass only the window factories for the window stage definition. | ||
// Sorting and partitioning are expected to be handled by the shuffle spec of the previous stage. | ||
final List<OperatorFactory> windowOperatorList = operatorList.get(i) | ||
.stream() | ||
.filter(operator -> operator instanceof WindowOperatorFactory) | ||
.collect(Collectors.toList()); | ||
|
||
queryDefBuilder.add( | ||
StageDefinition.builder(firstStageNumber + i) | ||
.inputs(new StageInputSpec(firstStageNumber + i - 1)) | ||
|
@@ -238,7 +244,7 @@ public QueryDefinition makeQueryDefinition( | |
.shuffleSpec(nextShuffleSpec) | ||
.processorFactory(new WindowOperatorQueryFrameProcessorFactory( | ||
queryToRun, | ||
operatorList.get(i), | ||
windowOperatorList, | ||
stageRowSignature, | ||
maxRowsMaterialized, | ||
partitionColumnNames | ||
|
@@ -302,26 +308,19 @@ private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> operatorF | |
} | ||
} | ||
|
||
Map<String, ColumnWithDirection.Direction> sortColumnsMap = new HashMap<>(); | ||
if (sort != null) { | ||
for (ColumnWithDirection sortColumn : sort.getSortColumns()) { | ||
sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection()); | ||
} | ||
} | ||
|
||
if (partition == null || partition.getPartitionColumns().isEmpty()) { | ||
// If operatorFactories doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage. | ||
// This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling. | ||
if (partition == null || partition.getPartitionColumns().isEmpty() || sort == null || sort.getSortColumns().isEmpty()) { | ||
// If operatorFactories doesn't have any partitioning or sorting factory, then we should keep the shuffle spec from previous stage. | ||
// This indicates that we already have the data partitioned and sorted correctly, and hence we don't need to do any shuffling. | ||
return null; | ||
} | ||
|
||
List<KeyColumn> keyColsOfWindow = new ArrayList<>(); | ||
for (String partitionColumn : partition.getPartitionColumns()) { | ||
for (ColumnWithDirection sortColumn : sort.getSortColumns()) { | ||
KeyColumn kc; | ||
if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) { | ||
kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING); | ||
if (sortColumn.getDirection() == ColumnWithDirection.Direction.DESC) { | ||
kc = new KeyColumn(sortColumn.getColumn(), KeyOrder.DESCENDING); | ||
} else { | ||
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING); | ||
kc = new KeyColumn(sortColumn.getColumn(), KeyOrder.ASCENDING); | ||
} | ||
keyColsOfWindow.add(kc); | ||
} | ||
Comment on lines
317
to
326
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems to me that the data will be its so hard to work with this without seeing the plan...I don't even know which class is the api to configure sort/partitioning in msq in-between stages - it seems to me that would be from what I see for results for I believe the old approach to set this according to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@kgyrtkirk The thing is that Sort operator (passed by Windowing.java to MSQ) already contains the |
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not necessarily true - I don't believe in building an infrastrucutre which could only do partitioning/shuffle in-between stages
there are benefits of further partitioning an existing sort; and that's rather cheap....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we are passing incorrect shuffle spec, and then explicitly passing sort+partition operators. So we're doing unnecessary additional work.
This PR fixes the shuffle spec, and removes the sort+partition operators.
Right now, MSQ creates the stages based on the list of operator factories we get from
Windowing.java
layer. So logic change for optimisation like this should happen there IIUC? So it feels unrelated to this PR to me.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct me if I'm wrong though, thanks!