Skip to content

Commit

Permalink
Merge pull request #58 from st-tech/patch/kubernetes-node-affinity
Browse files Browse the repository at this point in the history
patch kubernetes node affinity
  • Loading branch information
hnarimiya authored Jan 19, 2022
2 parents ded33b2 + 4d2bcd6 commit 0f3c1ec
Show file tree
Hide file tree
Showing 2 changed files with 472 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,38 @@
package io.digdag.standards.command.kubernetes;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CharStreams;
import io.digdag.client.config.Config;
import io.digdag.spi.CommandContext;
import io.digdag.spi.CommandRequest;
import io.digdag.spi.TaskRequest;
import io.fabric8.kubernetes.api.model.Affinity;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.PodSpecBuilder;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.Toleration;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeMount;
import io.fabric8.kubernetes.api.model.PersistentVolume;
import io.fabric8.kubernetes.api.model.PersistentVolumeSpec;
import io.fabric8.kubernetes.api.model.PersistentVolumeClaim;
import io.fabric8.kubernetes.api.model.PersistentVolumeClaimSpec;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.utils.Serialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -49,13 +61,18 @@ public KubernetesClientConfig getConfig()
public Pod runPod(final CommandContext context, final CommandRequest request,
final String name, final List<String> commands, final List<String> arguments)
{
final Container container = createContainer(context, request, name, commands, arguments);
final PodSpec podSpec = createPodSpec(context, request, container);
final io.fabric8.kubernetes.api.model.Pod pod = client.pods()
.inNamespace(client.getNamespace())
// If PersistentVolume or PersistentVolumeClaim is set, create PersistentVolume or PersistentVolumeClaim before making pod.
createPersistentVolume(context);
createPersistentVolumeClaim(context);

final Config kubernetesPodConfig = extractTargetKindConfig(context, "Pod");
final Container container = createContainer(context, request, kubernetesPodConfig, name, commands, arguments);
final PodSpec podSpec = createPodSpec(context, request, kubernetesPodConfig, container);
io.fabric8.kubernetes.api.model.Pod pod = client.pods()
.createNew()
.withNewMetadata()
.withName(name)
.withNamespace(client.getNamespace())
.withLabels(getPodLabels())
.endMetadata()
.withSpec(podSpec)
Expand Down Expand Up @@ -120,57 +137,164 @@ protected Map<String, String> getPodLabels()
return ImmutableMap.of();
}

protected Container createContainer(final CommandContext context, final CommandRequest request,
final String name, final List<String> commands, final List<String> arguments)
@VisibleForTesting
Container createContainer(final CommandContext context, final CommandRequest request,
final Config kubernetesPodConfig, final String name, final List<String> commands, final List<String> arguments)
{
return new ContainerBuilder()
Container container = new ContainerBuilder()
.withName(name)
.withImage(getContainerImage(context, request))
.withEnv(toEnvVars(getEnvironments(context, request)))
.withResources(toResourceRequirements(getResourceLimits(context, request), getResourceRequests(context, request)))
.withResources(getResources(kubernetesPodConfig))
.withVolumeMounts(getVolumeMounts(kubernetesPodConfig))
.withCommand(commands)
.withArgs(arguments)
.build();
return container;
}

protected PodSpec createPodSpec(final CommandContext context, final CommandRequest request,
final Container container)
@VisibleForTesting
PodSpec createPodSpec(final CommandContext context, final CommandRequest request,
final Config kubernetesPodConfig, final Container container)
{
// TODO
// Revisit what values should be extracted as config params or system config params

return new PodSpecBuilder()
PodSpec podSpec = new PodSpecBuilder()
//.withHostNetwork(true);
//.withDnsPolicy("ClusterFirstWithHostNet");
.addToContainers(container)
.withAffinity(getAffinity(kubernetesPodConfig))
.withTolerations(getTolerations(kubernetesPodConfig))
.withVolumes(getVolumes(kubernetesPodConfig))
// TODO extract as config parameter
// Restart policy is "Never" by default since it needs to avoid executing the operator multiple times. It might not
// make the script idempotent.
// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#restart-policy
.withRestartPolicy("Never")
.build();
return podSpec;
}

protected String getContainerImage(final CommandContext context, final CommandRequest request)
protected PersistentVolume createPersistentVolume(final CommandContext context)
{
final Config config = context.getTaskRequest().getConfig();
final Config dockerConfig = config.getNested("docker");
return dockerConfig.get("image", String.class);
final Config kubernetesPvConfig = extractTargetKindConfig(context, "PersistentVolume");
if (kubernetesPvConfig != null && kubernetesPvConfig.has("spec"))
return client.persistentVolumes()
.createOrReplaceWithNew()
.withNewMetadata()
.withName(kubernetesPvConfig.get("name", String.class))
.withNamespace(client.getNamespace())
.endMetadata()
.withSpec(getPersistentVolume(kubernetesPvConfig.get("spec", Config.class)))
.done();
else
return null;
}

protected Map<String, String> getEnvironments(final CommandContext context, final CommandRequest request)
protected PersistentVolumeClaim createPersistentVolumeClaim(final CommandContext context)
{
return request.getEnvironments();
final Config kubernetesPvcConfig = extractTargetKindConfig(context, "PersistentVolumeClaim");
if (kubernetesPvcConfig != null && kubernetesPvcConfig.has("spec"))
return client.persistentVolumeClaims()
.createOrReplaceWithNew()
.withNewMetadata()
.withName(kubernetesPvcConfig.get("name", String.class))
.withNamespace(client.getNamespace())
.endMetadata()
.withSpec(getPersistentVolumeClaim(kubernetesPvcConfig.get("spec", Config.class)))
.done();
else
return null;
}

protected Config extractTargetKindConfig(final CommandContext context, final String kind) {
final TaskRequest taskRequest = context.getTaskRequest();
final Config kubernetesConfig = taskRequest.getConfig().get("kubernetes", Config.class);
Config kubernetesTargetKindConfig = null;
if (kubernetesConfig != null && kubernetesConfig.has(kind)) kubernetesTargetKindConfig = kubernetesConfig.get(kind, Config.class);
return kubernetesTargetKindConfig;
}

@VisibleForTesting
PersistentVolumeSpec getPersistentVolume(Config kubernetesPvSpecConfig) {
final JsonNode persistentVolumeSpecNode = kubernetesPvSpecConfig.getInternalObjectNode();
return Serialization.unmarshal(persistentVolumeSpecNode.toString(), PersistentVolumeSpec.class);
}

@VisibleForTesting
PersistentVolumeClaimSpec getPersistentVolumeClaim(Config kubernetesPvcSpecConfig) {
final JsonNode persistentVolumeClaimSpecNode = kubernetesPvcSpecConfig.getInternalObjectNode();
return Serialization.unmarshal(persistentVolumeClaimSpecNode.toString(), PersistentVolumeClaimSpec.class);
}

protected ResourceRequirements getResources(Config kubernetesPodConfig) {
if (kubernetesPodConfig != null && kubernetesPodConfig.has("resources")) {
final JsonNode resourcesNode = kubernetesPodConfig.getInternalObjectNode().get("resources");
return Serialization.unmarshal(resourcesNode.toString(), ResourceRequirements.class);
} else {
return null;
}
}

protected List<VolumeMount> getVolumeMounts(Config kubernetesPodConfig) {
if (kubernetesPodConfig != null && kubernetesPodConfig.has("volumeMounts")) {
final JsonNode volumeMountsNode = kubernetesPodConfig.getInternalObjectNode().get("volumeMounts");
return convertToResourceList(volumeMountsNode, VolumeMount.class);
} else {
return null;
}
}

protected Map<String, String> getResourceLimits(final CommandContext context, final CommandRequest request)
protected Affinity getAffinity(Config kubernetesPodConfig) {
if (kubernetesPodConfig != null && kubernetesPodConfig.has("affinity")) {
final JsonNode affinityNode = kubernetesPodConfig.getInternalObjectNode().get("affinity");
return Serialization.unmarshal(affinityNode.toString(), Affinity.class);
} else {
return null;
}
}

protected List<Toleration> getTolerations(Config kubernetesPodConfig) {
if (kubernetesPodConfig != null && kubernetesPodConfig.has("tolerations")) {
final JsonNode tolerationsNode = kubernetesPodConfig.getInternalObjectNode().get("tolerations");
return convertToResourceList(tolerationsNode, Toleration.class);
} else {
return null;
}
}

protected List<Volume> getVolumes(Config kubernetesPodConfig) {
if (kubernetesPodConfig != null && kubernetesPodConfig.has("volumes")) {
final JsonNode volumesNode = kubernetesPodConfig.getInternalObjectNode().get("volumes");
return convertToResourceList(volumesNode, Volume.class);
} else {
return null;
}
}

protected <T> List<T> convertToResourceList(final JsonNode node, final Class<T> type)
{
return ImmutableMap.of();
List<T> resourcesList = new ArrayList<>();
if (node.isArray()){
for (JsonNode resource : node) {
resourcesList.add(Serialization.unmarshal(resource.toString(), type));
}
} else {
resourcesList.add(Serialization.unmarshal(node.toString(), type));
}
return resourcesList;
}

protected Map<String, String> getResourceRequests(final CommandContext context, final CommandRequest request)
protected String getContainerImage(final CommandContext context, final CommandRequest request)
{
return ImmutableMap.of();
final Config config = context.getTaskRequest().getConfig();
final Config dockerConfig = config.getNested("docker");
return dockerConfig.get("image", String.class);
}

protected Map<String, String> getEnvironments(final CommandContext context, final CommandRequest request)
{
return request.getEnvironments();
}

private static List<EnvVar> toEnvVars(final Map<String, String> environments)
Expand All @@ -183,23 +307,6 @@ private static List<EnvVar> toEnvVars(final Map<String, String> environments)
return envVars.build();
}

private static ResourceRequirements toResourceRequirements(
final Map<String, String> limits,
final Map<String, String> requests)
{
final ImmutableMap.Builder<String, Quantity> ls = new ImmutableMap.Builder<>();
for (Map.Entry<String, String> e : limits.entrySet()) {
ls.put(e.getKey(), new Quantity(e.getValue()));
}

final ImmutableMap.Builder<String, Quantity> rs = new ImmutableMap.Builder<>();
for (Map.Entry<String, String> e : requests.entrySet()) {
rs.put(e.getKey(), new Quantity(e.getValue()));
}

return new ResourceRequirements(ls.build(), rs.build());
}

@Override
public void close()
{
Expand Down
Loading

0 comments on commit 0f3c1ec

Please sign in to comment.