Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSQ window functions: Pass only WindowOperatorFactory for window stage definition #16781

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
{
Expand Down Expand Up @@ -230,6 +229,13 @@ public QueryDefinition makeQueryDefinition(
partitionColumnNames
);

// We need to pass only the window factories for the window stage definition.
// Sorting and partitioning are expected to be handled by the shuffle spec of the previous stage.
final List<OperatorFactory> windowOperatorList = operatorList.get(i)
.stream()
.filter(operator -> operator instanceof WindowOperatorFactory)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not necessarily true - I don't believe in building an infrastrucutre which could only do partitioning/shuffle in-between stages

there are benefits of further partitioning an existing sort; and that's rather cheap....

Copy link
Contributor Author

@Akshat-Jain Akshat-Jain Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we are passing incorrect shuffle spec, and then explicitly passing sort+partition operators. So we're doing unnecessary additional work.
This PR fixes the shuffle spec, and removes the sort+partition operators.

there are benefits of further partitioning an existing sort; and that's rather cheap....

Right now, MSQ creates the stages based on the list of operator factories we get from Windowing.java layer. So logic change for optimisation like this should happen there IIUC? So it feels unrelated to this PR to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please correct me if I'm wrong though, thanks!

.collect(Collectors.toList());

queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + i)
.inputs(new StageInputSpec(firstStageNumber + i - 1))
Expand All @@ -238,7 +244,7 @@ public QueryDefinition makeQueryDefinition(
.shuffleSpec(nextShuffleSpec)
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
queryToRun,
operatorList.get(i),
windowOperatorList,
stageRowSignature,
maxRowsMaterialized,
partitionColumnNames
Expand Down Expand Up @@ -302,26 +308,19 @@ private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> operatorF
}
}

Map<String, ColumnWithDirection.Direction> sortColumnsMap = new HashMap<>();
if (sort != null) {
for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
}
}

if (partition == null || partition.getPartitionColumns().isEmpty()) {
// If operatorFactories doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage.
// This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling.
if (partition == null || partition.getPartitionColumns().isEmpty() || sort == null || sort.getSortColumns().isEmpty()) {
// If operatorFactories doesn't have any partitioning or sorting factory, then we should keep the shuffle spec from previous stage.
// This indicates that we already have the data partitioned and sorted correctly, and hence we don't need to do any shuffling.
return null;
}

List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (String partitionColumn : partition.getPartitionColumns()) {
for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
KeyColumn kc;
if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) {
kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
if (sortColumn.getDirection() == ColumnWithDirection.Direction.DESC) {
kc = new KeyColumn(sortColumn.getColumn(), KeyOrder.DESCENDING);
} else {
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
kc = new KeyColumn(sortColumn.getColumn(), KeyOrder.ASCENDING);
}
keyColsOfWindow.add(kc);
}
Comment on lines 317 to 326
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to me that the data will be ClusterBy on keyColsOfWindow which is based on the sortColumns ; wouldn't that mean that different values of the sorted values may land on different workers?

its so hard to work with this without seeing the plan...I don't even know which class is the api to configure sort/partitioning in msq in-between stages - it seems to me that would be ShuffleSpec?

from what I see for results for SortOperatorFactory in the WindowOperatorQueryKit class; I think you may not remove any sort operators; as that's not handled correctly in-between stages

I believe the old approach to set this according to the partitionColumns was the right approach

Copy link
Contributor Author

@Akshat-Jain Akshat-Jain Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the old approach to set this according to the partitionColumns was the right approach

@kgyrtkirk The thing is that Sort operator (passed by Windowing.java to MSQ) already contains the partition by column.

Expand Down
Loading