Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Dynamic Peon Pod Template Selection in K8s extension #16510

Merged
merged 16 commits into from
Jun 12, 2024

Conversation

YongGang
Copy link
Contributor

@YongGang YongGang commented May 29, 2024

Description

This PR introduces a new feature to dynamically select Kubernetes pod templates for task execution in Druid. This functionality aims to optimize resource utilization and improve task execution efficiency by tailoring pod specifications to the needs of different task characteristics.
Druid operator can define execution strategies and associate them with different task characteristics through the new dynamic config interface. The system will apply these strategies dynamically as tasks are scheduled for execution.
This feature is a step towards making Apache Druid more adaptable and efficient in Kubernetes environments, addressing the need for more granular control over resource allocation and task scheduling.

Example Configuration:

We define two template keys in the configuration—low-throughput and medium-throughput—each associated with specific task conditions and arranged in a priority order.

  • Low Throughput Template: This is the first template evaluated and has the highest priority. Tasks that have a context tag billingCategory=streaming_ingestion and a datasource of wikipedia will be classified under the low-throughput template. This classification directs such tasks to utilize a predefined pod template optimized for low throughput requirements.

  • Medium Throughput Template: If a task does not meet the low-throughput criteria, the system will then evaluate it against the next selector in order. In this example, if the task type is index_kafka, it will fall into the medium-throughput template.

{
  "type": "default",
  "podTemplateSelectStrategy":
  {
    "type": "selectorBased",
    "selectors": [
      {
        "selectionKey": "low-throughput",
        "context.tags":
        {
          "billingCategory": ["streaming_ingestion"]
        },
        "dataSource": ["wikipedia"]
      },
      {
        "selectionKey": "medium-throughput",
        "type": ["index_kafka"]
      }
    ],
    "defaultKey": "base"
  }
}

Release note

The Dynamic Pod Template Selection feature enhances the K8s extension by enabling more flexible and dynamic selection of pod templates based on task properties.


Key changed/added classes in this PR
  • KubernetesTaskRunnerDynamicConfig.java Represents the configuration for task execution within a Kubernetes environment. This interface allows for dynamic configuration of task execution strategies based on specified behavior strategies.
  • KubernetesTaskExecutionConfigResource.java Resource that manages Kubernetes-specific execution configurations for running tasks.
  • PodTemplateSelectStrategy.java Defines a strategy for selecting Pod template of tasks based on specific conditions.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

})
public interface ExecutionConfig
{
String CONFIG_KEY = "k8s.taskrunner.config";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats something that would go under this ExecutionConfig but not under ExecutionBehaviorStrategy? does it make more sense to call this KubernetesTaskRunnerRefreshableConfig or something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionConfig can have config other than ExecutionBehaviorStrategy, we may can move RunnerStrategy to this dynamic config (or something similar in the future):

{
  "type": "default",
  "behaviorStrategy": {
    "type": "default",
    "categorySelectors": [
    ]
  },
  "runnerStrategy": {
    ...
  }
}

In this case it's not only guide KubernetesTaskRunner behavior but also whether task should run in Worker, so I think the general ExecutionConfig name is making more sense.

Copy link
Contributor

@georgew5656 georgew5656 May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, ExecutionConfig makes sense to me then. or maybe TaskExecutionConfig?

would categoryStrategy or templateStrategy make more sense for the second level thing then? since that config is really about choosing what template to run a task with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It named as behaviorStrategy since not only it will choose category and map template, this strategy will also be used in the future task laning work e.g. choose a task lane.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would task laning not be a separate field? like laneStrategy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m considering using a single strategy but with different fields returned. Since the implementation of the DynamicTaskExecutionBehaviorStrategy relies on the same Selector matching rules mechanism, the rules for matching categories and lanes are quite similar. Therefore, there’s no need to introduce a separate strategy for these functions.

public interface ExecutionBehaviorStrategy
{
  String getTaskCategory(Task task);
  String getTaskLane(Task task);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I too was confused about this name. I think having a more narrowly scoped interface will be easier to understand and maintain.

Copy link
Contributor

@suneet-s suneet-s Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YongGang and I discussed this offline, and I better understand the intent of this config object. It seems like this is trying to provide similar functionality as KubernetesTaskRunnerConfig but via the dynamic config. It makes sense to have an encompassing dynamic config object for this extension.

Some suggested names for this that better indicate it's purpose KubernetesTaskRunnerDynamicConfig KubernetesPeonDynamicConfig KubernetesTaskExecutionConfig

@arunramani
Copy link
Contributor

If we want Selector to be general purpose, we need to clean it up a bit. When implementing a selector, you need to have 3 things: the list of selections, the evaluation criteria and the selection key. For this case, we could simplify it to just the list of selections and the selection key. The evaluation criteria can be defaulted to a simple "AND all of the keys except the select key". An example of how it should look

"behaviorStrategy": {
    "type": "default",
    "selectKey": "category",
    "categorySelectors": [
      {
        "category": "low-throughput",
        "context.tags": {
          "billingCategory": [
            "streaming_ingestion"
          ]
        },
        "task": {
          "datasource": [
            "wikipedia"
          ]
        }
      },
      {
        "category": "medium-throughput",
        "task": {
          "type": [
            "index_kafka"
          ]
        }
      }
    ]
  }

So now the evaluator would look at all of the keys EXCEPT category and one a match, it will return the category.

What do you think?

@YongGang
Copy link
Contributor Author

If we want Selector to be general purpose, we need to clean it up a bit. When implementing a selector, you need to have 3 things: the list of selections, the evaluation criteria and the selection key. For this case, we could simplify it to just the list of selections and the selection key. The evaluation criteria can be defaulted to a simple "AND all of the keys except the select key". An example of how it should look

"behaviorStrategy": {
    "type": "default",
    "selectKey": "category",
    "categorySelectors": [
      {
        "category": "low-throughput",
        "context.tags": {
          "billingCategory": [
            "streaming_ingestion"
          ]
        },
        "task": {
          "datasource": [
            "wikipedia"
          ]
        }
      },
      {
        "category": "medium-throughput",
        "task": {
          "type": [
            "index_kafka"
          ]
        }
      }
    ]
  }

So now the evaluator would look at all of the keys EXCEPT category and one a match, it will return the category.

What do you think?

I think if fully implement this proposal, it needs to be on reflection based, otherwise still need to preknown what fields to look at. Given the concerns we have on reflection (e.g. perf overhead, not safe etc) and we don't expect some very different fields to look at in the near future so I think the current solution is good enough.

@YongGang YongGang marked this pull request as ready for review June 3, 2024 05:11
Copy link
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 on this change because I do not understand the use of the new interfaces. It seems like what we want here is a config that is scoped to the pod template adapter, but this PR introduces configs for general use across all of the k8s extension. Because of this the names of the interfaces and their uses are not clear to me eg. ExecutionConfig that returns an ExecutionBehaviorStrategy that gets a "category" from the task. It seems like all we need here is something like a PodTemplateNamingStrategy or a PodTemplateSelector

The other thing that feels clunky is the selector class. It should be an interface so that it can be extended in the future. The current selector class implementation does not provide good errors to users if they reference a field that is not currently supported - like group id. It feels like the Selector class is trying to implement a Predicate<Task>

I'd recommend introducing an interface called PodTemplateSelector that returns a PodTemplate given a Task object (similar to the BehaviorSelector classes introduced in this patch). For the Selectors - I'd recommend renaming them to Matchers that implement Predicate<Task>. We could then introduce and, not, or matchers and matchers that match on dataSource, tags, any context, task type, etc. The config would then look like

"podTemplateSelectorStrategy" : {
  "type": matcherBased,
  "templateMatchers": [
    {
      "template": "template0",
      "matcher": {
        "type": or,
        "matchers": [
          {
            "type": "dataSource",
            "matchingNames": ["ds0"]
          },
          {
            "type": "context",
            "field": "myContextKey"
            "matchingNames": ["anyValue"]
          },
        ]
      }
    },
    {
      "template": "template1",
      "matcher": {
        "type": "taskType",
        "matchingTypes": ["index_kafka", "index_kinesis"]
      }
    }
  ]
}

@@ -217,6 +217,66 @@ data:
druid.peon.mode=remote
druid.indexer.task.encapsulatedTask=true
```
#### Dynamic Pod Template Selection Config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: doc should be re-written. remove use of new feature, more flexible, etc.

What is the right point to talk about this config

@@ -98,6 +103,8 @@ public void configure(Binder binder)
.toProvider(RunnerStrategyProvider.class)
.in(LazySingleton.class);
configureTaskLogs(binder);

Jerseys.addResource(binder, KubernetesResource.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KubernetesResource does not indicate what the resource is actually for. Suggested rename KubernetesTaskExecutionConfigResource

Comment on lines 283 to 288
if (executionConfig != null && executionConfig.getBehaviorStrategy() != null) {
metricBuilder.setDimensionIfNotNull(
"category",
executionConfig.getBehaviorStrategy().getTaskCategory(task)
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems incorrect. The executionConfig could have changed from the time the task was converted to a job to when executionConfig.getBehaviorStrategy() is called in this function.

@JsonSubTypes.Type(name = "default", value = DefaultExecutionBehaviorStrategy.class),
@JsonSubTypes.Type(name = "dynamicTask", value = DynamicTaskExecutionBehaviorStrategy.class),
})
public interface ExecutionBehaviorStrategy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the name of this interface. What is the execution behavior strategy? It looks like this is just getting the name of a category from a Task


import java.util.Objects;

public class DefaultExecutionConfig implements ExecutionConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to TaskTypeExecutionConfig to indicate what it is doing

* This implementation categorizes tasks by simply returning the type of the task,
* making it a straightforward, type-based categorization strategy.
*/
public class DefaultExecutionBehaviorStrategy implements ExecutionBehaviorStrategy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to TaskTypeExecutionBehaviorStrategy instead of Default to be more descriptive of what this class is trying to do.

})
public interface ExecutionConfig
{
String CONFIG_KEY = "k8s.taskrunner.config";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I too was confused about this name. I think having a more narrowly scoped interface will be easier to understand and maintain.

Comment on lines 54 to 55
@Path("/druid/indexer/v1/k8s/runner")
public class KubernetesResource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@Path("/druid/indexer/v1/k8s/runner")
public class KubernetesResource
@Path("/druid/indexer/v1/k8s/taskRunner")
public class KubernetesTaskRunnerResource

OR

Suggested change
@Path("/druid/indexer/v1/k8s/runner")
public class KubernetesResource
@Path("/druid/indexer/v1/k8s/taskRunner/executionConfig")
public class KubernetesTaskRunnerExecutionConfigResource

@YongGang
Copy link
Contributor Author

YongGang commented Jun 6, 2024

-1 on this change because I do not understand the use of the new interfaces. It seems like what we want here is a config that is scoped to the pod template adapter, but this PR introduces configs for general use across all of the k8s extension. Because of this the names of the interfaces and their uses are not clear to me eg. ExecutionConfig that returns an ExecutionBehaviorStrategy that gets a "category" from the task. It seems like all we need here is something like a PodTemplateNamingStrategy or a PodTemplateSelector

The other thing that feels clunky is the selector class. It should be an interface so that it can be extended in the future. The current selector class implementation does not provide good errors to users if they reference a field that is not currently supported - like group id. It feels like the Selector class is trying to implement a Predicate<Task>

I'd recommend introducing an interface called PodTemplateSelector that returns a PodTemplate given a Task object (similar to the BehaviorSelector classes introduced in this patch). For the Selectors - I'd recommend renaming them to Matchers that implement Predicate<Task>. We could then introduce and, not, or matchers and matchers that match on dataSource, tags, any context, task type, etc. The config would then look like

"podTemplateSelectorStrategy" : {
  "type": matcherBased,
  "templateMatchers": [
    {
      "template": "template0",
      "matcher": {
        "type": or,
        "matchers": [
          {
            "type": "dataSource",
            "matchingNames": ["ds0"]
          },
          {
            "type": "context",
            "field": "myContextKey"
            "matchingNames": ["anyValue"]
          },
        ]
      }
    },
    {
      "template": "template1",
      "matcher": {
        "type": "taskType",
        "matchingTypes": ["index_kafka", "index_kinesis"]
      }
    }
  ]
}

@suneet-s I have addressed most of the comments except for the Matchers proposal. While it’s a good suggestion, I don’t believe it’s the right fit for our scenario for a couple of reasons:

  1. Complexity and Applicability of Operators: Matchers provide a variety of operators such as and, not, and or for evaluation. Our current design with sequence-based selectors is already complex and requires careful attention to detail. Introducing additional operators would further complicate the understanding of the dynamic config. Specifically, the not and or operators might not be very useful for our focused criteria of template selection. For example, the not operator could yield unexpected results, especially as new Task types are continuously added, potentially covering unintended scenarios without careful usage.
  2. Relevance of Typed Matchers: While the typed Matcher approach is excellent for extensibility and appears future-proof, it is more aligned with scenarios like Druid’s Filter, where many implementations exist for type safety and performance enhancements. In our case, however, the field comparisons are predominantly string-based. Given that our main input in this K8s extension is a Task, we do not anticipate needing significantly different criteria from what is currently covered. Should future requirements drastically diverge, adopting a new strategy would be more appropriate than extending the current one.

Additionally, implementing Matchers would require introducing many small Jackson classes, which could bloat the codebase. Our current implementation of Selector is cleaner and more streamlined in comparison.

Copy link
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @YongGang. I have reviewed the src/main classes.

I am still -1 on this change because of the way the selector class is written.
The Selector contains a selection key and implicit matching conditions. These matching conditions are specific to a usecase, but it is not clear why these specific fields are chosen. IMO this poses a UX problem for people adopting this feature. The other issue with this approach is that we are not able to provide good user validation when a user passes in a key to the task map that is not currently supported.

Please separate the selection key from what is being used to match the task with a selection key into a separate object aka instead of

{
  "selectionKey": "someKey",
  "context.tags": {..},
  "task": {...}
}

make it matching part it's own object

{
  "selectionKey": "someKey",
  "matcher": {
    "type": "myCustomMatcher",
    "context.tags": {..},
    "task": {...}
  }
}

In the example above, I can not think of a good name that would explain why someone would want to use that matcher, which is why I am proposing smaller matchers - like a DataSourceMatcher, TaskTypeMatcher, TagsMatcher, etc.

Regarding your concerns about smaller, more composable matchers:

  1. RE: Complexity of operators: We do not need to introduce any matchers that make it harder to understand what the rules are trying to say. The minimum work needed in this patch is to provide an and matcher, a tags matcher, a dataSource matcher and a taskType matcher. It is no more complex than the current proposal with these 4 matchers.
  2. RE: Relevance of Typed matchers: I do not understand this concern.
  3. RE: small jackson classes / code bloat: Smaller classes are easier to test and maintain as there is less logic in it. I do not understand the code bloat concern.

Also, please limit the use of nulls in the code. Please try to make everything non-null as this makes it less likely to introduce null handling bugs. If something really needs to be nullable, consider using Optionals as it forces the devs and the reviewers to think about what to do when the optional is absent.

@YongGang
Copy link
Contributor Author

YongGang commented Jun 7, 2024

Thanks @suneet-s for your comments:

The Selector contains a selection key and implicit matching conditions. These matching conditions are specific to a usecase, but it is not clear why these specific fields are chosen.

If we rename Selector to TaskPropertiesSelector should address this concern?

For this following suggestion:

{
  "selectionKey": "someKey",
  "matcher": {
    "type": "myCustomMatcher",
    "context.tags": {..},
    "task": {...}
  }
}

are we actually suggesting to do this?:

{
  "selectionKey": "someKey",
  "matchers": [
    {
      "type": "dataSource",
      "matchingNames": ["ds0"]
    },
    {
      "type": "context",
      "field": "myContextKey"
      "matchingNames": ["anyValue"]
    },
  ]
}

Otherwise if we still have a single matcher within a selector, I'm not sure about the benefit of introducing another layer of Jackson object.

So the full dynamic config based on the one from PR description will be like:

{
  "type": "default",
  "podTemplateSelectStrategy": {
    "type": "dynamicTask",
    "templateSelectors": [
      {
        "selectionKey": "low-throughput",
        "matchers": [
          {
            "type": "context.tags",
            "field": "billingCategory"
            "matchingNames": ["streaming_ingestion"]
          },
          {
            "type": "dataSource",
            "matchingNames": ["wikipedia"]
          },
        ]
      },
      {
        "selectionKey": "medium-throughput",
        "matchers": [
          {
            "type": "type",
            "matchingNames": ["index_kafka"]
          },
        ]
      }
    ]
  }
}

There are 3 level of arrays in the config, want to make sure that's the target one we'd like to have.

@YongGang
Copy link
Contributor Author

@suneet-s I have addressed your comments, please take a look. Thanks.

Copy link
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for incorporating the suggestions @YongGang

@suneet-s suneet-s merged commit 46dbc74 into apache:master Jun 12, 2024
88 checks passed
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants