Skip to content

Commit

Permalink
Refactor PlanFragmenter to make the logic more clear (apache#11912)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Nov 1, 2023
1 parent 168d630 commit 03a9ec7
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pinot.query.planner.logical;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntListIterator;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -83,10 +87,13 @@ public SubPlan makePlan(QueryPlan queryPlan) {
for (Map.Entry<Integer, PlanNode> subPlanEntry : subPlanContext._subPlanIdToRootNodeMap.entrySet()) {
int subPlanId = subPlanEntry.getKey();
PlanNode subPlanRoot = subPlanEntry.getValue();
PlanFragmenter.Context planFragmentContext = new PlanFragmenter.Context();
planFragmentContext._planFragmentIdToRootNodeMap.put(1,
new PlanFragment(1, subPlanRoot, new PlanFragmentMetadata(), new ArrayList<>()));
subPlanRoot = subPlanRoot.visit(PlanFragmenter.INSTANCE, planFragmentContext);

// Fragment the SubPlan into multiple PlanFragments.
PlanFragmenter fragmenter = new PlanFragmenter();
PlanFragmenter.Context fragmenterContext = fragmenter.createContext();
subPlanRoot = subPlanRoot.visit(fragmenter, fragmenterContext);
Int2ObjectOpenHashMap<PlanFragment> planFragmentMap = fragmenter.getPlanFragmentMap();
Int2ObjectOpenHashMap<IntList> childPlanFragmentIdsMap = fragmenter.getChildPlanFragmentIdsMap();

// Sub plan root needs to send final results back to the Broker
// TODO: Should be SINGLETON (currently SINGLETON has to be local, so use BROADCAST_DISTRIBUTED instead)
Expand All @@ -95,27 +102,24 @@ public SubPlan makePlan(QueryPlan queryPlan) {
RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
false);
subPlanRootSenderNode.addInput(subPlanRoot);
subPlanRoot = new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(),
RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null, false,
false, subPlanRootSenderNode);
PlanFragment planFragment1 = planFragmentContext._planFragmentIdToRootNodeMap.get(1);
planFragmentContext._planFragmentIdToRootNodeMap.put(1,
new PlanFragment(1, subPlanRootSenderNode, planFragment1.getFragmentMetadata(), planFragment1.getChildren()));
PlanFragment rootPlanFragment =
new PlanFragment(subPlanRoot.getPlanFragmentId(), subPlanRoot, new PlanFragmentMetadata(),
Collections.singletonList(planFragmentContext._planFragmentIdToRootNodeMap.get(1)));
planFragmentContext._planFragmentIdToRootNodeMap.put(0, rootPlanFragment);
for (Map.Entry<Integer, List<Integer>> planFragmentToChildrenEntry
: planFragmentContext._planFragmentIdToChildrenMap.entrySet()) {
int planFragmentId = planFragmentToChildrenEntry.getKey();
List<Integer> planFragmentChildren = planFragmentToChildrenEntry.getValue();
for (int planFragmentChild : planFragmentChildren) {
planFragmentContext._planFragmentIdToRootNodeMap.get(planFragmentId).getChildren()
.add(planFragmentContext._planFragmentIdToRootNodeMap.get(planFragmentChild));
PlanFragment planFragment1 =
new PlanFragment(1, subPlanRootSenderNode, new PlanFragmentMetadata(), new ArrayList<>());
planFragmentMap.put(1, planFragment1);
for (Int2ObjectMap.Entry<IntList> entry : childPlanFragmentIdsMap.int2ObjectEntrySet()) {
PlanFragment planFragment = planFragmentMap.get(entry.getIntKey());
List<PlanFragment> childPlanFragments = planFragment.getChildren();
IntListIterator childPlanFragmentIdIterator = entry.getValue().iterator();
while (childPlanFragmentIdIterator.hasNext()) {
childPlanFragments.add(planFragmentMap.get(childPlanFragmentIdIterator.nextInt()));
}
}
SubPlan subPlan = new SubPlan(planFragmentContext._planFragmentIdToRootNodeMap.get(0),
subPlanContext._subPlanIdToMetadataMap.get(0), new ArrayList<>());
MailboxReceiveNode rootReceiveNode =
new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(),
RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
false, false, subPlanRootSenderNode);
PlanFragment rootPlanFragment =
new PlanFragment(0, rootReceiveNode, new PlanFragmentMetadata(), Collections.singletonList(planFragment1));
SubPlan subPlan = new SubPlan(rootPlanFragment, subPlanContext._subPlanIdToMetadataMap.get(0), new ArrayList<>());
subPlanMap.put(subPlanId, subPlan);
}
for (Map.Entry<Integer, List<Integer>> subPlanToChildrenEntry : subPlanContext._subPlanIdToChildrenMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
*/
package org.apache.pinot.query.planner.logical;

import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.PlanFragmentMetadata;
import org.apache.pinot.query.planner.SubPlan;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
import org.apache.pinot.query.planner.plannode.FilterNode;
Expand All @@ -43,26 +45,39 @@


/**
* PlanFragmenter is an implementation of {@link PlanNodeVisitor} to fragment a
* {@link org.apache.pinot.query.planner.SubPlan} into multiple {@link PlanFragment}.
* PlanFragmenter is an implementation of {@link PlanNodeVisitor} to fragment a {@link SubPlan} into multiple
* {@link PlanFragment}s.
*
* The fragmenting process is as follows:
* 1. Traverse the plan tree in a depth-first manner;
* 2. For each node, if it is a PlanFragment splittable ExchangeNode, split it into {@link MailboxReceiveNode} and
* {@link MailboxSendNode} pair;
* 3. Assign current PlanFragment Id to {@link MailboxReceiveNode};
* 4. Increment current PlanFragment Id by one and assign it to the {@link MailboxSendNode}.
* 3. Assign current PlanFragment ID to {@link MailboxReceiveNode};
* 4. Increment current PlanFragment ID by one and assign it to the {@link MailboxSendNode}.
*/
public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.Context> {
public static final PlanFragmenter INSTANCE = new PlanFragmenter();
private final Int2ObjectOpenHashMap<PlanFragment> _planFragmentMap = new Int2ObjectOpenHashMap<>();
private final Int2ObjectOpenHashMap<IntList> _childPlanFragmentIdsMap = new Int2ObjectOpenHashMap<>();

// ROOT PlanFragment ID is 0, current PlanFragment ID starts with 1, next PlanFragment ID starts with 2.
private int _nextPlanFragmentId = 2;

public Context createContext() {
// ROOT PlanFragment ID is 0, current PlanFragment ID starts with 1.
return new Context(1);
}

public Int2ObjectOpenHashMap<PlanFragment> getPlanFragmentMap() {
return _planFragmentMap;
}

public Int2ObjectOpenHashMap<IntList> getChildPlanFragmentIdsMap() {
return _childPlanFragmentIdsMap;
}

private PlanNode process(PlanNode node, Context context) {
node.setPlanFragmentId(context._currentPlanFragmentId);
List<PlanNode> inputs = node.getInputs();
for (int i = 0; i < inputs.size(); i++) {
context._previousPlanFragmentId = node.getPlanFragmentId();
inputs.set(i, inputs.get(i).visit(this, context));
}
node.getInputs().replaceAll(planNode -> planNode.visit(this, context));
return node;
}

Expand Down Expand Up @@ -126,49 +141,44 @@ public PlanNode visitExchange(ExchangeNode node, Context context) {
if (!isPlanFragmentSplitter(node)) {
return process(node, context);
}
int currentPlanFragmentId = context._previousPlanFragmentId;
int nextPlanFragmentId = ++context._currentPlanFragmentId;
// Set previous PlanFragment ID in the context to be the next PlanFragment ID to be used by the child node.
context._previousPlanFragmentId = nextPlanFragmentId;
PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, context);

// Split the ExchangeNode to a MailboxReceiveNode and a MailboxSendNode, where MailboxReceiveNode is the leave node
// of the current PlanFragment, and MailboxSendNode is the root node of the next PlanFragment.
int receiverPlanFragmentId = context._currentPlanFragmentId;
int senderPlanFragmentId = _nextPlanFragmentId++;
_childPlanFragmentIdsMap.computeIfAbsent(receiverPlanFragmentId, k -> new IntArrayList())
.add(senderPlanFragmentId);

// Create a new context for the next PlanFragment with MailboxSendNode as the root node.
PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, new Context(senderPlanFragmentId));
PinotRelExchangeType exchangeType = node.getExchangeType();
RelDistribution.Type distributionType = node.getDistributionType();
// NOTE: Only HASH_DISTRIBUTED requires distribution keys
// TODO: Revisit ExchangeNode creation logic to avoid using HASH_DISTRIBUTED with empty distribution keys
List<Integer> distributionKeys =
distributionType == RelDistribution.Type.HASH_DISTRIBUTED ? node.getDistributionKeys() : null;

PlanNode mailboxSender =
new MailboxSendNode(nextPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), currentPlanFragmentId,
MailboxSendNode mailboxSendNode =
new MailboxSendNode(senderPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), receiverPlanFragmentId,
distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender());
PlanNode mailboxReceiver =
new MailboxReceiveNode(currentPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), nextPlanFragmentId,
distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(),
node.isSortOnReceiver(), mailboxSender);
mailboxSender.addInput(nextPlanFragmentRoot);

context._planFragmentIdToRootNodeMap.put(nextPlanFragmentId,
new PlanFragment(nextPlanFragmentId, mailboxSender, new PlanFragmentMetadata(), new ArrayList<>()));
if (!context._planFragmentIdToChildrenMap.containsKey(currentPlanFragmentId)) {
context._planFragmentIdToChildrenMap.put(currentPlanFragmentId, new ArrayList<>());
}
context._planFragmentIdToChildrenMap.get(currentPlanFragmentId).add(nextPlanFragmentId);

return mailboxReceiver;
mailboxSendNode.addInput(nextPlanFragmentRoot);
_planFragmentMap.put(senderPlanFragmentId,
new PlanFragment(senderPlanFragmentId, mailboxSendNode, new PlanFragmentMetadata(), new ArrayList<>()));

// Return the MailboxReceiveNode as the leave node of the current PlanFragment.
return new MailboxReceiveNode(receiverPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), senderPlanFragmentId,
distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(),
node.isSortOnReceiver(), mailboxSendNode);
}

private boolean isPlanFragmentSplitter(PlanNode node) {
return ((ExchangeNode) node).getExchangeType() != PinotRelExchangeType.SUB_PLAN;
}

public static class Context {
private final int _currentPlanFragmentId;

// PlanFragment ID starts with 1, 0 will be reserved for ROOT PlanFragment.
Integer _currentPlanFragmentId = 1;
Integer _previousPlanFragmentId = 1;
Map<Integer, PlanFragment> _planFragmentIdToRootNodeMap = new HashMap<>();

Map<Integer, List<Integer>> _planFragmentIdToChildrenMap = new HashMap<>();
private Context(int currentPlanFragmentId) {
_currentPlanFragmentId = currentPlanFragmentId;
}
}
}

0 comments on commit 03a9ec7

Please sign in to comment.