Skip to content

Commit

Permalink
[Multi-stage] Optimize query plan serialization (apache#12370)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Feb 7, 2024
1 parent f6ae06f commit 8434158
Show file tree
Hide file tree
Showing 35 changed files with 626 additions and 962 deletions.
18 changes: 0 additions & 18 deletions pinot-common/src/main/proto/mailbox.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,6 @@
// under the License.
//

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto3";

package org.apache.pinot.common.proto;
Expand Down
2 changes: 1 addition & 1 deletion pinot-common/src/main/proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@ message ListField {
// The key of the map is a string and the value of the map is a MemberVariableField.
message MapField {
map<string, MemberVariableField> content = 1;
}
}
18 changes: 0 additions & 18 deletions pinot-common/src/main/proto/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,6 @@
// under the License.
//

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto3";

package org.apache.pinot.common.proto;
Expand Down
33 changes: 7 additions & 26 deletions pinot-common/src/main/proto/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,10 @@
// under the License.
//

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto3";

package org.apache.pinot.common.proto;

import "plan.proto";

service PinotQueryWorker {
// Dispatch a QueryRequest to a PinotQueryWorker
rpc Submit(QueryRequest) returns (QueryResponse);
Expand All @@ -59,7 +39,7 @@ message CancelResponse {
// QueryRequest is the dispatched content for all query stages to a physical worker.
message QueryRequest {
repeated StagePlan stagePlan = 1;
map<string, string> metadata = 2;
bytes metadata = 2; // Serialized Properties
}

// QueryResponse is the dispatched response from worker, it doesn't contain actual data, only dispatch status.
Expand All @@ -70,15 +50,13 @@ message QueryResponse {

message StagePlan {
int32 stageId = 1;
StageNode stageRoot = 2;
bytes rootNode = 2; // Serialized StageNode
StageMetadata stageMetadata = 3;
}

message StageMetadata {
repeated WorkerMetadata workerMetadata = 1;
map<string, string> customProperty = 2;
string serverAddress = 3;
repeated int32 workerIds = 4;
bytes customProperty = 2; // Serialized Properties
}

message WorkerMetadata {
Expand All @@ -90,5 +68,8 @@ message WorkerMetadata {
message MailboxMetadata {
repeated string mailboxId = 1;
repeated string virtualAddress = 2;
map<string, string> customProperty = 3;
}

message Properties {
map<string, string> property = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public PhysicalExplainPlanVisitor(DispatchableSubPlan dispatchableSubPlan) {
/**
* Explains the query plan.
*
* @see DispatchableSubPlan#explain()
* @param dispatchableSubPlan the queryPlan to explain
* @return a String representation of the query plan tree
*/
Expand Down Expand Up @@ -216,9 +215,8 @@ private StringBuilder appendMailboxSend(MailboxSendNode node, Context context) {

int receiverStageId = node.getReceiverStageId();
List<VirtualServerAddress> serverAddressList =
_dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId())
.getWorkerMetadataList().get(context._workerId)
.getMailBoxInfosMap().get(receiverStageId).getVirtualAddressList();
_dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId()).getWorkerMetadataList()
.get(context._workerId).getMailboxMetadataMap().get(receiverStageId).getVirtualAddresses();
List<String> serverInstanceToWorkerIdList = stringifyVirtualServerAddresses(serverAddressList);
context._builder.append("->");
String receivers = serverInstanceToWorkerIdList.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,12 @@ public List<DispatchablePlanFragment> constructDispatchablePlanFragmentList(Plan
int workerId = serverEntry.getKey();
QueryServerInstance queryServerInstance = serverEntry.getValue();
serverInstanceToWorkerIdsMap.computeIfAbsent(queryServerInstance, k -> new ArrayList<>()).add(workerId);
WorkerMetadata.Builder workerMetadataBuilder = new WorkerMetadata.Builder().setVirtualServerAddress(
new VirtualServerAddress(queryServerInstance, workerId));
WorkerMetadata workerMetadata = new WorkerMetadata(new VirtualServerAddress(queryServerInstance, workerId),
workerIdToMailboxesMap.get(workerId));
if (workerIdToSegmentsMap != null) {
workerMetadataBuilder.addTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
workerMetadata.setTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
}
workerMetadataBuilder.putAllMailBoxInfosMap(workerIdToMailboxesMap.get(workerId));
workerMetadataArray[workerId] = workerMetadataBuilder.build();
workerMetadataArray[workerId] = workerMetadata;
}

// set the stageMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pinot.query.planner.physical;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
Expand Down Expand Up @@ -63,7 +65,7 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
workerId, senderServer, receiverServer);
MailboxMetadata mailboxMetadata = new MailboxMetadata(Collections.singletonList(
MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId)),
Collections.singletonList(new VirtualServerAddress(senderServer, workerId)), Collections.emptyMap());
Collections.singletonList(new VirtualServerAddress(senderServer, workerId)));
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata);
}
Expand All @@ -78,11 +80,9 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
for (int workerId = 0; workerId < numSenders; workerId++) {
String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId);
MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)),
Collections.emptyMap());
Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)));
MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)),
Collections.emptyMap());
Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)));
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
.put(receiverFragmentId, serderMailboxMetadata);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
Expand All @@ -94,22 +94,23 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
VirtualServerAddress senderAddress =
new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
List<String> receivingMailboxIds = new ArrayList<>(partitionParallelism);
List<VirtualServerAddress> receivingAddresses = new ArrayList<>(partitionParallelism);
MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses);
senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
.put(receiverFragmentId, senderMailboxMetadata);
for (int i = 0; i < partitionParallelism; i++) {
VirtualServerAddress receiverAddress =
new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId,
receiverWorkerId);
senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);
receivingMailboxIds.add(mailboxId);
receivingAddresses.add(
new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId));

MailboxMetadata receiverMailboxMetadata =
receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
.computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);
receiverMailboxMetadata.getMailboxIds().add(mailboxId);
receiverMailboxMetadata.getVirtualAddresses().add(senderAddress);

receiverWorkerId++;
}
Expand All @@ -123,22 +124,22 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
VirtualServerAddress senderAddress =
new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
List<String> receivingMailboxIds = new ArrayList<>(numReceivers);
List<VirtualServerAddress> receivingAddresses = new ArrayList<>(numReceivers);
MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses);
senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
.put(receiverFragmentId, senderMailboxMetadata);
for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) {
VirtualServerAddress receiverAddress =
new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
String mailboxId =
MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId, receiverWorkerId);
senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);
receivingMailboxIds.add(mailboxId);
receivingAddresses.add(new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId));

MailboxMetadata receiverMailboxMetadata =
receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
.computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);
receiverMailboxMetadata.getMailboxIds().add(mailboxId);
receiverMailboxMetadata.getVirtualAddresses().add(senderAddress);
}
}
}
Expand All @@ -154,14 +155,12 @@ private boolean isDirectExchangeCompatible(DispatchablePlanMetadata sender, Disp
int numReceivers = receiverServerMap.size();
if (sender.getScannedTables().size() > 0 && receiver.getScannedTables().size() == 0) {
// leaf-to-intermediate condition
return numSenders * sender.getPartitionParallelism() == numReceivers
&& sender.getPartitionFunction() != null
return numSenders * sender.getPartitionParallelism() == numReceivers && sender.getPartitionFunction() != null
&& sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction());
} else {
// dynamic-broadcast condition || intermediate-to-intermediate
return numSenders == numReceivers
&& sender.getPartitionFunction() != null
&& sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction());
return numSenders == numReceivers && sender.getPartitionFunction() != null && sender.getPartitionFunction()
.equalsIgnoreCase(receiver.getPartitionFunction());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,35 @@
*/
package org.apache.pinot.query.planner.physical;

import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.pinot.query.routing.MailboxMetadata;


public class MailboxIdUtils {
private MailboxIdUtils() {
}

private static final char SEPARATOR = '|';
public static final char SEPARATOR = '|';

public static String toPlanMailboxId(int senderStageId, int senderWorkerId, int receiverStageId,
int receiverWorkerId) {
return Integer.toString(senderStageId) + SEPARATOR + senderWorkerId + SEPARATOR
+ receiverStageId + SEPARATOR + receiverWorkerId;
return Integer.toString(senderStageId) + SEPARATOR + senderWorkerId + SEPARATOR + receiverStageId + SEPARATOR
+ receiverWorkerId;
}

public static String toMailboxId(long requestId, String planMailboxId) {
return Long.toString(requestId) + SEPARATOR + planMailboxId;
}

public static List<String> toMailboxIds(long requestId, MailboxMetadata mailboxMetadata) {
return mailboxMetadata.getMailboxIds().stream().map(v -> toMailboxId(requestId, v)).collect(Collectors.toList());
}

@VisibleForTesting
public static String toMailboxId(long requestId, int senderStageId, int senderWorkerId, int receiverStageId,
int receiverWorkerId) {
return toMailboxId(requestId, toPlanMailboxId(senderStageId, senderWorkerId, receiverStageId, receiverWorkerId));
}
}
Loading

0 comments on commit 8434158

Please sign in to comment.