diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java index 4659eb593c23..07df7845b842 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java @@ -18,6 +18,10 @@ */ package org.apache.pinot.query.planner.logical; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.ints.IntList; +import it.unimi.dsi.fastutil.ints.IntListIterator; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -83,10 +87,13 @@ public SubPlan makePlan(QueryPlan queryPlan) { for (Map.Entry subPlanEntry : subPlanContext._subPlanIdToRootNodeMap.entrySet()) { int subPlanId = subPlanEntry.getKey(); PlanNode subPlanRoot = subPlanEntry.getValue(); - PlanFragmenter.Context planFragmentContext = new PlanFragmenter.Context(); - planFragmentContext._planFragmentIdToRootNodeMap.put(1, - new PlanFragment(1, subPlanRoot, new PlanFragmentMetadata(), new ArrayList<>())); - subPlanRoot = subPlanRoot.visit(PlanFragmenter.INSTANCE, planFragmentContext); + + // Fragment the SubPlan into multiple PlanFragments. + PlanFragmenter fragmenter = new PlanFragmenter(); + PlanFragmenter.Context fragmenterContext = fragmenter.createContext(); + subPlanRoot = subPlanRoot.visit(fragmenter, fragmenterContext); + Int2ObjectOpenHashMap planFragmentMap = fragmenter.getPlanFragmentMap(); + Int2ObjectOpenHashMap childPlanFragmentIdsMap = fragmenter.getChildPlanFragmentIdsMap(); // Sub plan root needs to send final results back to the Broker // TODO: Should be SINGLETON (currently SINGLETON has to be local, so use BROADCAST_DISTRIBUTED instead) @@ -95,27 +102,24 @@ public SubPlan makePlan(QueryPlan queryPlan) { RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null, false); subPlanRootSenderNode.addInput(subPlanRoot); - subPlanRoot = new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(), - RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null, false, - false, subPlanRootSenderNode); - PlanFragment planFragment1 = planFragmentContext._planFragmentIdToRootNodeMap.get(1); - planFragmentContext._planFragmentIdToRootNodeMap.put(1, - new PlanFragment(1, subPlanRootSenderNode, planFragment1.getFragmentMetadata(), planFragment1.getChildren())); - PlanFragment rootPlanFragment = - new PlanFragment(subPlanRoot.getPlanFragmentId(), subPlanRoot, new PlanFragmentMetadata(), - Collections.singletonList(planFragmentContext._planFragmentIdToRootNodeMap.get(1))); - planFragmentContext._planFragmentIdToRootNodeMap.put(0, rootPlanFragment); - for (Map.Entry> planFragmentToChildrenEntry - : planFragmentContext._planFragmentIdToChildrenMap.entrySet()) { - int planFragmentId = planFragmentToChildrenEntry.getKey(); - List planFragmentChildren = planFragmentToChildrenEntry.getValue(); - for (int planFragmentChild : planFragmentChildren) { - planFragmentContext._planFragmentIdToRootNodeMap.get(planFragmentId).getChildren() - .add(planFragmentContext._planFragmentIdToRootNodeMap.get(planFragmentChild)); + PlanFragment planFragment1 = + new PlanFragment(1, subPlanRootSenderNode, new PlanFragmentMetadata(), new ArrayList<>()); + planFragmentMap.put(1, planFragment1); + for (Int2ObjectMap.Entry entry : childPlanFragmentIdsMap.int2ObjectEntrySet()) { + PlanFragment planFragment = planFragmentMap.get(entry.getIntKey()); + List childPlanFragments = planFragment.getChildren(); + IntListIterator childPlanFragmentIdIterator = entry.getValue().iterator(); + while (childPlanFragmentIdIterator.hasNext()) { + childPlanFragments.add(planFragmentMap.get(childPlanFragmentIdIterator.nextInt())); } } - SubPlan subPlan = new SubPlan(planFragmentContext._planFragmentIdToRootNodeMap.get(0), - subPlanContext._subPlanIdToMetadataMap.get(0), new ArrayList<>()); + MailboxReceiveNode rootReceiveNode = + new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(), + RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null, + false, false, subPlanRootSenderNode); + PlanFragment rootPlanFragment = + new PlanFragment(0, rootReceiveNode, new PlanFragmentMetadata(), Collections.singletonList(planFragment1)); + SubPlan subPlan = new SubPlan(rootPlanFragment, subPlanContext._subPlanIdToMetadataMap.get(0), new ArrayList<>()); subPlanMap.put(subPlanId, subPlan); } for (Map.Entry> subPlanToChildrenEntry : subPlanContext._subPlanIdToChildrenMap.entrySet()) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java index fdb5858fdd89..9d7e5a2fdd8d 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java @@ -18,14 +18,16 @@ */ package org.apache.pinot.query.planner.logical; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntList; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.logical.PinotRelExchangeType; import org.apache.pinot.query.planner.PlanFragment; import org.apache.pinot.query.planner.PlanFragmentMetadata; +import org.apache.pinot.query.planner.SubPlan; import org.apache.pinot.query.planner.plannode.AggregateNode; import org.apache.pinot.query.planner.plannode.ExchangeNode; import org.apache.pinot.query.planner.plannode.FilterNode; @@ -43,26 +45,39 @@ /** - * PlanFragmenter is an implementation of {@link PlanNodeVisitor} to fragment a - * {@link org.apache.pinot.query.planner.SubPlan} into multiple {@link PlanFragment}. + * PlanFragmenter is an implementation of {@link PlanNodeVisitor} to fragment a {@link SubPlan} into multiple + * {@link PlanFragment}s. * * The fragmenting process is as follows: * 1. Traverse the plan tree in a depth-first manner; * 2. For each node, if it is a PlanFragment splittable ExchangeNode, split it into {@link MailboxReceiveNode} and * {@link MailboxSendNode} pair; - * 3. Assign current PlanFragment Id to {@link MailboxReceiveNode}; - * 4. Increment current PlanFragment Id by one and assign it to the {@link MailboxSendNode}. + * 3. Assign current PlanFragment ID to {@link MailboxReceiveNode}; + * 4. Increment current PlanFragment ID by one and assign it to the {@link MailboxSendNode}. */ public class PlanFragmenter implements PlanNodeVisitor { - public static final PlanFragmenter INSTANCE = new PlanFragmenter(); + private final Int2ObjectOpenHashMap _planFragmentMap = new Int2ObjectOpenHashMap<>(); + private final Int2ObjectOpenHashMap _childPlanFragmentIdsMap = new Int2ObjectOpenHashMap<>(); + + // ROOT PlanFragment ID is 0, current PlanFragment ID starts with 1, next PlanFragment ID starts with 2. + private int _nextPlanFragmentId = 2; + + public Context createContext() { + // ROOT PlanFragment ID is 0, current PlanFragment ID starts with 1. + return new Context(1); + } + + public Int2ObjectOpenHashMap getPlanFragmentMap() { + return _planFragmentMap; + } + + public Int2ObjectOpenHashMap getChildPlanFragmentIdsMap() { + return _childPlanFragmentIdsMap; + } private PlanNode process(PlanNode node, Context context) { node.setPlanFragmentId(context._currentPlanFragmentId); - List inputs = node.getInputs(); - for (int i = 0; i < inputs.size(); i++) { - context._previousPlanFragmentId = node.getPlanFragmentId(); - inputs.set(i, inputs.get(i).visit(this, context)); - } + node.getInputs().replaceAll(planNode -> planNode.visit(this, context)); return node; } @@ -126,36 +141,33 @@ public PlanNode visitExchange(ExchangeNode node, Context context) { if (!isPlanFragmentSplitter(node)) { return process(node, context); } - int currentPlanFragmentId = context._previousPlanFragmentId; - int nextPlanFragmentId = ++context._currentPlanFragmentId; - // Set previous PlanFragment ID in the context to be the next PlanFragment ID to be used by the child node. - context._previousPlanFragmentId = nextPlanFragmentId; - PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, context); + // Split the ExchangeNode to a MailboxReceiveNode and a MailboxSendNode, where MailboxReceiveNode is the leave node + // of the current PlanFragment, and MailboxSendNode is the root node of the next PlanFragment. + int receiverPlanFragmentId = context._currentPlanFragmentId; + int senderPlanFragmentId = _nextPlanFragmentId++; + _childPlanFragmentIdsMap.computeIfAbsent(receiverPlanFragmentId, k -> new IntArrayList()) + .add(senderPlanFragmentId); + + // Create a new context for the next PlanFragment with MailboxSendNode as the root node. + PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, new Context(senderPlanFragmentId)); PinotRelExchangeType exchangeType = node.getExchangeType(); RelDistribution.Type distributionType = node.getDistributionType(); // NOTE: Only HASH_DISTRIBUTED requires distribution keys // TODO: Revisit ExchangeNode creation logic to avoid using HASH_DISTRIBUTED with empty distribution keys List distributionKeys = distributionType == RelDistribution.Type.HASH_DISTRIBUTED ? node.getDistributionKeys() : null; - - PlanNode mailboxSender = - new MailboxSendNode(nextPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), currentPlanFragmentId, + MailboxSendNode mailboxSendNode = + new MailboxSendNode(senderPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), receiverPlanFragmentId, distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender()); - PlanNode mailboxReceiver = - new MailboxReceiveNode(currentPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), nextPlanFragmentId, - distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(), - node.isSortOnReceiver(), mailboxSender); - mailboxSender.addInput(nextPlanFragmentRoot); - - context._planFragmentIdToRootNodeMap.put(nextPlanFragmentId, - new PlanFragment(nextPlanFragmentId, mailboxSender, new PlanFragmentMetadata(), new ArrayList<>())); - if (!context._planFragmentIdToChildrenMap.containsKey(currentPlanFragmentId)) { - context._planFragmentIdToChildrenMap.put(currentPlanFragmentId, new ArrayList<>()); - } - context._planFragmentIdToChildrenMap.get(currentPlanFragmentId).add(nextPlanFragmentId); - - return mailboxReceiver; + mailboxSendNode.addInput(nextPlanFragmentRoot); + _planFragmentMap.put(senderPlanFragmentId, + new PlanFragment(senderPlanFragmentId, mailboxSendNode, new PlanFragmentMetadata(), new ArrayList<>())); + + // Return the MailboxReceiveNode as the leave node of the current PlanFragment. + return new MailboxReceiveNode(receiverPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), senderPlanFragmentId, + distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(), + node.isSortOnReceiver(), mailboxSendNode); } private boolean isPlanFragmentSplitter(PlanNode node) { @@ -163,12 +175,10 @@ private boolean isPlanFragmentSplitter(PlanNode node) { } public static class Context { + private final int _currentPlanFragmentId; - // PlanFragment ID starts with 1, 0 will be reserved for ROOT PlanFragment. - Integer _currentPlanFragmentId = 1; - Integer _previousPlanFragmentId = 1; - Map _planFragmentIdToRootNodeMap = new HashMap<>(); - - Map> _planFragmentIdToChildrenMap = new HashMap<>(); + private Context(int currentPlanFragmentId) { + _currentPlanFragmentId = currentPlanFragmentId; + } } }