Skip to content

Commit

Permalink
Refactor pod template logic to make it easier to test (apache#17178)
Browse files Browse the repository at this point in the history
* Refactoring of pod template logic

* fix javadoc

* Fix intellij

* remove unneeded throw

* PR comments

* fix style

* Fix unit tests
  • Loading branch information
georgew5656 authored Sep 30, 2024
1 parent 28fead5 commit 5ad6ed0
Show file tree
Hide file tree
Showing 14 changed files with 756 additions and 736 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.k8s.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
Expand All @@ -39,26 +41,38 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskExecutionConfigResource;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
import org.apache.druid.k8s.overlord.taskadapter.DynamicConfigPodTemplateSelector;
import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogKiller;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogs;

import java.util.Locale;
import java.util.Properties;


Expand Down Expand Up @@ -162,6 +176,70 @@ TaskRunnerFactory<? extends WorkerTaskRunner> provideWorkerTaskRunner(
: injector.getInstance(RemoteTaskRunnerFactory.class);
}

/**
* Provides a TaskAdapter instance for the KubernetesTaskRunner.
*/
@Provides
@LazySingleton
TaskAdapter provideTaskAdapter(
DruidKubernetesClient client,
Properties properties,
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
@Self DruidNode druidNode,
@Smile ObjectMapper smileMapper,
TaskLogs taskLogs,
Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef
)
{
String adapter = properties.getProperty(String.format(
Locale.ROOT,
"%s.%s.adapter.type",
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX,
"k8s"
));

if (adapter != null && !MultiContainerTaskAdapter.TYPE.equals(adapter) && kubernetesTaskRunnerConfig.isSidecarSupport()) {
throw new IAE(
"Invalid pod adapter [%s], only pod adapter [%s] can be specified when sidecarSupport is enabled",
adapter,
MultiContainerTaskAdapter.TYPE
);
}

if (MultiContainerTaskAdapter.TYPE.equals(adapter) || kubernetesTaskRunnerConfig.isSidecarSupport()) {
return new MultiContainerTaskAdapter(
client,
kubernetesTaskRunnerConfig,
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper,
taskLogs
);
} else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
return new PodTemplateTaskAdapter(
kubernetesTaskRunnerConfig,
taskConfig,
druidNode,
smileMapper,
taskLogs,
new DynamicConfigPodTemplateSelector(properties, dynamicConfigRef)
);
} else {
return new SingleContainerTaskAdapter(
client,
kubernetesTaskRunnerConfig,
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper,
taskLogs
);
}
}

private static class RunnerStrategyProvider implements Provider<RunnerStrategy>
{
private KubernetesAndWorkerTaskRunnerConfig runnerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,73 +20,47 @@
package org.apache.druid.k8s.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogs;

import java.util.Locale;
import java.util.Properties;

public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<KubernetesTaskRunner>
{
public static final String TYPE_NAME = "k8s";
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
private final KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
private final StartupLoggingConfig startupLoggingConfig;
private final TaskLogs taskLogs;
private final DruidNode druidNode;
private final TaskConfig taskConfig;
private final Properties properties;
private final DruidKubernetesClient druidKubernetesClient;
private final ServiceEmitter emitter;
private final Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
private KubernetesTaskRunner runner;
private final TaskAdapter taskAdapter;

@Inject
public KubernetesTaskRunnerFactory(
@Smile ObjectMapper smileMapper,
@EscalatedGlobal final HttpClient httpClient,
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
StartupLoggingConfig startupLoggingConfig,
TaskLogs taskLogs,
@Self DruidNode druidNode,
TaskConfig taskConfig,
Properties properties,
DruidKubernetesClient druidKubernetesClient,
ServiceEmitter emitter,
Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef
TaskAdapter taskAdapter
)
{
this.smileMapper = smileMapper;
this.httpClient = httpClient;
this.kubernetesTaskRunnerConfig = kubernetesTaskRunnerConfig;
this.startupLoggingConfig = startupLoggingConfig;
this.taskLogs = taskLogs;
this.druidNode = druidNode;
this.taskConfig = taskConfig;
this.properties = properties;
this.druidKubernetesClient = druidKubernetesClient;
this.emitter = emitter;
this.dynamicConfigRef = dynamicConfigRef;
this.taskAdapter = taskAdapter;
}

@Override
Expand All @@ -101,7 +75,7 @@ public KubernetesTaskRunner build()
);

runner = new KubernetesTaskRunner(
buildTaskAdapter(druidKubernetesClient),
taskAdapter,
kubernetesTaskRunnerConfig,
peonClient,
httpClient,
Expand All @@ -117,53 +91,4 @@ public KubernetesTaskRunner get()
return runner;
}

private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
{
String adapter = properties.getProperty(String.format(
Locale.ROOT,
"%s.%s.adapter.type",
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX,
TYPE_NAME
));

if (adapter != null && !MultiContainerTaskAdapter.TYPE.equals(adapter) && kubernetesTaskRunnerConfig.isSidecarSupport()) {
throw new IAE(
"Invalid pod adapter [%s], only pod adapter [%s] can be specified when sidecarSupport is enabled",
adapter,
MultiContainerTaskAdapter.TYPE
);
}

if (MultiContainerTaskAdapter.TYPE.equals(adapter) || kubernetesTaskRunnerConfig.isSidecarSupport()) {
return new MultiContainerTaskAdapter(
client,
kubernetesTaskRunnerConfig,
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper,
taskLogs
);
} else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
return new PodTemplateTaskAdapter(
kubernetesTaskRunnerConfig,
taskConfig,
druidNode,
smileMapper,
properties,
taskLogs,
dynamicConfigRef
);
} else {
return new SingleContainerTaskAdapter(
client,
kubernetesTaskRunnerConfig,
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper,
taskLogs
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord.taskadapter;

import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.client.utils.Serialization;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy;

import java.io.File;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

public class DynamicConfigPodTemplateSelector implements PodTemplateSelector
{

private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX
+ ".k8s.podTemplate.";

private final Properties properties;
private HashMap<String, PodTemplate> podTemplates;
private Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;

public DynamicConfigPodTemplateSelector(
Properties properties,
Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef
)
{
this.properties = properties;
this.dynamicConfigRef = dynamicConfigRef;
initializeTemplatesFromFileSystem();
}

private void initializeTemplatesFromFileSystem()
{
Set<String> taskAdapterTemplateKeys = getTaskAdapterTemplates(properties);
if (!taskAdapterTemplateKeys.contains("base")) {
throw new IAE(
"Pod template task adapter requires a base pod template to be specified under druid.indexer.runner.k8s.podTemplate.base");
}

HashMap<String, PodTemplate> podTemplateMap = new HashMap<>();
for (String taskAdapterTemplateKey : taskAdapterTemplateKeys) {
Optional<PodTemplate> template = loadPodTemplate(taskAdapterTemplateKey, properties);
if (template.isPresent()) {
podTemplateMap.put(taskAdapterTemplateKey, template.get());
}
}
podTemplates = podTemplateMap;
}

private Set<String> getTaskAdapterTemplates(Properties properties)
{
Set<String> taskAdapterTemplates = new HashSet<>();

for (String runtimeProperty : properties.stringPropertyNames()) {
if (runtimeProperty.startsWith(TASK_PROPERTY)) {
String[] taskAdapterPropertyPaths = runtimeProperty.split("\\.");
taskAdapterTemplates.add(taskAdapterPropertyPaths[taskAdapterPropertyPaths.length - 1]);
}
}

return taskAdapterTemplates;
}

private Optional<PodTemplate> loadPodTemplate(String key, Properties properties)
{
String property = TASK_PROPERTY + key;
String podTemplateFile = properties.getProperty(property);
if (podTemplateFile == null) {
throw new IAE("Pod template file not specified for [%s]", property);

}
try {
return Optional.of(Serialization.unmarshal(
Files.newInputStream(new File(podTemplateFile).toPath()),
PodTemplate.class
));
}
catch (Exception e) {
throw new IAE(e, "Failed to load pod template file for [%s] at [%s]", property, podTemplateFile);
}
}

@Override
public Optional<PodTemplateWithName> getPodTemplateForTask(Task task)
{
PodTemplateSelectStrategy podTemplateSelectStrategy;
KubernetesTaskRunnerDynamicConfig dynamicConfig = dynamicConfigRef.get();
if (dynamicConfig == null || dynamicConfig.getPodTemplateSelectStrategy() == null) {
podTemplateSelectStrategy = KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY;
} else {
podTemplateSelectStrategy = dynamicConfig.getPodTemplateSelectStrategy();
}

return Optional.of(podTemplateSelectStrategy.getPodTemplateForTask(task, podTemplates));
}
}
Loading

0 comments on commit 5ad6ed0

Please sign in to comment.