Skip to content

Commit

Permalink
[Multi-stage] Reduce the stats transfered (apache#12517)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Feb 29, 2024
1 parent 704f73d commit 35c89c8
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
package org.apache.pinot.broker.requesthandler;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
Expand Down Expand Up @@ -179,14 +181,20 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S

Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();
boolean traceEnabled = Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));

ResultTable queryResults;
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled));
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap;
if (!traceEnabled) {
stageIdStatsMap = Collections.singletonMap(0, new ExecutionStatsAggregator(false));
} else {
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
int numStages = stagePlans.size();
stageIdStatsMap = Maps.newHashMapWithExpectedSize(numStages);
for (int stageId = 0; stageId < numStages; stageId++) {
stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(true));
}
}

long executionStartTimeNs = System.nanoTime();
ResultTable queryResults;
try {
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions,
stageIdStatsMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,10 @@ public static MetadataBlock getErrorDataBlock(Map<Integer, String> exceptions) {
}

public static MetadataBlock getEndOfStreamDataBlock() {
// TODO: add query statistics metadata for the block.
return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS);
}

public static MetadataBlock getEndOfStreamDataBlock(Map<String, String> stats) {
// TODO: add query statistics metadata for the block.
return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS, stats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@

public final class TransferableBlockUtils {
private static final int MEDIAN_COLUMN_SIZE_BYTES = 8;
private static final TransferableBlock EMPTY_EOS = new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());

private TransferableBlockUtils() {
// do not instantiate.
}

public static TransferableBlock getEndOfStreamTransferableBlock() {
return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
return EMPTY_EOS;
}

public static TransferableBlock getEndOfStreamTransferableBlock(Map<String, String> statsMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;


/**
Expand Down Expand Up @@ -75,24 +77,39 @@ protected BlockExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter spl
*/
public boolean send(TransferableBlock block)
throws Exception {
if (block.isEndOfStreamBlock()) {
if (block.isErrorBlock()) {
// Send error block to all mailboxes to propagate the error
for (SendingMailbox sendingMailbox : _sendingMailboxes) {
sendBlock(sendingMailbox, block);
}
return false;
} else {
boolean isEarlyTerminated = true;
for (SendingMailbox sendingMailbox : _sendingMailboxes) {
if (!sendingMailbox.isEarlyTerminated()) {
isEarlyTerminated = false;
break;
}
}

if (block.isSuccessfulEndOfStreamBlock()) {
// Send metadata to only one randomly picked mailbox, and empty EOS block to other mailboxes
int numMailboxes = _sendingMailboxes.size();
int mailboxIdToSendMetadata = ThreadLocalRandom.current().nextInt(numMailboxes);
for (int i = 0; i < numMailboxes; i++) {
SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
TransferableBlock blockToSend =
i == mailboxIdToSendMetadata ? block : TransferableBlockUtils.getEndOfStreamTransferableBlock();
sendBlock(sendingMailbox, blockToSend);
}
if (!isEarlyTerminated) {
route(_sendingMailboxes, block);
return false;
}

assert block.isDataBlock();
boolean isEarlyTerminated = true;
for (SendingMailbox sendingMailbox : _sendingMailboxes) {
if (!sendingMailbox.isEarlyTerminated()) {
isEarlyTerminated = false;
break;
}
return isEarlyTerminated;
}
if (!isEarlyTerminated) {
route(_sendingMailboxes, block);
}
return isEarlyTerminated;
}

protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.calcite.util.Pair;
import org.apache.commons.collections.MapUtils;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.broker.ResultTable;
Expand Down Expand Up @@ -89,7 +90,7 @@ public QueryDispatcher(MailboxService mailboxService) {
}

public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
Map<String, String> queryOptions, @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
throws Exception {
long requestId = context.getRequestId();
try {
Expand Down Expand Up @@ -278,20 +279,16 @@ public static ResultTable runReducer(long requestId, DispatchableSubPlan dispatc
}

private static void collectStats(DispatchableSubPlan dispatchableSubPlan, OpChainStats opChainStats,
@Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) {
if (executionStatsAggregatorMap != null) {
LOGGER.info("Extracting broker query execution stats, Runtime: {}ms", opChainStats.getExecutionTime());
for (Map.Entry<String, OperatorStats> entry : opChainStats.getOperatorStatsMap().entrySet()) {
OperatorStats operatorStats = entry.getValue();
ExecutionStatsAggregator rootStatsAggregator = executionStatsAggregatorMap.get(0);
ExecutionStatsAggregator stageStatsAggregator = executionStatsAggregatorMap.get(operatorStats.getStageId());
rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
@Nullable Map<Integer, ExecutionStatsAggregator> statsAggregatorMap) {
if (MapUtils.isNotEmpty(statsAggregatorMap)) {
for (OperatorStats operatorStats : opChainStats.getOperatorStatsMap().values()) {
ExecutionStatsAggregator rootStatsAggregator = statsAggregatorMap.get(0);
rootStatsAggregator.aggregate(null, operatorStats.getExecutionStats(), new HashMap<>());
ExecutionStatsAggregator stageStatsAggregator = statsAggregatorMap.get(operatorStats.getStageId());
if (stageStatsAggregator != null) {
if (dispatchableSubPlan != null) {
OperatorUtils.recordTableName(operatorStats,
dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId()));
}
stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
OperatorUtils.recordTableName(operatorStats,
dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId()));
stageStatsAggregator.aggregate(null, operatorStats.getExecutionStats(), new HashMap<>());
}
}
}
Expand Down

0 comments on commit 35c89c8

Please sign in to comment.