Skip to content

Commit

Permalink
Add indexer task success and failure metrics (#16829)
Browse files Browse the repository at this point in the history
This PR adds indexer-level task metrics-

"indexer/task/failed/count"
"indexer/task/success/count"

the current "worker/task/completed/count" metric shows all the tasks completed irrespective of success or failure status so these metrics would help us get more visibility into the status of the completed tasks
  • Loading branch information
rbankar7 authored Aug 5, 2024
1 parent c84e689 commit c8323d1
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 5 deletions.
2 changes: 2 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ If the JVM does not support CPU time measurement for the current thread, `ingest
|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies|
|`worker/task/assigned/count`|Number of tasks assigned to an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/completed/count`|Number of tasks completed by an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/failed/count`|Number of tasks that failed on an indexer during the emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/success/count`|Number of tasks that succeeded on an indexer during the emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/running/count`|Number of tasks running on an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|

## Shuffle metrics (Native parallel task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@
"worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"worker/task/running/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"worker/task/failed/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" },
"worker/task/success/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" },
"worker/task/failed/count" : { "dimensions" : ["category", "workerVersion", "dataSource"], "type" : "count" },
"worker/task/success/count" : { "dimensions" : ["category", "workerVersion", "dataSource"], "type" : "count" },
"worker/taskSlot/idle/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },
"worker/taskSlot/total/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },
"worker/taskSlot/used/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,22 @@ public Map<String, Long> getWorkerCompletedTasks()
return getNumTasksPerDatasource(this.getCompletedTasks().values(), TaskAnnouncement::getTaskDataSource);
}

@Override
public Map<String, Long> getWorkerFailedTasks()
{
return getNumTasksPerDatasource(completedTasks.entrySet().stream()
.filter(entry -> entry.getValue().getTaskStatus().isFailure())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
}

@Override
public Map<String, Long> getWorkerSuccessfulTasks()
{
return getNumTasksPerDatasource(completedTasks.entrySet().stream()
.filter(entry -> entry.getValue().getTaskStatus().isSuccess())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
}

private static class TaskDetails
{
private final Task task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,19 @@ private NoopTask createNoopTask(String id, String dataSource)
return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
}

private NoopTask createNoopFailingTask(String id, String dataSource)
{
return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0))
{
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
Thread.sleep(getRunTime());
return TaskStatus.failure(getId(), "Failed to complete the task");
}
};
}

/**
* Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for it to be complete. Common preamble
* for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}.
Expand Down Expand Up @@ -494,7 +507,7 @@ public void getWorkerTaskStatsTest() throws Exception

Task task1 = createNoopTask("task1", "wikipedia");
Task task2 = createNoopTask("task2", "wikipedia");
Task task3 = createNoopTask("task3", "animals");
Task task3 = createNoopFailingTask("task3", "animals");

workerTaskManager.start();
// befor assigning tasks we should get no running tasks
Expand All @@ -517,11 +530,19 @@ public void getWorkerTaskStatsTest() throws Exception
Thread.sleep(10);
} while (!runningTasks.isEmpty());

// When running tasks are empty all task should be reported as completed
// When running tasks are empty all task should be reported as completed and
// one of the task for animals datasource should fail and other 2 tasks in
// the wikipedia datasource should succeed
Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(), ImmutableMap.of(
"wikipedia", 2L,
"animals", 1L
));
Assert.assertEquals(workerTaskManager.getWorkerFailedTasks(), ImmutableMap.of(
"animals", 1L
));
Assert.assertEquals(workerTaskManager.getWorkerSuccessfulTasks(), ImmutableMap.of(
"wikipedia", 2L
));
Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ public interface IndexerTaskCountStatsProvider
* Map from datasource name to the number of completed tasks by the Indexer.
*/
Map<String, Long> getWorkerCompletedTasks();

Map<String, Long> getWorkerFailedTasks();

Map<String, Long> getWorkerSuccessfulTasks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public boolean doMonitor(ServiceEmitter emitter)
emit(emitter, "worker/task/running/count", indexerStatsProvider.getWorkerRunningTasks());
emit(emitter, "worker/task/assigned/count", indexerStatsProvider.getWorkerAssignedTasks());
emit(emitter, "worker/task/completed/count", indexerStatsProvider.getWorkerCompletedTasks());
emit(emitter, "worker/task/failed/count", indexerStatsProvider.getWorkerFailedTasks());
emit(emitter, "worker/task/success/count", indexerStatsProvider.getWorkerSuccessfulTasks());
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,24 @@ public Map<String, Long> getWorkerCompletedTasks()
"metrics", 9L
);
}

@Override
public Map<String, Long> getWorkerFailedTasks()
{
return ImmutableMap.of(
"movies", 4L,
"games", 6L
);
}

@Override
public Map<String, Long> getWorkerSuccessfulTasks()
{
return ImmutableMap.of(
"games", 23L,
"inventory", 89L
);
}
};

nullStatsProvider = new WorkerTaskCountStatsProvider()
Expand Down Expand Up @@ -239,7 +257,7 @@ public void testMonitorIndexer()
new WorkerTaskCountStatsMonitor(injectorForIndexer, ImmutableSet.of(NodeRole.INDEXER));
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
monitor.doMonitor(emitter);
Assert.assertEquals(6, emitter.getEvents().size());
Assert.assertEquals(10, emitter.getEvents().size());
emitter.verifyValue(
"worker/task/running/count",
ImmutableMap.of("dataSource", "wikipedia"),
Expand Down Expand Up @@ -270,6 +288,26 @@ public void testMonitorIndexer()
ImmutableMap.of("dataSource", "metrics"),
9L
);
emitter.verifyValue(
"worker/task/failed/count",
ImmutableMap.of("dataSource", "movies"),
4L
);
emitter.verifyValue(
"worker/task/failed/count",
ImmutableMap.of("dataSource", "games"),
6L
);
emitter.verifyValue(
"worker/task/success/count",
ImmutableMap.of("dataSource", "games"),
23L
);
emitter.verifyValue(
"worker/task/success/count",
ImmutableMap.of("dataSource", "inventory"),
89L
);
}
@Test
public void testMonitorWithNulls()
Expand Down

0 comments on commit c8323d1

Please sign in to comment.