-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MSQ WF: Batch multiple PARTITION BY keys for processing #16823
MSQ WF: Batch multiple PARTITION BY keys for processing #16823
Conversation
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Outdated
Show resolved
Hide resolved
* @throws IOException | ||
*/ | ||
private long flushFrameWriter() throws IOException | ||
private void flushFrameWriter() throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
runIncrementally()
's contract states that we should emit a single frame and yield at a time. From what I understand, method is called in a loop and we emit all the rows at once. We should update this implementation to be more inline with its contract. This needn't be addressed immediately since the logic has existed for a while, but we should definitely clean this up.
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Outdated
Show resolved
Hide resolved
Update: We had a discussion with @gianm and @cryptoe about this. We have 2 possible approaches to go with:
The conclusion was to go with approach (2). With approach (1), beyond a certain point, we would also hit a Hence, this PR needs to be worked upon, and I'm marking this PR as a draft PR until it's ready for review with the changes for approach (2) again. |
8a45c23
to
4cd6ff2
Compare
8954758
to
cca4904
Compare
cca4904
to
184a7f0
Compare
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are going ahead with this approach, we should add some test cases to verify the partitioning logic in the WindowOperatorFrameProcessor
. You can reference the SortMergeJoinFrameProcessor
on how to create input channels for test cases and output channels for assertions. It should be done with a smaller maxRowsMaterializedInWindow (say 2 or 3) along with tests that verify all the edge cases.
That would make future changes to this code verifiable, and easy to reason about
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Show resolved
Hide resolved
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Show resolved
Hide resolved
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Outdated
Show resolved
Hide resolved
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sublist approach seems the cleanest. Thanks. @Akshat-Jain Lets get the glueing operator work as part of a follow up PR for this.
@kgyrtkirk Should we go ahead and merge this and then raise a follow up PR for the glueing operator stuff. |
sure; but I still feel like this was an unecessary side-step...as these will be just removed later note: a benchmark would be nice to help keep track how performant these queries are |
Currently, if we have a query with window function having PARTITION BY xyz, and we have a million unique values for xyz each having 1 row, we'd end up creating a million individual RACs for processing, each having a single row. This is unnecessary, and we can batch the PARTITION BY keys together for processing, and process them only when we can't batch further rows to adhere to maxRowsMaterialized config. The previous iteration of this PR was simplifying WindowOperatorQueryFrameProcessor to run all operators on all the rows instead of creating smaller RACs per partition by key. That approach was discarded in favor of the batching approach, and the details are summarized here: apache#16823 (comment).
Currently, if we have a query with window function having PARTITION BY xyz, and we have a million unique values for xyz each having 1 row, we'd end up creating a million individual RACs for processing, each having a single row. This is unnecessary, and we can batch the PARTITION BY keys together for processing, and process them only when we can't batch further rows to adhere to maxRowsMaterialized config. The previous iteration of this PR was simplifying WindowOperatorQueryFrameProcessor to run all operators on all the rows instead of creating smaller RACs per partition by key. That approach was discarded in favor of the batching approach, and the details are summarized here: apache#16823 (comment).
Description
Currently, if we have a query with window function having
PARTITION BY xyz
, and we have a million unique values forxyz
each having 1 row, we'd end up creating a million individual RACs for processing, each having a single row. This is unnecessary, and we can batch the PARTITION BY keys together for processing, and process them only when we can't batch further rows to adhere tomaxRowsMaterialized
config.The previous iteration of this PR was simplifying
WindowOperatorQueryFrameProcessor
to run all operators on all the rows instead of creating smaller RACs perpartition by
key. That approach was discarded in favor of the batching approach, and the details are summarized here: #16823 (comment).Key changed/added classes in this PR
WindowOperatorQueryFrameProcessor
This PR has: