Skip to content

Commit

Permalink
Resolve merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
AmatyaAvadhanula committed Oct 19, 2023
2 parents 156d809 + a8febd4 commit 53d509a
Show file tree
Hide file tree
Showing 114 changed files with 2,546 additions and 636 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,4 @@ public void updateStatus(Task task, TaskStatus status)
{
kubernetesTaskRunner.updateStatus(task, status);
}

@Override
public void updateLocation(Task task, TaskLocation location)
{
kubernetesTaskRunner.updateLocation(task, location);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
Expand All @@ -47,6 +50,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -89,6 +94,8 @@ protected enum State
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
private final List<Pair<TaskRunnerListener, Executor>> listeners;

@MonotonicNonNull
private LogWatch logWatch;

Expand All @@ -99,7 +106,8 @@ protected KubernetesPeonLifecycle(
KubernetesPeonClient kubernetesClient,
TaskLogs taskLogs,
ObjectMapper mapper,
TaskStateListener stateListener
TaskStateListener stateListener,
List<Pair<TaskRunnerListener, Executor>> listeners
)
{
this.taskId = new K8sTaskId(task);
Expand All @@ -108,6 +116,7 @@ protected KubernetesPeonLifecycle(
this.taskLogs = taskLogs;
this.mapper = mapper;
this.stateListener = stateListener;
this.listeners = listeners;
}

/**
Expand Down Expand Up @@ -178,7 +187,11 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
{
try {
updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);

TaskRunnerUtils.notifyLocationChanged(
listeners,
task.getId(),
getTaskLocation()
);
JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
taskId,
timeout,
Expand All @@ -190,12 +203,14 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
finally {
try {
saveLogs();
shutdown();
}
catch (Exception e) {
log.warn(e, "Log processing failed for task [%s]", taskId);
log.warn(e, "Cleanup failed for task [%s]", taskId);
}
finally {
stopTask();
}

stopTask();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.tasklogs.TaskLogs;

import java.util.List;
import java.util.concurrent.Executor;

public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory
{
private final KubernetesPeonClient client;
Expand All @@ -42,14 +47,15 @@ public KubernetesPeonLifecycleFactory(
}

@Override
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener)
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List<Pair<TaskRunnerListener, Executor>> listeners)
{
return new KubernetesPeonLifecycle(
task,
client,
taskLogs,
mapper,
stateListener
stateListener,
listeners
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,20 @@ public Optional<InputStream> streamTaskLog(String taskid, long offset)
public ListenableFuture<TaskStatus> run(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> {
ListenableFuture<TaskStatus> unused = exec.submit(() -> runTask(task));
return new KubernetesWorkItem(task);
}).getResult();
}
}

protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> {
ListenableFuture<TaskStatus> unused = exec.submit(() -> joinTask(task));
return new KubernetesWorkItem(task);
}).getResult();
}
}

Expand All @@ -172,10 +176,12 @@ private TaskStatus joinTask(Task task)
@VisibleForTesting
protected TaskStatus doTask(Task task, boolean run)
{
TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task execution never started");
try {
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
task,
this::emitTaskStateMetrics
this::emitTaskStateMetrics,
listeners
);

synchronized (tasks) {
Expand All @@ -188,7 +194,6 @@ protected TaskStatus doTask(Task task, boolean run)
workItem.setKubernetesPeonLifecycle(peonLifecycle);
}

TaskStatus taskStatus;
if (run) {
taskStatus = peonLifecycle.run(
adapter.fromTask(task),
Expand All @@ -201,15 +206,17 @@ protected TaskStatus doTask(Task task, boolean run)
config.getTaskTimeout().toStandardDuration().getMillis()
);
}

updateStatus(task, taskStatus);

return taskStatus;
}
catch (Exception e) {
log.error(e, "Task [%s] execution caught an exception", task.getId());
taskStatus = TaskStatus.failure(task.getId(), "Could not start task execution");
throw new RuntimeException(e);
}
finally {
updateStatus(task, taskStatus);
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), TaskLocation.unknown());
}
}

@VisibleForTesting
Expand Down Expand Up @@ -242,13 +249,13 @@ protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String
@Override
public void updateStatus(Task task, TaskStatus status)
{
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}
KubernetesWorkItem workItem = tasks.get(task.getId());
if (workItem != null && !workItem.getResult().isDone() && status.isComplete()) {
workItem.setResult(status);
}

@Override
public void updateLocation(Task task, TaskLocation location)
{
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), location);
// Notify listeners even if the result is set to handle the shutdown case.
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}

@Override
Expand Down Expand Up @@ -417,6 +424,16 @@ public void registerListener(TaskRunnerListener listener, Executor executor)
final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor);
log.debug("Registered listener [%s]", listener.getListenerId());
listeners.add(listenerPair);

for (Map.Entry<String, KubernetesWorkItem> entry : tasks.entrySet()) {
if (entry.getValue().isRunning()) {
TaskRunnerUtils.notifyLocationChanged(
ImmutableList.of(listenerPair),
entry.getKey(),
entry.getValue().getLocation()
);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class KubernetesTaskRunnerConfig
@JsonProperty
@NotNull
// how long to wait for the jobs to be cleaned up.
private Period taskCleanupDelay = new Period("P2D");
private Period taskCleanupDelay = new Period("PT1H");

@JsonProperty
@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

package org.apache.druid.k8s.overlord;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -36,9 +37,18 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
private final Task task;
private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;

public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture)
private final SettableFuture<TaskStatus> result;

public KubernetesWorkItem(Task task)
{
this(task, SettableFuture.create());
}

@VisibleForTesting
public KubernetesWorkItem(Task task, SettableFuture<TaskStatus> result)
{
super(task.getId(), statusFuture);
super(task.getId(), result);
this.result = result;
this.task = task;
}

Expand All @@ -51,7 +61,7 @@ protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle k
protected synchronized void shutdown()
{

if (this.kubernetesPeonLifecycle != null) {
if (this.kubernetesPeonLifecycle != null && !result.isDone()) {
this.kubernetesPeonLifecycle.startWatchingLogs();
this.kubernetesPeonLifecycle.shutdown();
}
Expand Down Expand Up @@ -119,4 +129,9 @@ public Task getTask()
{
return task;
}

public void setResult(TaskStatus status)
{
result.set(status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
package org.apache.druid.k8s.overlord;

import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.java.util.common.Pair;

import java.util.List;
import java.util.concurrent.Executor;

public interface PeonLifecycleFactory
{
KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener);
KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List<Pair<TaskRunnerListener, Executor>> listeners);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,17 @@ public class JobResponse
private static final EmittingLogger LOGGER = new EmittingLogger(JobResponse.class);

private final Job job;
private final PeonPhase phase;

public JobResponse(@Nullable Job job, PeonPhase phase)
public JobResponse(@Nullable Job job)
{
this.job = job;
this.phase = phase;
}

public Job getJob()
{
return job;
}

public PeonPhase getPhase()
{
return phase;
}

public long getJobDuration()
{
long duration = -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, Time
);
if (job == null) {
log.info("K8s job for the task [%s] was not found. It can happen if the task was canceled", taskId);
return new JobResponse(null, PeonPhase.FAILED);
return new JobResponse(null);
}
if (job.getStatus().getSucceeded() != null) {
return new JobResponse(job, PeonPhase.SUCCEEDED);
return new JobResponse(job);
}
log.warn("Task %s failed with status %s", taskId, job.getStatus());
return new JobResponse(job, PeonPhase.FAILED);
return new JobResponse(job);
});
}

Expand Down
Loading

0 comments on commit 53d509a

Please sign in to comment.