Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSQ window functions: Pass only WindowOperatorFactory for window stage definition #16781

Conversation

Akshat-Jain
Copy link
Contributor

Description

Currently, we are passing all operator factories (NaiveSort, NaivePartition, Window) in the list of operator factories for window stage definition.

Ideally, we shouldn’t have to pass anything except the window factories, since sorting and partitioning are expected to be handled by the shuffle spec of the previous stage.

This PR makes the change to pass only the window operator factories for window stage definition. Making this change unraveled a bug where the logic for finding shuffle spec for next window stage was incorrect.

Description of the above-mentioned bug: For finding shuffle spec, we shouldn't be filtering out based on the partition columns. For queries having a window function like:

row_number() over (partition by array[1,2,length(cityName)] order by countryName) as c

We get the SortOperator with both columns (array[1,2,length(cityName)], countryName), and we get the partition operator with only array[1,2,length(cityName)]. With the current logic, we would've gotten rid of the countryName from the shuffle spec, as it wasn't in the partition factory --- which is incorrect.

The reason this wasn't broken in the current code (and gets unraveled with this PR's code changes) is that we were explicitly passing the original sort and partitioning operators in the window stage definition, hence overriding the behavior of the above incorrectly evaluated shuffle spec.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Jul 23, 2024
// Sorting and partitioning are expected to be handled by the shuffle spec of the previous stage.
final List<OperatorFactory> windowOperatorList = operatorList.get(i)
.stream()
.filter(operator -> operator instanceof WindowOperatorFactory)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not necessarily true - I don't believe in building an infrastrucutre which could only do partitioning/shuffle in-between stages

there are benefits of further partitioning an existing sort; and that's rather cheap....

Copy link
Contributor Author

@Akshat-Jain Akshat-Jain Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we are passing incorrect shuffle spec, and then explicitly passing sort+partition operators. So we're doing unnecessary additional work.
This PR fixes the shuffle spec, and removes the sort+partition operators.

there are benefits of further partitioning an existing sort; and that's rather cheap....

Right now, MSQ creates the stages based on the list of operator factories we get from Windowing.java layer. So logic change for optimisation like this should happen there IIUC? So it feels unrelated to this PR to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please correct me if I'm wrong though, thanks!

@kgyrtkirk
Copy link
Member

Description of the above-mentioned bug: For finding shuffle spec, we shouldn't be filtering out based on the partition columns. For queries having a window function like:

I don't fully understand from the above description how it supposed to happen....
but if this is a bug ; then it should contain a testcase ! could you please add it?

@Akshat-Jain
Copy link
Contributor Author

I don't fully understand from the above description how it supposed to happen....
but if this is a bug ; then it should contain a testcase ! could you please add it?

@kgyrtkirk
As discussed offline, this PR has 2 parts:

  1. Change to pass only window operator factories
  2. Change in the logic to find shuffle spec

A lot of the existing tests failed with only (1). They pass with (1) + (2). Hence existing tests suffice.

So it's not a user-facing bug. But it's an incorrect logic which gets uncovered by (1), hence I called it a bug in the PR description. Happy to rephrase it if that helps.

Comment on lines 317 to 326
List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (String partitionColumn : partition.getPartitionColumns()) {
for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
KeyColumn kc;
if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) {
kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
if (sortColumn.getDirection() == ColumnWithDirection.Direction.DESC) {
kc = new KeyColumn(sortColumn.getColumn(), KeyOrder.DESCENDING);
} else {
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
kc = new KeyColumn(sortColumn.getColumn(), KeyOrder.ASCENDING);
}
keyColsOfWindow.add(kc);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to me that the data will be ClusterBy on keyColsOfWindow which is based on the sortColumns ; wouldn't that mean that different values of the sorted values may land on different workers?

its so hard to work with this without seeing the plan...I don't even know which class is the api to configure sort/partitioning in msq in-between stages - it seems to me that would be ShuffleSpec?

from what I see for results for SortOperatorFactory in the WindowOperatorQueryKit class; I think you may not remove any sort operators; as that's not handled correctly in-between stages

I believe the old approach to set this according to the partitionColumns was the right approach

Copy link
Contributor Author

@Akshat-Jain Akshat-Jain Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the old approach to set this according to the partitionColumns was the right approach

@kgyrtkirk The thing is that Sort operator (passed by Windowing.java to MSQ) already contains the partition by column.

@Akshat-Jain
Copy link
Contributor Author

@kgyrtkirk I'm closing this PR as the logic is incorrect as per offline discussion. Also, this change seems irrelevant now based on our latest offline discussion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants