Skip to content

Commit

Permalink
Adding `"segment/scan/active" metric for processing thread pool. (#15060
Browse files Browse the repository at this point in the history
)
  • Loading branch information
cryptoe authored Sep 29, 2023
1 parent ebb9724 commit 2f1bcd6
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 1 deletion.
4 changes: 4 additions & 0 deletions docs/design/extensions-contrib/dropwizard.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ Latest default metrics mapping can be found [here] (https://github.com/apache/dr
"dimensions": [],
"type": "gauge"
},
"segment/scan/active": {
"dimensions": [],
"type": "gauge"
},
"query/segmentAndCache/time": {
"dimensions": [],
"type": "timer",
Expand Down
2 changes: 2 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/segment/time`|Milliseconds taken to query individual segment. Includes time to page in the segment from disk.|`id`, `status`, `segment`, `vectorized`.|several hundred milliseconds|
|`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|`id`, `segment`|< several hundred milliseconds|
|`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0|
|`segment/scan/active`|Number of segments currently scanned. This metric also indicates how many threads from `druid.processing.numThreads` are currently being used.||Close to `druid.processing.numThreads`|
|`query/segmentAndCache/time`|Milliseconds taken to query individual segment or hit the cache (if it is enabled on the Historical process).|`id`, `segment`|several hundred milliseconds|
|`query/cpu/time`|Microseconds of CPU time taken to complete a query.|<p>Common: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.</p><p> Aggregation Queries: `numMetrics`, `numComplexMetrics`.</p><p> GroupBy: `numDimensions`.</p><p> TopN: `threshold`, `dimension`.</p>|Varies|
|`query/count`|Total number of queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
Expand All @@ -104,6 +105,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/time`|Milliseconds taken to complete a query.|<p>Common: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.</p><p> Aggregation Queries: `numMetrics`, `numComplexMetrics`.</p><p> GroupBy: `numDimensions`.</p><p> TopN: `threshold`, `dimension`.</p>|< 1s|
|`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|`id`, `segment`|several hundred milliseconds|
|`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0|
|`segment/scan/active`|Number of segments currently scanned. This metric also indicates how many threads from `druid.processing.numThreads` are currently being used.||Close to `druid.processing.numThreads`|
|`query/cpu/time`|Microseconds of CPU time taken to complete a query.|<p>Common: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.</p><p> Aggregation Queries: `numMetrics`, `numComplexMetrics`.</p><p> GroupBy: `numDimensions`. </p><p>TopN: `threshold`, `dimension`.</p>|Varies|
|`query/count`|Number of total queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/success/count`|Number of queries successfully processed.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"segment/loadQueue/failed": [],
"segment/loadQueue/size": [],
"segment/scan/pending": [],
"segment/scan/active": [],
"segment/size": [
"dataSource"
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
"dimensions": [],
"type": "gauge"
},
"segment/scan/active": {
"dimensions": [],
"type": "gauge"
},
"query/segmentAndCache/time": {
"dimensions": [],
"type": "timer",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"segment/loadQueue/failed": [],
"segment/loadQueue/size": [],
"segment/scan/pending": [],
"segment/scan/active": [],
"segment/size": [],
"segment/usedPercent": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"query/segment/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual segment. Includes time to page in the segment from disk."},
"query/wait/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds spent waiting for a segment to be scanned."},
"segment/scan/pending" : { "dimensions" : [], "type" : "gauge", "help": "Number of segments in queue waiting to be scanned."},
"segment/scan/active" : { "dimensions" : [], "type" : "gauge", "help": "Number of segments currently scanned."},
"query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual segment or hit the cache (if it is enabled on the Historical process)."},
"query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": "1000000", "help": "Seconds of CPU time taken to complete a query"},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"query/segment/time" : { "dimensions" : [], "type" : "timer"},
"query/wait/time" : { "dimensions" : [], "type" : "timer"},
"segment/scan/pending" : { "dimensions" : [], "type" : "gauge"},
"segment/scan/active" : { "dimensions" : [], "type" : "gauge"},
"query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer" },
"query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer" },

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void emitMetrics(ServiceEmitter emitter, ServiceMetricEvent.Builder metri
{
if (delegate() instanceof PrioritizedExecutorService) {
emitter.emit(metricBuilder.setMetric("segment/scan/pending", ((PrioritizedExecutorService) delegate()).getQueueSize()));
emitter.emit(metricBuilder.setMetric("segment/scan/active", ((PrioritizedExecutorService) delegate()).getActiveTasks()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ public int getQueueSize()
{
return delegateQueue.size();
}

/**
* Returns the approximate number of tasks being run by the thread pool currently.
*/
public int getActiveTasks()
{
return threadPoolExecutor.getActiveCount();
}
}

class PrioritizedListenableFutureTask<V> implements RunnableFuture<V>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public void testPrioritizedExecutorDelegate()
{
PrioritizedExecutorService service = Mockito.mock(PrioritizedExecutorService.class);
Mockito.when(service.getQueueSize()).thenReturn(10);
Mockito.when(service.getActiveTasks()).thenReturn(2);
ExecutorServiceMonitor monitor = new ExecutorServiceMonitor();
List<Event> events = new ArrayList<>();
MetricsEmittingQueryProcessingPool processingPool = new MetricsEmittingQueryProcessingPool(service, monitor);
Expand All @@ -53,9 +54,11 @@ public void emit(Event event)
}
};
monitor.doMonitor(serviceEmitter);
Assert.assertEquals(1, events.size());
Assert.assertEquals(2, events.size());
Assert.assertEquals(((ServiceMetricEvent) (events.get(0))).getMetric(), "segment/scan/pending");
Assert.assertEquals(((ServiceMetricEvent) (events.get(0))).getValue(), 10);
Assert.assertEquals(((ServiceMetricEvent) (events.get(1))).getMetric(), "segment/scan/active");
Assert.assertEquals(((ServiceMetricEvent) (events.get(1))).getValue(), 2);
}

@Test
Expand Down

0 comments on commit 2f1bcd6

Please sign in to comment.