-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Separate task lifecycle from kubernetes/location lifecycle #15133
Changes from 11 commits
0a7db67
107bf8f
d7b82cf
7962f27
f4f3309
b0af37c
7f68cb3
5bc3b11
8ddda62
9ca3f91
3ae0327
cf05797
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -146,16 +146,20 @@ | |
public ListenableFuture<TaskStatus> run(Task task) | ||
{ | ||
synchronized (tasks) { | ||
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))) | ||
.getResult(); | ||
return tasks.computeIfAbsent(task.getId(), k -> { | ||
ListenableFuture<TaskStatus> unused = exec.submit(() -> runTask(task)); | ||
Check notice Code scanning / CodeQL Unread local variable Note
Variable 'ListenableFuture unused' is never read.
|
||
return new KubernetesWorkItem(task); | ||
}).getResult(); | ||
} | ||
} | ||
|
||
protected ListenableFuture<TaskStatus> joinAsync(Task task) | ||
{ | ||
synchronized (tasks) { | ||
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))) | ||
.getResult(); | ||
return tasks.computeIfAbsent(task.getId(), k -> { | ||
ListenableFuture<TaskStatus> unused = exec.submit(() -> joinTask(task)); | ||
Check notice Code scanning / CodeQL Unread local variable Note
Variable 'ListenableFuture unused' is never read.
|
||
return new KubernetesWorkItem(task); | ||
}).getResult(); | ||
} | ||
} | ||
|
||
|
@@ -172,10 +176,12 @@ | |
@VisibleForTesting | ||
protected TaskStatus doTask(Task task, boolean run) | ||
{ | ||
TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task execution never started"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit weird to initialize status with a failure one, don't think we need it. |
||
try { | ||
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build( | ||
task, | ||
this::emitTaskStateMetrics | ||
this::emitTaskStateMetrics, | ||
listeners | ||
); | ||
|
||
synchronized (tasks) { | ||
|
@@ -188,7 +194,6 @@ | |
workItem.setKubernetesPeonLifecycle(peonLifecycle); | ||
} | ||
|
||
TaskStatus taskStatus; | ||
if (run) { | ||
taskStatus = peonLifecycle.run( | ||
adapter.fromTask(task), | ||
|
@@ -201,15 +206,17 @@ | |
config.getTaskTimeout().toStandardDuration().getMillis() | ||
); | ||
} | ||
|
||
updateStatus(task, taskStatus); | ||
|
||
return taskStatus; | ||
} | ||
catch (Exception e) { | ||
log.error(e, "Task [%s] execution caught an exception", task.getId()); | ||
taskStatus = TaskStatus.failure(task.getId(), "Could not start task execution"); | ||
throw new RuntimeException(e); | ||
} | ||
finally { | ||
updateStatus(task, taskStatus); | ||
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), TaskLocation.unknown()); | ||
} | ||
} | ||
|
||
@VisibleForTesting | ||
|
@@ -242,13 +249,13 @@ | |
@Override | ||
public void updateStatus(Task task, TaskStatus status) | ||
{ | ||
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); | ||
} | ||
KubernetesWorkItem workItem = tasks.get(task.getId()); | ||
if (workItem != null && !workItem.getResult().isDone() && status.isComplete()) { | ||
workItem.setResult(status); | ||
} | ||
|
||
@Override | ||
public void updateLocation(Task task, TaskLocation location) | ||
{ | ||
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), location); | ||
georgew5656 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Notify listeners even if the result is set to handle the shutdown case. | ||
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); | ||
} | ||
|
||
@Override | ||
|
@@ -417,6 +424,16 @@ | |
final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor); | ||
log.debug("Registered listener [%s]", listener.getListenerId()); | ||
listeners.add(listenerPair); | ||
|
||
for (Map.Entry<String, KubernetesWorkItem> entry : tasks.entrySet()) { | ||
if (entry.getValue().isRunning()) { | ||
TaskRunnerUtils.notifyLocationChanged( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fyi I don't see the listener from supervisor is doing much work. |
||
ImmutableList.of(listenerPair), | ||
entry.getKey(), | ||
entry.getValue().getLocation() | ||
); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can
shutdown()
throw an exception? since it is making a call to the kubernetes client. If so, the stopTask should probably be in it's own finally blockThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i decided to group saveLogs and shutdown together since they are both k8s lifecycle cleanup actions (it is okay if one fails), and then moved stopTask to a finally block b/c it has to happen