-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MSQ window functions: Pass only WindowOperatorFactory for window stage definition #16781
MSQ window functions: Pass only WindowOperatorFactory for window stage definition #16781
Conversation
// Sorting and partitioning are expected to be handled by the shuffle spec of the previous stage. | ||
final List<OperatorFactory> windowOperatorList = operatorList.get(i) | ||
.stream() | ||
.filter(operator -> operator instanceof WindowOperatorFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not necessarily true - I don't believe in building an infrastrucutre which could only do partitioning/shuffle in-between stages
there are benefits of further partitioning an existing sort; and that's rather cheap....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we are passing incorrect shuffle spec, and then explicitly passing sort+partition operators. So we're doing unnecessary additional work.
This PR fixes the shuffle spec, and removes the sort+partition operators.
there are benefits of further partitioning an existing sort; and that's rather cheap....
Right now, MSQ creates the stages based on the list of operator factories we get from Windowing.java
layer. So logic change for optimisation like this should happen there IIUC? So it feels unrelated to this PR to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct me if I'm wrong though, thanks!
I don't fully understand from the above description how it supposed to happen.... |
@kgyrtkirk
A lot of the existing tests failed with only (1). They pass with (1) + (2). Hence existing tests suffice. So it's not a user-facing bug. But it's an incorrect logic which gets uncovered by (1), hence I called it a bug in the PR description. Happy to rephrase it if that helps. |
List<KeyColumn> keyColsOfWindow = new ArrayList<>(); | ||
for (String partitionColumn : partition.getPartitionColumns()) { | ||
for (ColumnWithDirection sortColumn : sort.getSortColumns()) { | ||
KeyColumn kc; | ||
if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) { | ||
kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING); | ||
if (sortColumn.getDirection() == ColumnWithDirection.Direction.DESC) { | ||
kc = new KeyColumn(sortColumn.getColumn(), KeyOrder.DESCENDING); | ||
} else { | ||
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING); | ||
kc = new KeyColumn(sortColumn.getColumn(), KeyOrder.ASCENDING); | ||
} | ||
keyColsOfWindow.add(kc); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems to me that the data will be ClusterBy
on keyColsOfWindow
which is based on the sortColumns
; wouldn't that mean that different values of the sorted values may land on different workers?
its so hard to work with this without seeing the plan...I don't even know which class is the api to configure sort/partitioning in msq in-between stages - it seems to me that would be ShuffleSpec
?
from what I see for results for SortOperatorFactory
in the WindowOperatorQueryKit
class; I think you may not remove any sort operators; as that's not handled correctly in-between stages
I believe the old approach to set this according to the partitionColumns
was the right approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the old approach to set this according to the partitionColumns was the right approach
@kgyrtkirk The thing is that Sort operator (passed by Windowing.java to MSQ) already contains the partition by
column.
@kgyrtkirk I'm closing this PR as the logic is incorrect as per offline discussion. Also, this change seems irrelevant now based on our latest offline discussion. |
Description
Currently, we are passing all operator factories (NaiveSort, NaivePartition, Window) in the list of operator factories for window stage definition.
Ideally, we shouldn’t have to pass anything except the window factories, since sorting and partitioning are expected to be handled by the shuffle spec of the previous stage.
This PR makes the change to pass only the window operator factories for window stage definition. Making this change unraveled a bug where the logic for finding shuffle spec for next window stage was incorrect.
Description of the above-mentioned bug: For finding shuffle spec, we shouldn't be filtering out based on the partition columns. For queries having a window function like:
We get the SortOperator with both columns (
array[1,2,length(cityName)]
,countryName
), and we get the partition operator with onlyarray[1,2,length(cityName)]
. With the current logic, we would've gotten rid of thecountryName
from the shuffle spec, as it wasn't in the partition factory --- which is incorrect.The reason this wasn't broken in the current code (and gets unraveled with this PR's code changes) is that we were explicitly passing the original sort and partitioning operators in the window stage definition, hence overriding the behavior of the above incorrectly evaluated shuffle spec.
This PR has: