Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WindowOperatorQueryFrameProcessor: Fix frame writer capacity issues + adhere to FrameProcessor's contract #17209

Merged
merged 10 commits into from
Oct 3, 2024

Conversation

Akshat-Jain
Copy link
Contributor

@Akshat-Jain Akshat-Jain commented Oct 1, 2024

Description

Currently, we had a bug in the WindowOperatorQueryFrameProcessor where the frame writer's capacity could get reached for larger queries, causing it to not output all of the result rows.

For example, the following query gives incomplete output rows when we use maxNumTasks=2, compared to when we use more workers.

select trip_id, row_number() over(partition by trip_id) as c1 from "trips_xaa" where __time < TIMESTAMP '2013-09-20 11:15:25' group by trip_id
-- This gives 20479 rows with maxNumTasks=2
-- This gives 30341 rows with maxNumTasks=5
-- This gives 30341 rows with maxNumTasks=11

This PR fixes the above issue by maintaining the state of last rowId flushed to output channel, and triggering another iteration of runIncrementally() method if frame writer has rows pending flush to the output channel.

The above is done keeping in mind FrameProcessor's contract which enforces that we should write only a single frame to each output channel in any given iteration of runIncrementally().

For manual testing, I've been verifying the behavior with queries like the following:

select trip_id, row_number() over(partition by trip_id) as c1 from "trips_xaa" where __time < TIMESTAMP '2016-10-20 11:15:25' group by trip_id
-- 49795247 rows inputted, 49795247 rows outputted by window stage (with maxNumTasks=2 and maxNumTasks=11)

select c1, count(c1) from (select trip_id, row_number() over(partition by trip_id) as c1 from "trips_xaa" where __time < TIMESTAMP '2016-10-20 11:15:25' group by trip_id) group by c1
-- [1, 49795247] with maxNumTasks=2 and maxNumTasks=11

Additionally, I have added a test WindowOperatorQueryFrameProcessorTest#testFrameWriterReachingCapacity() which was previously writing less number of rows to the output channel, but is writing the complete set of rows.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Oct 1, 2024
@cryptoe cryptoe added this to the 31.0.0 milestone Oct 1, 2024
@kgyrtkirk
Copy link
Member

this seems like a bugfix - please remove the optimization; that should be reviewed separetly

@Akshat-Jain
Copy link
Contributor Author

this seems like a bugfix - please remove the optimization; that should be reviewed separetly

@kgyrtkirk I think we anyway should get the optimization also merged for Druid 31 (since it's significant), hence bundled it together with this patch because it was already updating a lot of the frame writer logic.

If it helps, this part of the diff corresponds to the optimization:
image

@kgyrtkirk
Copy link
Member

remove the performance enhancement ; and place it in a separate PR - also add a benchmark to help keep track of performance improvements

@Akshat-Jain
Copy link
Contributor Author

remove the performance enhancement ; and place it in a separate PR - also add a benchmark to help keep track of performance improvements

@kgyrtkirk I was hoping to keep all changes related to frame writer together, to ensure that everything works fine with both of them together. Instead of a bug fix PR, I'm looking at this PR as more of a "revamp frame writer logic in WindowOperatorQueryFrameProcessor" PR.

Splitting it into multiple PRs is certainly doable, but I'd still have to continue testing them coupled together locally, which is unnecessary and would cause unnecessary delays (we are hoping to get this in for Druid 31 release).

Regarding the benchmark - I agree that it would be great to have it. It's on my list of todos, but it's not a trivial task and would require some time. So until then, benchmark shouldn't block any performance improvements / logic changes.

Thoughts? cc: @cryptoe

@kgyrtkirk
Copy link
Member

what's happening in this PR right now is pretty convoluted - that's why I'm asking to remove any performance enhancements ; because I don't think the fix for the bug is right

@Akshat-Jain
Copy link
Contributor Author

@kgyrtkirk Fair enough, working on splitting up the PR.

@Akshat-Jain
Copy link
Contributor Author

@kgyrtkirk Have removed the optimization diff from this PR. Have opened a separate PR for it: #17211, appreciate your reviews on that PR as well. Thanks!

@Akshat-Jain Akshat-Jain force-pushed the msq-wf-frame-capacity branch from 220450c to 547e51d Compare October 1, 2024 15:07
@Akshat-Jain Akshat-Jain requested a review from kgyrtkirk October 1, 2024 15:09
Copy link
Member

@kgyrtkirk kgyrtkirk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good; just a minor comment :)

Copy link
Member

@kgyrtkirk kgyrtkirk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the updates!
+1

* @throws IOException
*/
private void flushAllRowsAndCols(ArrayList<RowsAndColumns> resultRowAndCols) throws IOException
private void flushAllRowsAndCols() throws IOException
{
RowsAndColumns rac = new ConcatRowsAndColumns(resultRowAndCols);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its just performance: but the creation of this rac is an O(n) operation; regardless where the rowId stands.
that's why it would have been better to just pack all these things into an inner-workhorse class....when that will be done this should be taken into account.

@cryptoe cryptoe merged commit 135ca8f into apache:master Oct 3, 2024
56 checks passed
Akshat-Jain added a commit to Akshat-Jain/druid that referenced this pull request Oct 3, 2024
… adhere to FrameProcessor's contract (apache#17209)

This PR fixes the above issue by maintaining the state of last rowId flushed to output channel, and triggering another iteration of runIncrementally() method if frame writer has rows pending flush to the output channel.

The above is done keeping in mind FrameProcessor's contract which enforces that we should write only a single frame to each output channel in any given iteration of runIncrementally().
kgyrtkirk pushed a commit that referenced this pull request Oct 3, 2024
…17231)

* WindowOperatorQueryFrameProcessor: Fix frame writer capacity issues + adhere to FrameProcessor's contract (#17209)
* WindowOperatorQueryFrameProcessor: Avoid unnecessary re-runs of runIncrementally() (#17211)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants