Skip to content

Commit

Permalink
Add more information in RequestContext class (apache#11708)
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 authored Oct 15, 2023
1 parent cf939ac commit 36be75f
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,19 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons
statistics.setOfflineTotalCpuTimeNs(response.getOfflineTotalCpuTimeNs());
statistics.setRealtimeTotalCpuTimeNs(response.getRealtimeTotalCpuTimeNs());
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
statistics.setNumConsumingSegmentsQueried(response.getNumConsumingSegmentsQueried());
statistics.setNumConsumingSegmentsProcessed(response.getNumConsumingSegmentsProcessed());
statistics.setNumConsumingSegmentsMatched(response.getNumConsumingSegmentsMatched());
statistics.setMinConsumingFreshnessTimeMs(response.getMinConsumingFreshnessTimeMs());
statistics.setNumSegmentsPrunedByBroker(response.getNumSegmentsPrunedByBroker());
statistics.setNumSegmentsPrunedByServer(response.getNumSegmentsPrunedByServer());
statistics.setNumSegmentsPrunedInvalid(response.getNumSegmentsPrunedInvalid());
statistics.setNumSegmentsPrunedByLimit(response.getNumSegmentsPrunedByLimit());
statistics.setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue());
statistics.setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments());
statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
statistics.setProcessingExceptions(response.getProcessingExceptions().stream().map(Object::toString).collect(
Collectors.toList()));
}

private String getGlobalQueryId(long requestId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S

DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();
Set<String> tableNames = queryPlanResult.getTableNames();
requestContext.setTableNames(List.copyOf(tableNames));

// Compilation Time. This includes the time taken for parsing, compiling, create stage plans and assigning workers.
long compilationEndTimeNs = System.nanoTime();
Expand Down Expand Up @@ -235,6 +236,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - compilationStartTimeNs));
brokerResponse.setTimeUsedMs(totalTimeMs);
requestContext.setQueryProcessingTime(totalTimeMs);
requestContext.setTraceInfo(brokerResponse.getTraceInfo());
augmentStatistics(requestContext, brokerResponse);

// Log query and stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
_brokerReduceService.reduceOnDataTable(originalBrokerRequest, serverBrokerRequest, dataTableMap,
reduceTimeOutMs, _brokerMetrics);
final long reduceTimeNanos = System.nanoTime() - reduceStartTimeNs;
requestContext.setTraceInfo(brokerResponse.getTraceInfo());
requestContext.setReduceTimeNanos(reduceTimeNanos);
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE, reduceTimeNanos);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pinot.spi.trace;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;


Expand All @@ -32,7 +36,7 @@ public class DefaultRequestContext implements RequestScope {

private int _errorCode = 0;
private String _query;
private String _tableName = DEFAULT_TABLE_NAME;
private List<String> _tableNames = new ArrayList<>();
private long _processingTimeMillis = -1;
private long _totalDocs;
private long _numDocsScanned;
Expand Down Expand Up @@ -63,6 +67,19 @@ public class DefaultRequestContext implements RequestScope {

private FanoutType _fanoutType;
private int _numUnavailableSegments;
private long _numConsumingSegmentsQueried;
private long _numConsumingSegmentsProcessed;
private long _numConsumingSegmentsMatched;
private long _minConsumingFreshnessTimeMs;
private long _numSegmentsPrunedByBroker;
private long _numSegmentsPrunedByServer;
private long _numSegmentsPrunedInvalid;
private long _numSegmentsPrunedByLimit;
private long _numSegmentsPrunedByValue;
private long _explainPlanNumEmptyFilterSegments;
private long _explainPlanNumMatchAllFilterSegments;
private Map<String, String> _traceInfo = new HashMap<>();
private List<String> _processingExceptions = new ArrayList<>();

public DefaultRequestContext() {
}
Expand Down Expand Up @@ -169,7 +186,12 @@ public void setQuery(String query) {

@Override
public void setTableName(String tableName) {
_tableName = tableName;
_tableNames.add(tableName);
}

@Override
public void setTableNames(List<String> tableNames) {
_tableNames.addAll(tableNames);
}

@Override
Expand Down Expand Up @@ -239,7 +261,15 @@ public String getQuery() {

@Override
public String getTableName() {
return _tableName;
if (_tableNames.size() == 0) {
return DEFAULT_TABLE_NAME;
}
return _tableNames.get(0);
}

@Override
public List<String> getTableNames() {
return _tableNames;
}

@Override
Expand Down Expand Up @@ -314,7 +344,7 @@ public int getNumExceptions() {

@Override
public boolean hasValidTableName() {
return !DEFAULT_TABLE_NAME.equals(_tableName);
return !_tableNames.isEmpty();
}

@Override
Expand Down Expand Up @@ -402,6 +432,136 @@ public void setReduceTimeMillis(long reduceTimeMillis) {
_reduceTimeMillis = reduceTimeMillis;
}

@Override
public long getNumConsumingSegmentsQueried() {
return _numConsumingSegmentsQueried;
}

@Override
public void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried) {
_numConsumingSegmentsQueried = numConsumingSegmentsQueried;
}

@Override
public long getNumConsumingSegmentsProcessed() {
return _numConsumingSegmentsProcessed;
}

@Override
public void setNumConsumingSegmentsProcessed(long numConsumingSegmentsProcessed) {
_numConsumingSegmentsProcessed = numConsumingSegmentsProcessed;
}

@Override
public long getNumConsumingSegmentsMatched() {
return _numConsumingSegmentsMatched;
}

@Override
public void setNumConsumingSegmentsMatched(long numConsumingSegmentsMatched) {
_numConsumingSegmentsMatched = numConsumingSegmentsMatched;
}

@Override
public long getMinConsumingFreshnessTimeMs() {
return _minConsumingFreshnessTimeMs;
}

@Override
public void setMinConsumingFreshnessTimeMs(long minConsumingFreshnessTimeMs) {
_minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs;
}

@Override
public long getNumSegmentsPrunedByBroker() {
return _numSegmentsPrunedByBroker;
}

@Override
public void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker) {
_numSegmentsPrunedByBroker = numSegmentsPrunedByBroker;
}

@Override
public long getNumSegmentsPrunedByServer() {
return _numSegmentsPrunedByServer;
}

@Override
public void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer) {
_numSegmentsPrunedByServer = numSegmentsPrunedByServer;
}

@Override
public long getNumSegmentsPrunedInvalid() {
return _numSegmentsPrunedInvalid;
}

@Override
public void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid) {
_numSegmentsPrunedInvalid = numSegmentsPrunedInvalid;
}

@Override
public long getNumSegmentsPrunedByLimit() {
return _numSegmentsPrunedByLimit;
}

@Override
public void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit) {
_numSegmentsPrunedByLimit = numSegmentsPrunedByLimit;
}

@Override
public long getNumSegmentsPrunedByValue() {
return _numSegmentsPrunedByValue;
}

@Override
public void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue) {
_numSegmentsPrunedByValue = numSegmentsPrunedByValue;
}

@Override
public long getExplainPlanNumEmptyFilterSegments() {
return _explainPlanNumEmptyFilterSegments;
}

@Override
public void setExplainPlanNumEmptyFilterSegments(long explainPlanNumEmptyFilterSegments) {
_explainPlanNumEmptyFilterSegments = explainPlanNumEmptyFilterSegments;
}

@Override
public long getExplainPlanNumMatchAllFilterSegments() {
return _explainPlanNumMatchAllFilterSegments;
}

@Override
public void setExplainPlanNumMatchAllFilterSegments(long explainPlanNumMatchAllFilterSegments) {
_explainPlanNumMatchAllFilterSegments = explainPlanNumMatchAllFilterSegments;
}

@Override
public Map<String, String> getTraceInfo() {
return _traceInfo;
}

@Override
public void setTraceInfo(Map<String, String> traceInfo) {
_traceInfo.putAll(traceInfo);
}

@Override
public List<String> getProcessingExceptions() {
return _processingExceptions;
}

@Override
public void setProcessingExceptions(List<String> processingExceptions) {
_processingExceptions.addAll(processingExceptions);
}

@Override
public void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pinot.spi.trace;

import java.util.List;
import java.util.Map;


public interface RequestContext {
long getOfflineSystemActivitiesCpuTimeNs();

Expand Down Expand Up @@ -65,6 +69,8 @@ default boolean isSampledRequest() {

void setTableName(String tableName);

void setTableNames(List<String> tableNames);

void setQueryProcessingTime(long processingTimeMillis);

void setBrokerId(String brokerId);
Expand Down Expand Up @@ -93,6 +99,8 @@ default boolean isSampledRequest() {

String getTableName();

List<String> getTableNames();

long getProcessingTimeMillis();

long getTotalDocs();
Expand Down Expand Up @@ -157,6 +165,58 @@ default boolean isSampledRequest() {

void setReduceTimeMillis(long reduceTimeMillis);

long getNumConsumingSegmentsQueried();

void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried);

long getNumConsumingSegmentsProcessed();

void setNumConsumingSegmentsProcessed(long numConsumingSegmentsProcessed);

long getNumConsumingSegmentsMatched();

void setNumConsumingSegmentsMatched(long numConsumingSegmentsMatched);

long getMinConsumingFreshnessTimeMs();

void setMinConsumingFreshnessTimeMs(long minConsumingFreshnessTimeMs);

long getNumSegmentsPrunedByBroker();

void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker);

long getNumSegmentsPrunedByServer();

void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer);

long getNumSegmentsPrunedInvalid();

void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid);

long getNumSegmentsPrunedByLimit();

void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit);

long getNumSegmentsPrunedByValue();

void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue);

long getExplainPlanNumEmptyFilterSegments();

void setExplainPlanNumEmptyFilterSegments(long explainPlanNumEmptyFilterSegments);

long getExplainPlanNumMatchAllFilterSegments();

void setExplainPlanNumMatchAllFilterSegments(long explainPlanNumMatchAllFilterSegments);

Map<String, String> getTraceInfo();

void setTraceInfo(Map<String, String> traceInfo);

List<String> getProcessingExceptions();

void setProcessingExceptions(List<String> processingExceptions);

enum FanoutType {
OFFLINE, REALTIME, HYBRID
}
Expand Down

0 comments on commit 36be75f

Please sign in to comment.