-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit pages size to a configurable limit #14994
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was done with a partial review, but this question popped up in my mind: It seems like we are adding a new stage to partition by page size. I think this is not an optimal way since adding a new stage means that we'd be shuffling again.
In cases when fault tolerance is enabled, this also means an extra round of upload and download. There would be a performance penalty associated with this since this feature is intended for querying from deep storage where the result set would be huge.
If the above is true, I think we should look for an alternative than creating a new stage.
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Outdated
Show resolved
Hide resolved
.setExpectedCountersForStageWorkerChannel( | ||
CounterSnapshotMatcher | ||
.with().rows(6).frames(1), | ||
0, 0, "output" | ||
) | ||
.setExpectedCountersForStageWorkerChannel( | ||
CounterSnapshotMatcher | ||
.with().rows(6).frames(1), | ||
0, 0, "shuffle" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we removing the assertion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its was more of an influx thing. Fixed it
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
Outdated
Show resolved
Hide resolved
return channelCounters; | ||
} | ||
|
||
public static class TestFrame extends Frame |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static class TestFrame extends Frame | |
private static class TestFrame extends Frame |
Let's not create a separate class for a single use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just removed this class and went with mocks.
@@ -105,7 +105,7 @@ public class Frame | |||
private final int numRegions; | |||
private final boolean permuted; | |||
|
|||
private Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted) | |||
protected Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted) | |
private Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted) |
Please revert this change. We shouldn't change the access level in the main class to allow testing. At worst, we can mock the class in the tests, but if we do want to assert something that won't ever be possible and we are setting up a defensive check for the same.
We are basically annotating the constructor here with @VisibleForTesting, without mentioning it
#11848 (comment)
# Conflicts: # docs/multi-stage-query/reference.md
); | ||
if (finalShuffleStageDef.doesSortDuringShuffle()) { | ||
final QueryDefinitionBuilder builder = QueryDefinition.builder(); | ||
for (final StageDefinition stageDef : queryDef.getStageDefinitions()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could use org.apache.druid.msq.kernel.QueryDefinitionBuilder#addAll(org.apache.druid.msq.kernel.QueryDefinition) here
// we add a final stage which generates one partition per worker. | ||
shuffleSpecFactory = ShuffleSpecFactories.globalSortWithMaxPartitionCount(tuningConfig.getMaxNumWorkers()); | ||
shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize( | ||
MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something in somewhere else in this PR, but doesn't GlobalSortTargetSizeShuffleSpec enforce the limit on the total partition size summed across all workers? Since we create a new page for each worker parition combination, would the limit be enforced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GlobalSortTargetSizeShuffleSpec enforces a limit on partition size globally yes.
So there can be 2 cases:
-
If the last stage is group by post shuffle, then we know that each partition will only be present on distinct worker only. Hence the page size will control the number of rows in that partition.
-
If the last stage is scanStage, then we add a new
QueryResultFrameProcessor
since data needs to be sorted on the boost column. The queryResultFrameProcessor will merge the result in the same partition and write out a single partition. Since the partition cuts on sizes are done globally, in the controller, we would have the final partition equal to the page size configured.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a new testcase testExternSelectWithMultipleWorkers
. You can look at the counter checks to see whats happening with a scan query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is also supposed to help with the case of missing results reported in community Slack.
Let us add a test case that fails with the existing code and is fixed by the patch.
As discussed offline, you mentioned that it was due to a single processor producing multiple frames, so we should add that to confirm that the regression is fixed.
@LakshSingla that test case is already added if you see SqlStatementResourceHelperTest#testDistinctPartitionsOnEachWorker() . |
Okay I looked at |
@LakshSingla |
Why do we need indexes on frames for this? |
Also, if possible please add a MSQ select test for the regression. Using counters is risky as is and we'd wanna make sure that we are tackling the issue we are seeing E2E. |
That will help us in supporting usecases in the result api with api params like |
Added another test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM post CI 🚀
Thanks @adarshsanjeev @LakshSingla for the review. |
Adding the ability to limit the pages sizes of select queries. We piggyback on the same machinery that is used to control the numRowsPerSegment. This patch introduces a new context parameter rowsPerPage for which the default value is set to 100000 rows. This patch also optimizes adding the last selectResults stage only when the previous stages have sorted outputs. Currently for each select query with selectDestination=durableStorage, we used to add this extra selectResults stage. (cherry picked from commit 61ea9e0)
Adding the ability to limit the pages sizes of select queries. We piggyback on the same machinery that is used to control the numRowsPerSegment. This patch introduces a new context parameter rowsPerPage for which the default value is set to 100000 rows. This patch also optimizes adding the last selectResults stage only when the previous stages have sorted outputs. Currently for each select query with selectDestination=durableStorage, we used to add this extra selectResults stage. (cherry picked from commit 61ea9e0)
Adding the ability to limit the pages sizes of select queries. We piggyback on the same machinery that is used to control the numRowsPerSegment. This patch introduces a new context parameter rowsPerPage for which the default value is set to 100000 rows. This patch also optimizes adding the last selectResults stage only when the previous stages have sorted outputs. Currently for each select query with selectDestination=durableStorage, we used to add this extra selectResults stage.
Adding the ability to limit the pages sizes of select queries. We piggyback on the same machinery that is used to control the numRowsPerSegment. This patch introduces a new context parameter rowsPerPage for which the default value is set to 100000 rows. This patch also optimizes adding the last selectResults stage only when the previous stages have sorted outputs. Currently for each select query with selectDestination=durableStorage, we used to add this extra selectResults stage.
hey @cryptoe
and here's the results from get query status endpoint |
@marzi312 It is happening because you have a LIMIT in front of your query. It is a bug in the MSQ engine. I'll raise a patch for the same. |
Adding the ability to limit the pages sizes of select queries.
numRowsPerSegment
.rowsPerPage
for which the default value is set to100000
rows.selectResults
stage only when the previous stages have sorted outputs. Currently for each select query withselectDestination=durableStorage
, we used to add this extraselectResults
stage.This PR has: