-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MSQ window functions: Fix query correctness issues when using multiple workers #16804
MSQ window functions: Fix query correctness issues when using multiple workers #16804
Conversation
|
||
@MethodSource("data") | ||
@ParameterizedTest(name = "{index}:with context {0}") | ||
public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers(String contextName, Map<String, Object> context) |
Check notice
Code scanning / CodeQL
Useless parameter Note test
...re/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
Show resolved
Hide resolved
...ns-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
Outdated
Show resolved
Hide resolved
...ns-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
Outdated
Show resolved
Hide resolved
...ns-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
Outdated
Show resolved
Hide resolved
.setExpectedCountersForStageWorkerChannel( | ||
CounterSnapshotMatcher.with().rows(4).bytes(251).frames(1), | ||
1, 0, "input0" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what does it matter if a check like this fails?
does that mean we will no longer process window functions correctly with multiple workers?
...or it might break if we process it a bit differently...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kgyrtkirk I added the assertion for counters as per #16804 (comment).
...re/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
Outdated
Show resolved
Hide resolved
...re/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
Outdated
Show resolved
Hide resolved
...re/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
Outdated
Show resolved
Hide resolved
stageDef.getSignature(), | ||
shuffleSpec.clusterBy().getColumns() | ||
); | ||
queryDefBuilder.add(StageDefinition.builder(stageDef).shuffleSpec(shuffleSpec).signature(rowSignature)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be 100% sure that we are not causing correctness issues; can we validate if we are ok to override the old shuffleSpec
? if its not null or something we could allow we should preferably throw an Exception ( or if that's not really possibly the safest would be to add a new dummy stage which just re-shuffles?)
I guess the cases when its not safe to do so may need further investigation(s) - as those shuffles could possibly be moved "after" the window query....
I see the cases of clusterBy
as something which should probably wrap-around the full built query regardless if its Scan
/ GroupBy
/ Window
/ "anything"
; but that could be a refactor of its own - which may close that gap for window queries as well.
note: I think if we don't handle clusterBy
in this class then writing a windowed query to files might lead to unexpected results; but I guess that's not really a problem right now :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we throw error when the shuffleSpec is non-null for the stage that's getting overridden, a lot of tests fail in MSQDrillWindowQueryTest: 754 tests failed, 114 tests passed.
We can't throw an error when it's non-null 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, it is entirely possible that the final old shuffle spec contains non null shuffling. When creating the previous stages, it is the windowQueryKit which gives hints on what final it wants the shuffling to be based on resultShuffleSpecFactory
parameter. Currently, this is globalSortWithMaxPartitionCount
(if this was a normal subquery, we would want it to be running on as many workers as that stage allows).
I think it might be better to change the resultShuffleSpec we pass from windowQueryKit, and then do an assert on the shuffle spec.
@kgyrtkirk Do you know of any example query which might cause issues if we change the shuffle spec? I have thought about it a bit, but I can't think of one. Since it is window function which is reading the results of this shuffle, and it does not care about the ordering (I'm assuming this is the case since we want to change it to mixShuffleSpec), there should not be an issue with this change, from what I can tell, but I might have to think more about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the reasons last shuffle is non-null is a bit outside the scope here ; it does make things a little bit better
+1
...g/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java
Show resolved
Hide resolved
...re/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
Show resolved
Hide resolved
@Akshat-Jain, could you make a follow up PR to handle this: https://github.com/apache/druid/pull/16804/files#r1700096766? |
@adarshsanjeev But how would the GroupBy/Scan query kits know the ClusterBy columns (needed for the 1st window stage) with which to use the resultShuffleSpecFactory with? |
Do you mean something that is available in the windowQueryKit only? That information could be passed while creating the shuffleSpecFactory. |
@adarshsanjeev How can we pass ClusterBy from WindowOperatorQueryKit to the other query kits via ShuffleSpecFactory? 🤔 |
@adarshsanjeev Also, the ShuffleSpecFactory we pass to other query kits (from WindowOperatorQueryKit) will keep getting propagated to further query kits. But that doesn't seem right? We only want it propagated and taken into consideration through 1 layer of query kits, not all of them? Edit: Nvm, I guess that can be avoided by having 2 implementations of |
Description
This PR fixes query correctness issues for MSQ window functions when using more than 1 worker (that is,
maxNumTasks > 2
).Currently, we were keeping the shuffle spec of the previous stage when we didn't have any partition columns for window stage. This PR changes it to override the shuffle spec of the previous stage to MixShuffleSpec (if we have a window function with empty over clause) so that the window stage gets a single partition to work on.
A test has been added for a query which returned incorrect results prior to this change when using more than 1 workers.
Sample Query
Currently, the above query gives incorrect results with maxNumTasks=5 as can be seen in the following screenshot.
With this PR's changes, the above query gives correct results, as can be seen in the added test.
This PR has: