Skip to content

Commit

Permalink
Broker Query Timeout Metric (apache#11892)
Browse files Browse the repository at this point in the history
  • Loading branch information
suddendust authored Nov 3, 2023
1 parent cf41a0e commit bbeb1da
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ rules:
cache: true
labels:
table: "$1"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", name=\"pinot.broker.([^\\.]*?)\\.brokerResponsesWithTimeouts\"><>(\\w+)"
name: "pinot_broker_brokerResponsesWithTimeouts_$2"
cache: true
labels:
table: "$1"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", name=\"pinot.broker.([^\\.]*?)\\.noServerFoundExceptions\"><>(\\w+)"
name: "pinot_broker_noServerFoundExceptions_$2"
cache: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
Expand Down Expand Up @@ -81,13 +82,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final MailboxService _mailboxService;
private final QueryDispatcher _queryDispatcher;


public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics,
BrokerQueryEventListener brokerQueryEventListener) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache,
brokerMetrics, brokerQueryEventListener);
public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics, BrokerQueryEventListener brokerQueryEventListener) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics,
brokerQueryEventListener);
LOGGER.info("Using Multi-stage BrokerRequestHandler.");
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
Expand Down Expand Up @@ -190,6 +189,13 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
try {
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions,
stageIdStatsMap);
} catch (TimeoutException e) {
for (String table : tableNames) {
_brokerMetrics.addMeteredTableValue(table, BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS, 1);
}
LOGGER.warn("Timed out executing request {}: {}", requestId, query);
requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE);
return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
} catch (Throwable t) {
String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(t);
LOGGER.error("Caught exception executing request {}: {}, {}", requestId, query, consolidatedMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
import org.apache.pinot.core.transport.AsyncQueryResponse;
import org.apache.pinot.core.transport.QueryResponse;
import org.apache.pinot.core.transport.QueryRouter;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
Expand Down Expand Up @@ -116,6 +117,9 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
realtimeBrokerRequest, realtimeRoutingTable, timeoutMs);
_failureDetector.notifyQuerySubmitted(asyncQueryResponse);
Map<ServerRoutingInstance, ServerResponse> finalResponses = asyncQueryResponse.getFinalResponses();
if (asyncQueryResponse.getStatus() == QueryResponse.Status.TIMED_OUT) {
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS, 1);
}
_failureDetector.notifyQueryFinished(asyncQueryResponse);
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.SCATTER_GATHER,
System.nanoTime() - scatterGatherStartTimeNs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
// This metric track the number of broker responses with not all servers responded.
// (numServersQueried > numServersResponded)
BROKER_RESPONSES_WITH_PARTIAL_SERVERS_RESPONDED("badResponses", false),

BROKER_RESPONSES_WITH_TIMEOUTS("badResponses", false),

// This metric track the number of broker responses with number of groups limit reached (potential bad responses).
BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED("badResponses", false),

Expand Down

0 comments on commit bbeb1da

Please sign in to comment.