From 7217bb9d16b60074c1e9a9df72984556a7c8dbd5 Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Tue, 14 Nov 2023 10:46:27 -0800 Subject: [PATCH] [multistage][bugfix] leaf limit refactor issue (#12001) * [hotfix] leaf limit refactor - fix default value setting issue --------- Co-authored-by: Rong Rong Co-authored-by: Xiaotian (Jackie) Jiang <17555551+Jackie-Jiang@users.noreply.github.com> --- .../plan/server/ServerPlanRequestUtils.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index b2ead5a04132..832b2a466677 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -95,7 +95,7 @@ public static OpChain compileLeafStage(OpChainExecutionContext executionContext, ServerPlanRequestContext serverContext = new ServerPlanRequestContext(distributedStagePlan, leafQueryExecutor, executorService, executionContext.getPipelineBreakerResult()); // 1. compile the PinotQuery - constructPinotQueryPlan(serverContext); + constructPinotQueryPlan(serverContext, executionContext.getOpChainMetadata()); // 2. convert PinotQuery into InstanceRequest list (one for each physical table) List instanceRequestList = ServerPlanRequestUtils.constructServerQueryRequests(executionContext, serverContext, distributedStagePlan, @@ -115,10 +115,18 @@ public static OpChain compileLeafStage(OpChainExecutionContext executionContext, * It constructs the content for {@link ServerPlanRequestContext#getPinotQuery()} and set the boundary via: * {@link ServerPlanRequestContext#setLeafStageBoundaryNode(PlanNode)}. */ - private static void constructPinotQueryPlan(ServerPlanRequestContext serverContext) { + private static void constructPinotQueryPlan(ServerPlanRequestContext serverContext, + Map requestMetadata) { DistributedStagePlan stagePlan = serverContext.getStagePlan(); PinotQuery pinotQuery = serverContext.getPinotQuery(); pinotQuery.setExplain(false); + // attach leaf node limit it not set + Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(requestMetadata); + if (leafNodeLimit != null) { + pinotQuery.setLimit(leafNodeLimit); + } else { + pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT); + } // visit the plan and create PinotQuery and determine the leaf stage boundary PlanNode. ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), serverContext); } @@ -177,13 +185,6 @@ private static InstanceRequest compileInstanceRequest(OpChainExecutionContext ex (executionContext.getRequestId() << 16) + ((long) stageId << 8) + (tableType == TableType.REALTIME ? 1 : 0); // 1. make a deep copy of the pinotQuery and modify the PinotQuery accordingly PinotQuery pinotQuery = new PinotQuery(serverContext.getPinotQuery()); - // - attach leaf node limit - Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(executionContext.getOpChainMetadata()); - if (leafNodeLimit != null) { - pinotQuery.setLimit(leafNodeLimit); - } else { - pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT); - } // - attach table type DataSource dataSource = pinotQuery.getDataSource(); String rawTableName = dataSource.getTableName();