Skip to content

Commit

Permalink
k8s-based-ingestion: Wait for task lifecycles to enter RUNNING state …
Browse files Browse the repository at this point in the history
…before returning from KubernetesTaskRunner.start (apache#17446)

* Add a wait on start() for task lifecycle to go into running

* handle exceptions

* Fix logging messages

* Don't pass in the settable future as a arg

* add some unit tests
  • Loading branch information
georgew5656 authored Nov 8, 2024
1 parent d816216 commit 5764183
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 132 deletions.
1 change: 1 addition & 0 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ Druid selects the pod template `podSpecWithHighMemRequests.yaml`.
|`druid.indexer.runner.maxTaskDuration`| `Duration` | Max time a task is allowed to run for before getting killed |`PT4H`|No|
|`druid.indexer.runner.taskCleanupDelay`| `Duration` | How long do jobs stay around before getting reaped from K8s |`P2D`|No|
|`druid.indexer.runner.taskCleanupInterval`| `Duration` | How often to check for jobs to be reaped |`PT10M`|No|
|`druid.indexer.runner.taskJoinTimeout`| `Duration` | Timeout for gathering metadata about existing tasks on startup |`PT1M`|No|
|`druid.indexer.runner.K8sjobLaunchTimeout`| `Duration` | How long to wait to launch a K8s task before marking it as failed, on a resource constrained cluster it may take some time. |`PT1H`|No|
|`druid.indexer.runner.javaOptsArray`| `JsonArray` | java opts for the task. |`-Xmx1g`|No|
|`druid.indexer.runner.labels`| `JsonObject` | Additional labels you want to add to peon pod |`{}`|No|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodStatus;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
Expand Down Expand Up @@ -90,6 +92,8 @@ protected enum State
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
private final SettableFuture<Boolean> taskStartedSuccessfullyFuture;

@MonotonicNonNull
private LogWatch logWatch;

Expand All @@ -109,6 +113,7 @@ protected KubernetesPeonLifecycle(
this.taskLogs = taskLogs;
this.mapper = mapper;
this.stateListener = stateListener;
this.taskStartedSuccessfullyFuture = SettableFuture.create();
}

/**
Expand Down Expand Up @@ -137,11 +142,13 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout,
launchTimeout,
TimeUnit.MILLISECONDS
);

return join(timeout);
}
catch (Exception e) {
log.info("Failed to run task: %s", taskId.getOriginalTaskId());
if (!taskStartedSuccessfullyFuture.isDone()) {
taskStartedSuccessfullyFuture.set(false);
}
throw e;
}
finally {
Expand Down Expand Up @@ -179,7 +186,7 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
{
try {
updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);

taskStartedSuccessfullyFuture.set(true);
JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
taskId,
timeout,
Expand All @@ -188,14 +195,19 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio

return getTaskStatus(jobResponse.getJobDuration());
}
catch (Exception e) {
if (!taskStartedSuccessfullyFuture.isDone()) {
taskStartedSuccessfullyFuture.set(false);
}
throw e;
}
finally {
try {
saveLogs();
}
catch (Exception e) {
log.warn(e, "Log processing failed for task [%s]", taskId);
}

stopTask();
}
}
Expand Down Expand Up @@ -246,7 +258,10 @@ protected State getState()
protected TaskLocation getTaskLocation()
{
if (State.PENDING.equals(state.get()) || State.NOT_STARTED.equals(state.get())) {
log.debug("Can't get task location for non-running job. [%s]", taskId.getOriginalTaskId());
/* This should not actually ever happen because KubernetesTaskRunner.start() should not return until all running tasks
have already gone into State.RUNNING, so getTaskLocation should not be called.
*/
log.warn("Can't get task location for non-running job. [%s]", taskId.getOriginalTaskId());
return TaskLocation.unknown();
}

Expand All @@ -257,13 +272,18 @@ protected TaskLocation getTaskLocation()
if (taskLocation == null) {
Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName());
if (!maybePod.isPresent()) {
/* Arguably we should throw a exception here but leaving it as a warn log to prevent unexpected errors.
If there is strange behavior during overlord restarts the operator should look for this warn log.
*/
log.warn("Could not get task location from k8s for task [%s].", taskId);
return TaskLocation.unknown();
}

Pod pod = maybePod.get();
PodStatus podStatus = pod.getStatus();

if (podStatus == null || podStatus.getPodIP() == null) {
log.warn("Could not get task location from k8s for task [%s].", taskId);
return TaskLocation.unknown();
}
taskLocation = TaskLocation.create(
Expand Down Expand Up @@ -376,4 +396,17 @@ private void updateState(State[] acceptedStates, State targetState)
);
stateListener.stateChanged(state.get(), taskId.getOriginalTaskId());
}

/**
* Retrieves the current {@link ListenableFuture} representing whether the task started successfully
*
* <p>This future can be used to track whether the task started successfully, with a boolean result
* indicating success (true) or failure (false) when the task starts.
*
* @return a {@link ListenableFuture} representing whether the task started successfully.
*/
protected ListenableFuture<Boolean> getTaskStartedSuccessfullyFuture()
{
return taskStartedSuccessfullyFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -55,12 +56,14 @@
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -146,16 +149,28 @@ public Optional<InputStream> streamTaskLog(String taskid, long offset)
public ListenableFuture<TaskStatus> run(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(
task,
exec.submit(() -> runTask(task)),
peonLifecycleFactory.build(
task,
this::emitTaskStateMetrics
)
)).getResult();
}
}

protected ListenableFuture<TaskStatus> joinAsync(Task task)
protected KubernetesWorkItem joinAsync(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(
task,
exec.submit(() -> joinTask(task)),
peonLifecycleFactory.build(
task,
this::emitTaskStateMetrics
)
));
}
}

Expand All @@ -173,10 +188,7 @@ private TaskStatus joinTask(Task task)
protected TaskStatus doTask(Task task, boolean run)
{
try {
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
task,
this::emitTaskStateMetrics
);
KubernetesPeonLifecycle peonLifecycle;

synchronized (tasks) {
KubernetesWorkItem workItem = tasks.get(task.getId());
Expand All @@ -185,7 +197,7 @@ protected TaskStatus doTask(Task task, boolean run)
throw new ISE("Task [%s] has been shut down", task.getId());
}

workItem.setKubernetesPeonLifecycle(peonLifecycle);
peonLifecycle = workItem.getPeonLifeycle();
}

TaskStatus taskStatus;
Expand Down Expand Up @@ -321,16 +333,53 @@ public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
public void start()
{
log.info("Starting K8sTaskRunner...");
// Load tasks from previously running jobs and wait for their statuses to be updated asynchronously.
for (Job job : client.getPeonJobs()) {
// Load tasks from previously running jobs and wait for their statuses to start running.
final List<ListenableFuture<Boolean>> taskStatusActiveList = new ArrayList<>();
final List<Job> peonJobs = client.getPeonJobs();

log.info("Locating [%,d] active tasks.", peonJobs.size());
for (Job job : peonJobs) {
try {
joinAsync(adapter.toTask(job));
KubernetesWorkItem kubernetesWorkItem = joinAsync(adapter.toTask(job));
taskStatusActiveList.add(kubernetesWorkItem.getPeonLifeycle().getTaskStartedSuccessfullyFuture());
}
catch (IOException e) {
log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName());
}
}
log.info("Loaded %,d tasks from previous run", tasks.size());

try {
final DateTime nowUtc = DateTimes.nowUtc();
final long timeoutMs = nowUtc.plus(config.getTaskJoinTimeout()).getMillis() - nowUtc.getMillis();
if (timeoutMs > 0) {
FutureUtils.coalesce(taskStatusActiveList).get(timeoutMs, TimeUnit.MILLISECONDS);
}
log.info("Located [%,d] active tasks.", taskStatusActiveList.size());
}
catch (Exception e) {
final long numInitialized =
tasks.values()
.stream()
.filter(item -> {
if (item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().isDone()) {
try {
return item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().get();
}
catch (InterruptedException | ExecutionException ex) {
return false;
}
} else {
return false;
}
}).count();
log.warn(
e,
"Located [%,d] out of [%,d] active tasks (timeout = %s). Locating others asynchronously.",
numInitialized,
taskStatusActiveList.size(),
config.getTaskJoinTimeout()
);
}

cleanupExecutor.scheduleAtFixedRate(
() ->
Expand All @@ -342,7 +391,7 @@ public void start()
config.getTaskCleanupInterval().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
);
log.debug("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay());
log.info("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ public class KubernetesTaskRunnerConfig
// interval for k8s job cleanup to run
private Period taskCleanupInterval = new Period("PT10m");

@JsonProperty
@NotNull
// how long to wait to join peon k8s jobs on startup
private Period taskJoinTimeout = new Period("PT1M");


@JsonProperty
@NotNull
// how long to wait for the peon k8s job to launch
Expand Down Expand Up @@ -140,7 +146,8 @@ private KubernetesTaskRunnerConfig(
int cpuCoreInMicro,
Map<String, String> labels,
Map<String, String> annotations,
Integer capacity
Integer capacity,
Period taskJoinTimeout
)
{
this.namespace = namespace;
Expand Down Expand Up @@ -181,6 +188,10 @@ private KubernetesTaskRunnerConfig(
k8sjobLaunchTimeout,
this.k8sjobLaunchTimeout
);
this.taskJoinTimeout = ObjectUtils.defaultIfNull(
taskJoinTimeout,
this.taskJoinTimeout
);
this.peonMonitors = ObjectUtils.defaultIfNull(
peonMonitors,
this.peonMonitors
Expand Down Expand Up @@ -247,6 +258,11 @@ public Period getTaskTimeout()
{
return maxTaskDuration;
}
public Period getTaskJoinTimeout()
{
return taskJoinTimeout;
}


public Period getTaskCleanupDelay()
{
Expand Down Expand Up @@ -317,6 +333,7 @@ public static class Builder
private Map<String, String> labels;
private Map<String, String> annotations;
private Integer capacity;
private Period taskJoinTimeout;

public Builder()
{
Expand Down Expand Up @@ -425,6 +442,12 @@ public Builder withCapacity(@Min(0) @Max(Integer.MAX_VALUE) Integer capacity)
return this;
}

public Builder withTaskJoinTimeout(Period taskJoinTimeout)
{
this.taskJoinTimeout = taskJoinTimeout;
return this;
}

public KubernetesTaskRunnerConfig build()
{
return new KubernetesTaskRunnerConfig(
Expand All @@ -444,7 +467,8 @@ public KubernetesTaskRunnerConfig build()
this.cpuCoreInMicro,
this.labels,
this.annotations,
this.capacity
this.capacity,
this.taskJoinTimeout
);
}
}
Expand Down
Loading

0 comments on commit 5764183

Please sign in to comment.