-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add worker status and duration metrics in live and task reports #15180
Add worker status and duration metrics in live and task reports #15180
Conversation
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @gargvishesh. Left an initial review.
Should be good to go once the comments are addressed.
{ | ||
final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>(); | ||
|
||
for (Map.Entry<String, TaskTracker> taskEntry : taskTrackers.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final Map<String, TaskTracker> taskTrackers = new LinkedHashMap<>();
was only called from the main worker loop until now.
WIth this change, the taskTrackers can be called from the jetty thread or the main controller impl thread.
So we should either make TaskTracker thread safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Now wrapped in Collections.synchronizedMap()
. Believe this won't have any significant impact on performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets use a concurrentHashMap ?
Returns a synchronized (thread-safe) map backed by the specified map. In order to guarantee serial access, it is critical that all access to the backing map is accomplished through the returned map.
It is imperative that the user manually synchronize on the returned map when traversing any of its collection views via Iterator, Spliterator or Stream:
Map m = Collections.synchronizedMap(new HashMap());
...
Set s = m.keySet(); // Needn't be in synchronized block
...
synchronized (m) { // Synchronizing on m, not s!
Iterator i = s.iterator(); // Must be in synchronized block
while (i.hasNext())
foo(i.next());
}
Since its every easy to use s.iteratator() no ?
@@ -348,6 +351,68 @@ public boolean isTaskLatest(String taskId) | |||
} | |||
} | |||
|
|||
public static class WorkerStats | |||
{ | |||
String workerId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets mark these field final ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't - because of the default constructor.
/** | ||
* For JSON deserialization only | ||
*/ | ||
public WorkerStats() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually want serialization only rite ? so this method can go no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many tests in SQLMSQStatementResourceTypeTest fail as they require deserialization -- such as testReplaceAll
, testWithDurableStorage
, and testResultFormatWithParamInSelect
this.duration = duration; | ||
} | ||
|
||
@JsonProperty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit is the () required ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. Thanks!
|
||
TaskTracker taskTracker = taskEntry.getValue(); | ||
|
||
long duration = (taskTracker.status.getDuration() == -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be okay to remove the -1 duration check and always report taskTracker.status.getDuration().
We always rely on the overlord system to gives us the task duration without changing anything.
wdyt ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason was that duration is always -1
for RUNNING
workers (since it gets updated only upon completion), so publishing their duration wouldn't ever be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MSQ tracks the start time as the time it requested to launch the job. I currently do not know if duration counter in the overlord is started as soon as the overlord gets the request or when ever that task goes into running state.
To test it what you could do is
- Make changes to the MSQWorkerTaskLauncher to log the report every time liveReports is called.
- Start a cluster with lets say 4 task slots.
- Use 2 slots for other ingestions.
- Schedule an MSQ job with 4 slots. (The MSQ job will wait and the worker task launcher will set the start time for each task ).
- Kill the job running on the other 2 slots.
Check if the taskDuration in the report is going backward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did the check both from a run and in code and found it to be correct: the start time in MSQ is recorded when the task is submitted whereas in overlord is upon start of the run.
Currently TaskStatus
doesn't have any field to record a startTime, so for MSQ to get a worker's startTime from the Overlord, this field needs to be added and persisted in the database upon the worker task's start.
There is no such issue with reporting of query's duration periodically in live reports since it's a single hop from (from overlord to indexer) and the query's start time is maintained inside the controller -- so the duration can be calculated on-the-fly.
Since we are more interested in timings of finished workers, I think for now it is fine to just report the duration as -1
for worker tasks instead of adding a new field in TaskStatus
which is used at multiple places. But do let me know if you think we should take the route of adding a new field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for digging in. Reporting -1 for running tasks SGTM.
} | ||
|
||
@JsonProperty() | ||
public TaskState getState() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also document these properties in docs/api-reference/sql-ingestion-api.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added these properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR is almost there.
{ | ||
final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>(); | ||
|
||
for (Map.Entry<String, TaskTracker> taskEntry : taskTrackers.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets use a concurrentHashMap ?
Returns a synchronized (thread-safe) map backed by the specified map. In order to guarantee serial access, it is critical that all access to the backing map is accomplished through the returned map.
It is imperative that the user manually synchronize on the returned map when traversing any of its collection views via Iterator, Spliterator or Stream:
Map m = Collections.synchronizedMap(new HashMap());
...
Set s = m.keySet(); // Needn't be in synchronized block
...
synchronized (m) { // Synchronizing on m, not s!
Iterator i = s.iterator(); // Must be in synchronized block
while (i.hasNext())
foo(i.next());
}
Since its every easy to use s.iteratator() no ?
@@ -111,7 +115,7 @@ private enum State | |||
// Mutable state accessible only to the main loop. LinkedHashMap since order of key set matters. Tasks are added |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets update the comments as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes LGTM.
Thanks @gargvishesh .
…he#15180) Add worker status and duration metrics in live and task reports for tracking.
Description
Add worker status and duration metrics in live and task reports for tracking. Format: