Skip to content

Commit

Permalink
Allow users to pass task payload via deep storage instead of environm…
Browse files Browse the repository at this point in the history
…ent variable (#14887)

This change is meant to fix a issue where passing too large of a task payload to the mm-less task runner will cause the peon to fail to startup because the payload is passed (compressed) as a environment variable (TASK_JSON). In linux systems the limit for a environment variable is commonly 128KB, for windows systems less than this. Setting a env variable longer than this results in a bunch of "Argument list too long" errors.
  • Loading branch information
georgew5656 authored Oct 3, 2023
1 parent f3d1c8b commit 64754b6
Show file tree
Hide file tree
Showing 32 changed files with 1,043 additions and 95 deletions.
5 changes: 3 additions & 2 deletions distribution/docker/peon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ then
mkdir -p ${DRUID_DIRS_TO_CREATE}
fi

# take the ${TASK_JSON} environment variable and base64 decode, unzip and throw it in ${TASK_DIR}/task.json
mkdir -p ${TASK_DIR}; echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json;
# take the ${TASK_JSON} environment variable and base64 decode, unzip and throw it in ${TASK_DIR}/task.json.
# If TASK_JSON is not set, CliPeon will pull the task.json file from deep storage.
mkdir -p ${TASK_DIR}; [ -n "$TASK_JSON" ] && echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json;

exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon $@
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -71,9 +72,9 @@ public interface TaskStateListener

protected enum State
{
/** Lifecycle's state before {@link #run(Job, long, long)} or {@link #join(long)} is called. */
/** Lifecycle's state before {@link #run(Job, long, long, boolean)} or {@link #join(long)} is called. */
NOT_STARTED,
/** Lifecycle's state since {@link #run(Job, long, long)} is called. */
/** Lifecycle's state since {@link #run(Job, long, long, boolean)} is called. */
PENDING,
/** Lifecycle's state since {@link #join(long)} is called. */
RUNNING,
Expand All @@ -88,7 +89,6 @@ protected enum State
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;

@MonotonicNonNull
private LogWatch logWatch;

Expand Down Expand Up @@ -119,11 +119,15 @@ protected KubernetesPeonLifecycle(
* @return
* @throws IllegalStateException
*/
protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) throws IllegalStateException
protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout, boolean useDeepStorageForTaskPayload) throws IllegalStateException, IOException
{
try {
updateState(new State[]{State.NOT_STARTED}, State.PENDING);

if (useDeepStorageForTaskPayload) {
writeTaskPayload(task);
}

// In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation.
taskLocation = null;
kubernetesClient.launchPeonJobAndWaitForStart(
Expand All @@ -144,6 +148,25 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout)
}
}

private void writeTaskPayload(Task task) throws IOException
{
Path file = null;
try {
file = Files.createTempFile(taskId.getOriginalTaskId(), "task.json");
FileUtils.writeStringToFile(file.toFile(), mapper.writeValueAsString(task), Charset.defaultCharset());
taskLogs.pushTaskPayload(task.getId(), file.toFile());
}
catch (Exception e) {
log.error("Failed to write task payload for task: %s", taskId.getOriginalTaskId());
throw new RuntimeException(e);
}
finally {
if (file != null) {
Files.deleteIfExists(file);
}
}
}

/**
* Join existing Kubernetes Job
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ protected TaskStatus doTask(Task task, boolean run)
taskStatus = peonLifecycle.run(
adapter.fromTask(task),
config.getTaskLaunchTimeout().toStandardDuration().getMillis(),
config.getTaskTimeout().toStandardDuration().getMillis()
config.getTaskTimeout().toStandardDuration().getMillis(),
adapter.shouldUseDeepStorageForTaskPayload(task)
);
} else {
taskStatus = peonLifecycle.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final ServiceEmitter emitter;
private KubernetesTaskRunner runner;


@Inject
public KubernetesTaskRunnerFactory(
@Smile ObjectMapper smileMapper,
Expand Down Expand Up @@ -137,15 +136,17 @@ private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
smileMapper,
taskLogs
);
} else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
return new PodTemplateTaskAdapter(
kubernetesTaskRunnerConfig,
taskConfig,
druidNode,
smileMapper,
properties
properties,
taskLogs
);
} else {
return new SingleContainerTaskAdapter(
Expand All @@ -154,7 +155,8 @@ private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
smileMapper,
taskLogs
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ public class DruidK8sConstants
public static final String DRUID_HOSTNAME_ENV = "HOSTNAME";
public static final String LABEL_KEY = "druid.k8s.peons";
public static final String DRUID_LABEL_PREFIX = "druid.";
public static final long MAX_ENV_VARIABLE_KBS = 130048; // 127 KB
static final Predicate<Throwable> IS_TRANSIENT = e -> e instanceof KubernetesResourceNotFoundException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InternalServerError;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
Expand All @@ -57,8 +60,11 @@
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogs;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -89,14 +95,16 @@ public abstract class K8sTaskAdapter implements TaskAdapter
protected final StartupLoggingConfig startupLoggingConfig;
protected final DruidNode node;
protected final ObjectMapper mapper;
protected final TaskLogs taskLogs;

public K8sTaskAdapter(
KubernetesClientApi client,
KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode node,
ObjectMapper mapper
ObjectMapper mapper,
TaskLogs taskLogs
)
{
this.client = client;
Expand All @@ -105,6 +113,7 @@ public K8sTaskAdapter(
this.startupLoggingConfig = startupLoggingConfig;
this.node = node;
this.mapper = mapper;
this.taskLogs = taskLogs;
}

@Override
Expand Down Expand Up @@ -132,11 +141,39 @@ public Task toTask(Job from) throws IOException
Optional<EnvVar> taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst();
String contents = taskJson.map(envVar -> taskJson.get().getValue()).orElse(null);
if (contents == null) {
throw new IOException("No TASK_JSON environment variable found in pod: " + from.getMetadata().getName());
log.info("No TASK_JSON environment variable found in pod: %s. Trying to load task payload from deep storage.", from.getMetadata().getName());
return toTaskUsingDeepStorage(from);
}
return mapper.readValue(Base64Compression.decompressBase64(contents), Task.class);
}

private Task toTaskUsingDeepStorage(Job from) throws IOException
{
com.google.common.base.Optional<InputStream> taskBody = taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
if (!taskBody.isPresent()) {
throw InternalServerError.exception(
"Could not load task payload from deep storage for job [%s]. Check the overlord logs for any errors in uploading task payload to deep storage.",
from.getMetadata().getName()
);
}
String task = IOUtils.toString(taskBody.get(), Charset.defaultCharset());
return mapper.readValue(task, Task.class);
}

@Override
public K8sTaskId getTaskId(Job from)
{
Map<String, String> annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
throw DruidException.defensive().build("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
}
String taskId = annotations.get(DruidK8sConstants.TASK_ID);
if (taskId == null) {
throw DruidException.defensive().build("No task_id annotation found on pod spec for job [%s]", from.getMetadata().getName());
}
return new K8sTaskId(taskId);
}

@VisibleForTesting
abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException;

Expand Down Expand Up @@ -219,15 +256,11 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context
.build());
}

mainContainer.getEnv().addAll(Lists.newArrayList(
List<EnvVar> envVars = Lists.newArrayList(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
.withValue(context.getTaskDir().getAbsolutePath())
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_JSON_ENV)
.withValue(taskContents)
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.JAVA_OPTS)
.withValue(Joiner.on(" ").join(context.getJavaOpts()))
Expand All @@ -244,7 +277,17 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context
null,
"metadata.name"
)).build()).build()
));
);

if (taskContents.length() < DruidK8sConstants.MAX_ENV_VARIABLE_KBS) {
envVars.add(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_JSON_ENV)
.withValue(taskContents)
.build()
);
}
mainContainer.getEnv().addAll(envVars);
}

protected Container setupMainContainer(
Expand Down Expand Up @@ -403,6 +446,9 @@ private List<String> generateCommand(Task task)
command.add("--loadBroadcastSegments");
command.add("true");
}

command.add("--taskId");
command.add(task.getId());
log.info(
"Peon Command for K8s job: %s",
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
Expand Down Expand Up @@ -433,5 +479,12 @@ static ResourceRequirements getResourceRequirements(ResourceRequirements require
}
return requirements;
}

@Override
public boolean shouldUseDeepStorageForTaskPayload(Task task) throws IOException
{
String compressedTaskPayload = Base64Compression.compressBase64(mapper.writeValueAsString(task));
return compressedTaskPayload.length() > DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogs;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -59,10 +60,11 @@ public MultiContainerTaskAdapter(
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode druidNode,
ObjectMapper mapper
ObjectMapper mapper,
TaskLogs taskLogs
)
{
super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper);
super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper, taskLogs);
}

@Override
Expand Down
Loading

0 comments on commit 64754b6

Please sign in to comment.