Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to pass task payload via deep storage instead of environment variable #14887

Merged
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7a86726
Separate out task logs
georgew5656 Aug 17, 2023
151b5c0
working with cleaner configs
georgew5656 Aug 17, 2023
455dfcf
Remove unneeded changes
georgew5656 Aug 17, 2023
6dd5713
Working with new configs
georgew5656 Aug 18, 2023
4823adc
Merge branch 'master' of github.com:georgew5656/druid into saveTaskLogs
georgew5656 Aug 18, 2023
14650c9
Pulling remote changes in
georgew5656 Aug 18, 2023
378a472
Fixing checkstyle
georgew5656 Aug 18, 2023
12374be
Cleanup unit tests
georgew5656 Aug 20, 2023
ea8a37b
fix checkstyle
georgew5656 Aug 21, 2023
39c62ac
Add more unit tests
georgew5656 Aug 21, 2023
ca17d62
Clean up check failures
georgew5656 Aug 21, 2023
bf21854
PR changes
georgew5656 Aug 23, 2023
be50e45
Fix spellign errors
georgew5656 Aug 23, 2023
3a59f3f
Fix spacing in docs
georgew5656 Aug 28, 2023
3bc19de
Don't overwrite table format
georgew5656 Aug 28, 2023
4b05f39
don't fix table format
georgew5656 Aug 28, 2023
ffcf0a7
Rename config
georgew5656 Sep 13, 2023
01989d7
Fix merge conflicts
georgew5656 Sep 13, 2023
baf9c75
fix merge conflicts
georgew5656 Sep 13, 2023
a74aad7
Remove config options
georgew5656 Sep 15, 2023
bbe7444
Small fixes
georgew5656 Sep 15, 2023
f3f8aac
Remove uneeded param
georgew5656 Sep 18, 2023
7760688
fix build
georgew5656 Sep 18, 2023
519b046
Merge branch 'master' of github.com:georgew5656/druid into useTaskMan…
georgew5656 Sep 19, 2023
4afa8f3
remove unneeded arg
georgew5656 Sep 19, 2023
0af80bf
Remove unused import
georgew5656 Sep 19, 2023
ba82cb1
PR changes
georgew5656 Sep 22, 2023
f200439
more pr changes
georgew5656 Sep 25, 2023
9f171fe
Merge branch 'master' into useTaskManagerForTaskPayload
georgew5656 Sep 27, 2023
59dc958
More pr changes
georgew5656 Sep 28, 2023
6123a3e
Fix static checks
georgew5656 Sep 29, 2023
99b68f0
Update extensions-contrib/kubernetes-overlord-extensions/src/main/jav…
georgew5656 Sep 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distribution/docker/peon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,6 @@ then
fi

# take the ${TASK_JSON} environment variable and base64 decode, unzip and throw it in ${TASK_DIR}/task.json
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
mkdir -p ${TASK_DIR}; echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json;
mkdir -p ${TASK_DIR}; [ -n "$TASK_JSON" ] && echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json;

exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon $@
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -71,9 +72,9 @@ public interface TaskStateListener

protected enum State
{
/** Lifecycle's state before {@link #run(Job, long, long)} or {@link #join(long)} is called. */
/** Lifecycle's state before {@link #run(Job, long, long, Boolean)} or {@link #join(long)} is called. */
NOT_STARTED,
/** Lifecycle's state since {@link #run(Job, long, long)} is called. */
/** Lifecycle's state since {@link #run(Job, long, long, Boolean)} is called. */
PENDING,
/** Lifecycle's state since {@link #join(long)} is called. */
RUNNING,
Expand All @@ -88,7 +89,6 @@ protected enum State
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;

@MonotonicNonNull
private LogWatch logWatch;

Expand Down Expand Up @@ -119,11 +119,15 @@ protected KubernetesPeonLifecycle(
* @return
* @throws IllegalStateException
*/
protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) throws IllegalStateException
protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout, Boolean useDeepStorageForTaskPayload) throws IllegalStateException
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
{
try {
updateState(new State[]{State.NOT_STARTED}, State.PENDING);

if (useDeepStorageForTaskPayload) {
writeTaskPayload(task);
}

// In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation.
taskLocation = null;
kubernetesClient.launchPeonJobAndWaitForStart(
Expand All @@ -144,6 +148,28 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout)
}
}

private void writeTaskPayload(Task task)
{
try {
abhishekagarwal87 marked this conversation as resolved.
Show resolved Hide resolved
Path file = Files.createTempFile(taskId.getOriginalTaskId(), "task.json");
try {
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
FileUtils.writeStringToFile(file.toFile(), mapper.writeValueAsString(task), Charset.defaultCharset());
taskLogs.pushTaskPayload(task.getId(), file.toFile());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that a log cleanup job removes the task payload from the deep storage while task is still in progress? How are these payloads cleaned up from deep storage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the task reads the file to disk on startup so i wouldn't be worried about the log cleanup job to clean it up that soon. we are relying on the log cleanup job to cleanup deep storage

}
catch (Exception e) {
log.error("Failed to write task payload for task: %s", taskId.getOriginalTaskId());
throw new RuntimeException(e);
}
finally {
Files.deleteIfExists(file);
}
}
catch (IOException e) {
log.error("Failed to write task payload for task: %s", taskId.getOriginalTaskId());
throw new RuntimeException(e);
}
}

/**
* Join existing Kubernetes Job
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ protected TaskStatus doTask(Task task, boolean run)
taskStatus = peonLifecycle.run(
adapter.fromTask(task),
config.getTaskLaunchTimeout().toStandardDuration().getMillis(),
config.getTaskTimeout().toStandardDuration().getMillis()
config.getTaskTimeout().toStandardDuration().getMillis(),
adapter.shouldUseDeepStorageForTaskPayload(task)
);
} else {
taskStatus = peonLifecycle.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final ServiceEmitter emitter;
private KubernetesTaskRunner runner;


@Inject
public KubernetesTaskRunnerFactory(
@Smile ObjectMapper smileMapper,
Expand Down Expand Up @@ -137,15 +136,17 @@ private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
smileMapper,
taskLogs
);
} else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
return new PodTemplateTaskAdapter(
kubernetesTaskRunnerConfig,
taskConfig,
druidNode,
smileMapper,
properties
properties,
taskLogs
);
} else {
return new SingleContainerTaskAdapter(
Expand All @@ -154,7 +155,8 @@ private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
smileMapper,
taskLogs
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ public class DruidK8sConstants
public static final String DRUID_HOSTNAME_ENV = "HOSTNAME";
public static final String LABEL_KEY = "druid.k8s.peons";
public static final String DRUID_LABEL_PREFIX = "druid.";
public static final long MAX_ENV_VARIABLE_KBS = 130048; // 127 KB
static final Predicate<Throwable> IS_TRANSIENT = e -> e instanceof KubernetesResourceNotFoundException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
import org.apache.druid.indexing.overlord.QuotableWhiteSpaceSplitter;
import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.Base64Compression;
Expand All @@ -57,8 +59,11 @@
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogs;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -89,14 +94,16 @@ public abstract class K8sTaskAdapter implements TaskAdapter
protected final StartupLoggingConfig startupLoggingConfig;
protected final DruidNode node;
protected final ObjectMapper mapper;
protected final TaskLogs taskLogs;

public K8sTaskAdapter(
KubernetesClientApi client,
KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode node,
ObjectMapper mapper
ObjectMapper mapper,
TaskLogs taskLogs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name of this class has been confusing since it does so much more than dealing with task logs. could be fixed in some other PR someday.

)
{
this.client = client;
Expand All @@ -105,6 +112,7 @@ public K8sTaskAdapter(
this.startupLoggingConfig = startupLoggingConfig;
this.node = node;
this.mapper = mapper;
this.taskLogs = taskLogs;
}

@Override
Expand Down Expand Up @@ -132,11 +140,36 @@ public Task toTask(Job from) throws IOException
Optional<EnvVar> taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst();
String contents = taskJson.map(envVar -> taskJson.get().getValue()).orElse(null);
if (contents == null) {
throw new IOException("No TASK_JSON environment variable found in pod: " + from.getMetadata().getName());
log.info("No TASK_JSON environment variable found in pod: %s. Trying to load task payload from deep storage.", from.getMetadata().getName());
return toTaskUsingDeepStorage(from);
}
return mapper.readValue(Base64Compression.decompressBase64(contents), Task.class);
}

private Task toTaskUsingDeepStorage(Job from) throws IOException
{
com.google.common.base.Optional<InputStream> taskBody = taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
if (!taskBody.isPresent()) {
throw new IOE("Could not load task payload for job [%s]", from.getMetadata().getName());
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
}
String task = IOUtils.toString(taskBody.get(), Charset.defaultCharset());
return mapper.readValue(task, Task.class);
}

@Override
public K8sTaskId getTaskId(Job from) throws IOException
{
Map<String, String> annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be replaced with DruidException.defensive()?

}
String taskId = annotations.get(DruidK8sConstants.TASK_ID);
if (taskId == null) {
throw new IOE("No task_id annotation found on pod spec for job [%s]", from.getMetadata().getName());
}
return new K8sTaskId(taskId);
}

@VisibleForTesting
abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException;

Expand Down Expand Up @@ -219,15 +252,11 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context
.build());
}

mainContainer.getEnv().addAll(Lists.newArrayList(
List<EnvVar> envVars = Lists.newArrayList(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
.withValue(context.getTaskDir().getAbsolutePath())
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_JSON_ENV)
.withValue(taskContents)
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.JAVA_OPTS)
.withValue(Joiner.on(" ").join(context.getJavaOpts()))
Expand All @@ -244,7 +273,17 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context
null,
"metadata.name"
)).build()).build()
));
);

if (taskContents.length() < DruidK8sConstants.MAX_ENV_VARIABLE_KBS) {
envVars.add(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_JSON_ENV)
.withValue(taskContents)
.build()
);
}
mainContainer.getEnv().addAll(envVars);
}

protected Container setupMainContainer(
Expand Down Expand Up @@ -403,6 +442,9 @@ private List<String> generateCommand(Task task)
command.add("--loadBroadcastSegments");
command.add("true");
}

command.add("--taskId");
command.add(task.getId());
log.info(
"Peon Command for K8s job: %s",
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
Expand Down Expand Up @@ -433,5 +475,18 @@ static ResourceRequirements getResourceRequirements(ResourceRequirements require
}
return requirements;
}

@Override
public Boolean shouldUseDeepStorageForTaskPayload(Task task)
{
try {
String compressedTaskPayload = Base64Compression.compressBase64(mapper.writeValueAsString(task));
return compressedTaskPayload.length() > DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
}
catch (Exception e) {
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
// In case there's a issue with checking how large the task is, default to using deep storage so we don't lose the task payload.
return true;
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogs;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -59,10 +60,11 @@ public MultiContainerTaskAdapter(
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode druidNode,
ObjectMapper mapper
ObjectMapper mapper,
TaskLogs taskLogs
)
{
super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper);
super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper, taskLogs);
}

@Override
Expand Down
Loading