Skip to content

Commit

Permalink
Ability to send task types to k8s or worker task runner (apache#15196) (
Browse files Browse the repository at this point in the history
  • Loading branch information
suneet-s authored Oct 25, 2023
1 parent 8fa961b commit f105704
Show file tree
Hide file tree
Showing 16 changed files with 587 additions and 55 deletions.
7 changes: 5 additions & 2 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ To do this, set the following property.

|Property| Possible Values |Description|Default|required|
|--------|-----------------|-----------|-------|--------|
|`druid.indexer.runner.k8sAndWorker.workerTaskRunnerType`|`String`|Determines whether the `httpRemote` or the `remote` task runner should be used in addition to the Kubernetes task runner.|`httpRemote`|No|
|`druid.indexer.runner.k8sAndWorker.sendAllTasksToWorkerTaskRunner`|`boolean`| Whether to send all the tasks to the worker task runner. If this is set to false all tasks will be sent to Kubernetes|`false`|No|
|`druid.indexer.runner.k8sAndWorker.runnerStrategy.type`| `String` (e.g., `k8s`, `worker`, `taskType`)| Defines the strategy for task runner selection. |`k8s`|No|
|`druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType`| `String` (e.g., `httpRemote`, `remote`)| Specifies the variant of the worker task runner to be utilized.|`httpRemote`|No|
| **For `taskType` runner strategy:**|||||
|`druid.indexer.runner.k8sAndWorker.runnerStrategy.taskType.default`| `String` (e.g., `k8s`, `worker`) | Specifies the default runner to use if no overrides apply. This setting ensures there is always a fallback runner available.|None|No|
|`druid.indexer.runner.k8sAndWorker.runnerStrategy.taskType.overrides`| `JsonObject`(e.g., `{"index_kafka": "worker"}`)| Defines task-specific overrides for runner types. Each entry sets a task type to a specific runner, allowing fine control. |`{}`|No|

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
import org.apache.druid.tasklogs.TaskLogStreamer;

import javax.annotation.Nullable;
Expand All @@ -57,17 +58,17 @@ public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, WorkerTas
{
private final KubernetesTaskRunner kubernetesTaskRunner;
private final WorkerTaskRunner workerTaskRunner;
private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig;
private final RunnerStrategy runnerStrategy;

public KubernetesAndWorkerTaskRunner(
KubernetesTaskRunner kubernetesTaskRunner,
WorkerTaskRunner workerTaskRunner,
KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
RunnerStrategy runnerStrategy
)
{
this.kubernetesTaskRunner = kubernetesTaskRunner;
this.workerTaskRunner = workerTaskRunner;
this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig;
this.runnerStrategy = runnerStrategy;
}

@Override
Expand Down Expand Up @@ -101,7 +102,8 @@ public void unregisterListener(String listenerId)
@Override
public ListenableFuture<TaskStatus> run(Task task)
{
if (kubernetesAndWorkerTaskRunnerConfig.isSendAllTasksToWorkerTaskRunner()) {
RunnerStrategy.RunnerType runnerType = runnerStrategy.getRunnerTypeForTask(task);
if (RunnerStrategy.RunnerType.WORKER_RUNNER_TYPE.equals(runnerType)) {
return workerTaskRunner.run(task);
} else {
return kubernetesTaskRunner.run(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,56 +26,49 @@
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;

import javax.validation.constraints.NotNull;
import javax.annotation.Nullable;

public class KubernetesAndWorkerTaskRunnerConfig
{
private static final String DEFAULT_WORKER_TASK_RUNNER_TYPE = HttpRemoteTaskRunnerFactory.TYPE_NAME;
/**
* Select which worker task runner to use in addition to the Kubernetes runner, options are httpRemote or remote.
*/
@JsonProperty
@NotNull
private final String workerTaskRunnerType;

private final String RUNNER_STRATEGY_TYPE = "runnerStrategy.type";
private final String RUNNER_STRATEGY_WORKER_TYPE = "runnerStrategy.workerType";
/**
* Whether or not to send tasks to the worker task runner instead of the Kubernetes runner.
* Select which runner type a task would run on, options are k8s or worker.
*/
@JsonProperty
@NotNull
private final boolean sendAllTasksToWorkerTaskRunner;
@JsonProperty(RUNNER_STRATEGY_TYPE)
private String runnerStrategy;

@JsonProperty(RUNNER_STRATEGY_WORKER_TYPE)
private String workerType;

@JsonCreator
public KubernetesAndWorkerTaskRunnerConfig(
@JsonProperty("workerTaskRunnerType") String workerTaskRunnerType,
@JsonProperty("sendAllTasksToWorkerTaskRunner") Boolean sendAllTasksToWorkerTaskRunner
@JsonProperty(RUNNER_STRATEGY_TYPE) @Nullable String runnerStrategy,
@JsonProperty(RUNNER_STRATEGY_WORKER_TYPE) @Nullable String workerType
)
{
this.workerTaskRunnerType = ObjectUtils.defaultIfNull(
workerTaskRunnerType,
DEFAULT_WORKER_TASK_RUNNER_TYPE
);
this.runnerStrategy = ObjectUtils.defaultIfNull(runnerStrategy, KubernetesTaskRunnerFactory.TYPE_NAME);
this.workerType = ObjectUtils.defaultIfNull(workerType, HttpRemoteTaskRunnerFactory.TYPE_NAME);
Preconditions.checkArgument(
this.workerTaskRunnerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME) ||
this.workerTaskRunnerType.equals(RemoteTaskRunnerFactory.TYPE_NAME),
"workerTaskRunnerType must be set to one of (%s, %s)",
this.workerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME) ||
this.workerType.equals(RemoteTaskRunnerFactory.TYPE_NAME),
"workerType must be set to one of (%s, %s)",
HttpRemoteTaskRunnerFactory.TYPE_NAME,
RemoteTaskRunnerFactory.TYPE_NAME
);
this.sendAllTasksToWorkerTaskRunner = ObjectUtils.defaultIfNull(
sendAllTasksToWorkerTaskRunner,
false
);
}

public String getWorkerTaskRunnerType()
@JsonProperty(RUNNER_STRATEGY_TYPE)
public String getRunnerStrategy()
{
return workerTaskRunnerType;
return runnerStrategy;
}

public boolean isSendAllTasksToWorkerTaskRunner()
@JsonProperty(RUNNER_STRATEGY_WORKER_TYPE)
public String getWorkerType()
{
return sendAllTasksToWorkerTaskRunner;
return workerType;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;


public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<KubernetesAndWorkerTaskRunner>
Expand All @@ -33,6 +35,7 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<K
private final HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
private final RemoteTaskRunnerFactory remoteTaskRunnerFactory;
private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig;
private final RunnerStrategy runnerStrategy;

private KubernetesAndWorkerTaskRunner runner;

Expand All @@ -41,27 +44,35 @@ public KubernetesAndWorkerTaskRunnerFactory(
KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory,
HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory,
RemoteTaskRunnerFactory remoteTaskRunnerFactory,
KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig,
RunnerStrategy runnerStrategy
)
{
this.kubernetesTaskRunnerFactory = kubernetesTaskRunnerFactory;
this.httpRemoteTaskRunnerFactory = httpRemoteTaskRunnerFactory;
this.remoteTaskRunnerFactory = remoteTaskRunnerFactory;
this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig;
this.runnerStrategy = runnerStrategy;
}

@Override
public KubernetesAndWorkerTaskRunner build()
{
runner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunnerFactory.build(),
HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(kubernetesAndWorkerTaskRunnerConfig.getWorkerTaskRunnerType()) ?
httpRemoteTaskRunnerFactory.build() : remoteTaskRunnerFactory.build(),
kubernetesAndWorkerTaskRunnerConfig
getWorkerTaskRunner(),
runnerStrategy
);
return runner;
}

private WorkerTaskRunner getWorkerTaskRunner()
{
String workerType = kubernetesAndWorkerTaskRunnerConfig.getWorkerType();
return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType) ?
httpRemoteTaskRunnerFactory.build() : remoteTaskRunnerFactory.build();
}

@Override
public KubernetesAndWorkerTaskRunner get()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.druid.k8s.overlord;

import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import io.fabric8.kubernetes.client.Config;
Expand All @@ -29,6 +31,7 @@
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.LoadScope;
Expand All @@ -37,27 +40,35 @@
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogKiller;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogs;

import java.util.Properties;


@LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
public class KubernetesOverlordModule implements DruidModule
{

private static final Logger log = new Logger(KubernetesOverlordModule.class);
private static final String K8SANDWORKER_PROPERTIES_PREFIX = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX
+ ".k8sAndWorker";
private static final String RUNNERSTRATEGY_PROPERTIES_FORMAT_STRING = K8SANDWORKER_PROPERTIES_PREFIX
+ ".runnerStrategy.%s";

@Override
public void configure(Binder binder)
{
// druid.indexer.runner.type=k8s
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, KubernetesTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8sAndWorker", KubernetesAndWorkerTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, K8SANDWORKER_PROPERTIES_PREFIX, KubernetesAndWorkerTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
PolyBind.createChoice(
binder,
Expand All @@ -78,6 +89,9 @@ public void configure(Binder binder)
.in(LazySingleton.class);
binder.bind(KubernetesTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(KubernetesAndWorkerTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RunnerStrategy.class)
.toProvider(RunnerStrategyProvider.class)
.in(LazySingleton.class);
configureTaskLogs(binder);
}

Expand Down Expand Up @@ -116,6 +130,45 @@ public void stop()
return client;
}

private static class RunnerStrategyProvider implements Provider<RunnerStrategy>
{
private KubernetesAndWorkerTaskRunnerConfig runnerConfig;
private Properties props;
private JsonConfigurator configurator;

@Inject
public void inject(
KubernetesAndWorkerTaskRunnerConfig runnerConfig,
Properties props,
JsonConfigurator configurator
)
{
this.runnerConfig = runnerConfig;
this.props = props;
this.configurator = configurator;
}

@Override
public RunnerStrategy get()
{
String runnerStrategy = runnerConfig.getRunnerStrategy();

final String runnerStrategyPropertyBase = StringUtils.format(
RUNNERSTRATEGY_PROPERTIES_FORMAT_STRING,
runnerStrategy
);
final JsonConfigProvider<RunnerStrategy> provider = JsonConfigProvider.of(
runnerStrategyPropertyBase,
RunnerStrategy.class
);

props.put(runnerStrategyPropertyBase + ".type", runnerStrategy);
provider.inject(props, configurator);

return provider.get();
}
}

private void configureTaskLogs(Binder binder)
{
PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord.runnerstrategy;

import com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.druid.indexing.common.task.Task;

/**
* Implementation of {@link RunnerStrategy} that always selects the Kubernetes runner type for tasks.
*
* <p>This strategy is specific for tasks that are intended to be executed in a Kubernetes environment.
* Regardless of task specifics, this strategy always returns {@link RunnerType#KUBERNETES_RUNNER_TYPE}.
*/
public class KubernetesRunnerStrategy implements RunnerStrategy
{
@JsonCreator
public KubernetesRunnerStrategy()
{
}

@Override
public RunnerType getRunnerTypeForTask(Task task)
{
return RunnerType.KUBERNETES_RUNNER_TYPE;
}
}
Loading

0 comments on commit f105704

Please sign in to comment.