Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSQ WF: Batch multiple PARTITION BY keys for processing #16823

Conversation

Akshat-Jain
Copy link
Contributor

@Akshat-Jain Akshat-Jain commented Jul 30, 2024

Description

Currently, if we have a query with window function having PARTITION BY xyz, and we have a million unique values for xyz each having 1 row, we'd end up creating a million individual RACs for processing, each having a single row. This is unnecessary, and we can batch the PARTITION BY keys together for processing, and process them only when we can't batch further rows to adhere to maxRowsMaterialized config.

The previous iteration of this PR was simplifying WindowOperatorQueryFrameProcessor to run all operators on all the rows instead of creating smaller RACs per partition by key. That approach was discarded in favor of the batching approach, and the details are summarized here: #16823 (comment).


Key changed/added classes in this PR
  • WindowOperatorQueryFrameProcessor

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Jul 30, 2024
* @throws IOException
*/
private long flushFrameWriter() throws IOException
private void flushFrameWriter() throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runIncrementally()'s contract states that we should emit a single frame and yield at a time. From what I understand, method is called in a loop and we emit all the rows at once. We should update this implementation to be more inline with its contract. This needn't be addressed immediately since the logic has existed for a while, but we should definitely clean this up.

@Akshat-Jain
Copy link
Contributor Author

Update: We had a discussion with @gianm and @cryptoe about this. We have 2 possible approaches to go with:

  1. One option is to keep the changes of this PR (create a single RAC per MSQ partition, instead of 1 RAC per PARTITION BY key), and use GlobalSortTargetSizeShuffleSpec with targetSize=(Limits#MAX_ROWS_MATERIALIZED_IN_WINDOW) / 2 as an attempt to tune the number of partitions received by the workers in window stage, such that MSQ partitions received by workers in window stages contain less than the limit (100k rows) whenever possible.
  2. The problem with current implementation is that we're creating 1 RAC per PARTITION BY key. The suggestion was to batch multiple of these RACs together for processing in the WindowOperatorQueryFrameProcessor layer, and process them only when the "currentRac" has more than some threshold numRows. (essentially having 1 RAC per N PARTITION BY keys such that each RAC is sized closer to the limit)

The conclusion was to go with approach (2). With approach (1), beyond a certain point, we would also hit a TooManyPartitions error (if we have more than 25000 partitions, i.e., if we're using 50k target size per partition, that is we have more than 1.25 billion rows). Doing a large number of partitions at the MSQ framework level will also have its own performance implication (each partition has its own set of files, etc).

Hence, this PR needs to be worked upon, and I'm marking this PR as a draft PR until it's ready for review with the changes for approach (2) again.

@Akshat-Jain Akshat-Jain marked this pull request as draft August 16, 2024 04:26
@Akshat-Jain Akshat-Jain force-pushed the simplify-WindowOperatorQueryFrameProcessor branch from 8a45c23 to 4cd6ff2 Compare August 20, 2024 07:56
@Akshat-Jain Akshat-Jain changed the title Simplify WindowOperatorQueryFrameProcessor to run all operators on entire partition MSQ WF: Batch multiple PARTITION BY keys for processing Aug 20, 2024
@Akshat-Jain Akshat-Jain marked this pull request as ready for review August 20, 2024 08:04
@Akshat-Jain Akshat-Jain force-pushed the simplify-WindowOperatorQueryFrameProcessor branch from 8954758 to cca4904 Compare August 21, 2024 06:19
@Akshat-Jain Akshat-Jain force-pushed the simplify-WindowOperatorQueryFrameProcessor branch from cca4904 to 184a7f0 Compare August 21, 2024 06:19
@kgyrtkirk kgyrtkirk self-requested a review August 22, 2024 10:54
@Akshat-Jain Akshat-Jain requested a review from cryptoe August 23, 2024 09:42
Copy link
Contributor

@LakshSingla LakshSingla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going ahead with this approach, we should add some test cases to verify the partitioning logic in the WindowOperatorFrameProcessor. You can reference the SortMergeJoinFrameProcessor on how to create input channels for test cases and output channels for assertions. It should be done with a smaller maxRowsMaterializedInWindow (say 2 or 3) along with tests that verify all the edge cases.

That would make future changes to this code verifiable, and easy to reason about

@Akshat-Jain Akshat-Jain requested a review from cryptoe August 26, 2024 06:43
Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sublist approach seems the cleanest. Thanks. @Akshat-Jain Lets get the glueing operator work as part of a follow up PR for this.

@cryptoe
Copy link
Contributor

cryptoe commented Aug 27, 2024

@kgyrtkirk Should we go ahead and merge this and then raise a follow up PR for the glueing operator stuff.

@kgyrtkirk
Copy link
Member

sure; but I still feel like this was an unecessary side-step...as these will be just removed later
either by using GlobalSortTargetSizeShuffleSpec#targetSize and/or an operator based approach

note: a benchmark would be nice to help keep track how performant these queries are

@adarshsanjeev adarshsanjeev merged commit fbd305a into apache:master Aug 28, 2024
94 checks passed
hevansDev pushed a commit to hevansDev/druid that referenced this pull request Aug 29, 2024
Currently, if we have a query with window function having PARTITION BY xyz, and we have a million unique values for xyz each having 1 row, we'd end up creating a million individual RACs for processing, each having a single row. This is unnecessary, and we can batch the PARTITION BY keys together for processing, and process them only when we can't batch further rows to adhere to maxRowsMaterialized config.

The previous iteration of this PR was simplifying WindowOperatorQueryFrameProcessor to run all operators on all the rows instead of creating smaller RACs per partition by key. That approach was discarded in favor of the batching approach, and the details are summarized here: apache#16823 (comment).
edgar2020 pushed a commit to edgar2020/druid that referenced this pull request Sep 5, 2024
Currently, if we have a query with window function having PARTITION BY xyz, and we have a million unique values for xyz each having 1 row, we'd end up creating a million individual RACs for processing, each having a single row. This is unnecessary, and we can batch the PARTITION BY keys together for processing, and process them only when we can't batch further rows to adhere to maxRowsMaterialized config.

The previous iteration of this PR was simplifying WindowOperatorQueryFrameProcessor to run all operators on all the rows instead of creating smaller RACs per partition by key. That approach was discarded in favor of the batching approach, and the details are summarized here: apache#16823 (comment).
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants