From d85d1b11e5040f42c39f2e341fea0270c6c7b916 Mon Sep 17 00:00:00 2001 From: Hardik Bajaj <58038410+hardikbajaj@users.noreply.github.com> Date: Wed, 4 Sep 2024 17:38:22 +0530 Subject: [PATCH] Adding `"segment/scan/active" metric for processing thread pool. (#15060) (#42) (#224) Co-authored-by: Karan Kumar --- docs/design/extensions-contrib/dropwizard.md | 4 ++++ docs/operations/metrics.md | 2 ++ .../src/main/resources/defaultWhiteListMap.json | 1 + .../src/main/resources/defaultMetricDimensions.json | 4 ++++ .../src/main/resources/defaultWhiteListMap.json | 1 + .../src/main/resources/defaultMetrics.json | 1 + .../src/main/resources/defaultMetricDimensions.json | 1 + .../druid/query/MetricsEmittingQueryProcessingPool.java | 1 + .../apache/druid/query/PrioritizedExecutorService.java | 8 ++++++++ .../query/MetricsEmittingQueryProcessingPoolTest.java | 5 ++++- 10 files changed, 27 insertions(+), 1 deletion(-) diff --git a/docs/design/extensions-contrib/dropwizard.md b/docs/design/extensions-contrib/dropwizard.md index a2a8c34d6eaa..bb139a945c06 100644 --- a/docs/design/extensions-contrib/dropwizard.md +++ b/docs/design/extensions-contrib/dropwizard.md @@ -137,6 +137,10 @@ Latest default metrics mapping can be found [here] (https://github.com/apache/dr "dimensions": [], "type": "gauge" }, + "segment/scan/active": { + "dimensions": [], + "type": "gauge" + }, "query/segmentAndCache/time": { "dimensions": [], "type": "timer", diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 92fdad86d7f4..aaa137d2cfdd 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -74,6 +74,7 @@ Metrics may have additional dimensions beyond those listed above. |`query/segment/time`|Milliseconds taken to query individual segment. Includes time to page in the segment from disk.|`id`, `status`, `segment`, `vectorized`.|several hundred milliseconds| |`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|`id`, `segment`|< several hundred milliseconds| |`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0| +|`segment/scan/active`|Number of segments currently scanned. This metric also indicates how many threads from `druid.processing.numThreads` are currently being used.||Close to `druid.processing.numThreads`| |`query/segmentAndCache/time`|Milliseconds taken to query individual segment or hit the cache (if it is enabled on the Historical process).|`id`, `segment`|several hundred milliseconds| |`query/cpu/time`|Microseconds of CPU time taken to complete a query.|

Common: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.

Aggregation Queries: `numMetrics`, `numComplexMetrics`.

GroupBy: `numDimensions`.

TopN: `threshold`, `dimension`.

|Varies| |`query/count`|Total number of queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| @@ -90,6 +91,7 @@ Metrics may have additional dimensions beyond those listed above. |`query/time`|Milliseconds taken to complete a query.|

Common: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.

Aggregation Queries: `numMetrics`, `numComplexMetrics`.

GroupBy: `numDimensions`.

TopN: `threshold`, `dimension`.

|< 1s| |`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|`id`, `segment`|several hundred milliseconds| |`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0| +|`segment/scan/active`|Number of segments currently scanned. This metric also indicates how many threads from `druid.processing.numThreads` are currently being used.||Close to `druid.processing.numThreads`| |`query/cpu/time`|Microseconds of CPU time taken to complete a query.|

Common: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.

Aggregation Queries: `numMetrics`, `numComplexMetrics`.

GroupBy: `numDimensions`.

TopN: `threshold`, `dimension`.

|Varies| |`query/count`|Number of total queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/success/count`|Number of queries successfully processed.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/resources/defaultWhiteListMap.json b/extensions-contrib/ambari-metrics-emitter/src/main/resources/defaultWhiteListMap.json index ea31beed2a0f..8ac86ed46fc7 100644 --- a/extensions-contrib/ambari-metrics-emitter/src/main/resources/defaultWhiteListMap.json +++ b/extensions-contrib/ambari-metrics-emitter/src/main/resources/defaultWhiteListMap.json @@ -58,6 +58,7 @@ "segment/loadQueue/failed": [], "segment/loadQueue/size": [], "segment/scan/pending": [], + "segment/scan/active": [], "segment/size": [ "dataSource" ], diff --git a/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json index 950d2638b000..849162c5ff33 100644 --- a/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json @@ -42,6 +42,10 @@ "dimensions": [], "type": "gauge" }, + "segment/scan/active": { + "dimensions": [], + "type": "gauge" + }, "query/segmentAndCache/time": { "dimensions": [], "type": "timer", diff --git a/extensions-contrib/graphite-emitter/src/main/resources/defaultWhiteListMap.json b/extensions-contrib/graphite-emitter/src/main/resources/defaultWhiteListMap.json index 87cbd8951653..44bd5ef8db9e 100644 --- a/extensions-contrib/graphite-emitter/src/main/resources/defaultWhiteListMap.json +++ b/extensions-contrib/graphite-emitter/src/main/resources/defaultWhiteListMap.json @@ -43,6 +43,7 @@ "segment/loadQueue/failed": [], "segment/loadQueue/size": [], "segment/scan/pending": [], + "segment/scan/active": [], "segment/size": [], "segment/usedPercent": [] } diff --git a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json index 88fa4347728c..b08201c56307 100644 --- a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json +++ b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json @@ -10,6 +10,7 @@ "query/segment/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual segment. Includes time to page in the segment from disk."}, "query/wait/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds spent waiting for a segment to be scanned."}, "segment/scan/pending" : { "dimensions" : [], "type" : "gauge", "help": "Number of segments in queue waiting to be scanned."}, + "segment/scan/active" : { "dimensions" : [], "type" : "gauge", "help": "Number of segments currently scanned."}, "query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual segment or hit the cache (if it is enabled on the Historical process)."}, "query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": "1000000", "help": "Seconds of CPU time taken to complete a query"}, diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 410c4c7620b8..af90b2638142 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -9,6 +9,7 @@ "query/segment/time" : { "dimensions" : [], "type" : "timer"}, "query/wait/time" : { "dimensions" : [], "type" : "timer"}, "segment/scan/pending" : { "dimensions" : [], "type" : "gauge"}, + "segment/scan/active" : { "dimensions" : [], "type" : "gauge"}, "query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer" }, "query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer" }, diff --git a/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java b/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java index 4047d7977f91..a72dfc7fdc6c 100644 --- a/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java +++ b/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java @@ -41,6 +41,7 @@ public void emitMetrics(ServiceEmitter emitter, ServiceMetricEvent.Builder metri { if (delegate() instanceof PrioritizedExecutorService) { emitter.emit(metricBuilder.build("segment/scan/pending", ((PrioritizedExecutorService) delegate()).getQueueSize())); + emitter.emit(metricBuilder.build("segment/scan/active", ((PrioritizedExecutorService) delegate()).getActiveTasks())); } } diff --git a/processing/src/main/java/org/apache/druid/query/PrioritizedExecutorService.java b/processing/src/main/java/org/apache/druid/query/PrioritizedExecutorService.java index 6781df609654..02636df107ff 100644 --- a/processing/src/main/java/org/apache/druid/query/PrioritizedExecutorService.java +++ b/processing/src/main/java/org/apache/druid/query/PrioritizedExecutorService.java @@ -203,6 +203,14 @@ public int getQueueSize() { return delegateQueue.size(); } + + /** + * Returns the approximate number of tasks being run by the thread pool currently. + */ + public int getActiveTasks() + { + return threadPoolExecutor.getActiveCount(); + } } class PrioritizedListenableFutureTask implements RunnableFuture, diff --git a/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java b/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java index 71b3a1c9e0f3..cb9914293dc6 100644 --- a/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java +++ b/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java @@ -38,6 +38,7 @@ public void testPrioritizedExecutorDelegate() { PrioritizedExecutorService service = Mockito.mock(PrioritizedExecutorService.class); Mockito.when(service.getQueueSize()).thenReturn(10); + Mockito.when(service.getActiveTasks()).thenReturn(2); ExecutorServiceMonitor monitor = new ExecutorServiceMonitor(); List events = new ArrayList<>(); MetricsEmittingQueryProcessingPool processingPool = new MetricsEmittingQueryProcessingPool(service, monitor); @@ -52,9 +53,11 @@ public void emit(Event event) } }; monitor.doMonitor(serviceEmitter); - Assert.assertEquals(1, events.size()); + Assert.assertEquals(2, events.size()); Assert.assertEquals(((ServiceMetricEvent) (events.get(0))).getMetric(), "segment/scan/pending"); Assert.assertEquals(((ServiceMetricEvent) (events.get(0))).getValue(), 10); + Assert.assertEquals(((ServiceMetricEvent) (events.get(1))).getMetric(), "segment/scan/active"); + Assert.assertEquals(((ServiceMetricEvent) (events.get(1))).getValue(), 2); } @Test