Skip to content

Commit

Permalink
Separate task lifecycle from kubernetes/location lifecycle (#15133)
Browse files Browse the repository at this point in the history
* Separate k8s and druid task lifecycles

* Remove extra log lines

* Fix unit tests

* fix unit tests

* Fix unit tests

* notify listeners on task completion

* Fix unit test

* unused var

* PR changes

* Fix unit tests

* Fix checkstyle

* PR changes
  • Loading branch information
georgew5656 authored Oct 17, 2023
1 parent 0a27a7a commit dc0b163
Show file tree
Hide file tree
Showing 34 changed files with 324 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,4 @@ public void updateStatus(Task task, TaskStatus status)
{
kubernetesTaskRunner.updateStatus(task, status);
}

@Override
public void updateLocation(Task task, TaskLocation location)
{
kubernetesTaskRunner.updateLocation(task, location);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
Expand All @@ -47,6 +50,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -89,6 +94,8 @@ protected enum State
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
private final List<Pair<TaskRunnerListener, Executor>> listeners;

@MonotonicNonNull
private LogWatch logWatch;

Expand All @@ -99,7 +106,8 @@ protected KubernetesPeonLifecycle(
KubernetesPeonClient kubernetesClient,
TaskLogs taskLogs,
ObjectMapper mapper,
TaskStateListener stateListener
TaskStateListener stateListener,
List<Pair<TaskRunnerListener, Executor>> listeners
)
{
this.taskId = new K8sTaskId(task);
Expand All @@ -108,6 +116,7 @@ protected KubernetesPeonLifecycle(
this.taskLogs = taskLogs;
this.mapper = mapper;
this.stateListener = stateListener;
this.listeners = listeners;
}

/**
Expand Down Expand Up @@ -178,7 +187,11 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
{
try {
updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);

TaskRunnerUtils.notifyLocationChanged(
listeners,
task.getId(),
getTaskLocation()
);
JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
taskId,
timeout,
Expand All @@ -190,12 +203,14 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
finally {
try {
saveLogs();
shutdown();
}
catch (Exception e) {
log.warn(e, "Log processing failed for task [%s]", taskId);
log.warn(e, "Cleanup failed for task [%s]", taskId);
}
finally {
stopTask();
}

stopTask();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.tasklogs.TaskLogs;

import java.util.List;
import java.util.concurrent.Executor;

public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory
{
private final KubernetesPeonClient client;
Expand All @@ -42,14 +47,15 @@ public KubernetesPeonLifecycleFactory(
}

@Override
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener)
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List<Pair<TaskRunnerListener, Executor>> listeners)
{
return new KubernetesPeonLifecycle(
task,
client,
taskLogs,
mapper,
stateListener
stateListener,
listeners
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,20 @@ public Optional<InputStream> streamTaskLog(String taskid, long offset)
public ListenableFuture<TaskStatus> run(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> {
ListenableFuture<TaskStatus> unused = exec.submit(() -> runTask(task));
return new KubernetesWorkItem(task);
}).getResult();
}
}

protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> {
ListenableFuture<TaskStatus> unused = exec.submit(() -> joinTask(task));
return new KubernetesWorkItem(task);
}).getResult();
}
}

Expand All @@ -172,10 +176,12 @@ private TaskStatus joinTask(Task task)
@VisibleForTesting
protected TaskStatus doTask(Task task, boolean run)
{
TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task execution never started");
try {
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
task,
this::emitTaskStateMetrics
this::emitTaskStateMetrics,
listeners
);

synchronized (tasks) {
Expand All @@ -188,7 +194,6 @@ protected TaskStatus doTask(Task task, boolean run)
workItem.setKubernetesPeonLifecycle(peonLifecycle);
}

TaskStatus taskStatus;
if (run) {
taskStatus = peonLifecycle.run(
adapter.fromTask(task),
Expand All @@ -201,15 +206,17 @@ protected TaskStatus doTask(Task task, boolean run)
config.getTaskTimeout().toStandardDuration().getMillis()
);
}

updateStatus(task, taskStatus);

return taskStatus;
}
catch (Exception e) {
log.error(e, "Task [%s] execution caught an exception", task.getId());
taskStatus = TaskStatus.failure(task.getId(), "Could not start task execution");
throw new RuntimeException(e);
}
finally {
updateStatus(task, taskStatus);
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), TaskLocation.unknown());
}
}

@VisibleForTesting
Expand Down Expand Up @@ -242,13 +249,13 @@ protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String
@Override
public void updateStatus(Task task, TaskStatus status)
{
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}
KubernetesWorkItem workItem = tasks.get(task.getId());
if (workItem != null && !workItem.getResult().isDone() && status.isComplete()) {
workItem.setResult(status);
}

@Override
public void updateLocation(Task task, TaskLocation location)
{
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), location);
// Notify listeners even if the result is set to handle the shutdown case.
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}

@Override
Expand Down Expand Up @@ -417,6 +424,16 @@ public void registerListener(TaskRunnerListener listener, Executor executor)
final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor);
log.debug("Registered listener [%s]", listener.getListenerId());
listeners.add(listenerPair);

for (Map.Entry<String, KubernetesWorkItem> entry : tasks.entrySet()) {
if (entry.getValue().isRunning()) {
TaskRunnerUtils.notifyLocationChanged(
ImmutableList.of(listenerPair),
entry.getKey(),
entry.getValue().getLocation()
);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class KubernetesTaskRunnerConfig
@JsonProperty
@NotNull
// how long to wait for the jobs to be cleaned up.
private Period taskCleanupDelay = new Period("P2D");
private Period taskCleanupDelay = new Period("PT1H");

@JsonProperty
@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

package org.apache.druid.k8s.overlord;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -36,9 +37,18 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
private final Task task;
private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;

public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture)
private final SettableFuture<TaskStatus> result;

public KubernetesWorkItem(Task task)
{
this(task, SettableFuture.create());
}

@VisibleForTesting
public KubernetesWorkItem(Task task, SettableFuture<TaskStatus> result)
{
super(task.getId(), statusFuture);
super(task.getId(), result);
this.result = result;
this.task = task;
}

Expand All @@ -51,7 +61,7 @@ protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle k
protected synchronized void shutdown()
{

if (this.kubernetesPeonLifecycle != null) {
if (this.kubernetesPeonLifecycle != null && !result.isDone()) {
this.kubernetesPeonLifecycle.startWatchingLogs();
this.kubernetesPeonLifecycle.shutdown();
}
Expand Down Expand Up @@ -119,4 +129,9 @@ public Task getTask()
{
return task;
}

public void setResult(TaskStatus status)
{
result.set(status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
package org.apache.druid.k8s.overlord;

import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.java.util.common.Pair;

import java.util.List;
import java.util.concurrent.Executor;

public interface PeonLifecycleFactory
{
KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener);
KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List<Pair<TaskRunnerListener, Executor>> listeners);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,17 @@ public class JobResponse
private static final EmittingLogger LOGGER = new EmittingLogger(JobResponse.class);

private final Job job;
private final PeonPhase phase;

public JobResponse(@Nullable Job job, PeonPhase phase)
public JobResponse(@Nullable Job job)
{
this.job = job;
this.phase = phase;
}

public Job getJob()
{
return job;
}

public PeonPhase getPhase()
{
return phase;
}

public long getJobDuration()
{
long duration = -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, Time
);
if (job == null) {
log.info("K8s job for the task [%s] was not found. It can happen if the task was canceled", taskId);
return new JobResponse(null, PeonPhase.FAILED);
return new JobResponse(null);
}
if (job.getStatus().getSucceeded() != null) {
return new JobResponse(job, PeonPhase.SUCCEEDED);
return new JobResponse(job);
}
log.warn("Task %s failed with status %s", taskId, job.getStatus());
return new JobResponse(job, PeonPhase.FAILED);
return new JobResponse(job);
});
}

Expand Down
Loading

0 comments on commit dc0b163

Please sign in to comment.