Skip to content

Commit

Permalink
Revert "Separate task lifecycle from kubernetes/location lifecycle (a…
Browse files Browse the repository at this point in the history
…pache#15133)" (apache#15346)

This reverts commit dc0b163.
  • Loading branch information
George Shiqi Wu authored Nov 8, 2023
1 parent b7d7f84 commit 130bfbf
Show file tree
Hide file tree
Showing 34 changed files with 323 additions and 324 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,10 @@ public void updateStatus(Task task, TaskStatus status)
{
kubernetesTaskRunner.updateStatus(task, status);
}

@Override
public void updateLocation(Task task, TaskLocation location)
{
kubernetesTaskRunner.updateLocation(task, location);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
Expand All @@ -50,8 +47,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -94,8 +89,6 @@ protected enum State
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
private final List<Pair<TaskRunnerListener, Executor>> listeners;

@MonotonicNonNull
private LogWatch logWatch;

Expand All @@ -106,8 +99,7 @@ protected KubernetesPeonLifecycle(
KubernetesPeonClient kubernetesClient,
TaskLogs taskLogs,
ObjectMapper mapper,
TaskStateListener stateListener,
List<Pair<TaskRunnerListener, Executor>> listeners
TaskStateListener stateListener
)
{
this.taskId = new K8sTaskId(task);
Expand All @@ -116,7 +108,6 @@ protected KubernetesPeonLifecycle(
this.taskLogs = taskLogs;
this.mapper = mapper;
this.stateListener = stateListener;
this.listeners = listeners;
}

/**
Expand Down Expand Up @@ -187,11 +178,7 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
{
try {
updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);
TaskRunnerUtils.notifyLocationChanged(
listeners,
task.getId(),
getTaskLocation()
);

JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
taskId,
timeout,
Expand All @@ -203,14 +190,12 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
finally {
try {
saveLogs();
shutdown();
}
catch (Exception e) {
log.warn(e, "Cleanup failed for task [%s]", taskId);
}
finally {
stopTask();
log.warn(e, "Log processing failed for task [%s]", taskId);
}

stopTask();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,9 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.tasklogs.TaskLogs;

import java.util.List;
import java.util.concurrent.Executor;

public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory
{
private final KubernetesPeonClient client;
Expand All @@ -47,15 +42,14 @@ public KubernetesPeonLifecycleFactory(
}

@Override
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List<Pair<TaskRunnerListener, Executor>> listeners)
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener)
{
return new KubernetesPeonLifecycle(
task,
client,
taskLogs,
mapper,
stateListener,
listeners
stateListener
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,20 +146,16 @@ public Optional<InputStream> streamTaskLog(String taskid, long offset)
public ListenableFuture<TaskStatus> run(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> {
ListenableFuture<TaskStatus> unused = exec.submit(() -> runTask(task));
return new KubernetesWorkItem(task);
}).getResult();
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
.getResult();
}
}

protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> {
ListenableFuture<TaskStatus> unused = exec.submit(() -> joinTask(task));
return new KubernetesWorkItem(task);
}).getResult();
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
.getResult();
}
}

Expand All @@ -176,12 +172,10 @@ private TaskStatus joinTask(Task task)
@VisibleForTesting
protected TaskStatus doTask(Task task, boolean run)
{
TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task execution never started");
try {
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
task,
this::emitTaskStateMetrics,
listeners
this::emitTaskStateMetrics
);

synchronized (tasks) {
Expand All @@ -194,6 +188,7 @@ protected TaskStatus doTask(Task task, boolean run)
workItem.setKubernetesPeonLifecycle(peonLifecycle);
}

TaskStatus taskStatus;
if (run) {
taskStatus = peonLifecycle.run(
adapter.fromTask(task),
Expand All @@ -206,17 +201,15 @@ protected TaskStatus doTask(Task task, boolean run)
config.getTaskTimeout().toStandardDuration().getMillis()
);
}

updateStatus(task, taskStatus);

return taskStatus;
}
catch (Exception e) {
log.error(e, "Task [%s] execution caught an exception", task.getId());
taskStatus = TaskStatus.failure(task.getId(), "Could not start task execution");
throw new RuntimeException(e);
}
finally {
updateStatus(task, taskStatus);
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), TaskLocation.unknown());
}
}

@VisibleForTesting
Expand Down Expand Up @@ -249,15 +242,15 @@ protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String
@Override
public void updateStatus(Task task, TaskStatus status)
{
KubernetesWorkItem workItem = tasks.get(task.getId());
if (workItem != null && !workItem.getResult().isDone() && status.isComplete()) {
workItem.setResult(status);
}

// Notify listeners even if the result is set to handle the shutdown case.
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}

@Override
public void updateLocation(Task task, TaskLocation location)
{
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), location);
}

@Override
public void shutdown(String taskid, String reason)
{
Expand Down Expand Up @@ -424,16 +417,6 @@ public void registerListener(TaskRunnerListener listener, Executor executor)
final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor);
log.debug("Registered listener [%s]", listener.getListenerId());
listeners.add(listenerPair);

for (Map.Entry<String, KubernetesWorkItem> entry : tasks.entrySet()) {
if (entry.getValue().isRunning()) {
TaskRunnerUtils.notifyLocationChanged(
ImmutableList.of(listenerPair),
entry.getKey(),
entry.getValue().getLocation()
);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class KubernetesTaskRunnerConfig
@JsonProperty
@NotNull
// how long to wait for the jobs to be cleaned up.
private Period taskCleanupDelay = new Period("PT1H");
private Period taskCleanupDelay = new Period("P2D");

@JsonProperty
@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

package org.apache.druid.k8s.overlord;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -37,18 +36,9 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
private final Task task;
private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;

private final SettableFuture<TaskStatus> result;

public KubernetesWorkItem(Task task)
{
this(task, SettableFuture.create());
}

@VisibleForTesting
public KubernetesWorkItem(Task task, SettableFuture<TaskStatus> result)
public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture)
{
super(task.getId(), result);
this.result = result;
super(task.getId(), statusFuture);
this.task = task;
}

Expand All @@ -61,7 +51,7 @@ protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle k
protected synchronized void shutdown()
{

if (this.kubernetesPeonLifecycle != null && !result.isDone()) {
if (this.kubernetesPeonLifecycle != null) {
this.kubernetesPeonLifecycle.startWatchingLogs();
this.kubernetesPeonLifecycle.shutdown();
}
Expand Down Expand Up @@ -129,9 +119,4 @@ public Task getTask()
{
return task;
}

public void setResult(TaskStatus status)
{
result.set(status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,8 @@
package org.apache.druid.k8s.overlord;

import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.java.util.common.Pair;

import java.util.List;
import java.util.concurrent.Executor;

public interface PeonLifecycleFactory
{
KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List<Pair<TaskRunnerListener, Executor>> listeners);
KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,24 @@ public class JobResponse
private static final EmittingLogger LOGGER = new EmittingLogger(JobResponse.class);

private final Job job;
private final PeonPhase phase;

public JobResponse(@Nullable Job job)
public JobResponse(@Nullable Job job, PeonPhase phase)
{
this.job = job;
this.phase = phase;
}

public Job getJob()
{
return job;
}

public PeonPhase getPhase()
{
return phase;
}

public long getJobDuration()
{
long duration = -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, Time
);
if (job == null) {
log.info("K8s job for the task [%s] was not found. It can happen if the task was canceled", taskId);
return new JobResponse(null);
return new JobResponse(null, PeonPhase.FAILED);
}
if (job.getStatus().getSucceeded() != null) {
return new JobResponse(job);
return new JobResponse(job, PeonPhase.SUCCEEDED);
}
log.warn("Task %s failed with status %s", taskId, job.getStatus());
return new JobResponse(job);
return new JobResponse(job, PeonPhase.FAILED);
});
}

Expand Down
Loading

0 comments on commit 130bfbf

Please sign in to comment.