diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 21267de91364..ec97f44fe391 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -299,6 +299,8 @@ If the JVM does not support CPU time measurement for the current thread, `ingest |`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies| |`worker/task/assigned/count`|Number of tasks assigned to an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| |`worker/task/completed/count`|Number of tasks completed by an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| +|`worker/task/failed/count`|Number of tasks that failed on an indexer during the emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| +|`worker/task/success/count`|Number of tasks that succeeded on an indexer during the emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| |`worker/task/running/count`|Number of tasks running on an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| ## Shuffle metrics (Native parallel task) diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index ad065c63d39a..91d7c4c4abdf 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -74,8 +74,8 @@ "worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "worker/task/running/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" : "count" }, - "worker/task/failed/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" }, - "worker/task/success/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" }, + "worker/task/failed/count" : { "dimensions" : ["category", "workerVersion", "dataSource"], "type" : "count" }, + "worker/task/success/count" : { "dimensions" : ["category", "workerVersion", "dataSource"], "type" : "count" }, "worker/taskSlot/idle/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" }, "worker/taskSlot/total/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" }, "worker/taskSlot/used/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" }, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java index 729ac1d1617e..a1c131ad0f49 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java @@ -640,6 +640,22 @@ public Map getWorkerCompletedTasks() return getNumTasksPerDatasource(this.getCompletedTasks().values(), TaskAnnouncement::getTaskDataSource); } + @Override + public Map getWorkerFailedTasks() + { + return getNumTasksPerDatasource(completedTasks.entrySet().stream() + .filter(entry -> entry.getValue().getTaskStatus().isFailure()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource); + } + + @Override + public Map getWorkerSuccessfulTasks() + { + return getNumTasksPerDatasource(completedTasks.entrySet().stream() + .filter(entry -> entry.getValue().getTaskStatus().isSuccess()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource); + } + private static class TaskDetails { private final Task task; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 6b08be3a3c65..37839f8e0777 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -455,6 +455,19 @@ private NoopTask createNoopTask(String id, String dataSource) return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)); } + private NoopTask createNoopFailingTask(String id, String dataSource) + { + return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)) + { + @Override + public TaskStatus runTask(TaskToolbox toolbox) throws Exception + { + Thread.sleep(getRunTime()); + return TaskStatus.failure(getId(), "Failed to complete the task"); + } + }; + } + /** * Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for it to be complete. Common preamble * for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}. @@ -494,7 +507,7 @@ public void getWorkerTaskStatsTest() throws Exception Task task1 = createNoopTask("task1", "wikipedia"); Task task2 = createNoopTask("task2", "wikipedia"); - Task task3 = createNoopTask("task3", "animals"); + Task task3 = createNoopFailingTask("task3", "animals"); workerTaskManager.start(); // befor assigning tasks we should get no running tasks @@ -517,11 +530,19 @@ public void getWorkerTaskStatsTest() throws Exception Thread.sleep(10); } while (!runningTasks.isEmpty()); - // When running tasks are empty all task should be reported as completed + // When running tasks are empty all task should be reported as completed and + // one of the task for animals datasource should fail and other 2 tasks in + // the wikipedia datasource should succeed Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(), ImmutableMap.of( "wikipedia", 2L, "animals", 1L )); + Assert.assertEquals(workerTaskManager.getWorkerFailedTasks(), ImmutableMap.of( + "animals", 1L + )); + Assert.assertEquals(workerTaskManager.getWorkerSuccessfulTasks(), ImmutableMap.of( + "wikipedia", 2L + )); Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L); } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java index 735bc27abb3a..b38b461eb362 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java +++ b/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java @@ -41,4 +41,8 @@ public interface IndexerTaskCountStatsProvider * Map from datasource name to the number of completed tasks by the Indexer. */ Map getWorkerCompletedTasks(); + + Map getWorkerFailedTasks(); + + Map getWorkerSuccessfulTasks(); } diff --git a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java index d07311c1a462..bc09e95b5ce9 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java @@ -72,6 +72,8 @@ public boolean doMonitor(ServiceEmitter emitter) emit(emitter, "worker/task/running/count", indexerStatsProvider.getWorkerRunningTasks()); emit(emitter, "worker/task/assigned/count", indexerStatsProvider.getWorkerAssignedTasks()); emit(emitter, "worker/task/completed/count", indexerStatsProvider.getWorkerCompletedTasks()); + emit(emitter, "worker/task/failed/count", indexerStatsProvider.getWorkerFailedTasks()); + emit(emitter, "worker/task/success/count", indexerStatsProvider.getWorkerSuccessfulTasks()); } return true; } diff --git a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java index ff9fcffb8d99..ad00e5e6dbde 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java @@ -120,6 +120,24 @@ public Map getWorkerCompletedTasks() "metrics", 9L ); } + + @Override + public Map getWorkerFailedTasks() + { + return ImmutableMap.of( + "movies", 4L, + "games", 6L + ); + } + + @Override + public Map getWorkerSuccessfulTasks() + { + return ImmutableMap.of( + "games", 23L, + "inventory", 89L + ); + } }; nullStatsProvider = new WorkerTaskCountStatsProvider() @@ -239,7 +257,7 @@ public void testMonitorIndexer() new WorkerTaskCountStatsMonitor(injectorForIndexer, ImmutableSet.of(NodeRole.INDEXER)); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(6, emitter.getEvents().size()); + Assert.assertEquals(10, emitter.getEvents().size()); emitter.verifyValue( "worker/task/running/count", ImmutableMap.of("dataSource", "wikipedia"), @@ -270,6 +288,26 @@ public void testMonitorIndexer() ImmutableMap.of("dataSource", "metrics"), 9L ); + emitter.verifyValue( + "worker/task/failed/count", + ImmutableMap.of("dataSource", "movies"), + 4L + ); + emitter.verifyValue( + "worker/task/failed/count", + ImmutableMap.of("dataSource", "games"), + 6L + ); + emitter.verifyValue( + "worker/task/success/count", + ImmutableMap.of("dataSource", "games"), + 23L + ); + emitter.verifyValue( + "worker/task/success/count", + ImmutableMap.of("dataSource", "inventory"), + 89L + ); } @Test public void testMonitorWithNulls()