Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to pass task payload via deep storage instead of environment variable #14887

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7a86726
Separate out task logs
georgew5656 Aug 17, 2023
151b5c0
working with cleaner configs
georgew5656 Aug 17, 2023
455dfcf
Remove unneeded changes
georgew5656 Aug 17, 2023
6dd5713
Working with new configs
georgew5656 Aug 18, 2023
4823adc
Merge branch 'master' of github.com:georgew5656/druid into saveTaskLogs
georgew5656 Aug 18, 2023
14650c9
Pulling remote changes in
georgew5656 Aug 18, 2023
378a472
Fixing checkstyle
georgew5656 Aug 18, 2023
12374be
Cleanup unit tests
georgew5656 Aug 20, 2023
ea8a37b
fix checkstyle
georgew5656 Aug 21, 2023
39c62ac
Add more unit tests
georgew5656 Aug 21, 2023
ca17d62
Clean up check failures
georgew5656 Aug 21, 2023
bf21854
PR changes
georgew5656 Aug 23, 2023
be50e45
Fix spellign errors
georgew5656 Aug 23, 2023
3a59f3f
Fix spacing in docs
georgew5656 Aug 28, 2023
3bc19de
Don't overwrite table format
georgew5656 Aug 28, 2023
4b05f39
don't fix table format
georgew5656 Aug 28, 2023
ffcf0a7
Rename config
georgew5656 Sep 13, 2023
01989d7
Fix merge conflicts
georgew5656 Sep 13, 2023
baf9c75
fix merge conflicts
georgew5656 Sep 13, 2023
a74aad7
Remove config options
georgew5656 Sep 15, 2023
bbe7444
Small fixes
georgew5656 Sep 15, 2023
f3f8aac
Remove uneeded param
georgew5656 Sep 18, 2023
7760688
fix build
georgew5656 Sep 18, 2023
519b046
Merge branch 'master' of github.com:georgew5656/druid into useTaskMan…
georgew5656 Sep 19, 2023
4afa8f3
remove unneeded arg
georgew5656 Sep 19, 2023
0af80bf
Remove unused import
georgew5656 Sep 19, 2023
ba82cb1
PR changes
georgew5656 Sep 22, 2023
f200439
more pr changes
georgew5656 Sep 25, 2023
9f171fe
Merge branch 'master' into useTaskManagerForTaskPayload
georgew5656 Sep 27, 2023
59dc958
More pr changes
georgew5656 Sep 28, 2023
6123a3e
Fix static checks
georgew5656 Sep 29, 2023
99b68f0
Update extensions-contrib/kubernetes-overlord-extensions/src/main/jav…
georgew5656 Sep 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distribution/docker/peon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,6 @@ then
fi

# take the ${TASK_JSON} environment variable and base64 decode, unzip and throw it in ${TASK_DIR}/task.json
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
mkdir -p ${TASK_DIR}; echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json;
mkdir -p ${TASK_DIR}; [ -n "$TASK_JSON" ] && echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json;

exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon $@
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
Expand All @@ -42,6 +43,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -88,7 +90,7 @@ protected enum State
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;

private final TaskConfig taskConfig;
@MonotonicNonNull
private LogWatch logWatch;

Expand All @@ -99,7 +101,8 @@ protected KubernetesPeonLifecycle(
KubernetesPeonClient kubernetesClient,
TaskLogs taskLogs,
ObjectMapper mapper,
TaskStateListener stateListener
TaskStateListener stateListener,
TaskConfig taskConfig
)
{
this.taskId = new K8sTaskId(task);
Expand All @@ -108,6 +111,7 @@ protected KubernetesPeonLifecycle(
this.taskLogs = taskLogs;
this.mapper = mapper;
this.stateListener = stateListener;
this.taskConfig = taskConfig;
}

/**
Expand All @@ -124,6 +128,10 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout)
try {
updateState(new State[]{State.NOT_STARTED}, State.PENDING);

if (taskConfig.isEnableTaskPayloadManagerPerTask()) {
writeTaskPayload(task);
}

// In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation.
taskLocation = null;
kubernetesClient.launchPeonJobAndWaitForStart(
Expand All @@ -145,6 +153,28 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout)
}
}

private void writeTaskPayload(Task task)
{
try {
abhishekagarwal87 marked this conversation as resolved.
Show resolved Hide resolved
Path file = Files.createTempFile(taskId.getOriginalTaskId(), "task.json");
try {
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
FileUtils.writeStringToFile(file.toFile(), mapper.writeValueAsString(task), Charset.defaultCharset());
taskLogs.pushTaskPayload(task.getId(), file.toFile());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that a log cleanup job removes the task payload from the deep storage while task is still in progress? How are these payloads cleaned up from deep storage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the task reads the file to disk on startup so i wouldn't be worried about the log cleanup job to clean it up that soon. we are relying on the log cleanup job to cleanup deep storage

}
catch (Exception e) {
log.error("Failed to write task payload for task: %s", taskId.getOriginalTaskId());
throw new RuntimeException(e);
}
finally {
Files.deleteIfExists(file);
}
}
catch (IOException e) {
log.error("Failed to write task payload for task: %s", taskId.getOriginalTaskId());
throw new RuntimeException(e);
}
}

/**
* Join existing Kubernetes Job
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.k8s.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.tasklogs.TaskLogs;
Expand All @@ -29,16 +30,19 @@ public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory
private final KubernetesPeonClient client;
private final TaskLogs taskLogs;
private final ObjectMapper mapper;
private final TaskConfig taskConfig;

public KubernetesPeonLifecycleFactory(
KubernetesPeonClient client,
TaskLogs taskLogs,
ObjectMapper mapper
ObjectMapper mapper,
TaskConfig taskConfig
)
{
this.client = client;
this.taskLogs = taskLogs;
this.mapper = mapper;
this.taskConfig = taskConfig;
}

@Override
Expand All @@ -49,7 +53,8 @@ public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStat
client,
taskLogs,
mapper,
stateListener
stateListener,
taskConfig
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public class KubernetesTaskRunnerConfig
// how long to wait for the peon k8s job to launch
private Period k8sjobLaunchTimeout = new Period("PT1H");

// Whether to pass the task.json payload to the peon K8s Jobs as a environment variable.
@JsonProperty
@NotNull
private Boolean taskPayloadAsEnvVariable = Boolean.TRUE;

@JsonProperty
// ForkingTaskRunner inherits the monitors from the MM, in k8s mode
// the peon inherits the monitors from the overlord, so if someone specifies
Expand Down Expand Up @@ -135,7 +140,8 @@ private KubernetesTaskRunnerConfig(
List<String> javaOptsArray,
Map<String, String> labels,
Map<String, String> annotations,
Integer capacity
Integer capacity,
Boolean taskPayloadAsEnvVariable
)
{
this.namespace = namespace;
Expand Down Expand Up @@ -196,6 +202,10 @@ private KubernetesTaskRunnerConfig(
capacity,
this.capacity
);
this.taskPayloadAsEnvVariable = ObjectUtils.defaultIfNull(
taskPayloadAsEnvVariable,
Boolean.TRUE
);
}

public String getNamespace()
Expand Down Expand Up @@ -279,6 +289,11 @@ public Integer getCapacity()
return capacity;
}

public Boolean isTaskPayloadAsEnvVariable()
{
return taskPayloadAsEnvVariable;
}

public static Builder builder()
{
return new Builder();
Expand All @@ -302,6 +317,7 @@ public static class Builder
private Map<String, String> labels;
private Map<String, String> annotations;
private Integer capacity;
private Boolean taskPayloadAsEnvVariable;

public Builder()
{
Expand Down Expand Up @@ -403,6 +419,12 @@ public Builder withCapacity(@Min(0) @Max(Integer.MAX_VALUE) Integer capacity)
return this;
}

public Builder withTaskPayloadAsEnvVariable(Boolean taskPayloadAsEnvVariable)
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
{
this.taskPayloadAsEnvVariable = taskPayloadAsEnvVariable;
return this;
}

public KubernetesTaskRunnerConfig build()
{
return new KubernetesTaskRunnerConfig(
Expand All @@ -421,7 +443,8 @@ public KubernetesTaskRunnerConfig build()
this.javaOptsArray,
this.labels,
this.annotations,
this.capacity
this.capacity,
this.taskPayloadAsEnvVariable
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final ServiceEmitter emitter;
private KubernetesTaskRunner runner;


@Inject
public KubernetesTaskRunnerFactory(
@Smile ObjectMapper smileMapper,
Expand Down Expand Up @@ -101,7 +100,7 @@ public KubernetesTaskRunner build()
kubernetesTaskRunnerConfig,
peonClient,
httpClient,
new KubernetesPeonLifecycleFactory(peonClient, taskLogs, smileMapper),
new KubernetesPeonLifecycleFactory(peonClient, taskLogs, smileMapper, taskConfig),
emitter
);
return runner;
Expand Down Expand Up @@ -137,15 +136,17 @@ private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
smileMapper,
taskLogs
);
} else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
return new PodTemplateTaskAdapter(
kubernetesTaskRunnerConfig,
taskConfig,
druidNode,
smileMapper,
properties
properties,
taskLogs
);
} else {
return new SingleContainerTaskAdapter(
Expand All @@ -154,7 +155,8 @@ private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
smileMapper,
taskLogs
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
import org.apache.druid.indexing.overlord.QuotableWhiteSpaceSplitter;
import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.Base64Compression;
Expand All @@ -57,8 +59,11 @@
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogs;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -89,14 +94,16 @@ public abstract class K8sTaskAdapter implements TaskAdapter
protected final StartupLoggingConfig startupLoggingConfig;
protected final DruidNode node;
protected final ObjectMapper mapper;
protected final TaskLogs taskLogs;

public K8sTaskAdapter(
KubernetesClientApi client,
KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode node,
ObjectMapper mapper
ObjectMapper mapper,
TaskLogs taskLogs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name of this class has been confusing since it does so much more than dealing with task logs. could be fixed in some other PR someday.

)
{
this.client = client;
Expand All @@ -105,6 +112,7 @@ public K8sTaskAdapter(
this.startupLoggingConfig = startupLoggingConfig;
this.node = node;
this.mapper = mapper;
this.taskLogs = taskLogs;
}

@Override
Expand All @@ -126,6 +134,14 @@ public Job fromTask(Task task) throws IOException
@Override
public Task toTask(Job from) throws IOException
{
if (taskConfig.isEnableTaskPayloadManagerPerTask()) {
com.google.common.base.Optional<InputStream> taskBody = taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
if (!taskBody.isPresent()) {
throw new IOE("Could not find task payload in task logs for job [%s]", from.getMetadata().getName());
}
String task = IOUtils.toString(taskBody.get(), Charset.defaultCharset());
return mapper.readValue(task, Task.class);
}
PodSpec podSpec = from.getSpec().getTemplate().getSpec();
massageSpec(podSpec, "main");
List<EnvVar> envVars = podSpec.getContainers().get(0).getEnv();
Expand All @@ -137,6 +153,20 @@ public Task toTask(Job from) throws IOException
return mapper.readValue(Base64Compression.decompressBase64(contents), Task.class);
}

@Override
public K8sTaskId getTaskId(Job from) throws IOException
{
Map<String, String> annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be replaced with DruidException.defensive()?

}
String taskId = annotations.get(DruidK8sConstants.TASK_ID);
if (taskId == null) {
throw new IOE("No task_id annotation found on pod spec for job [%s]", from.getMetadata().getName());
}
return new K8sTaskId(taskId);
}

@VisibleForTesting
abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException;

Expand Down Expand Up @@ -219,15 +249,11 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context
.build());
}

mainContainer.getEnv().addAll(Lists.newArrayList(
List<EnvVar> envVars = Lists.newArrayList(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
.withValue(context.getTaskDir().getAbsolutePath())
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_JSON_ENV)
.withValue(taskContents)
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.JAVA_OPTS)
.withValue(Joiner.on(" ").join(context.getJavaOpts()))
Expand All @@ -244,7 +270,16 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context
null,
"metadata.name"
)).build()).build()
));
);
if (taskRunnerConfig.isTaskPayloadAsEnvVariable()) {
envVars.add(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_JSON_ENV)
.withValue(taskRunnerConfig.isTaskPayloadAsEnvVariable() ? taskContents : "")
.build()
);
}
mainContainer.getEnv().addAll(envVars);
}

protected Container setupMainContainer(
Expand Down Expand Up @@ -403,6 +438,9 @@ private List<String> generateCommand(Task task)
command.add("--loadBroadcastSegments");
command.add("true");
}

command.add("--taskId");
command.add(task.getId());
log.info(
"Peon Command for K8s job: %s",
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
Expand Down
Loading