diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md index 2132b55ea1e3..27290a9bee59 100644 --- a/docs/development/extensions-contrib/k8s-jobs.md +++ b/docs/development/extensions-contrib/k8s-jobs.md @@ -284,6 +284,9 @@ To do this, set the following property. |Property| Possible Values |Description|Default|required| |--------|-----------------|-----------|-------|--------| -|`druid.indexer.runner.k8sAndWorker.workerTaskRunnerType`|`String`|Determines whether the `httpRemote` or the `remote` task runner should be used in addition to the Kubernetes task runner.|`httpRemote`|No| -|`druid.indexer.runner.k8sAndWorker.sendAllTasksToWorkerTaskRunner`|`boolean`| Whether to send all the tasks to the worker task runner. If this is set to false all tasks will be sent to Kubernetes|`false`|No| +|`druid.indexer.runner.k8sAndWorker.runnerStrategy.type`| `String` (e.g., `k8s`, `worker`, `taskType`)| Defines the strategy for task runner selection. |`k8s`|No| +|`druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType`| `String` (e.g., `httpRemote`, `remote`)| Specifies the variant of the worker task runner to be utilized.|`httpRemote`|No| +| **For `taskType` runner strategy:**||||| +|`druid.indexer.runner.k8sAndWorker.runnerStrategy.taskType.default`| `String` (e.g., `k8s`, `worker`) | Specifies the default runner to use if no overrides apply. This setting ensures there is always a fallback runner available.|None|No| +|`druid.indexer.runner.k8sAndWorker.runnerStrategy.taskType.overrides`| `JsonObject`(e.g., `{"index_kafka": "worker"}`)| Defines task-specific overrides for runner types. Each entry sets a task type to a specific runner, allowing fine control. |`{}`|No| diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java index 243f6626c664..2c45a0ec7b89 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy; import org.apache.druid.tasklogs.TaskLogStreamer; import javax.annotation.Nullable; @@ -57,17 +58,17 @@ public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, WorkerTas { private final KubernetesTaskRunner kubernetesTaskRunner; private final WorkerTaskRunner workerTaskRunner; - private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig; + private final RunnerStrategy runnerStrategy; public KubernetesAndWorkerTaskRunner( KubernetesTaskRunner kubernetesTaskRunner, WorkerTaskRunner workerTaskRunner, - KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig + RunnerStrategy runnerStrategy ) { this.kubernetesTaskRunner = kubernetesTaskRunner; this.workerTaskRunner = workerTaskRunner; - this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig; + this.runnerStrategy = runnerStrategy; } @Override @@ -101,7 +102,8 @@ public void unregisterListener(String listenerId) @Override public ListenableFuture run(Task task) { - if (kubernetesAndWorkerTaskRunnerConfig.isSendAllTasksToWorkerTaskRunner()) { + RunnerStrategy.RunnerType runnerType = runnerStrategy.getRunnerTypeForTask(task); + if (RunnerStrategy.RunnerType.WORKER_RUNNER_TYPE.equals(runnerType)) { return workerTaskRunner.run(task); } else { return kubernetesTaskRunner.run(task); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java index 8e6fb8f7c617..0ffeb0103afa 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java @@ -26,56 +26,49 @@ import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; -import javax.validation.constraints.NotNull; +import javax.annotation.Nullable; public class KubernetesAndWorkerTaskRunnerConfig { - private static final String DEFAULT_WORKER_TASK_RUNNER_TYPE = HttpRemoteTaskRunnerFactory.TYPE_NAME; - /** - * Select which worker task runner to use in addition to the Kubernetes runner, options are httpRemote or remote. - */ - @JsonProperty - @NotNull - private final String workerTaskRunnerType; + private final String RUNNER_STRATEGY_TYPE = "runnerStrategy.type"; + private final String RUNNER_STRATEGY_WORKER_TYPE = "runnerStrategy.workerType"; /** - * Whether or not to send tasks to the worker task runner instead of the Kubernetes runner. + * Select which runner type a task would run on, options are k8s or worker. */ - @JsonProperty - @NotNull - private final boolean sendAllTasksToWorkerTaskRunner; + @JsonProperty(RUNNER_STRATEGY_TYPE) + private String runnerStrategy; + + @JsonProperty(RUNNER_STRATEGY_WORKER_TYPE) + private String workerType; @JsonCreator public KubernetesAndWorkerTaskRunnerConfig( - @JsonProperty("workerTaskRunnerType") String workerTaskRunnerType, - @JsonProperty("sendAllTasksToWorkerTaskRunner") Boolean sendAllTasksToWorkerTaskRunner + @JsonProperty(RUNNER_STRATEGY_TYPE) @Nullable String runnerStrategy, + @JsonProperty(RUNNER_STRATEGY_WORKER_TYPE) @Nullable String workerType ) { - this.workerTaskRunnerType = ObjectUtils.defaultIfNull( - workerTaskRunnerType, - DEFAULT_WORKER_TASK_RUNNER_TYPE - ); + this.runnerStrategy = ObjectUtils.defaultIfNull(runnerStrategy, KubernetesTaskRunnerFactory.TYPE_NAME); + this.workerType = ObjectUtils.defaultIfNull(workerType, HttpRemoteTaskRunnerFactory.TYPE_NAME); Preconditions.checkArgument( - this.workerTaskRunnerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME) || - this.workerTaskRunnerType.equals(RemoteTaskRunnerFactory.TYPE_NAME), - "workerTaskRunnerType must be set to one of (%s, %s)", + this.workerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME) || + this.workerType.equals(RemoteTaskRunnerFactory.TYPE_NAME), + "workerType must be set to one of (%s, %s)", HttpRemoteTaskRunnerFactory.TYPE_NAME, RemoteTaskRunnerFactory.TYPE_NAME ); - this.sendAllTasksToWorkerTaskRunner = ObjectUtils.defaultIfNull( - sendAllTasksToWorkerTaskRunner, - false - ); } - public String getWorkerTaskRunnerType() + @JsonProperty(RUNNER_STRATEGY_TYPE) + public String getRunnerStrategy() { - return workerTaskRunnerType; + return runnerStrategy; } - public boolean isSendAllTasksToWorkerTaskRunner() + @JsonProperty(RUNNER_STRATEGY_WORKER_TYPE) + public String getWorkerType() { - return sendAllTasksToWorkerTaskRunner; + return workerType; } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java index 49ca454f50ae..de6db915c8a9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java @@ -22,7 +22,9 @@ import com.google.inject.Inject; import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; import org.apache.druid.indexing.overlord.TaskRunnerFactory; +import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; +import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy; public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory @@ -33,6 +35,7 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory + { + private KubernetesAndWorkerTaskRunnerConfig runnerConfig; + private Properties props; + private JsonConfigurator configurator; + + @Inject + public void inject( + KubernetesAndWorkerTaskRunnerConfig runnerConfig, + Properties props, + JsonConfigurator configurator + ) + { + this.runnerConfig = runnerConfig; + this.props = props; + this.configurator = configurator; + } + + @Override + public RunnerStrategy get() + { + String runnerStrategy = runnerConfig.getRunnerStrategy(); + + final String runnerStrategyPropertyBase = StringUtils.format( + RUNNERSTRATEGY_PROPERTIES_FORMAT_STRING, + runnerStrategy + ); + final JsonConfigProvider provider = JsonConfigProvider.of( + runnerStrategyPropertyBase, + RunnerStrategy.class + ); + + props.put(runnerStrategyPropertyBase + ".type", runnerStrategy); + provider.inject(props, configurator); + + return provider.get(); + } + } + private void configureTaskLogs(Binder binder) { PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class)); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategy.java new file mode 100644 index 000000000000..8b0a6374ad46 --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategy.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.runnerstrategy; + +import com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.druid.indexing.common.task.Task; + +/** + * Implementation of {@link RunnerStrategy} that always selects the Kubernetes runner type for tasks. + * + *

This strategy is specific for tasks that are intended to be executed in a Kubernetes environment. + * Regardless of task specifics, this strategy always returns {@link RunnerType#KUBERNETES_RUNNER_TYPE}. + */ +public class KubernetesRunnerStrategy implements RunnerStrategy +{ + @JsonCreator + public KubernetesRunnerStrategy() + { + } + + @Override + public RunnerType getRunnerTypeForTask(Task task) + { + return RunnerType.KUBERNETES_RUNNER_TYPE; + } +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/RunnerStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/RunnerStrategy.java new file mode 100644 index 000000000000..5aa2bc4723ab --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/RunnerStrategy.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.runnerstrategy; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.k8s.overlord.KubernetesTaskRunnerFactory; + +/** + * Strategy interface for selecting the appropriate runner type based on the task spec or specific context conditions. + * + *

This interface is part of a strategy pattern and is implemented by different classes that handle + * the logic of selecting a runner type based on various criteria. Each task submitted to the system + * will pass through the strategy implementation to determine the correct runner for execution. + * + *

The strategy uses {@link RunnerType} as a standardized way of referring to and managing different types of runners. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = KubernetesRunnerStrategy.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "k8s", value = KubernetesRunnerStrategy.class), + @JsonSubTypes.Type(name = "worker", value = WorkerRunnerStrategy.class), + @JsonSubTypes.Type(name = "taskType", value = TaskTypeRunnerStrategy.class) +}) +public interface RunnerStrategy +{ + String WORKER_NAME = "worker"; + + /** + * Enumerates the available runner types, each associated with a specific method of task execution. + * These runner types are used by the strategies to make decisions and by the system to route tasks appropriately. + */ + enum RunnerType + { + KUBERNETES_RUNNER_TYPE(KubernetesTaskRunnerFactory.TYPE_NAME), + WORKER_RUNNER_TYPE(WORKER_NAME); + + private final String type; + + RunnerType(String type) + { + this.type = type; + } + + public String getType() + { + return type; + } + } + + /** + * Analyzes the task and determines the appropriate runner type for executing it. + * + * @param task The task that needs to be executed. + * @return The runner type deemed most suitable for executing the task. + */ + RunnerType getRunnerTypeForTask(Task task); +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategy.java new file mode 100644 index 000000000000..6a16314be5b9 --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategy.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.runnerstrategy; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.indexing.common.task.Task; + +import javax.annotation.Nullable; +import java.util.Map; + +/** + * Implementation of {@link RunnerStrategy} that allows dynamic selection of runner type based on task type. + * + *

This strategy checks each task's type against a set of overrides to determine the appropriate runner type. + * If no override is specified for a task's type, it uses a default runner. + * + *

Runner types are determined based on configurations provided at construction, including default runner + * type and specific overrides per task type. This strategy is designed for environments where tasks may require + * different execution environments (e.g., Kubernetes or worker nodes). + */ +public class TaskTypeRunnerStrategy implements RunnerStrategy +{ + @Nullable + private final Map overrides; + private final RunnerStrategy kubernetesRunnerStrategy = new KubernetesRunnerStrategy(); + private WorkerRunnerStrategy workerRunnerStrategy; + private final RunnerStrategy defaultRunnerStrategy; + private final String defaultRunner; + + @JsonCreator + public TaskTypeRunnerStrategy( + @JsonProperty("default") String defaultRunner, + @JsonProperty("overrides") @Nullable Map overrides + ) + { + Preconditions.checkNotNull(defaultRunner); + workerRunnerStrategy = new WorkerRunnerStrategy(); + defaultRunnerStrategy = RunnerType.WORKER_RUNNER_TYPE.getType().equals(defaultRunner) ? + workerRunnerStrategy : kubernetesRunnerStrategy; + validate(overrides); + this.defaultRunner = defaultRunner; + this.overrides = overrides; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getOverrides() + { + return overrides; + } + + @JsonProperty + public String getDefault() + { + return defaultRunner; + } + + @Override + public RunnerType getRunnerTypeForTask(Task task) + { + String runnerType = null; + if (overrides != null) { + runnerType = overrides.get(task.getType()); + } + + RunnerStrategy runnerStrategy = getRunnerSelectStrategy(runnerType); + return runnerStrategy.getRunnerTypeForTask(task); + } + + private RunnerStrategy getRunnerSelectStrategy(String runnerType) + { + if (runnerType == null) { + return defaultRunnerStrategy; + } + + if (WORKER_NAME.equals(runnerType)) { + return workerRunnerStrategy; + } else { + return kubernetesRunnerStrategy; + } + } + + private void validate(Map overrides) + { + if (overrides == null) { + return; + } + + boolean hasValidRunnerType = + overrides.values().stream().allMatch(v -> RunnerType.WORKER_RUNNER_TYPE.getType().equals(v) + || RunnerType.KUBERNETES_RUNNER_TYPE.getType().equals(v)); + Preconditions.checkArgument( + hasValidRunnerType, + "Invalid config in 'overrides'. Each runner type must be either '%s' or '%s'.", + RunnerType.WORKER_RUNNER_TYPE.getType(), + RunnerType.KUBERNETES_RUNNER_TYPE.getType() + ); + } + + @Override + public String toString() + { + return "TaskTypeRunnerStrategy{" + + "default=" + defaultRunner + + ", overrides=" + overrides + + '}'; + } +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategy.java new file mode 100644 index 000000000000..bd06f91aa8f1 --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategy.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.runnerstrategy; + +import com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.druid.indexing.common.task.Task; + +/** + * Implementation of {@link RunnerStrategy} that always selects the Worker runner type for tasks. + * + *

This strategy is specific for tasks that are intended to be executed in a Worker environment. + * Regardless of task specifics, this strategy always returns {@link RunnerType#WORKER_RUNNER_TYPE}. + */ +public class WorkerRunnerStrategy implements RunnerStrategy +{ + @JsonCreator + public WorkerRunnerStrategy() + { + } + + @Override + public RunnerType getRunnerTypeForTask(Task task) + { + return RunnerType.WORKER_RUNNER_TYPE; + } +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java index 979aba69291a..8ad631682f95 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java @@ -20,8 +20,6 @@ package org.apache.druid.k8s.overlord; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; -import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; import org.apache.druid.jackson.DefaultObjectMapper; import org.junit.Assert; import org.junit.Test; @@ -39,9 +37,8 @@ public void test_deserializable() throws IOException KubernetesAndWorkerTaskRunnerConfig.class ); - Assert.assertEquals(RemoteTaskRunnerFactory.TYPE_NAME, config.getWorkerTaskRunnerType()); - Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner()); - + Assert.assertEquals("worker", config.getRunnerStrategy()); + Assert.assertEquals("remote", config.getWorkerType()); } @Test @@ -49,7 +46,6 @@ public void test_withDefaults() { KubernetesAndWorkerTaskRunnerConfig config = new KubernetesAndWorkerTaskRunnerConfig(null, null); - Assert.assertEquals(HttpRemoteTaskRunnerFactory.TYPE_NAME, config.getWorkerTaskRunnerType()); - Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner()); + Assert.assertEquals(KubernetesTaskRunnerFactory.TYPE_NAME, config.getRunnerStrategy()); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java index c8e6d3afa03c..88696017d056 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java @@ -22,6 +22,8 @@ import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; +import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy; +import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -45,7 +47,8 @@ public void test_useHttpTaskRunner_asDefault() kubernetesTaskRunnerFactory, httpRemoteTaskRunnerFactory, remoteTaskRunnerFactory, - new KubernetesAndWorkerTaskRunnerConfig(null, null) + new KubernetesAndWorkerTaskRunnerConfig(null, null), + new WorkerRunnerStrategy() ); EasyMock.expect(httpRemoteTaskRunnerFactory.build()).andReturn(null); @@ -63,7 +66,8 @@ public void test_specifyRemoteTaskRunner() kubernetesTaskRunnerFactory, httpRemoteTaskRunnerFactory, remoteTaskRunnerFactory, - new KubernetesAndWorkerTaskRunnerConfig("remote", null) + new KubernetesAndWorkerTaskRunnerConfig(null, "remote"), + new WorkerRunnerStrategy() ); EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null); @@ -81,7 +85,8 @@ public void test_specifyIncorrectTaskRunner_shouldThrowException() kubernetesTaskRunnerFactory, httpRemoteTaskRunnerFactory, remoteTaskRunnerFactory, - new KubernetesAndWorkerTaskRunnerConfig("noop", null) + new KubernetesAndWorkerTaskRunnerConfig(null, "noop"), + new KubernetesRunnerStrategy() ); EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java index af5a6c39bb0b..3ab515cc6e55 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java @@ -31,6 +31,9 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner; +import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy; +import org.apache.druid.k8s.overlord.runnerstrategy.TaskTypeRunnerStrategy; +import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -67,7 +70,7 @@ public void setup() runner = new KubernetesAndWorkerTaskRunner( kubernetesTaskRunner, workerTaskRunner, - new KubernetesAndWorkerTaskRunnerConfig(null, null) + new KubernetesRunnerStrategy() ); } @@ -77,7 +80,7 @@ public void test_runOnKubernetes() throws ExecutionException, InterruptedExcepti KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner( kubernetesTaskRunner, workerTaskRunner, - new KubernetesAndWorkerTaskRunnerConfig(null, false) + new KubernetesRunnerStrategy() ); TaskStatus taskStatus = TaskStatus.success(ID); EasyMock.expect(kubernetesTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus)); @@ -93,7 +96,7 @@ public void test_runOnWorker() throws ExecutionException, InterruptedException KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner( kubernetesTaskRunner, workerTaskRunner, - new KubernetesAndWorkerTaskRunnerConfig(null, true) + new WorkerRunnerStrategy() ); TaskStatus taskStatus = TaskStatus.success(ID); EasyMock.expect(workerTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus)); @@ -103,6 +106,33 @@ public void test_runOnWorker() throws ExecutionException, InterruptedException verifyAll(); } + @Test + public void test_runOnKubernetesOrWorkerBasedOnStrategy() throws ExecutionException, InterruptedException + { + TaskTypeRunnerStrategy runnerStrategy = new TaskTypeRunnerStrategy("k8s", ImmutableMap.of("index_kafka", "worker")); + KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner( + kubernetesTaskRunner, + workerTaskRunner, + runnerStrategy + ); + Task taskMock = EasyMock.createMock(Task.class); + TaskStatus taskStatus = TaskStatus.success(ID); + EasyMock.expect(taskMock.getId()).andReturn(ID).anyTimes(); + + EasyMock.expect(taskMock.getType()).andReturn("index_kafka").once(); + EasyMock.expect(workerTaskRunner.run(taskMock)).andReturn(Futures.immediateFuture(taskStatus)).once(); + EasyMock.replay(taskMock, workerTaskRunner); + Assert.assertEquals(taskStatus, kubernetesAndWorkerTaskRunner.run(taskMock).get()); + EasyMock.verify(taskMock, workerTaskRunner); + EasyMock.reset(taskMock, workerTaskRunner); + + EasyMock.expect(taskMock.getType()).andReturn("compact").once(); + EasyMock.expect(kubernetesTaskRunner.run(taskMock)).andReturn(Futures.immediateFuture(taskStatus)).once(); + EasyMock.replay(taskMock, kubernetesTaskRunner); + Assert.assertEquals(taskStatus, kubernetesAndWorkerTaskRunner.run(taskMock).get()); + EasyMock.verify(taskMock, kubernetesTaskRunner); + } + @Test public void test_getUsedCapacity() { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategyTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategyTest.java new file mode 100644 index 000000000000..880d5528ac7b --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategyTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.runnerstrategy; + +import org.apache.druid.indexing.common.task.Task; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(EasyMockRunner.class) +public class KubernetesRunnerStrategyTest extends EasyMockSupport +{ + @Mock + Task task; + + @Test + public void test_kubernetesRunnerStrategy_returnsCorrectRunnerType() + { + KubernetesRunnerStrategy runnerStrategy = new KubernetesRunnerStrategy(); + + Assert.assertEquals(RunnerStrategy.RunnerType.KUBERNETES_RUNNER_TYPE, runnerStrategy.getRunnerTypeForTask(task)); + } +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategyTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategyTest.java new file mode 100644 index 000000000000..a32630ed6144 --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategyTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.runnerstrategy; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.k8s.overlord.KubernetesTaskRunnerFactory; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(EasyMockRunner.class) +public class TaskTypeRunnerStrategyTest extends EasyMockSupport +{ + @Mock + Task task; + + @Test + public void test_taskTypeRunnerStrategy_returnsCorrectRunnerType() + { + TaskTypeRunnerStrategy runnerStrategy = new TaskTypeRunnerStrategy("k8s", ImmutableMap.of("index_kafka", "worker")); + EasyMock.expect(task.getType()).andReturn("index_kafka"); + EasyMock.expectLastCall().once(); + EasyMock.expect(task.getType()).andReturn("compact"); + EasyMock.expectLastCall().once(); + replayAll(); + Assert.assertEquals(RunnerStrategy.WORKER_NAME, runnerStrategy.getRunnerTypeForTask(task).getType()); + Assert.assertEquals(KubernetesTaskRunnerFactory.TYPE_NAME, runnerStrategy.getRunnerTypeForTask(task).getType()); + verifyAll(); + } + + @Test(expected = IllegalArgumentException.class) + public void test_invalidOverridesConfig_shouldThrowException() + { + new TaskTypeRunnerStrategy( + "k8s", + ImmutableMap.of( + "index_kafka", + "non_exist_runner" + ) + ); + } +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategyTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategyTest.java new file mode 100644 index 000000000000..1a3ae34fc6a7 --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategyTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.runnerstrategy; + + +import org.apache.druid.indexing.common.task.Task; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(EasyMockRunner.class) +public class WorkerRunnerStrategyTest extends EasyMockSupport +{ + @Mock + Task task; + + @Test + public void test_workerRunnerStrategy_returnsCorrectRunnerType() + { + WorkerRunnerStrategy runnerStrategy = new WorkerRunnerStrategy(); + Assert.assertEquals(RunnerStrategy.WORKER_NAME, runnerStrategy.getRunnerTypeForTask(task).getType()); + } +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json index 757b07ebda52..43e7414f11f8 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json @@ -1,4 +1,4 @@ { - "workerTaskRunnerType": "remote", - "sendAllTasksToWorkerTaskRunner": false + "runnerStrategy.type": "worker", + "runnerStrategy.workerType": "remote" } \ No newline at end of file