Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSQ window functions: Fix query correctness issues when using multiple workers #16804

Merged

Conversation

Akshat-Jain
Copy link
Contributor

@Akshat-Jain Akshat-Jain commented Jul 25, 2024

Description

This PR fixes query correctness issues for MSQ window functions when using more than 1 worker (that is, maxNumTasks > 2).

Currently, we were keeping the shuffle spec of the previous stage when we didn't have any partition columns for window stage. This PR changes it to override the shuffle spec of the previous stage to MixShuffleSpec (if we have a window function with empty over clause) so that the window stage gets a single partition to work on.

A test has been added for a query which returned incorrect results prior to this change when using more than 1 workers.

Sample Query

select countryName, cityName, channel, 
row_number() over (order by countryName, cityName, channel) as c1,
count(channel) over (partition by cityName order by countryName, cityName, channel) as c2
from "wikipedia"
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel 
order by countryName, cityName, channel

Currently, the above query gives incorrect results with maxNumTasks=5 as can be seen in the following screenshot.

image

With this PR's changes, the above query gives correct results, as can be seen in the added test.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Jul 25, 2024

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers(String contextName, Map<String, Object> context)

Check notice

Code scanning / CodeQL

Useless parameter Note test

The parameter 'contextName' is never used.
Comment on lines +2207 to +2210
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(251).frames(1),
1, 0, "input0"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what does it matter if a check like this fails?
does that mean we will no longer process window functions correctly with multiple workers?
...or it might break if we process it a bit differently...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kgyrtkirk I added the assertion for counters as per #16804 (comment).

@Akshat-Jain Akshat-Jain requested a review from kgyrtkirk July 31, 2024 09:52
stageDef.getSignature(),
shuffleSpec.clusterBy().getColumns()
);
queryDefBuilder.add(StageDefinition.builder(stageDef).shuffleSpec(shuffleSpec).signature(rowSignature));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be 100% sure that we are not causing correctness issues; can we validate if we are ok to override the old shuffleSpec ? if its not null or something we could allow we should preferably throw an Exception ( or if that's not really possibly the safest would be to add a new dummy stage which just re-shuffles?)

I guess the cases when its not safe to do so may need further investigation(s) - as those shuffles could possibly be moved "after" the window query....
I see the cases of clusterBy as something which should probably wrap-around the full built query regardless if its Scan / GroupBy / Window / "anything" ; but that could be a refactor of its own - which may close that gap for window queries as well.

note: I think if we don't handle clusterBy in this class then writing a windowed query to files might lead to unexpected results; but I guess that's not really a problem right now :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we throw error when the shuffleSpec is non-null for the stage that's getting overridden, a lot of tests fail in MSQDrillWindowQueryTest: 754 tests failed, 114 tests passed.

We can't throw an error when it's non-null 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it is entirely possible that the final old shuffle spec contains non null shuffling. When creating the previous stages, it is the windowQueryKit which gives hints on what final it wants the shuffling to be based on resultShuffleSpecFactory parameter. Currently, this is globalSortWithMaxPartitionCount (if this was a normal subquery, we would want it to be running on as many workers as that stage allows).

I think it might be better to change the resultShuffleSpec we pass from windowQueryKit, and then do an assert on the shuffle spec.

@kgyrtkirk Do you know of any example query which might cause issues if we change the shuffle spec? I have thought about it a bit, but I can't think of one. Since it is window function which is reading the results of this shuffle, and it does not care about the ordering (I'm assuming this is the case since we want to change it to mixShuffleSpec), there should not be an issue with this change, from what I can tell, but I might have to think more about this.

Copy link
Member

@kgyrtkirk kgyrtkirk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reasons last shuffle is non-null is a bit outside the scope here ; it does make things a little bit better
+1

@adarshsanjeev adarshsanjeev merged commit c3aa033 into apache:master Aug 6, 2024
88 checks passed
@adarshsanjeev
Copy link
Contributor

@Akshat-Jain, could you make a follow up PR to handle this: https://github.com/apache/druid/pull/16804/files#r1700096766?
Essentially, perhaps we should have a separate org.apache.druid.msq.querykit.DataSourcePlan#forQuery, which passes a result shuffle spec factory which encapsulates the logic of choosing the shuffle spec. We could then change the overriding of shuffle spec to a validation exception instead.
cc: @cryptoe

@Akshat-Jain
Copy link
Contributor Author

@adarshsanjeev But how would the GroupBy/Scan query kits know the ClusterBy columns (needed for the 1st window stage) with which to use the resultShuffleSpecFactory with?

@adarshsanjeev
Copy link
Contributor

Do you mean something that is available in the windowQueryKit only? That information could be passed while creating the shuffleSpecFactory.

@Akshat-Jain
Copy link
Contributor Author

@adarshsanjeev How can we pass ClusterBy from WindowOperatorQueryKit to the other query kits via ShuffleSpecFactory? 🤔

@Akshat-Jain
Copy link
Contributor Author

Akshat-Jain commented Aug 7, 2024

@adarshsanjeev Also, the ShuffleSpecFactory we pass to other query kits (from WindowOperatorQueryKit) will keep getting propagated to further query kits. But that doesn't seem right? We only want it propagated and taken into consideration through 1 layer of query kits, not all of them?

Edit: Nvm, I guess that can be avoided by having 2 implementations of forDataSource() method. One with a ShuffleSpecFactory, one without it.

@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
@Akshat-Jain Akshat-Jain mentioned this pull request Oct 8, 2024
1 task
@317brian 317brian added the Bug label Oct 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants