Skip to content

Commit

Permalink
[bugfix][multistage] explicit warning flags set on each stage stats (a…
Browse files Browse the repository at this point in the history
…pache#11936)

* set explicit warning flags on each stage
* put a top-level flag for maxRowsInJoinReached b/c the UI depends on the top-level flag only
* simplify the logic in early termination for HashJoinOperator per comments
* set partial result flag in multi-stage handler

---------

Co-authored-by: Rong Rong <[email protected]>
  • Loading branch information
walterddr and Rong Rong authored Nov 6, 2023
1 parent 6638d4e commit d177866
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -775,8 +775,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED,
1);
}
brokerResponse.setPartialResult(
brokerResponse.isNumGroupsLimitReached() || brokerResponse.getExceptionsSize() > 0);
brokerResponse.setPartialResult(isPartialResult(brokerResponse));

// Set total query processing time
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs);
Expand Down Expand Up @@ -1821,6 +1820,11 @@ protected abstract BrokerResponseNative processBrokerRequest(long requestId, Bro
RequestContext requestContext)
throws Exception;

protected static boolean isPartialResult(BrokerResponse brokerResponse) {
return brokerResponse.isNumGroupsLimitReached() || brokerResponse.isMaxRowsInJoinReached()
|| brokerResponse.getExceptionsSize() > 0;
}

protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) {
statistics.setTotalDocs(response.getTotalDocs());
statistics.setNumDocsScanned(response.getNumDocsScanned());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
brokerResponse.addStageStat(entry.getKey(), brokerResponseStats);
}

// Set partial result flag
brokerResponse.setPartialResult(isPartialResult(brokerResponse));

// Set total query processing time
// TODO: Currently we don't emit metric for QUERY_TOTAL_TIME_MS
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,12 @@ enum MetadataKey {
OPERATOR_EXECUTION_TIME_MS(30, "operatorExecutionTimeMs", MetadataValueType.LONG),
OPERATOR_ID(31, "operatorId", MetadataValueType.STRING),
OPERATOR_EXEC_START_TIME_MS(32, "operatorExecStartTimeMs", MetadataValueType.LONG),
OPERATOR_EXEC_END_TIME_MS(33, "operatorExecEndTimeMs", MetadataValueType.LONG);
OPERATOR_EXEC_END_TIME_MS(33, "operatorExecEndTimeMs", MetadataValueType.LONG),
MAX_ROWS_IN_JOIN_REACHED(34, "maxRowsInJoinReached", MetadataValueType.STRING);

// We keep this constant to track the max id added so far for backward compatibility.
// Increase it when adding new keys, but NEVER DECREASE IT!!!
private static final int MAX_ID = 33;
private static final int MAX_ID = 34;

private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1];
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ String toJsonString()
*/
boolean isNumGroupsLimitReached();

/**
* Returns whether the limit for max rows in join has been reached.
*/
boolean isMaxRowsInJoinReached();

/**
* Get number of exceptions recorded in the response.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@
"resultTable", "requestId", "brokerId", "exceptions", "numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
"numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
"numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
"realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "segmentStatistics", "traceInfo", "partialResult"})
"numEntriesScannedPostFilter", "numGroupsLimitReached", "maxRowsInJoinReached", "totalDocs", "timeUsedMs",
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
"realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "brokerReduceTimeMs",
"segmentStatistics", "traceInfo", "partialResult"
})
public class BrokerResponseNative implements BrokerResponse {
public static final BrokerResponseNative EMPTY_RESULT = BrokerResponseNative.empty();
public static final BrokerResponseNative NO_TABLE_RESULT =
Expand Down Expand Up @@ -76,6 +78,7 @@ public class BrokerResponseNative implements BrokerResponse {

private long _totalDocs = 0L;
private boolean _numGroupsLimitReached = false;
private boolean _maxRowsInJoinReached = false;
private boolean _partialResult = false;
private long _timeUsedMs = 0L;
private long _offlineThreadCpuTimeNs = 0L;
Expand Down Expand Up @@ -494,6 +497,16 @@ public void setNumGroupsLimitReached(boolean numGroupsLimitReached) {
_numGroupsLimitReached = numGroupsLimitReached;
}

@JsonProperty("maxRowsInJoinReached")
public boolean isMaxRowsInJoinReached() {
return _maxRowsInJoinReached;
}

@JsonProperty("maxRowsInJoinReached")
public void setMaxRowsInJoinReached(boolean maxRowsInJoinReached) {
_maxRowsInJoinReached = maxRowsInJoinReached;
}

@JsonProperty("partialResult")
public boolean isPartialResult() {
return _partialResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@
* Supports serialization via JSON.
*/
@JsonPropertyOrder({
"resultTable", "requestId", "stageStats", "exceptions", "numServersQueried", "numServersResponded",
"resultTable", "requestId", "stageStats", "brokerId", "exceptions", "numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
"numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
"numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
"realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs", "segmentStatistics", "traceInfo"
"numEntriesScannedPostFilter", "numGroupsLimitReached", "maxRowsInJoinReached", "totalDocs", "timeUsedMs",
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
"realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "brokerReduceTimeMs",
"segmentStatistics", "traceInfo", "partialResult"
})
public class BrokerResponseNativeV2 extends BrokerResponseNative {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@
// same metadataKey
// TODO: Replace member fields with a simple map of <MetadataKey, Object>
// TODO: Add a subStat field, stage level subStats will contain each operator stats
@JsonPropertyOrder({"brokerId", "requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs",
"stageExecutionUnit", "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
"numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
"numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
@JsonPropertyOrder({
"brokerId", "requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "stageExecutionUnit",
"stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded", "numSegmentsQueried",
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
"numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
"numGroupsLimitReached", "maxRowsInJoinReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
"realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "traceInfo", "operatorStats", "tableNames"})
"realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "traceInfo", "operatorStats", "tableNames"
})
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class BrokerResponseStats extends BrokerResponseNative {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class ExecutionStatsAggregator {
private long _explainPlanNumEmptyFilterSegments = 0L;
private long _explainPlanNumMatchAllFilterSegments = 0L;
private boolean _numGroupsLimitReached = false;
private boolean _maxRowsInJoinReached = false;
private int _numBlocks = 0;
private int _numRows = 0;
private long _stageExecutionTimeMs = 0;
Expand Down Expand Up @@ -250,6 +251,8 @@ public synchronized void aggregate(@Nullable ServerRoutingInstance routingInstan
_numGroupsLimitReached |=
Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));

_maxRowsInJoinReached |=
Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.MAX_ROWS_IN_JOIN_REACHED.getName()));

String numBlocksString = metadata.get(DataTable.MetadataKey.NUM_BLOCKS.getName());
if (numBlocksString != null) {
Expand Down Expand Up @@ -306,6 +309,7 @@ public void setStats(@Nullable String rawTableName, BrokerResponseNative brokerR
brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched);
brokerResponseNative.setTotalDocs(_numTotalDocs);
brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
brokerResponseNative.setMaxRowsInJoinReached(_maxRowsInJoinReached);
brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
brokerResponseNative.setRealtimeThreadCpuTimeNs(_realtimeThreadCpuTimeNs);
brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(_offlineSystemActivitiesCpuTimeNs);
Expand Down Expand Up @@ -369,6 +373,8 @@ public void setStageLevelStats(@Nullable String rawTableName, BrokerResponseStat

brokerResponseStats.setNumBlocks(_numBlocks);
brokerResponseStats.setNumRows(_numRows);
brokerResponseStats.setMaxRowsInJoinReached(_maxRowsInJoinReached);
brokerResponseStats.setNumGroupsLimitReached(_numGroupsLimitReached);
brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs);
brokerResponseStats.setStageExecutionUnit(_stageExecutionUnit);
brokerResponseStats.setTableNames(new ArrayList<>(_tableNames));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.calcite.sql.SqlKind;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.request.Literal;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
Expand Down Expand Up @@ -160,9 +160,9 @@ private TransferableBlock produceAggregatedBlock() {
} else {
TransferableBlock dataBlock = new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
if (_groupByExecutor.isNumGroupsLimitReached()) {
dataBlock.addException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE,
String.format("Reached numGroupsLimit of: %d for group-by, ignoring the extra groups",
_groupByExecutor.getNumGroupsLimit()));
OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
operatorStats.recordSingleStat(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true");
_inputOperator.earlyTerminate();
}
return dataBlock;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
Expand Down Expand Up @@ -110,7 +111,6 @@ public class HashJoinOperator extends MultiStageOperator {
private final JoinOverFlowMode _joinOverflowMode;

private int _currentRowsInHashTable = 0;
private ProcessingException _resourceLimitExceededException = null;

public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator leftTableOperator,
MultiStageOperator rightTableOperator, DataSchema leftSchema, JoinNode node) {
Expand Down Expand Up @@ -191,7 +191,7 @@ public String toExplainString() {
protected TransferableBlock getNextBlock() {
try {
if (_isTerminated) {
return setPartialResultExceptionToBlock(TransferableBlockUtils.getEndOfStreamTransferableBlock());
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
if (!_isHashTableBuilt) {
// Build JOIN hash table
Expand All @@ -202,7 +202,7 @@ protected TransferableBlock getNextBlock() {
}
TransferableBlock leftBlock = _leftTableOperator.nextBlock();
// JOIN each left block with the right block.
return setPartialResultExceptionToBlock(buildJoinedDataBlock(leftBlock));
return buildJoinedDataBlock(leftBlock);
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
}
Expand All @@ -215,16 +215,21 @@ private void buildBroadcastHashTable()
List<Object[]> container = rightBlock.getContainer();
// Row based overflow check.
if (container.size() + _currentRowsInHashTable > _maxRowsInHashTable) {
_resourceLimitExceededException =
new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
_resourceLimitExceededException.setMessage(
"Cannot build in memory hash table for join operator, reach number of rows limit: " + _maxRowsInHashTable);
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
throw _resourceLimitExceededException;
ProcessingException resourceLimitExceededException =
new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
resourceLimitExceededException.setMessage(
"Cannot build in memory hash table for join operator, reach number of rows limit: "
+ _maxRowsInHashTable);
throw resourceLimitExceededException;
} else {
// Just fill up the buffer.
int remainingRows = _maxRowsInHashTable - _currentRowsInHashTable;
container = container.subList(0, remainingRows);
OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
operatorStats.recordSingleStat(DataTable.MetadataKey.MAX_ROWS_IN_JOIN_REACHED.getName(), "true");
// setting only the rightTableOperator to be early terminated and awaits EOS block next.
_rightTableOperator.earlyTerminate();
}
}
// put all the rows into corresponding hash collections keyed by the key selector function.
Expand All @@ -238,10 +243,6 @@ private void buildBroadcastHashTable()
hashCollection.add(row);
}
_currentRowsInHashTable += container.size();
if (_currentRowsInHashTable == _maxRowsInHashTable) {
// setting only the rightTableOperator to be early terminated and awaits EOS block next.
_rightTableOperator.earlyTerminate();
}
rightBlock = _rightTableOperator.nextBlock();
}
if (rightBlock.isErrorBlock()) {
Expand Down Expand Up @@ -300,13 +301,6 @@ private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock) {
return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}

private TransferableBlock setPartialResultExceptionToBlock(TransferableBlock block) {
if (_resourceLimitExceededException != null) {
block.addException(_resourceLimitExceededException);
}
return block;
}

private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,25 @@
*/
package org.apache.pinot.query.runtime.operator;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.sql.SqlKind;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.AggregateNode.AggType;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
Expand Down Expand Up @@ -65,6 +73,12 @@ public void tearDown()
_mocks.close();
}

private static AbstractPlanNode.NodeHint getAggHints(Map<String, String> hintsMap) {
RelHint.Builder relHintBuilder = RelHint.builder(PinotHintOptions.AGGREGATE_HINT_OPTIONS);
hintsMap.forEach(relHintBuilder::hintOption);
return new AbstractPlanNode.NodeHint(ImmutableList.of(relHintBuilder.build()));
}

@Test
public void shouldHandleUpstreamErrorBlocks() {
// Given:
Expand Down Expand Up @@ -256,6 +270,42 @@ public void shouldReturnErrorBlockOnUnexpectedInputType() {
"expected it to fail with class cast exception");
}

@Test
public void shouldHandleGroupLimitExceed() {
// Given:
List<RexExpression> calls = ImmutableList.of(getSum(new RexExpression.InputRef(1)));
List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0));

DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, DOUBLE});
Mockito.when(_input.nextBlock())
.thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1.0}, new Object[]{3, 2.0}))
.thenReturn(OperatorTestUtil.block(inSchema, new Object[]{3, 3.0}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());

DataSchema outSchema = new DataSchema(new String[]{"group", "sum"}, new ColumnDataType[]{INT, DOUBLE});
OpChainExecutionContext context = OperatorTestUtil.getDefaultContext();
Map<String, String> hintsMap = ImmutableMap.of(PinotHintOptions.AggregateOptions.NUM_GROUPS_LIMIT, "1");
AggregateOperator operator =
new AggregateOperator(context, _input, outSchema, calls, group, AggType.DIRECT, Collections.singletonList(-1),
getAggHints(hintsMap));

// When:
TransferableBlock block1 = operator.nextBlock();
TransferableBlock block2 = operator.nextBlock();

// Then
Mockito.verify(_input).earlyTerminate();

// Then:
Assert.assertTrue(block1.getNumRows() == 1, "when group limit reach it should only return that many groups");
Assert.assertTrue(block2.isEndOfStreamBlock(), "Second block is EOS (done processing)");
String operatorId =
Joiner.on("_").join(AggregateOperator.class.getSimpleName(), context.getStageId(), context.getServer());
OperatorStats operatorStats = context.getStats().getOperatorStats(context, operatorId);
Assert.assertEquals(operatorStats.getExecutionStats().get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()),
"true");
}

private static RexExpression.FunctionCall getSum(RexExpression arg) {
return new RexExpression.FunctionCall(SqlKind.SUM, ColumnDataType.INT, "SUM", ImmutableList.of(arg));
}
Expand Down
Loading

0 comments on commit d177866

Please sign in to comment.