Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to pass task payload via deep storage instead of environment variable #14887

Merged

Conversation

georgew5656
Copy link
Contributor

@georgew5656 georgew5656 commented Aug 21, 2023

This change is meant to fix a issue where passing too large of a task payload to the mm-less task runner will cause the peon to fail to startup because the payload is passed (compressed) as a environment variable (TASK_JSON). In linux systems the limit for a environment variable is commonly 128KB, for windows systems less than this. Setting a env variable longer than this results in a bunch of "Argument list too long" errors.

Description

(1) Problem
The goal of this patch is to prevent larger tasks from failing with mm-less ingestion due to the TASK_JSON being too large as described above.

(2) Solution
Part 1. Optionally stop setting TASK_JSON

To address the immediate problem (setting environment variables that are too large), I added a additional config for the KubernetesTaskRunner (druid.indexer.runner.taskPayloadAsEnvVariable) that defaults to true but can be optionally set to false. Setting this config to false will cause the K8s adapters to not set the task payload as the TASK_JSON env variable. This prevents the Jobs from failing to come up.

Part 2. We still need to pass the task.json payload to the peons somehow. I explored three options for this, and ended up going with the below solution.

Push the task payload into task logs deep storage and have the peon read the payload.
I ended up going with this option because it was the most simple to implement and the most future-proof (no worry about task payloads getting larger than 1MB). The task logs killer will automatically delete the task.json in deep storage alongside he task logs whenever it is run.

Changes Made

  • Introduced a new interface (TaskPayloadManager) that exposes two methods (push task payload, pull task payload) and is implemented by TaskLogs. (I only implemented it for S3TaskLogs, but it should be easy to implement for other deep storage systems).
  • Introduce a new config in TaskConfig (druid.indexer.task.enableTaskPayloadManagerPerTask). When set on the overlord, mm-less ingestion will push the task.json payload to deep storage before launching a k8s job. When set on the peon, the CliPeon task injector will read the task.json from deep storage instead of assuming it is available on the file system. This is currently only useable by mm-less ingestion but it technically could be used in any task running scenario.
  • The K8s adapters in mm-less ingestion will check deep storage if the config is set when converting K8s jobs to tasks in toTask

(3) Alternative solutions to passing the task.json payload

  1. Using k8s configmaps to store the task payload and then mounting them onto the created peon pods.
    I decided not to go with this option because configmaps still have a 1MB size limit and I was concerned with the KubernetesTaskRunner having to manage a bunch of configmaps in addition to jobs. Having this many configmaps also pollutes K8s metadata, making it hard to see anything else going on when you're looking at configmaps.

  2. Updating CliPeon to use the getTaskPayload endpoint on the overlord to pull the task.json payload on startup. This didn't work because we currently have a guice injector in the peon that requires the task.json be available at injection time. In order to pull the task.json from the overlord, we need to use the ServiceLocator class which is only available once the peon lifecycle has already started (after injection). Changing this would have required many changes to the code so I didn't want to do it. Additionally, I would have had to deprecate the toTask interface on the overlords since there would be no way for the overlord to turn a K8s Job into a task definition.

Release note

  • Support compressed task payloads larger than 128KB in mm-less ingestion.
Key changed/added classes in this PR
  • CliPeon
  • KubernetesPeonLifecycle
  • PodTemplateTaskAdapter
  • K8sTaskAdapter
  • S3TaskLogs TaskLogs

I can add some more documentation to this PR later but I wanted to get some feedback on this approach before doing so.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@cryptoe
Copy link
Contributor

cryptoe commented Aug 22, 2023

@georgew5656
The current approach also SGTM but I would not have it behind a feature flag. If you want to have a feature flag, I would optionally turn it on by default .
This also means we implement that TaskPayloadManager for other deep storage mainly 'GCS,AZURE,HDFS. If the implementation is not found, we should through a nice error message instructing the user to set druid.indexer.runner.taskPayloadAsEnvVariable to true or if we are not using a flag, instructing the user to implement the methods to implement 'TaskPayloadManager for there custom deep storage impl.

@churromorales
Copy link
Contributor

@cryptoe I think the way it is implemented now behind the feature flag is better. For most usecases it is much better to just pass the task.json directly. It is much faster, also for our customers we use deep-storage which is slow and less featured than s3, thus this would be a feature we would only turn on if necessary. Additionally, our deep storage provider wont allow us to do batch deletes so the cleaner approach to remove the task files is not great. I know its a special case for us, but it is always better to pass something directly if possible than use indirection...disregarding our usecase.

LGTM overall, but one feature request I would like to request is that at the end of the task in AbstractTask to delete the task.json file, not leave it up to the cleaner. Quite a few folks that have their own k8s which they launch in their datacenters. While this works for cloud providers, it might not work for everyone else. This can be done in another PR or I can one up after this PR is merged if needed.

@georgew5656
Copy link
Contributor Author

@georgew5656 The current approach also SGTM but I would not have it behind a feature flag. If you want to have a feature flag, I would optionally turn it on by default . This also means we implement that TaskPayloadManager for other deep storage mainly 'GCS,AZURE,HDFS. If the implementation is not found, we should through a nice error message instructing the user to set druid.indexer.runner.taskPayloadAsEnvVariable to true or if we are not using a flag, instructing the user to implement the methods to implement 'TaskPayloadManager for there custom deep storage impl.

i didn't really want to break anyone who didn't want to use deep storage for task payloads for whatever reason. i think maybe later on if this gets used in production a bit more and the performance is okay maybe we could consider flipping it on as a default

@cryptoe
Copy link
Contributor

cryptoe commented Aug 24, 2023

Thanks @churromorales and @georgew5656 for the responses. I was more worried about adding another config which needs to be set for the end users.
The current opt out by default lgtm.

Copy link
Contributor

@YongGang YongGang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM generally, left some new comments along with my previous one #14887 (comment)

@abhishekagarwal87
Copy link
Contributor

I have concerns about the config change as well. In a single PR adding one config doesn't seem much, but after a year worth of work, you suddenly realize that the feature has become very complex to tune and use. Is there a way to check if the deep storage supports storing payload and if not, then using the environment variable? Because then we can change the default easily.

@churromorales
Copy link
Contributor

Why not just check the size of the task.json, if its larger than then MAX_SIZE do it in deep storage, if it is smaller then just use the env? I agree 100% with Druid in general having too many configuration options, makes it hard to remember everything you need include. When creating this feature, the whole goal I had in mind was to have this work with as few configuration options as possible.

@georgew5656
Copy link
Contributor Author

@abhishekagarwal87 @churromorales I just put up a new version of the PR that removes the configs and uses deep storage to pass the task payload if the task size is too large. This is more of a "opinionated" choice but in this case I think it makes sense, lmk what you guys think

distribution/docker/peon.sh Outdated Show resolved Hide resolved
Path file = Files.createTempFile(taskId.getOriginalTaskId(), "task.json");
try {
FileUtils.writeStringToFile(file.toFile(), mapper.writeValueAsString(task), Charset.defaultCharset());
taskLogs.pushTaskPayload(task.getId(), file.toFile());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that a log cleanup job removes the task payload from the deep storage while task is still in progress? How are these payloads cleaned up from deep storage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the task reads the file to disk on startup so i wouldn't be worried about the log cleanup job to clean it up that soon. we are relying on the log cleanup job to cleanup deep storage


public K8sTaskAdapter(
KubernetesClientApi client,
KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode node,
ObjectMapper mapper
ObjectMapper mapper,
TaskLogs taskLogs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name of this class has been confusing since it does so much more than dealing with task logs. could be fixed in some other PR someday.

@abhishekagarwal87
Copy link
Contributor

thank you for addressing comments @georgew5656. Looks good to me except for the exception handling in some places.

{
com.google.common.base.Optional<InputStream> taskBody = taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
if (!taskBody.isPresent()) {
throw InternalError.exception("Could not load task payload for job [%s]", from.getMetadata().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there an action you can associate with this error message? Like should they verify that overlord is successfully uploading task jsons to deep storage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the message

{
Map<String, String> annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be replaced with DruidException.defensive()?

Copy link
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there. Can you look into the test failures?

georgew5656 and others added 2 commits September 29, 2023 10:15
…a/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java

Co-authored-by: Abhishek Agarwal <[email protected]>
@georgew5656
Copy link
Contributor Author

Almost there. Can you look into the test failures?

i think the only failing tests are for coverage now so we should be good

@abhishekagarwal87 abhishekagarwal87 changed the title Use task manager for task payload Allow users to pass task payload via deep storage instead of environment variable Oct 3, 2023
@abhishekagarwal87 abhishekagarwal87 merged commit 64754b6 into apache:master Oct 3, 2023
63 of 65 checks passed
@LakshSingla LakshSingla added this to the 28.0 milestone Oct 12, 2023
ektravel pushed a commit to ektravel/druid that referenced this pull request Oct 16, 2023
…ent variable (apache#14887)

This change is meant to fix a issue where passing too large of a task payload to the mm-less task runner will cause the peon to fail to startup because the payload is passed (compressed) as a environment variable (TASK_JSON). In linux systems the limit for a environment variable is commonly 128KB, for windows systems less than this. Setting a env variable longer than this results in a bunch of "Argument list too long" errors.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants