Skip to content

Commit

Permalink
Ability to send task types to k8s or worker task runner
Browse files Browse the repository at this point in the history
  • Loading branch information
YongGang committed Oct 18, 2023
1 parent 0a27a7a commit 357485f
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,5 @@ To do this, set the following property.
|--------|-----------------|-----------|-------|--------|
|`druid.indexer.runner.k8sAndWorker.workerTaskRunnerType`|`String`|Determines whether the `httpRemote` or the `remote` task runner should be used in addition to the Kubernetes task runner.|`httpRemote`|No|
|`druid.indexer.runner.k8sAndWorker.sendAllTasksToWorkerTaskRunner`|`boolean`| Whether to send all the tasks to the worker task runner. If this is set to false all tasks will be sent to Kubernetes|`false`|No|
|`druid.indexer.runner.k8sAndWorker.runnerSelectorSpec`|`{"default":"k8s","overrides": {"index_kafka": "worker"}}`| Determines for a task type whether it should run in `k8s` or `worker` runner type|`false`|No|

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand Down Expand Up @@ -104,7 +105,23 @@ public ListenableFuture<TaskStatus> run(Task task)
if (kubernetesAndWorkerTaskRunnerConfig.isSendAllTasksToWorkerTaskRunner()) {
return workerTaskRunner.run(task);
} else {
return kubernetesTaskRunner.run(task);
KubernetesRunnerSelectStrategy runnerSelectStrategy = kubernetesAndWorkerTaskRunnerConfig.getRunnerSelectStrategy();
if (runnerSelectStrategy == null) {
return kubernetesTaskRunner.run(task);
}

String runnerTypeForTask = runnerSelectStrategy.getRunnerTypeForTask(task);
if (KubernetesRunnerSelectStrategy.WORKER_RUNNER_TYPE.equals(runnerTypeForTask)) {
return workerTaskRunner.run(task);
} else if (KubernetesRunnerSelectStrategy.KUBERNETES_RUNNER_TYPE.equals(runnerTypeForTask)) {
return kubernetesTaskRunner.run(task);
} else {
throw DruidException.defensive()
.build("Wrong runner type [%s] configured for task type [%s]",
runnerTypeForTask,
task.getType()
);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

public class KubernetesAndWorkerTaskRunnerConfig
Expand All @@ -37,18 +38,25 @@ public class KubernetesAndWorkerTaskRunnerConfig
@JsonProperty
@NotNull
private final String workerTaskRunnerType;

/**
* Whether or not to send tasks to the worker task runner instead of the Kubernetes runner.
*/
@JsonProperty
@NotNull
private final boolean sendAllTasksToWorkerTaskRunner;
/**
* Select which runner type a task would run on, options are k8s or worker.
*/
@JsonProperty
@Nullable
private final RunnerSelectorSpec runnerSelectorSpec;
private KubernetesRunnerSelectStrategy runnerSelectStrategy;

@JsonCreator
public KubernetesAndWorkerTaskRunnerConfig(
@JsonProperty("workerTaskRunnerType") String workerTaskRunnerType,
@JsonProperty("sendAllTasksToWorkerTaskRunner") Boolean sendAllTasksToWorkerTaskRunner
@JsonProperty("sendAllTasksToWorkerTaskRunner") Boolean sendAllTasksToWorkerTaskRunner,
@JsonProperty("runnerSelectorSpec") @Nullable RunnerSelectorSpec runnerSelectorSpec
)
{
this.workerTaskRunnerType = ObjectUtils.defaultIfNull(
Expand All @@ -66,6 +74,11 @@ public KubernetesAndWorkerTaskRunnerConfig(
sendAllTasksToWorkerTaskRunner,
false
);
this.runnerSelectorSpec = runnerSelectorSpec;
if (this.runnerSelectorSpec != null) {
this.runnerSelectStrategy = new KubernetesRunnerSelectStrategy(
runnerSelectorSpec);
}
}

public String getWorkerTaskRunnerType()
Expand All @@ -78,4 +91,9 @@ public boolean isSendAllTasksToWorkerTaskRunner()
return sendAllTasksToWorkerTaskRunner;
}

public KubernetesRunnerSelectStrategy getRunnerSelectStrategy()
{
return runnerSelectStrategy;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord;

import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.task.Task;

/**
* A strategy for selecting a runner type to run a task, it could be k8s or worker.
*/
public class KubernetesRunnerSelectStrategy
{
public static final String KUBERNETES_RUNNER_TYPE = "k8s";
public static final String WORKER_RUNNER_TYPE = "worker";

private final RunnerSelectorSpec runnerSelectorSpec;

public KubernetesRunnerSelectStrategy(
RunnerSelectorSpec runnerSelectorSpec
)
{
Preconditions.checkNotNull(runnerSelectorSpec);
this.runnerSelectorSpec = runnerSelectorSpec;
}

public String getRunnerTypeForTask(Task task)
{
String runnerType = null;
if (runnerSelectorSpec.getOverrides() != null) {
runnerType = runnerSelectorSpec.getOverrides().get(task.getType());
}

return runnerType == null ? runnerSelectorSpec.getDefault() : runnerType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;

public class RunnerSelectorSpec
{
@Nullable
private final Map<String, String> overrides;
private final String defaultRunner;

@JsonCreator
public RunnerSelectorSpec(
@JsonProperty("default") String defaultRunner,
@JsonProperty("overrides") @Nullable Map<String, String> overrides
)
{
Preconditions.checkNotNull(defaultRunner);
Preconditions.checkArgument(
KubernetesRunnerSelectStrategy.KUBERNETES_RUNNER_TYPE.equals(defaultRunner)
|| KubernetesRunnerSelectStrategy.WORKER_RUNNER_TYPE.equals(defaultRunner),
"runnerSelectorSpec default must be set to one of (%s, %s)",
KubernetesRunnerSelectStrategy.KUBERNETES_RUNNER_TYPE,
KubernetesRunnerSelectStrategy.WORKER_RUNNER_TYPE
);
this.defaultRunner = defaultRunner;
this.overrides = overrides;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<String, String> getOverrides()
{
return overrides;
}

@JsonProperty
public String getDefault()
{
return defaultRunner;
}

@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final RunnerSelectorSpec that = (RunnerSelectorSpec) o;
return Objects.equals(defaultRunner, that.defaultRunner) &&
Objects.equals(overrides, that.overrides);
}

@Override
public int hashCode()
{
return Objects.hash(defaultRunner, overrides);
}

@Override
public String toString()
{
return "RunnerSelectorSpec{" +
"default=" + defaultRunner +
", overrides=" + overrides +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ public void test_deserializable() throws IOException

Assert.assertEquals(RemoteTaskRunnerFactory.TYPE_NAME, config.getWorkerTaskRunnerType());
Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner());

KubernetesRunnerSelectStrategy runnerSelectStrategy = config.getRunnerSelectStrategy();
Assert.assertNotNull(runnerSelectStrategy);
}

@Test
public void test_withDefaults()
{
KubernetesAndWorkerTaskRunnerConfig config = new KubernetesAndWorkerTaskRunnerConfig(null, null);
KubernetesAndWorkerTaskRunnerConfig config = new KubernetesAndWorkerTaskRunnerConfig(null, null, null);

Assert.assertEquals(HttpRemoteTaskRunnerFactory.TYPE_NAME, config.getWorkerTaskRunnerType());
Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner());
Assert.assertNull(config.getRunnerSelectStrategy());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void test_useHttpTaskRunner_asDefault()
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
new KubernetesAndWorkerTaskRunnerConfig(null, null)
new KubernetesAndWorkerTaskRunnerConfig(null, null, null)
);

EasyMock.expect(httpRemoteTaskRunnerFactory.build()).andReturn(null);
Expand All @@ -63,7 +63,7 @@ public void test_specifyRemoteTaskRunner()
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
new KubernetesAndWorkerTaskRunnerConfig("remote", null)
new KubernetesAndWorkerTaskRunnerConfig("remote", null, null)
);

EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
Expand All @@ -81,7 +81,7 @@ public void test_specifyIncorrectTaskRunner_shouldThrowException()
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
new KubernetesAndWorkerTaskRunnerConfig("noop", null)
new KubernetesAndWorkerTaskRunnerConfig("noop", null, null)
);

EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void setup()
runner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
new KubernetesAndWorkerTaskRunnerConfig(null, null)
new KubernetesAndWorkerTaskRunnerConfig(null, null, null)
);
}

Expand All @@ -77,7 +77,7 @@ public void test_runOnKubernetes() throws ExecutionException, InterruptedExcepti
KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
new KubernetesAndWorkerTaskRunnerConfig(null, false)
new KubernetesAndWorkerTaskRunnerConfig(null, false, null)
);
TaskStatus taskStatus = TaskStatus.success(ID);
EasyMock.expect(kubernetesTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus));
Expand All @@ -93,7 +93,7 @@ public void test_runOnWorker() throws ExecutionException, InterruptedException
KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
new KubernetesAndWorkerTaskRunnerConfig(null, true)
new KubernetesAndWorkerTaskRunnerConfig(null, true, null)
);
TaskStatus taskStatus = TaskStatus.success(ID);
EasyMock.expect(workerTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus));
Expand All @@ -103,6 +103,33 @@ public void test_runOnWorker() throws ExecutionException, InterruptedException
verifyAll();
}

@Test
public void test_runOnKubernetesOrWorkerBasedOnSelectorSpec() throws ExecutionException, InterruptedException
{
RunnerSelectorSpec spec = new RunnerSelectorSpec("k8s", ImmutableMap.of("index_kafka", "worker"));
KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
new KubernetesAndWorkerTaskRunnerConfig(null, false, spec)
);
Task taskMock = EasyMock.createMock(Task.class);
TaskStatus taskStatus = TaskStatus.success(ID);
EasyMock.expect(taskMock.getId()).andReturn(ID).anyTimes();

EasyMock.expect(taskMock.getType()).andReturn("index_kafka").once();
EasyMock.expect(workerTaskRunner.run(taskMock)).andReturn(Futures.immediateFuture(taskStatus)).once();
EasyMock.replay(taskMock, workerTaskRunner);
Assert.assertEquals(taskStatus, kubernetesAndWorkerTaskRunner.run(taskMock).get());
EasyMock.verify(taskMock, workerTaskRunner);
EasyMock.reset(taskMock, workerTaskRunner);

EasyMock.expect(taskMock.getType()).andReturn("compact").once();
EasyMock.expect(kubernetesTaskRunner.run(taskMock)).andReturn(Futures.immediateFuture(taskStatus)).once();
EasyMock.replay(taskMock, kubernetesTaskRunner);
Assert.assertEquals(taskStatus, kubernetesAndWorkerTaskRunner.run(taskMock).get());
EasyMock.verify(taskMock, kubernetesTaskRunner);
}

@Test
public void test_getUsedCapacity()
{
Expand Down
Loading

0 comments on commit 357485f

Please sign in to comment.