Skip to content

Commit

Permalink
[multistage][bugfix] leaf limit refactor issue (apache#12001)
Browse files Browse the repository at this point in the history
* [hotfix] leaf limit refactor - fix default value setting issue

---------

Co-authored-by: Rong Rong <[email protected]>
Co-authored-by: Xiaotian (Jackie) Jiang <[email protected]>
  • Loading branch information
3 people authored Nov 14, 2023
1 parent 5f22d16 commit 7217bb9
Showing 1 changed file with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static OpChain compileLeafStage(OpChainExecutionContext executionContext,
ServerPlanRequestContext serverContext = new ServerPlanRequestContext(distributedStagePlan, leafQueryExecutor,
executorService, executionContext.getPipelineBreakerResult());
// 1. compile the PinotQuery
constructPinotQueryPlan(serverContext);
constructPinotQueryPlan(serverContext, executionContext.getOpChainMetadata());
// 2. convert PinotQuery into InstanceRequest list (one for each physical table)
List<InstanceRequest> instanceRequestList =
ServerPlanRequestUtils.constructServerQueryRequests(executionContext, serverContext, distributedStagePlan,
Expand All @@ -115,10 +115,18 @@ public static OpChain compileLeafStage(OpChainExecutionContext executionContext,
* It constructs the content for {@link ServerPlanRequestContext#getPinotQuery()} and set the boundary via:
* {@link ServerPlanRequestContext#setLeafStageBoundaryNode(PlanNode)}.
*/
private static void constructPinotQueryPlan(ServerPlanRequestContext serverContext) {
private static void constructPinotQueryPlan(ServerPlanRequestContext serverContext,
Map<String, String> requestMetadata) {
DistributedStagePlan stagePlan = serverContext.getStagePlan();
PinotQuery pinotQuery = serverContext.getPinotQuery();
pinotQuery.setExplain(false);
// attach leaf node limit it not set
Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(requestMetadata);
if (leafNodeLimit != null) {
pinotQuery.setLimit(leafNodeLimit);
} else {
pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
}
// visit the plan and create PinotQuery and determine the leaf stage boundary PlanNode.
ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), serverContext);
}
Expand Down Expand Up @@ -177,13 +185,6 @@ private static InstanceRequest compileInstanceRequest(OpChainExecutionContext ex
(executionContext.getRequestId() << 16) + ((long) stageId << 8) + (tableType == TableType.REALTIME ? 1 : 0);
// 1. make a deep copy of the pinotQuery and modify the PinotQuery accordingly
PinotQuery pinotQuery = new PinotQuery(serverContext.getPinotQuery());
// - attach leaf node limit
Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(executionContext.getOpChainMetadata());
if (leafNodeLimit != null) {
pinotQuery.setLimit(leafNodeLimit);
} else {
pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
}
// - attach table type
DataSource dataSource = pinotQuery.getDataSource();
String rawTableName = dataSource.getTableName();
Expand Down

0 comments on commit 7217bb9

Please sign in to comment.