diff --git a/distribution/docker/peon.sh b/distribution/docker/peon.sh index 66e34c997445..b5ec89ea8d46 100755 --- a/distribution/docker/peon.sh +++ b/distribution/docker/peon.sh @@ -149,7 +149,8 @@ then mkdir -p ${DRUID_DIRS_TO_CREATE} fi -# take the ${TASK_JSON} environment variable and base64 decode, unzip and throw it in ${TASK_DIR}/task.json -mkdir -p ${TASK_DIR}; echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json; +# take the ${TASK_JSON} environment variable and base64 decode, unzip and throw it in ${TASK_DIR}/task.json. +# If TASK_JSON is not set, CliPeon will pull the task.json file from deep storage. +mkdir -p ${TASK_DIR}; [ -n "$TASK_JSON" ] && echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json; exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon $@ diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index 4814d8cbb609..5c6c7c6b3ebe 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -71,9 +72,9 @@ public interface TaskStateListener protected enum State { - /** Lifecycle's state before {@link #run(Job, long, long)} or {@link #join(long)} is called. */ + /** Lifecycle's state before {@link #run(Job, long, long, boolean)} or {@link #join(long)} is called. */ NOT_STARTED, - /** Lifecycle's state since {@link #run(Job, long, long)} is called. */ + /** Lifecycle's state since {@link #run(Job, long, long, boolean)} is called. */ PENDING, /** Lifecycle's state since {@link #join(long)} is called. */ RUNNING, @@ -88,7 +89,6 @@ protected enum State private final KubernetesPeonClient kubernetesClient; private final ObjectMapper mapper; private final TaskStateListener stateListener; - @MonotonicNonNull private LogWatch logWatch; @@ -119,11 +119,15 @@ protected KubernetesPeonLifecycle( * @return * @throws IllegalStateException */ - protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) throws IllegalStateException + protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout, boolean useDeepStorageForTaskPayload) throws IllegalStateException, IOException { try { updateState(new State[]{State.NOT_STARTED}, State.PENDING); + if (useDeepStorageForTaskPayload) { + writeTaskPayload(task); + } + // In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation. taskLocation = null; kubernetesClient.launchPeonJobAndWaitForStart( @@ -144,6 +148,25 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) } } + private void writeTaskPayload(Task task) throws IOException + { + Path file = null; + try { + file = Files.createTempFile(taskId.getOriginalTaskId(), "task.json"); + FileUtils.writeStringToFile(file.toFile(), mapper.writeValueAsString(task), Charset.defaultCharset()); + taskLogs.pushTaskPayload(task.getId(), file.toFile()); + } + catch (Exception e) { + log.error("Failed to write task payload for task: %s", taskId.getOriginalTaskId()); + throw new RuntimeException(e); + } + finally { + if (file != null) { + Files.deleteIfExists(file); + } + } + } + /** * Join existing Kubernetes Job * diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 33efd848d0ac..a0a29dcbbb92 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -193,7 +193,8 @@ protected TaskStatus doTask(Task task, boolean run) taskStatus = peonLifecycle.run( adapter.fromTask(task), config.getTaskLaunchTimeout().toStandardDuration().getMillis(), - config.getTaskTimeout().toStandardDuration().getMillis() + config.getTaskTimeout().toStandardDuration().getMillis(), + adapter.shouldUseDeepStorageForTaskPayload(task) ); } else { taskStatus = peonLifecycle.join( diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java index 92fc220e6213..72d7ef0c00d4 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java @@ -58,7 +58,6 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory IS_TRANSIENT = e -> e instanceof KubernetesResourceNotFoundException; } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java index 712bc1a47e20..862b176b1159 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java @@ -41,7 +41,10 @@ import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InternalServerError; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ForkingTaskRunner; @@ -57,8 +60,11 @@ import org.apache.druid.k8s.overlord.common.PeonCommandContext; import org.apache.druid.server.DruidNode; import org.apache.druid.server.log.StartupLoggingConfig; +import org.apache.druid.tasklogs.TaskLogs; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -89,6 +95,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter protected final StartupLoggingConfig startupLoggingConfig; protected final DruidNode node; protected final ObjectMapper mapper; + protected final TaskLogs taskLogs; public K8sTaskAdapter( KubernetesClientApi client, @@ -96,7 +103,8 @@ public K8sTaskAdapter( TaskConfig taskConfig, StartupLoggingConfig startupLoggingConfig, DruidNode node, - ObjectMapper mapper + ObjectMapper mapper, + TaskLogs taskLogs ) { this.client = client; @@ -105,6 +113,7 @@ public K8sTaskAdapter( this.startupLoggingConfig = startupLoggingConfig; this.node = node; this.mapper = mapper; + this.taskLogs = taskLogs; } @Override @@ -132,11 +141,39 @@ public Task toTask(Job from) throws IOException Optional taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst(); String contents = taskJson.map(envVar -> taskJson.get().getValue()).orElse(null); if (contents == null) { - throw new IOException("No TASK_JSON environment variable found in pod: " + from.getMetadata().getName()); + log.info("No TASK_JSON environment variable found in pod: %s. Trying to load task payload from deep storage.", from.getMetadata().getName()); + return toTaskUsingDeepStorage(from); } return mapper.readValue(Base64Compression.decompressBase64(contents), Task.class); } + private Task toTaskUsingDeepStorage(Job from) throws IOException + { + com.google.common.base.Optional taskBody = taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId()); + if (!taskBody.isPresent()) { + throw InternalServerError.exception( + "Could not load task payload from deep storage for job [%s]. Check the overlord logs for any errors in uploading task payload to deep storage.", + from.getMetadata().getName() + ); + } + String task = IOUtils.toString(taskBody.get(), Charset.defaultCharset()); + return mapper.readValue(task, Task.class); + } + + @Override + public K8sTaskId getTaskId(Job from) + { + Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); + if (annotations == null) { + throw DruidException.defensive().build("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); + } + String taskId = annotations.get(DruidK8sConstants.TASK_ID); + if (taskId == null) { + throw DruidException.defensive().build("No task_id annotation found on pod spec for job [%s]", from.getMetadata().getName()); + } + return new K8sTaskId(taskId); + } + @VisibleForTesting abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException; @@ -219,15 +256,11 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context .build()); } - mainContainer.getEnv().addAll(Lists.newArrayList( + List envVars = Lists.newArrayList( new EnvVarBuilder() .withName(DruidK8sConstants.TASK_DIR_ENV) .withValue(context.getTaskDir().getAbsolutePath()) .build(), - new EnvVarBuilder() - .withName(DruidK8sConstants.TASK_JSON_ENV) - .withValue(taskContents) - .build(), new EnvVarBuilder() .withName(DruidK8sConstants.JAVA_OPTS) .withValue(Joiner.on(" ").join(context.getJavaOpts())) @@ -244,7 +277,17 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context null, "metadata.name" )).build()).build() - )); + ); + + if (taskContents.length() < DruidK8sConstants.MAX_ENV_VARIABLE_KBS) { + envVars.add( + new EnvVarBuilder() + .withName(DruidK8sConstants.TASK_JSON_ENV) + .withValue(taskContents) + .build() + ); + } + mainContainer.getEnv().addAll(envVars); } protected Container setupMainContainer( @@ -403,6 +446,9 @@ private List generateCommand(Task task) command.add("--loadBroadcastSegments"); command.add("true"); } + + command.add("--taskId"); + command.add(task.getId()); log.info( "Peon Command for K8s job: %s", ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), command) @@ -433,5 +479,12 @@ static ResourceRequirements getResourceRequirements(ResourceRequirements require } return requirements; } + + @Override + public boolean shouldUseDeepStorageForTaskPayload(Task task) throws IOException + { + String compressedTaskPayload = Base64Compression.compressBase64(mapper.writeValueAsString(task)); + return compressedTaskPayload.length() > DruidK8sConstants.MAX_ENV_VARIABLE_KBS; + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java index 9cda8f864882..a81154a3bcba 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java @@ -43,6 +43,7 @@ import org.apache.druid.k8s.overlord.common.PeonCommandContext; import org.apache.druid.server.DruidNode; import org.apache.druid.server.log.StartupLoggingConfig; +import org.apache.druid.tasklogs.TaskLogs; import java.io.IOException; import java.util.Collections; @@ -59,10 +60,11 @@ public MultiContainerTaskAdapter( TaskConfig taskConfig, StartupLoggingConfig startupLoggingConfig, DruidNode druidNode, - ObjectMapper mapper + ObjectMapper mapper, + TaskLogs taskLogs ) { - super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper); + super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper, taskLogs); } @Override diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index a3d10f7dcd1c..ef0509a673f8 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -24,8 +24,8 @@ import com.fasterxml.jackson.databind.introspect.AnnotatedClass; import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver; import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import io.fabric8.kubernetes.api.model.EnvVar; import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder; @@ -35,11 +35,13 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.client.utils.Serialization; +import org.apache.commons.io.IOUtils; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InternalServerError; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -49,12 +51,16 @@ import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesOverlordUtils; import org.apache.druid.server.DruidNode; +import org.apache.druid.tasklogs.TaskLogs; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; import java.nio.file.Files; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -85,13 +91,15 @@ public class PodTemplateTaskAdapter implements TaskAdapter private final DruidNode node; private final ObjectMapper mapper; private final HashMap templates; + private final TaskLogs taskLogs; public PodTemplateTaskAdapter( KubernetesTaskRunnerConfig taskRunnerConfig, TaskConfig taskConfig, DruidNode node, ObjectMapper mapper, - Properties properties + Properties properties, + TaskLogs taskLogs ) { this.taskRunnerConfig = taskRunnerConfig; @@ -99,6 +107,7 @@ public PodTemplateTaskAdapter( this.node = node; this.mapper = mapper; this.templates = initializePodTemplates(properties); + this.taskLogs = taskLogs; } /** @@ -163,15 +172,44 @@ public Task toTask(Job from) throws IOException { Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); if (annotations == null) { - throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); + log.info("No annotations found on pod spec for job [%s]. Trying to load task payload from deep storage.", from.getMetadata().getName()); + return toTaskUsingDeepStorage(from); } String task = annotations.get(DruidK8sConstants.TASK); if (task == null) { - throw new IOE("No task annotation found on pod spec for job [%s]", from.getMetadata().getName()); + log.info("No task annotation found on pod spec for job [%s]. Trying to load task payload from deep storage.", from.getMetadata().getName()); + return toTaskUsingDeepStorage(from); } return mapper.readValue(Base64Compression.decompressBase64(task), Task.class); } + private Task toTaskUsingDeepStorage(Job from) throws IOException + { + com.google.common.base.Optional taskBody = taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId()); + if (!taskBody.isPresent()) { + throw InternalServerError.exception( + "Could not load task payload from deep storage for job [%s]. Check the overlord logs for errors uploading task payloads to deep storage.", + from.getMetadata().getName() + ); + } + String task = IOUtils.toString(taskBody.get(), Charset.defaultCharset()); + return mapper.readValue(task, Task.class); + } + + @Override + public K8sTaskId getTaskId(Job from) + { + Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); + if (annotations == null) { + throw DruidException.defensive().build("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); + } + String taskId = annotations.get(DruidK8sConstants.TASK_ID); + if (taskId == null) { + throw DruidException.defensive().build("No task_id annotation found on pod spec for job [%s]", from.getMetadata().getName()); + } + return new K8sTaskId(taskId); + } + private HashMap initializePodTemplates(Properties properties) { HashMap podTemplateMap = new HashMap<>(); @@ -208,9 +246,9 @@ private Optional loadPodTemplate(String key, Properties properties) } } - private Collection getEnv(Task task) + private Collection getEnv(Task task) throws IOException { - return ImmutableList.of( + List envVars = Lists.newArrayList( new EnvVarBuilder() .withName(DruidK8sConstants.TASK_DIR_ENV) .withValue(taskConfig.getBaseDir()) @@ -219,17 +257,21 @@ private Collection getEnv(Task task) .withName(DruidK8sConstants.TASK_ID_ENV) .withValue(task.getId()) .build(), - new EnvVarBuilder() - .withName(DruidK8sConstants.TASK_JSON_ENV) - .withValueFrom(new EnvVarSourceBuilder().withFieldRef(new ObjectFieldSelector( - null, - StringUtils.format("metadata.annotations['%s']", DruidK8sConstants.TASK) - )).build()).build(), new EnvVarBuilder() .withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV) .withValue(Boolean.toString(task.supportsQueries())) .build() ); + if (!shouldUseDeepStorageForTaskPayload(task)) { + envVars.add(new EnvVarBuilder() + .withName(DruidK8sConstants.TASK_JSON_ENV) + .withValueFrom(new EnvVarSourceBuilder().withFieldRef(new ObjectFieldSelector( + null, + StringUtils.format("metadata.annotations['%s']", DruidK8sConstants.TASK) + )).build()).build() + ); + } + return envVars; } private Map getPodLabels(KubernetesTaskRunnerConfig config, Task task) @@ -239,14 +281,18 @@ private Map getPodLabels(KubernetesTaskRunnerConfig config, Task private Map getPodTemplateAnnotations(Task task) throws IOException { - return ImmutableMap.builder() - .put(DruidK8sConstants.TASK, Base64Compression.compressBase64(mapper.writeValueAsString(task))) + ImmutableMap.Builder podTemplateAnnotationBuilder = ImmutableMap.builder() .put(DruidK8sConstants.TLS_ENABLED, String.valueOf(node.isEnableTlsPort())) .put(DruidK8sConstants.TASK_ID, task.getId()) .put(DruidK8sConstants.TASK_TYPE, task.getType()) .put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId()) - .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource()) - .build(); + .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource()); + + if (!shouldUseDeepStorageForTaskPayload(task)) { + podTemplateAnnotationBuilder + .put(DruidK8sConstants.TASK, Base64Compression.compressBase64(mapper.writeValueAsString(task))); + } + return podTemplateAnnotationBuilder.build(); } private Map getJobLabels(KubernetesTaskRunnerConfig config, Task task) @@ -276,4 +322,11 @@ private String getDruidLabel(String baseLabel) { return DruidK8sConstants.DRUID_LABEL_PREFIX + baseLabel; } + + @Override + public boolean shouldUseDeepStorageForTaskPayload(Task task) throws IOException + { + String compressedTaskPayload = Base64Compression.compressBase64(mapper.writeValueAsString(task)); + return compressedTaskPayload.length() > DruidK8sConstants.MAX_ENV_VARIABLE_KBS; + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java index c64acd153106..be0588c35b15 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java @@ -32,6 +32,7 @@ import org.apache.druid.k8s.overlord.common.PeonCommandContext; import org.apache.druid.server.DruidNode; import org.apache.druid.server.log.StartupLoggingConfig; +import org.apache.druid.tasklogs.TaskLogs; import java.io.IOException; import java.util.Collections; @@ -47,10 +48,11 @@ public SingleContainerTaskAdapter( TaskConfig taskConfig, StartupLoggingConfig startupLoggingConfig, DruidNode druidNode, - ObjectMapper mapper + ObjectMapper mapper, + TaskLogs taskLogs ) { - super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper); + super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper, taskLogs); } @Override diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java index 05933604f2ba..9dacb213cf3f 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java @@ -21,6 +21,7 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.k8s.overlord.common.K8sTaskId; import java.io.IOException; @@ -31,4 +32,10 @@ public interface TaskAdapter Task toTask(Job from) throws IOException; + K8sTaskId getTaskId(Job from); + + /** + * Method for exposing to external classes whether the task has its task payload bundled by the adapter or relies on a external system + */ + boolean shouldUseDeepStorageForTaskPayload(Task task) throws IOException; } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 4c46c278e260..1c6e429a3dc3 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -79,7 +79,7 @@ public void setup() } @Test - public void test_run() + public void test_run() throws IOException { KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, @@ -114,7 +114,7 @@ protected synchronized TaskStatus join(long timeout) replayAll(); - TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L); + TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L, false); verifyAll(); @@ -124,7 +124,51 @@ protected synchronized TaskStatus join(long timeout) } @Test - public void test_run_whenCalledMultipleTimes_raisesIllegalStateException() + public void test_run_useTaskManager() throws IOException + { + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( + task, + kubernetesClient, + taskLogs, + mapper, + stateListener + ) + { + @Override + protected synchronized TaskStatus join(long timeout) + { + return TaskStatus.success(ID); + } + }; + + Job job = new JobBuilder().withNewMetadata().withName(ID).endMetadata().build(); + + EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart( + EasyMock.eq(job), + EasyMock.eq(task), + EasyMock.anyLong(), + EasyMock.eq(TimeUnit.MILLISECONDS) + )).andReturn(null); + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); + + stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID); + EasyMock.expectLastCall().once(); + stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); + EasyMock.expectLastCall().once(); + + taskLogs.pushTaskPayload(EasyMock.anyString(), EasyMock.anyObject()); + replayAll(); + + TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L, true); + + verifyAll(); + Assert.assertTrue(taskStatus.isSuccess()); + Assert.assertEquals(ID, taskStatus.getId()); + Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); + } + + @Test + public void test_run_whenCalledMultipleTimes_raisesIllegalStateException() throws IOException { KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, @@ -159,12 +203,12 @@ protected synchronized TaskStatus join(long timeout) replayAll(); - peonLifecycle.run(job, 0L, 0L); + peonLifecycle.run(job, 0L, 0L, false); Assert.assertThrows( "Task [id] failed to run: invalid peon lifecycle state transition [STOPPED]->[PENDING]", IllegalStateException.class, - () -> peonLifecycle.run(job, 0L, 0L) + () -> peonLifecycle.run(job, 0L, 0L, false) ); verifyAll(); @@ -208,7 +252,7 @@ protected synchronized TaskStatus join(long timeout) Assert.assertThrows( Exception.class, - () -> peonLifecycle.run(job, 0L, 0L) + () -> peonLifecycle.run(job, 0L, 0L, false) ); verifyAll(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index e6b1b8006af0..36a7b4cfcd9c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -221,10 +221,12 @@ public void test_run_withoutExistingTask() throws IOException, ExecutionExceptio TaskStatus taskStatus = TaskStatus.success(task.getId()); EasyMock.expect(taskAdapter.fromTask(task)).andReturn(job); + EasyMock.expect(taskAdapter.shouldUseDeepStorageForTaskPayload(task)).andReturn(false); EasyMock.expect(kubernetesPeonLifecycle.run( EasyMock.eq(job), EasyMock.anyLong(), - EasyMock.anyLong() + EasyMock.anyLong(), + EasyMock.anyBoolean() )).andReturn(taskStatus); replayAll(); @@ -256,10 +258,12 @@ public void test_run_whenExceptionThrown_throwsRuntimeException() throws IOExcep .build(); EasyMock.expect(taskAdapter.fromTask(task)).andReturn(job); + EasyMock.expect(taskAdapter.shouldUseDeepStorageForTaskPayload(task)).andReturn(false); EasyMock.expect(kubernetesPeonLifecycle.run( EasyMock.eq(job), EasyMock.anyLong(), - EasyMock.anyLong() + EasyMock.anyLong(), + EasyMock.anyBoolean() )).andThrow(new IllegalStateException()); replayAll(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java index 22e2311bbbae..098161685883 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java @@ -129,7 +129,8 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception taskConfig, startupLoggingConfig, druidNode, - jsonMapper + jsonMapper, + null ); String taskBasePath = "/home/taskDir"; PeonCommandContext context = new PeonCommandContext(Collections.singletonList( diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java index 19701e4f26e1..c1100ccc29d1 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java @@ -37,10 +37,13 @@ import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.api.model.batch.v1.JobList; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.StringUtils; +import org.apache.druid.error.DruidException; import org.apache.druid.guice.FirehoseModule; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.config.TaskConfig; @@ -52,6 +55,7 @@ import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; +import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.K8sTestUtils; import org.apache.druid.k8s.overlord.common.KubernetesExecutor; import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException; @@ -59,15 +63,23 @@ import org.apache.druid.k8s.overlord.common.TestKubernetesClient; import org.apache.druid.server.DruidNode; import org.apache.druid.server.log.StartupLoggingConfig; +import org.apache.druid.tasklogs.NoopTaskLogs; +import org.apache.druid.tasklogs.TaskLogs; +import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -82,6 +94,8 @@ class K8sTaskAdapterTest private final TaskConfig taskConfig; private final DruidNode node; private final ObjectMapper jsonMapper; + private final TaskLogs taskLogs; + public K8sTaskAdapterTest() { @@ -105,6 +119,7 @@ public K8sTaskAdapterTest() ); startupLoggingConfig = new StartupLoggingConfig(); taskConfig = new TaskConfigBuilder().setBaseDir("src/test/resources").build(); + taskLogs = new NoopTaskLogs(); } @Test @@ -139,7 +154,9 @@ public PodSpec getSpec() taskConfig, startupLoggingConfig, node, - jsonMapper + jsonMapper, + taskLogs + ); Task task = K8sTestUtils.getTask(); Job jobFromSpec = adapter.fromTask(task); @@ -166,7 +183,8 @@ public void serializingAndDeserializingATask() throws IOException taskConfig, startupLoggingConfig, node, - jsonMapper + jsonMapper, + taskLogs ); Task task = K8sTestUtils.getTask(); Job jobFromSpec = adapter.createJobFromPodSpec( @@ -189,6 +207,169 @@ public void serializingAndDeserializingATask() throws IOException assertEquals(task, taskFromJob); } + @Test + public void fromTask_dontSetTaskJSON() throws IOException + { + final PodSpec podSpec = K8sTestUtils.getDummyPodSpec(); + TestKubernetesClient testClient = new TestKubernetesClient(client) + { + @SuppressWarnings("unchecked") + @Override + public T executeRequest(KubernetesExecutor executor) throws KubernetesResourceNotFoundException + { + return (T) new Pod() + { + @Override + public PodSpec getSpec() + { + return podSpec; + } + }; + } + }; + + KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("test") + .build(); + K8sTaskAdapter adapter = new SingleContainerTaskAdapter( + testClient, + config, + taskConfig, + startupLoggingConfig, + node, + jsonMapper, + taskLogs + ); + Task task = new NoopTask( + "id", + "id", + "datasource", + 0, + 0, + ImmutableMap.of("context", RandomStringUtils.randomAlphanumeric((int) DruidK8sConstants.MAX_ENV_VARIABLE_KBS * 20)) + ); + Job job = adapter.fromTask(task); + // TASK_JSON should not be set in env variables + Assertions.assertFalse( + job.getSpec() + .getTemplate() + .getSpec() + .getContainers() + .get(0).getEnv() + .stream().anyMatch(env -> env.getName().equals(DruidK8sConstants.TASK_JSON_ENV)) + ); + + // --taskId should be passed to the peon command args + Assertions.assertTrue( + Arrays.stream(job.getSpec() + .getTemplate() + .getSpec() + .getContainers() + .get(0) + .getArgs() + .get(0).split(" ")).collect(Collectors.toSet()) + .containsAll(ImmutableList.of("--taskId", task.getId())) + ); + } + + @Test + public void toTask_useTaskPayloadManager() throws IOException + { + TestKubernetesClient testClient = new TestKubernetesClient(client); + KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("test") + .build(); + Task taskInTaskPayloadManager = K8sTestUtils.getTask(); + TaskLogs mockTestLogs = Mockito.mock(TaskLogs.class); + Mockito.when(mockTestLogs.streamTaskPayload("ID")).thenReturn(com.google.common.base.Optional.of( + new ByteArrayInputStream(jsonMapper.writeValueAsString(taskInTaskPayloadManager).getBytes(Charset.defaultCharset())) + )); + K8sTaskAdapter adapter = new SingleContainerTaskAdapter( + testClient, + config, + taskConfig, + startupLoggingConfig, + node, + jsonMapper, + mockTestLogs + ); + + Job job = new JobBuilder() + .editMetadata().withName("job").endMetadata() + .editSpec().editTemplate().editMetadata() + .addToAnnotations(DruidK8sConstants.TASK_ID, "ID") + .endMetadata().editSpec().addToContainers(new ContainerBuilder().withName("main").build()).endSpec().endTemplate().endSpec().build(); + + Task taskFromJob = adapter.toTask(job); + assertEquals(taskInTaskPayloadManager, taskFromJob); + } + + @Test + public void getTaskId() + { + TestKubernetesClient testClient = new TestKubernetesClient(client); + KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder().build(); + K8sTaskAdapter adapter = new SingleContainerTaskAdapter( + testClient, + config, + taskConfig, + startupLoggingConfig, + node, + jsonMapper, + taskLogs + ); + Job job = new JobBuilder() + .editSpec().editTemplate().editMetadata() + .addToAnnotations(DruidK8sConstants.TASK_ID, "ID") + .endMetadata().endTemplate().endSpec().build(); + + assertEquals(new K8sTaskId("ID"), adapter.getTaskId(job)); + } + + @Test + public void getTaskId_noAnnotations() + { + TestKubernetesClient testClient = new TestKubernetesClient(client); + KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder().build(); + K8sTaskAdapter adapter = new SingleContainerTaskAdapter( + testClient, + config, + taskConfig, + startupLoggingConfig, + node, + jsonMapper, + taskLogs + ); + Job job = new JobBuilder() + .editSpec().editTemplate().editMetadata() + .endMetadata().endTemplate().endSpec() + .editMetadata().withName("job").endMetadata().build(); + + Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job)); + } + + @Test + public void getTaskId_missingTaskIdAnnotation() + { + TestKubernetesClient testClient = new TestKubernetesClient(client); + KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder().build(); + K8sTaskAdapter adapter = new SingleContainerTaskAdapter( + testClient, + config, + taskConfig, + startupLoggingConfig, + node, + jsonMapper, + taskLogs + ); + Job job = new JobBuilder() + .editSpec().editTemplate().editMetadata() + .addToAnnotations(DruidK8sConstants.TASK_GROUP_ID, "ID") + .endMetadata().endTemplate().endSpec() + .editMetadata().withName("job").endMetadata().build(); + + Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job)); + } @Test void testGrabbingTheLastXmxValueFromACommand() { @@ -282,7 +463,8 @@ void testAddingMonitors() throws IOException taskConfig, startupLoggingConfig, node, - jsonMapper + jsonMapper, + taskLogs ); Task task = K8sTestUtils.getTask(); // no monitor in overlord, no monitor override @@ -305,7 +487,8 @@ void testAddingMonitors() throws IOException taskConfig, startupLoggingConfig, node, - jsonMapper + jsonMapper, + taskLogs ); adapter.addEnvironmentVariables(container, context, task.toString()); EnvVar env = container.getEnv() @@ -322,7 +505,8 @@ void testAddingMonitors() throws IOException taskConfig, startupLoggingConfig, node, - jsonMapper + jsonMapper, + taskLogs ); container.getEnv().add(new EnvVarBuilder() .withName("druid_monitoring_monitors") @@ -347,13 +531,16 @@ void testEphemeralStorageIsRespected() throws IOException KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() .withNamespace("test") .build(); - SingleContainerTaskAdapter adapter = - new SingleContainerTaskAdapter(testClient, - config, taskConfig, - startupLoggingConfig, - node, - jsonMapper - ); + + SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter( + testClient, + config, + taskConfig, + startupLoggingConfig, + node, + jsonMapper, + taskLogs + ); NoopTask task = K8sTestUtils.createTask("id", 1); Job actual = adapter.createJobFromPodSpec( pod.getSpec(), diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java index ac6e32d140ee..45ea08733768 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java @@ -41,6 +41,8 @@ import org.apache.druid.k8s.overlord.common.TestKubernetesClient; import org.apache.druid.server.DruidNode; import org.apache.druid.server.log.StartupLoggingConfig; +import org.apache.druid.tasklogs.TaskLogs; +import org.easymock.Mock; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -58,6 +60,7 @@ class MultiContainerTaskAdapterTest private TaskConfig taskConfig; private DruidNode druidNode; private ObjectMapper jsonMapper; + @Mock private TaskLogs taskLogs; @BeforeEach public void setup() @@ -98,7 +101,8 @@ public void testMultiContainerSupport() throws IOException taskConfig, startupLoggingConfig, druidNode, - jsonMapper + jsonMapper, + taskLogs ); NoopTask task = K8sTestUtils.createTask("id", 1); Job actual = adapter.createJobFromPodSpec( @@ -146,7 +150,8 @@ public void testMultiContainerSupportWithNamedContainer() throws IOException taskConfig, startupLoggingConfig, druidNode, - jsonMapper + jsonMapper, + taskLogs ); NoopTask task = K8sTestUtils.createTask("id", 1); PodSpec spec = pod.getSpec(); @@ -191,12 +196,16 @@ public void testOverridingPeonMonitors() throws IOException .withPrimaryContainerName("primary") .withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor")) .build(); - MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(testClient, - config, - taskConfig, - startupLoggingConfig, - druidNode, - jsonMapper); + + MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter( + testClient, + config, + taskConfig, + startupLoggingConfig, + druidNode, + jsonMapper, + taskLogs + ); NoopTask task = K8sTestUtils.createTask("id", 1); PodSpec spec = pod.getSpec(); K8sTaskAdapter.massageSpec(spec, config.getPrimaryContainerName()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java index dd4eadfea299..74dfacd1a327 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java @@ -20,30 +20,39 @@ package org.apache.druid.k8s.overlord.taskadapter; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import io.fabric8.kubernetes.api.model.PodTemplate; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfigBuilder; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; import org.apache.druid.k8s.overlord.common.Base64Compression; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; +import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.K8sTestUtils; import org.apache.druid.server.DruidNode; +import org.apache.druid.tasklogs.TaskLogs; import org.easymock.EasyMock; +import org.easymock.Mock; import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; @@ -51,6 +60,8 @@ import java.util.Properties; import java.util.stream.Collectors; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class PodTemplateTaskAdapterTest { @TempDir private Path tempDir; @@ -59,6 +70,7 @@ public class PodTemplateTaskAdapterTest private TaskConfig taskConfig; private DruidNode node; private ObjectMapper mapper; + @Mock private TaskLogs taskLogs; @BeforeEach public void setup() @@ -89,7 +101,8 @@ public void test_fromTask_withoutBasePodTemplateInRuntimeProperites_raisesIAE() taskConfig, node, mapper, - new Properties() + new Properties(), + taskLogs )); } @@ -109,7 +122,8 @@ public void test_fromTask_withBasePodTemplateInRuntimeProperites_withEmptyFile_r taskConfig, node, mapper, - props + props, + taskLogs )); } @@ -127,7 +141,8 @@ public void test_fromTask_withBasePodTemplateInRuntimeProperites() throws IOExce taskConfig, node, mapper, - props + props, + taskLogs ); Task task = new NoopTask("id", "id", "datasource", 0, 0, null); @@ -159,7 +174,8 @@ public void test_fromTask_withBasePodTemplateInRuntimeProperites_andTlsEnabled() true ), mapper, - props + props, + taskLogs ); Task task = new NoopTask("id", "id", "datasource", 0, 0, null); @@ -185,7 +201,8 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperties_withEmptyFile_r taskConfig, node, mapper, - props + props, + taskLogs )); } @@ -204,7 +221,8 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws IOExce taskConfig, node, mapper, - props + props, + taskLogs ); Task task = new NoopTask("id", "id", "datasource", 0, 0, null); @@ -215,7 +233,41 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws IOExce } @Test - public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException + public void test_fromTask_withNoopPodTemplateInRuntimeProperites_dontSetTaskJSON() throws IOException + { + Path templatePath = Files.createFile(tempDir.resolve("noop.yaml")); + mapper.writeValue(templatePath.toFile(), podTemplateSpec); + + Properties props = new Properties(); + props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); + props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", templatePath.toString()); + + PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( + taskRunnerConfig, + taskConfig, + node, + mapper, + props, + taskLogs + ); + + Task task = new NoopTask( + "id", + "id", + "datasource", + 0, + 0, + ImmutableMap.of("context", RandomStringUtils.randomAlphanumeric((int) DruidK8sConstants.MAX_ENV_VARIABLE_KBS * 20)) + ); + + Job actual = adapter.fromTask(task); + Job expected = K8sTestUtils.fileToResource("expectedNoopJobNoTaskJson.yaml", Job.class); + + Assertions.assertEquals(actual, expected); + } + + @Test + public void test_fromTask_withoutAnnotations_throwsDruidException() throws IOException { Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); mapper.writeValue(templatePath.toFile(), podTemplateSpec); @@ -228,17 +280,91 @@ public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException taskConfig, node, mapper, - props + props, + taskLogs ); Job job = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class); - Assert.assertThrows(IOE.class, () -> adapter.toTask(job)); + Assert.assertThrows(DruidException.class, () -> adapter.toTask(job)); + } + + @Test + public void test_getTaskId() throws IOException + { + Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); + mapper.writeValue(templatePath.toFile(), podTemplateSpec); + + Properties props = new Properties(); + props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); + PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( + taskRunnerConfig, + taskConfig, + node, + mapper, + props, + taskLogs + ); + Job job = new JobBuilder() + .editSpec().editTemplate().editMetadata() + .addToAnnotations(DruidK8sConstants.TASK_ID, "ID") + .endMetadata().endTemplate().endSpec().build(); + + assertEquals(new K8sTaskId("ID"), adapter.getTaskId(job)); } @Test - public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException + public void test_getTaskId_noAnnotations() throws IOException + { + Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); + mapper.writeValue(templatePath.toFile(), podTemplateSpec); + + Properties props = new Properties(); + props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); + PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( + taskRunnerConfig, + taskConfig, + node, + mapper, + props, + taskLogs + ); + Job job = new JobBuilder() + .editSpec().editTemplate().editMetadata() + .endMetadata().endTemplate().endSpec() + .editMetadata().withName("job").endMetadata().build(); + + Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job)); + } + + @Test + public void test_getTaskId_missingTaskIdAnnotation() throws IOException + { + Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); + mapper.writeValue(templatePath.toFile(), podTemplateSpec); + + Properties props = new Properties(); + props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); + PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( + taskRunnerConfig, + taskConfig, + node, + mapper, + props, + taskLogs + ); + Job job = new JobBuilder() + .editSpec().editTemplate().editMetadata() + .addToAnnotations(DruidK8sConstants.TASK_GROUP_ID, "ID") + .endMetadata().endTemplate().endSpec() + .editMetadata().withName("job").endMetadata().build(); + + Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job)); + } + + @Test + public void test_toTask_withoutTaskAnnotation_throwsIOE() throws IOException { Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); mapper.writeValue(templatePath.toFile(), podTemplateSpec); @@ -251,7 +377,8 @@ public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException taskConfig, node, mapper, - props + props, + taskLogs ); Job baseJob = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class); @@ -265,11 +392,11 @@ public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException .endTemplate() .endSpec() .build(); - Assert.assertThrows(IOE.class, () -> adapter.toTask(job)); + Assert.assertThrows(DruidException.class, () -> adapter.toTask(job)); } @Test - public void test_fromTask() throws IOException + public void test_toTask() throws IOException { Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); mapper.writeValue(templatePath.toFile(), podTemplateSpec); @@ -282,7 +409,8 @@ public void test_fromTask() throws IOException taskConfig, node, mapper, - props + props, + taskLogs ); Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class); @@ -292,6 +420,35 @@ public void test_fromTask() throws IOException Assertions.assertEquals(expected, actual); } + @Test + public void test_toTask_useTaskPayloadManager() throws IOException + { + Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); + mapper.writeValue(templatePath.toFile(), podTemplateSpec); + + Properties props = new Properties(); + props.put("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); + + Task expected = new NoopTask("id", null, "datasource", 0, 0, ImmutableMap.of()); + TaskLogs mockTestLogs = Mockito.mock(TaskLogs.class); + Mockito.when(mockTestLogs.streamTaskPayload("id")).thenReturn(Optional.of( + new ByteArrayInputStream(mapper.writeValueAsString(expected).getBytes(Charset.defaultCharset())) + )); + + PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( + taskRunnerConfig, + taskConfig, + node, + mapper, + props, + mockTestLogs + ); + + Job job = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class); + Task actual = adapter.toTask(job); + Assertions.assertEquals(expected, actual); + } + @Test public void test_fromTask_withRealIds() throws IOException { @@ -307,7 +464,8 @@ public void test_fromTask_withRealIds() throws IOException taskConfig, node, mapper, - props + props, + taskLogs ); Task task = new NoopTask( @@ -340,7 +498,8 @@ public void test_fromTask_taskSupportsQueries() throws IOException taskConfig, node, mapper, - props + props, + taskLogs ); Task task = EasyMock.mock(Task.class); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java index 10e129a9c2b5..43a40daedc1d 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java @@ -39,6 +39,8 @@ import org.apache.druid.k8s.overlord.common.TestKubernetesClient; import org.apache.druid.server.DruidNode; import org.apache.druid.server.log.StartupLoggingConfig; +import org.apache.druid.tasklogs.TaskLogs; +import org.easymock.Mock; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,6 +59,8 @@ class SingleContainerTaskAdapterTest private DruidNode druidNode; private ObjectMapper jsonMapper; + @Mock private TaskLogs taskLogs; + @BeforeEach public void setup() { @@ -96,7 +100,8 @@ public void testSingleContainerSupport() throws IOException taskConfig, startupLoggingConfig, druidNode, - jsonMapper + jsonMapper, + taskLogs ); NoopTask task = K8sTestUtils.createTask("id", 1); Job actual = adapter.createJobFromPodSpec( diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml index 0998d592fee4..2cef837f3972 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml @@ -42,11 +42,11 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_SEGMENTS" + value: "false" - name: "TASK_JSON" valueFrom: fieldRef: fieldPath: "metadata.annotations['task']" - - name: "LOAD_BROADCAST_SEGMENTS" - value: "false" image: one name: primary diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml index bb8f64c5e5ed..cf16c49c5db1 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml @@ -42,11 +42,11 @@ spec: value: "/tmp" - name: "TASK_ID" value: "api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z" + - name: "LOAD_BROADCAST_SEGMENTS" + value: "false" - name: "TASK_JSON" valueFrom: fieldRef: fieldPath: "metadata.annotations['task']" - - name: "LOAD_BROADCAST_SEGMENTS" - value: "false" image: one name: primary diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml new file mode 100644 index 000000000000..d72d0ef37b03 --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml @@ -0,0 +1,47 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: "id-3e70afe5cd823dfc7dd308eea616426b" + labels: + druid.k8s.peons: "true" + druid.task.id: "id" + druid.task.type: "noop" + druid.task.group.id: "id" + druid.task.datasource: "datasource" + annotations: + task.id: "id" + task.type: "noop" + task.group.id: "id" + task.datasource: "datasource" +spec: + activeDeadlineSeconds: 14400 + backoffLimit: 0 + ttlSecondsAfterFinished: 172800 + template: + metadata: + labels: + druid.k8s.peons: "true" + druid.task.id: "id" + druid.task.type: "noop" + druid.task.group.id: "id" + druid.task.datasource: "datasource" + annotations: + tls.enabled: "false" + task.id: "id" + task.type: "noop" + task.group.id: "id" + task.datasource: "datasource" + spec: + containers: + - command: + - sleep + - "3600" + env: + - name: "TASK_DIR" + value: "/tmp" + - name: "TASK_ID" + value: "id" + - name: "LOAD_BROADCAST_SEGMENTS" + value: "false" + image: one + name: primary diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml index e6762af63cf0..a230ac913a60 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml @@ -42,11 +42,11 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_SEGMENTS" + value: "false" - name: "TASK_JSON" valueFrom: fieldRef: fieldPath: "metadata.annotations['task']" - - name: "LOAD_BROADCAST_SEGMENTS" - value: "false" image: one name: primary diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java index 53112d0808db..b9fd9e42862c 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java @@ -83,6 +83,21 @@ public Optional streamTaskStatus(String taskid) throws IOException return streamTaskFileWithRetry(0, taskKey); } + @Override + public void pushTaskPayload(String taskid, File taskPayloadFile) throws IOException + { + final String taskKey = getTaskLogKey(taskid, "task.json"); + log.info("Pushing task payload [%s] to location [%s]", taskPayloadFile, taskKey); + pushTaskFile(taskPayloadFile, taskKey); + } + + @Override + public Optional streamTaskPayload(String taskid) throws IOException + { + final String taskKey = getTaskLogKey(taskid, "task.json"); + return streamTaskFileWithRetry(0, taskKey); + } + /** * Using the retry conditions defined in {@link S3Utils#S3RETRY}. */ diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java index 7428081aa3ed..011dc4888456 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java @@ -35,8 +35,11 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.commons.io.IOUtils; import org.apache.druid.common.utils.CurrentTimeMillisSupplier; import org.apache.druid.java.util.common.StringUtils; +import org.easymock.Capture; +import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -55,6 +58,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.stream.Collectors; @@ -143,6 +147,78 @@ public void test_pushTaskStatus() throws IOException EasyMock.verify(s3Client); } + @Test + public void test_pushTaskPayload() throws IOException + { + Capture putObjectRequestCapture = Capture.newInstance(CaptureType.FIRST); + EasyMock.expect(s3Client.putObject(EasyMock.capture(putObjectRequestCapture))) + .andReturn(new PutObjectResult()) + .once(); + + EasyMock.replay(s3Client); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix("prefix"); + config.setDisableAcl(true); + + CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier(); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + + File payloadFile = tempFolder.newFile("task.json"); + String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z"; + s3TaskLogs.pushTaskPayload(taskId, payloadFile); + + PutObjectRequest putObjectRequest = putObjectRequestCapture.getValue(); + Assert.assertEquals(TEST_BUCKET, putObjectRequest.getBucketName()); + Assert.assertEquals("prefix/" + taskId + "/task.json", putObjectRequest.getKey()); + Assert.assertEquals(payloadFile, putObjectRequest.getFile()); + EasyMock.verify(s3Client); + } + + @Test + public void test_streamTaskPayload() throws IOException + { + String taskPayloadString = "task payload"; + + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setContentLength(taskPayloadString.length()); + EasyMock.expect(s3Client.getObjectMetadata(EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(objectMetadata) + .once(); + + InputStream taskPayload = new ByteArrayInputStream(taskPayloadString.getBytes(Charset.defaultCharset())); + S3Object s3Object = new S3Object(); + s3Object.setObjectContent(taskPayload); + Capture getObjectRequestCapture = Capture.newInstance(CaptureType.FIRST); + EasyMock.expect(s3Client.getObject(EasyMock.capture(getObjectRequestCapture))) + .andReturn(s3Object) + .once(); + + EasyMock.replay(s3Client); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix("prefix"); + config.setDisableAcl(true); + + CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier(); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + + String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z"; + Optional payloadResponse = s3TaskLogs.streamTaskPayload(taskId); + + GetObjectRequest getObjectRequest = getObjectRequestCapture.getValue(); + Assert.assertEquals(TEST_BUCKET, getObjectRequest.getBucketName()); + Assert.assertEquals("prefix/" + taskId + "/task.json", getObjectRequest.getKey()); + Assert.assertTrue(payloadResponse.isPresent()); + + Assert.assertEquals(taskPayloadString, IOUtils.toString(payloadResponse.get(), Charset.defaultCharset())); + EasyMock.verify(s3Client); + } + @Test public void test_killAll_noException_deletesAllTaskLogs() throws IOException { diff --git a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java index be2c62540e08..6e7df12f8d20 100644 --- a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java +++ b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java @@ -29,6 +29,7 @@ import org.apache.druid.tasklogs.TaskLogKiller; import org.apache.druid.tasklogs.TaskLogPusher; import org.apache.druid.tasklogs.TaskLogs; +import org.apache.druid.tasklogs.TaskPayloadManager; /** */ @@ -48,5 +49,6 @@ public void configure(Binder binder) binder.bind(TaskLogPusher.class).to(TaskLogs.class); binder.bind(TaskLogKiller.class).to(TaskLogs.class); + binder.bind(TaskPayloadManager.class).to(TaskLogs.class); } } diff --git a/processing/src/main/java/org/apache/druid/error/InternalServerError.java b/processing/src/main/java/org/apache/druid/error/InternalServerError.java new file mode 100644 index 000000000000..b730acb0e3d6 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/error/InternalServerError.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.error; + +public class InternalServerError extends DruidException.Failure +{ + public static DruidException exception(String errorCode, String msg, Object... args) + { + return exception(null, errorCode, msg, args); + } + public static DruidException exception(Throwable t, String errorCode, String msg, Object... args) + { + return DruidException.fromFailure(new InternalServerError(t, errorCode, msg, args)); + } + + private final Throwable t; + private final String msg; + private final Object[] args; + + private InternalServerError( + Throwable t, + String errorCode, + String msg, + Object... args + ) + { + super(errorCode); + this.t = t; + this.msg = msg; + this.args = args; + } + + @Override + public DruidException makeException(DruidException.DruidExceptionBuilder bob) + { + bob = bob.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE); + + if (t == null) { + return bob.build(msg, args); + } else { + return bob.build(t, msg, args); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java b/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java index 287b2f6fcc34..5568a5160fd8 100644 --- a/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java +++ b/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java @@ -64,4 +64,16 @@ public void killOlderThan(long timestamp) { log.info("Noop: No task logs are deleted."); } + + @Override + public void pushTaskPayload(String taskid, File taskPayloadFile) + { + log.info("Not pushing payload for task: %s", taskid); + } + + @Override + public Optional streamTaskPayload(String taskid) + { + return Optional.absent(); + } } diff --git a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogs.java b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogs.java index ee50217c9571..2756911f6a34 100644 --- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogs.java +++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogs.java @@ -21,7 +21,8 @@ import org.apache.druid.guice.annotations.ExtensionPoint; + @ExtensionPoint -public interface TaskLogs extends TaskLogStreamer, TaskLogPusher, TaskLogKiller +public interface TaskLogs extends TaskLogStreamer, TaskLogPusher, TaskLogKiller, TaskPayloadManager { } diff --git a/processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java b/processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java new file mode 100644 index 000000000000..41db8e4556b2 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tasklogs; + +import com.google.common.base.Optional; +import org.apache.commons.lang.NotImplementedException; +import org.apache.druid.guice.annotations.ExtensionPoint; +import org.apache.druid.java.util.common.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; + +/** + * Something that knows how to push a task payload before it is run to somewhere + * a ingestion worker will be able to stream the task payload from when trying to run the task. + */ +@ExtensionPoint +public interface TaskPayloadManager +{ + /** + * Save payload so it can be retrieved later. + * + * @return inputStream for this taskPayload, if available + */ + default void pushTaskPayload(String taskid, File taskPayloadFile) throws IOException + { + throw new NotImplementedException(StringUtils.format("this druid.indexer.logs.type [%s] does not support managing task payloads yet. You will have to switch to using environment variables", getClass())); + } + + /** + * Stream payload for a task. + * + * @return inputStream for this taskPayload, if available + */ + default Optional streamTaskPayload(String taskid) throws IOException + { + throw new NotImplementedException(StringUtils.format("this druid.indexer.logs.type [%s] does not support managing task payloads yet. You will have to switch to using environment variables", getClass())); + } +} diff --git a/processing/src/test/java/org/apache/druid/error/InternalServerErrorTest.java b/processing/src/test/java/org/apache/druid/error/InternalServerErrorTest.java new file mode 100644 index 000000000000..b28296b2c415 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/error/InternalServerErrorTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.error; + +import org.apache.druid.matchers.DruidMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.Test; + +import java.util.Map; + +public class InternalServerErrorTest +{ + + @Test + public void testAsErrorResponse() + { + ErrorResponse errorResponse = new ErrorResponse(InternalServerError.exception("runtimeFailure", "Internal Server Error")); + final Map asMap = errorResponse.getAsMap(); + + MatcherAssert.assertThat( + asMap, + DruidMatchers.mapMatcher( + "error", "druidException", + "errorCode", "runtimeFailure", + "persona", "OPERATOR", + "category", "RUNTIME_FAILURE", + "errorMessage", "Internal Server Error" + ) + ); + + ErrorResponse recomposed = ErrorResponse.fromMap(asMap); + + MatcherAssert.assertThat( + recomposed.getUnderlyingException(), + new DruidExceptionMatcher( + DruidException.Persona.OPERATOR, + DruidException.Category.RUNTIME_FAILURE, + "runtimeFailure" + ).expectMessageContains("Internal Server Error") + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java index 30192932a9e0..2da98dab3038 100644 --- a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java +++ b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java @@ -32,4 +32,11 @@ public void test_streamTaskStatus() throws IOException TaskLogs taskLogs = new NoopTaskLogs(); Assert.assertFalse(taskLogs.streamTaskStatus("id").isPresent()); } + + @Test + public void test_streamTaskPayload() throws IOException + { + TaskLogs taskLogs = new NoopTaskLogs(); + Assert.assertFalse(taskLogs.streamTaskPayload("id").isPresent()); + } } diff --git a/processing/src/test/java/org/apache/druid/tasklogs/TaskPayloadManagerTest.java b/processing/src/test/java/org/apache/druid/tasklogs/TaskPayloadManagerTest.java new file mode 100644 index 000000000000..4c53061a86f2 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/tasklogs/TaskPayloadManagerTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tasklogs; + +import org.apache.commons.lang.NotImplementedException; +import org.junit.Assert; +import org.junit.Test; + +public class TaskPayloadManagerTest implements TaskPayloadManager +{ + @Test + public void test_streamTaskPayload() + { + Assert.assertThrows(NotImplementedException.class, + () -> this.streamTaskPayload("id") + ); + } + + @Test + public void test_pushTaskPayload() + { + Assert.assertThrows(NotImplementedException.class, + () -> this.pushTaskPayload("id", null) + ); + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 2ba88117cd51..b6b1730199f2 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -40,6 +40,8 @@ import com.google.inject.name.Named; import com.google.inject.name.Names; import io.netty.util.SuppressForbidden; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.NodeRole; @@ -129,10 +131,12 @@ import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.apache.druid.server.metrics.ServiceStatusMonitor; +import org.apache.druid.tasklogs.TaskPayloadManager; import org.eclipse.jetty.server.Server; import java.io.File; import java.io.IOException; +import java.nio.charset.Charset; import java.nio.file.Paths; import java.util.List; import java.util.Map; @@ -176,6 +180,9 @@ public class CliPeon extends GuiceRunnable @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments") public String loadBroadcastSegments = "false"; + @Option(name = "--taskId", title = "taskId", description = "TaskId for fetching task.json remotely") + public String taskId = ""; + private static final Logger log = new Logger(CliPeon.class); private Properties properties; @@ -282,9 +289,15 @@ public Supplier> heartbeatDimensions(Task task) @Provides @LazySingleton - public Task readTask(@Json ObjectMapper mapper, ExecutorLifecycleConfig config) + public Task readTask(@Json ObjectMapper mapper, ExecutorLifecycleConfig config, TaskPayloadManager taskPayloadManager) { try { + if (!config.getTaskFile().exists() || config.getTaskFile().length() == 0) { + log.info("Task file not found, trying to pull task payload from deep storage"); + String task = IOUtils.toString(taskPayloadManager.streamTaskPayload(taskId).get(), Charset.defaultCharset()); + // write the remote task.json to the task file location for ExecutorLifecycle to pickup + FileUtils.write(config.getTaskFile(), task, Charset.defaultCharset()); + } return mapper.readValue(config.getTaskFile(), Task.class); } catch (IOException e) {