From 48a04f6fc23d8fa5b5169a810eda9535dce9317a Mon Sep 17 00:00:00 2001 From: TrsNium Date: Mon, 2 Dec 2019 18:20:18 +0900 Subject: [PATCH 01/12] support affinity, tolerations and volumes --- .../kubernetes/DefaultKubernetesClient.java | 87 +++++++++-- .../DefaultKubernetesClientTest.java | 146 ++++++++++++++++++ 2 files changed, 221 insertions(+), 12 deletions(-) create mode 100644 digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java diff --git a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java index a1929a811b..14cc243689 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java +++ b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java @@ -3,9 +3,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.CharStreams; +import io.digdag.core.storage.StorageManager; import io.digdag.client.config.Config; +import io.digdag.client.config.ConfigException; import io.digdag.spi.CommandContext; import io.digdag.spi.CommandRequest; +import io.digdag.spi.TaskRequest; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerBuilder; import io.fabric8.kubernetes.api.model.ContainerStatus; @@ -13,15 +16,25 @@ import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.PodSpecBuilder; +import io.fabric8.kubernetes.api.model.Affinity; +import io.fabric8.kubernetes.api.model.Toleration; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeMount; import io.fabric8.kubernetes.api.model.Quantity; import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.fabric8.kubernetes.client.dsl.PodResource; +import io.fabric8.kubernetes.client.utils.Serialization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.JsonNode; + +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.Reader; import java.util.List; +import java.util.ArrayList; import java.util.Map; public class DefaultKubernetesClient @@ -51,15 +64,16 @@ public Pod runPod(final CommandContext context, final CommandRequest request, { final Container container = createContainer(context, request, name, commands, arguments); final PodSpec podSpec = createPodSpec(context, request, container); - final io.fabric8.kubernetes.api.model.Pod pod = client.pods() - .inNamespace(client.getNamespace()) + io.fabric8.kubernetes.api.model.Pod pod = client.pods() .createNew() .withNewMetadata() .withName(name) + .withNamespace(client.getNamespace()) .withLabels(getPodLabels()) .endMetadata() .withSpec(podSpec) .done(); + return Pod.of(pod); } @@ -120,26 +134,43 @@ protected Map getPodLabels() return ImmutableMap.of(); } - protected Container createContainer(final CommandContext context, final CommandRequest request, + @VisibleForTesting + Container createContainer(final CommandContext context, final CommandRequest request, final String name, final List commands, final List arguments) { - return new ContainerBuilder() + final TaskRequest taskRequest = context.getTaskRequest(); + final Config taskConfig = taskRequest.getConfig(); + ContainerBuilder containerBuilder = new ContainerBuilder() .withName(name) .withImage(getContainerImage(context, request)) .withEnv(toEnvVars(getEnvironments(context, request))) .withResources(toResourceRequirements(getResourceLimits(context, request), getResourceRequests(context, request))) .withCommand(commands) - .withArgs(arguments) - .build(); + .withArgs(arguments); + + + final JsonNode node = taskConfig.getInternalObjectNode(); + if (node.has("kubernetes")) { + final JsonNode kubernetesNode = node.get("kubernetes"); + if (kubernetesNode.has("container")) { + final JsonNode containerNode = kubernetesNode.get("container"); + if (containerNode.has("volumeMounts")) { + containerBuilder.withVolumeMounts(convertToResourceList(containerNode.get("volumeMounts"), VolumeMount.class)); + } + } + } + return containerBuilder.build(); } - protected PodSpec createPodSpec(final CommandContext context, final CommandRequest request, + @VisibleForTesting + PodSpec createPodSpec(final CommandContext context, final CommandRequest request, final Container container) { // TODO // Revisit what values should be extracted as config params or system config params - - return new PodSpecBuilder() + final TaskRequest taskRequest = context.getTaskRequest(); + final Config taskConfig = taskRequest.getConfig(); + PodSpecBuilder podSpecBuilder = new PodSpecBuilder() //.withHostNetwork(true); //.withDnsPolicy("ClusterFirstWithHostNet"); .addToContainers(container) @@ -147,8 +178,40 @@ protected PodSpec createPodSpec(final CommandContext context, final CommandReque // Restart policy is "Never" by default since it needs to avoid executing the operator multiple times. It might not // make the script idempotent. // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#restart-policy - .withRestartPolicy("Never") - .build(); + .withRestartPolicy("Never"); + + final JsonNode node = taskConfig.getInternalObjectNode(); + if (node.has("kubernetes")) { + final JsonNode kubernetesNode = node.get("kubernetes"); + if (kubernetesNode.has("pod")) { + final JsonNode podNode = kubernetesNode.get("pod"); + if (podNode.has("affinity")) { + podSpecBuilder.withAffinity(Serialization.unmarshal(podNode.get("affinity").toString(), Affinity.class)); + } + + if (podNode.has("tolerations")) { + podSpecBuilder.withTolerations(convertToResourceList(podNode.get("tolerations"), Toleration.class)); + } + + if (podNode.has("volumes")) { + podSpecBuilder.withVolumes(convertToResourceList(podNode.get("volumes"), Volume.class)); + } + } + } + return podSpecBuilder.build(); + } + + protected List convertToResourceList(final JsonNode node, final Class type) + { + List resourcesList = new ArrayList<>(); + if (node.isArray()){ + for (JsonNode resource : node) { + resourcesList.add(Serialization.unmarshal(resource.toString(), type)); + } + } else { + resourcesList.add(Serialization.unmarshal(node.toString(), type)); + } + return resourcesList; } protected String getContainerImage(final CommandContext context, final CommandRequest request) @@ -183,7 +246,7 @@ private static List toEnvVars(final Map environments) return envVars.build(); } - private static ResourceRequirements toResourceRequirements( + protected static ResourceRequirements toResourceRequirements( final Map limits, final Map requests) { diff --git a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java new file mode 100644 index 0000000000..ab9639cd37 --- /dev/null +++ b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java @@ -0,0 +1,146 @@ +package io.digdag.standards.command.kubernetes; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import io.digdag.spi.CommandContext; +import io.digdag.spi.CommandRequest; +import static io.digdag.client.config.ConfigUtils.newConfig; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static io.digdag.core.workflow.OperatorTestingUtils.newContext; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.PodSpec; +import io.fabric8.kubernetes.api.model.PodSpecBuilder; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import io.fabric8.kubernetes.api.model.AffinityBuilder; +import io.fabric8.kubernetes.api.model.NodeAffinityBuilder; +import io.fabric8.kubernetes.api.model.NodeSelectorBuilder; +import io.fabric8.kubernetes.api.model.NodeSelectorTermBuilder; +import io.fabric8.kubernetes.api.model.NodeSelectorRequirementBuilder; +import io.fabric8.kubernetes.api.model.Container; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +import io.digdag.spi.TaskRequest; +import io.digdag.client.config.Config; +import static io.digdag.core.workflow.OperatorTestingUtils.newTaskRequest; +import static io.digdag.core.workflow.OperatorTestingUtils.newContext; + +import java.util.List; +import java.util.Arrays; +import java.util.ArrayList; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultKubernetesClientTest +{ + + private KubernetesClientConfig kubernetesClientConfig; + private io.fabric8.kubernetes.client.DefaultKubernetesClient k8sDefaultKubernetesClient; + private CommandContext commandContext; + private CommandRequest commandRequest; + + @Before + public void setUp() + throws Exception + { + kubernetesClientConfig = mock(KubernetesClientConfig.class); + k8sDefaultKubernetesClient = mock(io.fabric8.kubernetes.client.DefaultKubernetesClient.class); + commandContext = mock(CommandContext.class); + commandRequest = mock(CommandRequest.class); + } + + @Test + public void testCreateContainer() + throws Exception + { + final Config taskRequestConfig = newConfig() + .set("kubernetes", newConfig().set( + "container", newConfig().set( + "volumeMounts", ImmutableList.of( + newConfig().set("mountPath", "/test-ebs").set("name", "test"))))) + .set("docker", newConfig().set("image", "test")); + + final TaskRequest taskRequest = newTaskRequest().withConfig(taskRequestConfig); + when(commandContext.getTaskRequest()).thenReturn(taskRequest); + DefaultKubernetesClient defaultKubernetesClient = new DefaultKubernetesClient(kubernetesClientConfig, k8sDefaultKubernetesClient); + + String podName = "test"; + List commands = new ArrayList<>(); + List arguments = new ArrayList<>(); + Container container = defaultKubernetesClient.createContainer(commandContext, commandRequest, podName, commands, arguments); + + Container desiredContainer = new ContainerBuilder() + .withName(podName) + .withImage("test") + .withCommand(commands) + .withArgs(arguments) + .withResources(defaultKubernetesClient.toResourceRequirements(defaultKubernetesClient.getResourceLimits(commandContext, commandRequest), defaultKubernetesClient.getResourceRequests(commandContext, commandRequest))) + .withVolumeMounts(Arrays.asList(new VolumeMountBuilder().withName("test").withMountPath("/test-ebs").build())).build(); + + assertThat(container, is(desiredContainer)); + } + + @Test + public void testCreatePodSPec() + throws Exception + { + final Config taskRequestConfig = newConfig() + .set("kubernetes", newConfig().set( + "pod", newConfig().set( + "affinity", newConfig().set( + "nodeAffinity", newConfig().set( + "requiredDuringSchedulingIgnoredDuringExecution", newConfig().set( + "nodeSelectorTerms", ImmutableList.of( + newConfig().set( + "matchExpressions", ImmutableList.of( + newConfig().set("key", "failure-domain.beta.kubernetes.io/zone") + .set("operator", "In") + .set("values", ImmutableList.of( + "asia-northeast1-a" + ))))))))))); + + final TaskRequest taskRequest = newTaskRequest().withConfig(taskRequestConfig); + when(commandContext.getTaskRequest()).thenReturn(taskRequest); + DefaultKubernetesClient defaultKubernetesClient = new DefaultKubernetesClient(kubernetesClientConfig, k8sDefaultKubernetesClient); + + Container container = mock(Container.class); + PodSpec podSpec = defaultKubernetesClient.createPodSpec(commandContext, commandRequest, container); + + PodSpec desiredPodSpec = new PodSpecBuilder() + .addToContainers(container) + .withRestartPolicy("Never") + .withAffinity(new AffinityBuilder() + .withNodeAffinity(new NodeAffinityBuilder() + .withRequiredDuringSchedulingIgnoredDuringExecution(new NodeSelectorBuilder() + .addToNodeSelectorTerms(0, + new NodeSelectorTermBuilder().addToMatchExpressions(0, + new NodeSelectorRequirementBuilder() + .withKey("failure-domain.beta.kubernetes.io/zone") + .addToValues(0,"asia-northeast1-a") + .withOperator("In") + .build() + ) + .build() + ) + .build() + ) + .build() + ) + .build() + ) + .build(); + + assertThat(podSpec, is(desiredPodSpec)); + } +} From 11e66e037b15f3efdf840aed349d4ec3b0881959 Mon Sep 17 00:00:00 2001 From: TrsNium Date: Tue, 10 Dec 2019 18:21:48 +0900 Subject: [PATCH 02/12] fix indent --- .../kubernetes/DefaultKubernetesClient.java | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java index 14cc243689..07f854c9c6 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java +++ b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java @@ -149,16 +149,16 @@ Container createContainer(final CommandContext context, final CommandRequest req .withArgs(arguments); - final JsonNode node = taskConfig.getInternalObjectNode(); - if (node.has("kubernetes")) { - final JsonNode kubernetesNode = node.get("kubernetes"); - if (kubernetesNode.has("container")) { - final JsonNode containerNode = kubernetesNode.get("container"); - if (containerNode.has("volumeMounts")) { - containerBuilder.withVolumeMounts(convertToResourceList(containerNode.get("volumeMounts"), VolumeMount.class)); - } - } - } + final JsonNode node = taskConfig.getInternalObjectNode(); + if (node.has("kubernetes")) { + final JsonNode kubernetesNode = node.get("kubernetes"); + if (kubernetesNode.has("container")) { + final JsonNode containerNode = kubernetesNode.get("container"); + if (containerNode.has("volumeMounts")) { + containerBuilder.withVolumeMounts(convertToResourceList(containerNode.get("volumeMounts"), VolumeMount.class)); + } + } + } return containerBuilder.build(); } @@ -180,25 +180,25 @@ PodSpec createPodSpec(final CommandContext context, final CommandRequest request // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#restart-policy .withRestartPolicy("Never"); - final JsonNode node = taskConfig.getInternalObjectNode(); - if (node.has("kubernetes")) { - final JsonNode kubernetesNode = node.get("kubernetes"); - if (kubernetesNode.has("pod")) { - final JsonNode podNode = kubernetesNode.get("pod"); - if (podNode.has("affinity")) { - podSpecBuilder.withAffinity(Serialization.unmarshal(podNode.get("affinity").toString(), Affinity.class)); - } - - if (podNode.has("tolerations")) { - podSpecBuilder.withTolerations(convertToResourceList(podNode.get("tolerations"), Toleration.class)); - } - - if (podNode.has("volumes")) { - podSpecBuilder.withVolumes(convertToResourceList(podNode.get("volumes"), Volume.class)); - } - } - } - return podSpecBuilder.build(); + final JsonNode node = taskConfig.getInternalObjectNode(); + if (node.has("kubernetes")) { + final JsonNode kubernetesNode = node.get("kubernetes"); + if (kubernetesNode.has("spec")) { + final JsonNode podNode = kubernetesNode.get("spec"); + if (podNode.has("affinity")) { + podSpecBuilder.withAffinity(Serialization.unmarshal(podNode.get("affinity").toString(), Affinity.class)); + } + + if (podNode.has("tolerations")) { + podSpecBuilder.withTolerations(convertToResourceList(podNode.get("tolerations"), Toleration.class)); + } + + if (podNode.has("volumes")) { + podSpecBuilder.withVolumes(convertToResourceList(podNode.get("volumes"), Volume.class)); + } + } + } + return podSpecBuilder.build(); } protected List convertToResourceList(final JsonNode node, final Class type) From bc39ce05ce49ad72e37e372cf7659e5da2aa77d6 Mon Sep 17 00:00:00 2001 From: TrsNium Date: Tue, 10 Dec 2019 18:22:56 +0900 Subject: [PATCH 03/12] rename pod to spec --- .../command/kubernetes/DefaultKubernetesClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java index ab9639cd37..9fccad9d29 100644 --- a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java +++ b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java @@ -97,7 +97,7 @@ public void testCreatePodSPec() { final Config taskRequestConfig = newConfig() .set("kubernetes", newConfig().set( - "pod", newConfig().set( + "spec", newConfig().set( "affinity", newConfig().set( "nodeAffinity", newConfig().set( "requiredDuringSchedulingIgnoredDuringExecution", newConfig().set( From 325bc14051d47f7014601dc882289fd43bf6f9c2 Mon Sep 17 00:00:00 2001 From: TrsNium Date: Wed, 29 Jan 2020 17:05:51 +0900 Subject: [PATCH 04/12] refactor --- .../kubernetes/DefaultKubernetesClient.java | 135 +++++++++--------- .../DefaultKubernetesClientTest.java | 14 +- 2 files changed, 75 insertions(+), 74 deletions(-) diff --git a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java index 07f854c9c6..a1984775e4 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java +++ b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java @@ -17,6 +17,7 @@ import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.PodSpecBuilder; import io.fabric8.kubernetes.api.model.Affinity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.fabric8.kubernetes.api.model.Toleration; import io.fabric8.kubernetes.api.model.Volume; import io.fabric8.kubernetes.api.model.VolumeMount; @@ -62,8 +63,10 @@ public KubernetesClientConfig getConfig() public Pod runPod(final CommandContext context, final CommandRequest request, final String name, final List commands, final List arguments) { - final Container container = createContainer(context, request, name, commands, arguments); - final PodSpec podSpec = createPodSpec(context, request, container); + final TaskRequest taskRequest = context.getTaskRequest(); + final Config kubernetesConfig = taskRequest.getConfig().getNested("kubernetes"); + final Container container = createContainer(context, request, kubernetesConfig, name, commands, arguments); + final PodSpec podSpec = createPodSpec(context, request, kubernetesConfig, container); io.fabric8.kubernetes.api.model.Pod pod = client.pods() .createNew() .withNewMetadata() @@ -136,69 +139,90 @@ protected Map getPodLabels() @VisibleForTesting Container createContainer(final CommandContext context, final CommandRequest request, - final String name, final List commands, final List arguments) + final Config kubernetesConfig, final String name, final List commands, final List arguments) { - final TaskRequest taskRequest = context.getTaskRequest(); - final Config taskConfig = taskRequest.getConfig(); - ContainerBuilder containerBuilder = new ContainerBuilder() + Config kubernetesContainerConfig = null; + if (kubernetesConfig.has("container")) kubernetesContainerConfig = kubernetesConfig.getNested("container"); + + Container container = new ContainerBuilder() .withName(name) .withImage(getContainerImage(context, request)) .withEnv(toEnvVars(getEnvironments(context, request))) - .withResources(toResourceRequirements(getResourceLimits(context, request), getResourceRequests(context, request))) + .withResources(getResources(kubernetesContainerConfig)) + .withVolumeMounts(getVolumeMounts(kubernetesContainerConfig)) .withCommand(commands) - .withArgs(arguments); - - - final JsonNode node = taskConfig.getInternalObjectNode(); - if (node.has("kubernetes")) { - final JsonNode kubernetesNode = node.get("kubernetes"); - if (kubernetesNode.has("container")) { - final JsonNode containerNode = kubernetesNode.get("container"); - if (containerNode.has("volumeMounts")) { - containerBuilder.withVolumeMounts(convertToResourceList(containerNode.get("volumeMounts"), VolumeMount.class)); - } - } - } - return containerBuilder.build(); + .withArgs(arguments) + .build(); + return container; } @VisibleForTesting PodSpec createPodSpec(final CommandContext context, final CommandRequest request, - final Container container) + final Config kubernetesConfig, final Container container) { // TODO // Revisit what values should be extracted as config params or system config params - final TaskRequest taskRequest = context.getTaskRequest(); - final Config taskConfig = taskRequest.getConfig(); - PodSpecBuilder podSpecBuilder = new PodSpecBuilder() + Config kubernetesPodSpecConfig = null; + if (kubernetesConfig.has("spec")) kubernetesPodSpecConfig = kubernetesConfig.getNested("spec"); + PodSpec podSpec = new PodSpecBuilder() //.withHostNetwork(true); //.withDnsPolicy("ClusterFirstWithHostNet"); .addToContainers(container) + .withAffinity(getAffinity(kubernetesPodSpecConfig)) + .withTolerations(getTolerations(kubernetesPodSpecConfig)) + .withVolumes(getVolumes(kubernetesPodSpecConfig)) // TODO extract as config parameter // Restart policy is "Never" by default since it needs to avoid executing the operator multiple times. It might not // make the script idempotent. // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#restart-policy - .withRestartPolicy("Never"); + .withRestartPolicy("Never") + .build(); + return podSpec; + } - final JsonNode node = taskConfig.getInternalObjectNode(); - if (node.has("kubernetes")) { - final JsonNode kubernetesNode = node.get("kubernetes"); - if (kubernetesNode.has("spec")) { - final JsonNode podNode = kubernetesNode.get("spec"); - if (podNode.has("affinity")) { - podSpecBuilder.withAffinity(Serialization.unmarshal(podNode.get("affinity").toString(), Affinity.class)); - } + protected ResourceRequirements getResources(Config kubernetesContainerConfig) { + if (kubernetesContainerConfig != null && kubernetesContainerConfig.has("resources")) { + final JsonNode resourcesNode = kubernetesContainerConfig.getInternalObjectNode().get("resources"); + return Serialization.unmarshal(resourcesNode.toString(), ResourceRequirements.class); + } else { + return null; + } + } - if (podNode.has("tolerations")) { - podSpecBuilder.withTolerations(convertToResourceList(podNode.get("tolerations"), Toleration.class)); - } + protected List getVolumeMounts(Config kubernetesContainerConfig) { + if (kubernetesContainerConfig != null && kubernetesContainerConfig.has("volumeMounts")) { + final JsonNode volumeMountsNode = kubernetesContainerConfig.getInternalObjectNode().get("volumeMounts"); + return convertToResourceList(volumeMountsNode, VolumeMount.class); + } else { + return null; + } + } - if (podNode.has("volumes")) { - podSpecBuilder.withVolumes(convertToResourceList(podNode.get("volumes"), Volume.class)); - } - } + protected Affinity getAffinity(Config kubernetesPodSpecConfig) { + if (kubernetesPodSpecConfig != null && kubernetesPodSpecConfig.has("affinity")) { + final JsonNode affinityNode = kubernetesPodSpecConfig.getInternalObjectNode().get("affinity"); + return Serialization.unmarshal(affinityNode.toString(), Affinity.class); + } else { + return null; + } + } + + protected List getTolerations(Config kubernetesPodSpecConfig) { + if (kubernetesPodSpecConfig != null && kubernetesPodSpecConfig.has("tolerations")) { + final JsonNode tolerationsNode = kubernetesPodSpecConfig.getInternalObjectNode().get("tolerations"); + return convertToResourceList(tolerationsNode, Toleration.class); + } else { + return null; + } + } + + protected List getVolumes(Config kubernetesPodSpecConfig) { + if (kubernetesPodSpecConfig != null && kubernetesPodSpecConfig.has("volumes")) { + final JsonNode volumesNode = kubernetesPodSpecConfig.getInternalObjectNode().get("volumes"); + return convertToResourceList(volumesNode, Volume.class); + } else { + return null; } - return podSpecBuilder.build(); } protected List convertToResourceList(final JsonNode node, final Class type) @@ -226,16 +250,6 @@ protected Map getEnvironments(final CommandContext context, fina return request.getEnvironments(); } - protected Map getResourceLimits(final CommandContext context, final CommandRequest request) - { - return ImmutableMap.of(); - } - - protected Map getResourceRequests(final CommandContext context, final CommandRequest request) - { - return ImmutableMap.of(); - } - private static List toEnvVars(final Map environments) { final ImmutableList.Builder envVars = ImmutableList.builder(); @@ -246,23 +260,6 @@ private static List toEnvVars(final Map environments) return envVars.build(); } - protected static ResourceRequirements toResourceRequirements( - final Map limits, - final Map requests) - { - final ImmutableMap.Builder ls = new ImmutableMap.Builder<>(); - for (Map.Entry e : limits.entrySet()) { - ls.put(e.getKey(), new Quantity(e.getValue())); - } - - final ImmutableMap.Builder rs = new ImmutableMap.Builder<>(); - for (Map.Entry e : requests.entrySet()) { - rs.put(e.getKey(), new Quantity(e.getValue())); - } - - return new ResourceRequirements(ls.build(), rs.build()); - } - @Override public void close() { diff --git a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java index 9fccad9d29..e32f4443bb 100644 --- a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java +++ b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java @@ -26,6 +26,8 @@ import io.fabric8.kubernetes.api.model.NodeSelectorBuilder; import io.fabric8.kubernetes.api.model.NodeSelectorTermBuilder; import io.fabric8.kubernetes.api.model.NodeSelectorRequirementBuilder; +import io.fabric8.kubernetes.api.model.Toleration; +import io.fabric8.kubernetes.api.model.Volume; import io.fabric8.kubernetes.api.model.Container; import org.junit.Before; import org.junit.Test; @@ -66,8 +68,8 @@ public void testCreateContainer() { final Config taskRequestConfig = newConfig() .set("kubernetes", newConfig().set( - "container", newConfig().set( - "volumeMounts", ImmutableList.of( + "container", newConfig() + .set("volumeMounts", ImmutableList.of( newConfig().set("mountPath", "/test-ebs").set("name", "test"))))) .set("docker", newConfig().set("image", "test")); @@ -78,14 +80,15 @@ public void testCreateContainer() String podName = "test"; List commands = new ArrayList<>(); List arguments = new ArrayList<>(); - Container container = defaultKubernetesClient.createContainer(commandContext, commandRequest, podName, commands, arguments); + final Config kubernetesConfig = taskRequest.getConfig().getNested("kubernetes"); + Container container = defaultKubernetesClient.createContainer(commandContext, commandRequest, kubernetesConfig, podName, commands, arguments); Container desiredContainer = new ContainerBuilder() .withName(podName) .withImage("test") .withCommand(commands) .withArgs(arguments) - .withResources(defaultKubernetesClient.toResourceRequirements(defaultKubernetesClient.getResourceLimits(commandContext, commandRequest), defaultKubernetesClient.getResourceRequests(commandContext, commandRequest))) + .withResources(null) .withVolumeMounts(Arrays.asList(new VolumeMountBuilder().withName("test").withMountPath("/test-ebs").build())).build(); assertThat(container, is(desiredContainer)); @@ -115,7 +118,8 @@ public void testCreatePodSPec() DefaultKubernetesClient defaultKubernetesClient = new DefaultKubernetesClient(kubernetesClientConfig, k8sDefaultKubernetesClient); Container container = mock(Container.class); - PodSpec podSpec = defaultKubernetesClient.createPodSpec(commandContext, commandRequest, container); + final Config kubernetesConfig = taskRequest.getConfig().getNested("kubernetes"); + PodSpec podSpec = defaultKubernetesClient.createPodSpec(commandContext, commandRequest, kubernetesConfig, container); PodSpec desiredPodSpec = new PodSpecBuilder() .addToContainers(container) From c868c0da050231645c32f0203f5f0620a8f293cb Mon Sep 17 00:00:00 2001 From: TrsNium Date: Wed, 29 Jan 2020 17:08:30 +0900 Subject: [PATCH 05/12] reformat code --- .../kubernetes/DefaultKubernetesClient.java | 14 ++---- .../DefaultKubernetesClientTest.java | 44 +++++++------------ 2 files changed, 20 insertions(+), 38 deletions(-) diff --git a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java index a1984775e4..f5f91066c9 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java +++ b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java @@ -1,14 +1,15 @@ package io.digdag.standards.command.kubernetes; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.CharStreams; -import io.digdag.core.storage.StorageManager; import io.digdag.client.config.Config; -import io.digdag.client.config.ConfigException; import io.digdag.spi.CommandContext; import io.digdag.spi.CommandRequest; import io.digdag.spi.TaskRequest; +import io.fabric8.kubernetes.api.model.Affinity; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerBuilder; import io.fabric8.kubernetes.api.model.ContainerStatus; @@ -16,26 +17,19 @@ import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.PodSpecBuilder; -import io.fabric8.kubernetes.api.model.Affinity; import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.fabric8.kubernetes.api.model.Toleration; import io.fabric8.kubernetes.api.model.Volume; import io.fabric8.kubernetes.api.model.VolumeMount; -import io.fabric8.kubernetes.api.model.Quantity; -import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.utils.Serialization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.JsonNode; - -import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.Reader; -import java.util.List; import java.util.ArrayList; +import java.util.List; import java.util.Map; public class DefaultKubernetesClient diff --git a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java index e32f4443bb..c950022acc 100644 --- a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java +++ b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java @@ -1,47 +1,35 @@ package io.digdag.standards.command.kubernetes; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - +import io.digdag.client.config.Config; import io.digdag.spi.CommandContext; import io.digdag.spi.CommandRequest; -import static io.digdag.client.config.ConfigUtils.newConfig; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static io.digdag.core.workflow.OperatorTestingUtils.newContext; - -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - +import io.digdag.spi.TaskRequest; +import io.fabric8.kubernetes.api.model.AffinityBuilder; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerBuilder; -import io.fabric8.kubernetes.api.model.PodSpec; -import io.fabric8.kubernetes.api.model.PodSpecBuilder; -import io.fabric8.kubernetes.api.model.VolumeMountBuilder; -import io.fabric8.kubernetes.api.model.AffinityBuilder; import io.fabric8.kubernetes.api.model.NodeAffinityBuilder; import io.fabric8.kubernetes.api.model.NodeSelectorBuilder; -import io.fabric8.kubernetes.api.model.NodeSelectorTermBuilder; import io.fabric8.kubernetes.api.model.NodeSelectorRequirementBuilder; -import io.fabric8.kubernetes.api.model.Toleration; -import io.fabric8.kubernetes.api.model.Volume; -import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.NodeSelectorTermBuilder; +import io.fabric8.kubernetes.api.model.PodSpec; +import io.fabric8.kubernetes.api.model.PodSpecBuilder; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.runners.MockitoJUnitRunner; -import io.digdag.spi.TaskRequest; -import io.digdag.client.config.Config; -import static io.digdag.core.workflow.OperatorTestingUtils.newTaskRequest; -import static io.digdag.core.workflow.OperatorTestingUtils.newContext; - -import java.util.List; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static io.digdag.client.config.ConfigUtils.newConfig; +import static io.digdag.core.workflow.OperatorTestingUtils.newTaskRequest; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class DefaultKubernetesClientTest From 2e8c9daf4dbc48f3a844e2082c9a83818cdeb3bc Mon Sep 17 00:00:00 2001 From: TrsNium Date: Wed, 29 Jan 2020 17:13:38 +0900 Subject: [PATCH 06/12] remove diff --- .../standards/command/kubernetes/DefaultKubernetesClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java index f5f91066c9..0a4c4b1bc1 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java +++ b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java @@ -70,7 +70,6 @@ public Pod runPod(final CommandContext context, final CommandRequest request, .endMetadata() .withSpec(podSpec) .done(); - return Pod.of(pod); } From a30e4d89d35e0511bef400aac062be2647476413 Mon Sep 17 00:00:00 2001 From: TrsNium Date: Thu, 30 Jan 2020 14:04:13 +0900 Subject: [PATCH 07/12] convert to airflow style and update test --- .../kubernetes/DefaultKubernetesClient.java | 59 +++-- .../DefaultKubernetesClientTest.java | 235 ++++++++++++++---- 2 files changed, 218 insertions(+), 76 deletions(-) diff --git a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java index 0a4c4b1bc1..eb83ed8037 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java +++ b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java @@ -21,6 +21,7 @@ import io.fabric8.kubernetes.api.model.Toleration; import io.fabric8.kubernetes.api.model.Volume; import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.utils.Serialization; import org.slf4j.Logger; @@ -58,9 +59,12 @@ public Pod runPod(final CommandContext context, final CommandRequest request, final String name, final List commands, final List arguments) { final TaskRequest taskRequest = context.getTaskRequest(); - final Config kubernetesConfig = taskRequest.getConfig().getNested("kubernetes"); - final Container container = createContainer(context, request, kubernetesConfig, name, commands, arguments); - final PodSpec podSpec = createPodSpec(context, request, kubernetesConfig, container); + final Config kubernetesConfig = taskRequest.getConfig().get("kubernetes", Config.class); + Config kubernetesPodConfig = null; + if (kubernetesConfig != null && kubernetesConfig.has("Pod")) kubernetesPodConfig = kubernetesConfig.get("Pod", Config.class); + + final Container container = createContainer(context, request, kubernetesPodConfig, name, commands, arguments); + final PodSpec podSpec = createPodSpec(context, request, kubernetesPodConfig, container); io.fabric8.kubernetes.api.model.Pod pod = client.pods() .createNew() .withNewMetadata() @@ -132,17 +136,14 @@ protected Map getPodLabels() @VisibleForTesting Container createContainer(final CommandContext context, final CommandRequest request, - final Config kubernetesConfig, final String name, final List commands, final List arguments) + final Config kubernetesPodConfig, final String name, final List commands, final List arguments) { - Config kubernetesContainerConfig = null; - if (kubernetesConfig.has("container")) kubernetesContainerConfig = kubernetesConfig.getNested("container"); - Container container = new ContainerBuilder() .withName(name) .withImage(getContainerImage(context, request)) .withEnv(toEnvVars(getEnvironments(context, request))) - .withResources(getResources(kubernetesContainerConfig)) - .withVolumeMounts(getVolumeMounts(kubernetesContainerConfig)) + .withResources(getResources(kubernetesPodConfig)) + .withVolumeMounts(getVolumeMounts(kubernetesPodConfig)) .withCommand(commands) .withArgs(arguments) .build(); @@ -151,19 +152,17 @@ Container createContainer(final CommandContext context, final CommandRequest req @VisibleForTesting PodSpec createPodSpec(final CommandContext context, final CommandRequest request, - final Config kubernetesConfig, final Container container) + final Config kubernetesPodConfig, final Container container) { // TODO // Revisit what values should be extracted as config params or system config params - Config kubernetesPodSpecConfig = null; - if (kubernetesConfig.has("spec")) kubernetesPodSpecConfig = kubernetesConfig.getNested("spec"); PodSpec podSpec = new PodSpecBuilder() //.withHostNetwork(true); //.withDnsPolicy("ClusterFirstWithHostNet"); .addToContainers(container) - .withAffinity(getAffinity(kubernetesPodSpecConfig)) - .withTolerations(getTolerations(kubernetesPodSpecConfig)) - .withVolumes(getVolumes(kubernetesPodSpecConfig)) + .withAffinity(getAffinity(kubernetesPodConfig)) + .withTolerations(getTolerations(kubernetesPodConfig)) + .withVolumes(getVolumes(kubernetesPodConfig)) // TODO extract as config parameter // Restart policy is "Never" by default since it needs to avoid executing the operator multiple times. It might not // make the script idempotent. @@ -173,45 +172,45 @@ PodSpec createPodSpec(final CommandContext context, final CommandRequest request return podSpec; } - protected ResourceRequirements getResources(Config kubernetesContainerConfig) { - if (kubernetesContainerConfig != null && kubernetesContainerConfig.has("resources")) { - final JsonNode resourcesNode = kubernetesContainerConfig.getInternalObjectNode().get("resources"); + protected ResourceRequirements getResources(Config kubernetesPodConfig) { + if (kubernetesPodConfig != null && kubernetesPodConfig.has("resources")) { + final JsonNode resourcesNode = kubernetesPodConfig.getInternalObjectNode().get("resources"); return Serialization.unmarshal(resourcesNode.toString(), ResourceRequirements.class); } else { return null; } } - protected List getVolumeMounts(Config kubernetesContainerConfig) { - if (kubernetesContainerConfig != null && kubernetesContainerConfig.has("volumeMounts")) { - final JsonNode volumeMountsNode = kubernetesContainerConfig.getInternalObjectNode().get("volumeMounts"); + protected List getVolumeMounts(Config kubernetesPodConfig) { + if (kubernetesPodConfig != null && kubernetesPodConfig.has("volumeMounts")) { + final JsonNode volumeMountsNode = kubernetesPodConfig.getInternalObjectNode().get("volumeMounts"); return convertToResourceList(volumeMountsNode, VolumeMount.class); } else { return null; } } - protected Affinity getAffinity(Config kubernetesPodSpecConfig) { - if (kubernetesPodSpecConfig != null && kubernetesPodSpecConfig.has("affinity")) { - final JsonNode affinityNode = kubernetesPodSpecConfig.getInternalObjectNode().get("affinity"); + protected Affinity getAffinity(Config kubernetesPodConfig) { + if (kubernetesPodConfig != null && kubernetesPodConfig.has("affinity")) { + final JsonNode affinityNode = kubernetesPodConfig.getInternalObjectNode().get("affinity"); return Serialization.unmarshal(affinityNode.toString(), Affinity.class); } else { return null; } } - protected List getTolerations(Config kubernetesPodSpecConfig) { - if (kubernetesPodSpecConfig != null && kubernetesPodSpecConfig.has("tolerations")) { - final JsonNode tolerationsNode = kubernetesPodSpecConfig.getInternalObjectNode().get("tolerations"); + protected List getTolerations(Config kubernetesPodConfig) { + if (kubernetesPodConfig != null && kubernetesPodConfig.has("tolerations")) { + final JsonNode tolerationsNode = kubernetesPodConfig.getInternalObjectNode().get("tolerations"); return convertToResourceList(tolerationsNode, Toleration.class); } else { return null; } } - protected List getVolumes(Config kubernetesPodSpecConfig) { - if (kubernetesPodSpecConfig != null && kubernetesPodSpecConfig.has("volumes")) { - final JsonNode volumesNode = kubernetesPodSpecConfig.getInternalObjectNode().get("volumes"); + protected List getVolumes(Config kubernetesPodConfig) { + if (kubernetesPodConfig != null && kubernetesPodConfig.has("volumes")) { + final JsonNode volumesNode = kubernetesPodConfig.getInternalObjectNode().get("volumes"); return convertToResourceList(volumesNode, Volume.class); } else { return null; diff --git a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java index c950022acc..9419095ccd 100644 --- a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java +++ b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java @@ -14,7 +14,15 @@ import io.fabric8.kubernetes.api.model.NodeSelectorTermBuilder; import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.PodSpecBuilder; +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; +import io.fabric8.kubernetes.api.model.Toleration; +import io.fabric8.kubernetes.api.model.TolerationBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMount; import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,6 +47,7 @@ public class DefaultKubernetesClientTest private io.fabric8.kubernetes.client.DefaultKubernetesClient k8sDefaultKubernetesClient; private CommandContext commandContext; private CommandRequest commandRequest; + private Config testKubernetesConfig; @Before public void setUp() @@ -48,36 +57,156 @@ public void setUp() k8sDefaultKubernetesClient = mock(io.fabric8.kubernetes.client.DefaultKubernetesClient.class); commandContext = mock(CommandContext.class); commandRequest = mock(CommandRequest.class); + + /* + kubernetes: + Pod: + volumeMounts: + - mountPath: "/test-ebs" + name: "test" + - mountPath: "/test-ebs" + name: "test2" + volumes: + - name: test + emptyDir: {} + - name: test2 + emptyDir: {} + resources: + limits: + memory: 200Mi + requests: + memory: 100Mi + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution + nodeSelectorTerms: + - matchExpressions + - key: test + operator: In + values: + - test + tolerations: + - key: test + operator: Exists + effect: NoSchedule + - key: test2 + operator: Exists + effect: NoSchedule + PersistentVolumeClaim: + accessModes: + - ReadWriteOnce + volumeMode: Block + resources: + requests: + storage: 10Gi + PersistentVolume: + capacity: + storage: 10Gi + accessModes: + - ReadWriteOnce + volumeMode: "Block" + persistentVolumeReclaimPolicy: "ReadWriteOnce" + fc: + targetWWNs: ["50060e801049cfd1"] + lun: 0 + readOnly: false + */ + testKubernetesConfig = newConfig() + .set("Pod", newConfig() + .set("volumeMounts", ImmutableList.of( + newConfig().set("mountPath", "/test-ebs").set("name", "test"), + newConfig().set("mountPath", "/test-ebs2").set("name", "test2"))) + .set("volumes", ImmutableList.of( + newConfig().set("name", "test").set("emptyDir", newConfig()), + newConfig().set("name", "test2").set("emptyDir", newConfig()))) + .set("resources", newConfig() + .set("limits", newConfig().set("memory", "200Mi")) + .set("requests", newConfig().set("memory", "100Mi"))) + .set("affinity", newConfig() + .set("nodeAffinity", newConfig() + .set("requiredDuringSchedulingIgnoredDuringExecution", newConfig() + .set("nodeSelectorTerms", ImmutableList.of( + newConfig().set("matchExpressions", ImmutableList.of( + newConfig().set("key", "test1").set("operator", "In").set("values", ImmutableList.of("test1"))))))))) + .set("tolerations", ImmutableList.of( + newConfig().set("key", "test").set("operator", "Exists").set("effect", "NoSchedule"), + newConfig().set("key", "test2").set("operator", "Exists").set("effect", "NoSchedule")))) + .set("PersistentVolumeClaim", newConfig() + .set("accessModes", ImmutableList.of("ReadWriteOnce")) + .set("volumeMode", "Block") + .set("resources", newConfig().set("requests", newConfig().set("storage", "10Gi")))) + .set("PersistentVolume", newConfig() + .set("capacity", newConfig().set("storage", "10Gi")) + .set("accessModes", ImmutableList.of("ReadWriteOnce")) + .set("volumeMode", "Block") + .set("persistentVolumeReclaimPolicy", "Retain") + .set("fc", newConfig() + .set("targetWWNs", ImmutableList.of("50060e801049cfd1")) + .set("lun", "0") + .set("readOnly", "false"))); + } @Test public void testCreateContainer() throws Exception { + final Config taskRequestConfig = newConfig().set("docker", newConfig().set("image", "test")); + + final TaskRequest taskRequest = newTaskRequest().withConfig(taskRequestConfig); + when(commandContext.getTaskRequest()).thenReturn(taskRequest); + + String podName = "test"; + List commands = new ArrayList<>(); + List arguments = new ArrayList<>(); + + DefaultKubernetesClient defaultKubernetesClient = new DefaultKubernetesClient(kubernetesClientConfig, k8sDefaultKubernetesClient); + Container container = defaultKubernetesClient.createContainer(commandContext, commandRequest, null, podName, commands, arguments); + + Container desiredContainer = new ContainerBuilder() + .withName(podName) + .withImage("test") + .withCommand(commands) + .withArgs(arguments) + .withResources(null) + .withVolumeMounts((List) null) + .build(); + + assertThat(container, is(desiredContainer)); + } + + @Test + public void testCreateContainerWithKubernetesConfig() + throws Exception + { + final Config kubernetesPodConfig = testKubernetesConfig.get("Pod", Config.class); final Config taskRequestConfig = newConfig() - .set("kubernetes", newConfig().set( - "container", newConfig() - .set("volumeMounts", ImmutableList.of( - newConfig().set("mountPath", "/test-ebs").set("name", "test"))))) + .set("kubernetes", testKubernetesConfig) .set("docker", newConfig().set("image", "test")); final TaskRequest taskRequest = newTaskRequest().withConfig(taskRequestConfig); when(commandContext.getTaskRequest()).thenReturn(taskRequest); - DefaultKubernetesClient defaultKubernetesClient = new DefaultKubernetesClient(kubernetesClientConfig, k8sDefaultKubernetesClient); String podName = "test"; List commands = new ArrayList<>(); List arguments = new ArrayList<>(); - final Config kubernetesConfig = taskRequest.getConfig().getNested("kubernetes"); - Container container = defaultKubernetesClient.createContainer(commandContext, commandRequest, kubernetesConfig, podName, commands, arguments); + + DefaultKubernetesClient defaultKubernetesClient = new DefaultKubernetesClient(kubernetesClientConfig, k8sDefaultKubernetesClient); + Container container = defaultKubernetesClient.createContainer(commandContext, commandRequest, kubernetesPodConfig, podName, commands, arguments); Container desiredContainer = new ContainerBuilder() - .withName(podName) - .withImage("test") - .withCommand(commands) - .withArgs(arguments) - .withResources(null) - .withVolumeMounts(Arrays.asList(new VolumeMountBuilder().withName("test").withMountPath("/test-ebs").build())).build(); + .withName(podName) + .withImage("test") + .withCommand(commands) + .withArgs(arguments) + .withResources(new ResourceRequirementsBuilder() + .addToLimits("memory", new Quantity("200Mi")) + .addToRequests("memory", new Quantity("100Mi")) + .build()) + .withVolumeMounts(Arrays.asList( + new VolumeMountBuilder().withName("test").withMountPath("/test-ebs").build(), + new VolumeMountBuilder().withName("test2").withMountPath("/test-ebs2").build())) + .build(); assertThat(container, is(desiredContainer)); } @@ -86,51 +215,65 @@ public void testCreateContainer() public void testCreatePodSPec() throws Exception { - final Config taskRequestConfig = newConfig() - .set("kubernetes", newConfig().set( - "spec", newConfig().set( - "affinity", newConfig().set( - "nodeAffinity", newConfig().set( - "requiredDuringSchedulingIgnoredDuringExecution", newConfig().set( - "nodeSelectorTerms", ImmutableList.of( - newConfig().set( - "matchExpressions", ImmutableList.of( - newConfig().set("key", "failure-domain.beta.kubernetes.io/zone") - .set("operator", "In") - .set("values", ImmutableList.of( - "asia-northeast1-a" - ))))))))))); + final Config taskRequestConfig = newConfig().set("docker", newConfig().set("image", "test")); final TaskRequest taskRequest = newTaskRequest().withConfig(taskRequestConfig); when(commandContext.getTaskRequest()).thenReturn(taskRequest); + Container container = mock(Container.class); + DefaultKubernetesClient defaultKubernetesClient = new DefaultKubernetesClient(kubernetesClientConfig, k8sDefaultKubernetesClient); + PodSpec podSpec = defaultKubernetesClient.createPodSpec(commandContext, commandRequest, null, container); + + PodSpec desiredPodSpec = new PodSpecBuilder() + .addToContainers(container) + .withRestartPolicy("Never") + .withAffinity(null) + .withTolerations((List)null) + .withVolumes((List)null) + .build(); + + assertThat(podSpec, is(desiredPodSpec)); + } + @Test + public void testCreatePodSPecWithKubernetesConfig() + throws Exception + { + final Config kubernetesPodConfig = testKubernetesConfig.get("Pod", Config.class); + final Config taskRequestConfig = newConfig() + .set("kubernetes", testKubernetesConfig) + .set("docker", newConfig().set("image", "test")); + + final TaskRequest taskRequest = newTaskRequest().withConfig(taskRequestConfig); + when(commandContext.getTaskRequest()).thenReturn(taskRequest); Container container = mock(Container.class); - final Config kubernetesConfig = taskRequest.getConfig().getNested("kubernetes"); - PodSpec podSpec = defaultKubernetesClient.createPodSpec(commandContext, commandRequest, kubernetesConfig, container); + + DefaultKubernetesClient defaultKubernetesClient = new DefaultKubernetesClient(kubernetesClientConfig, k8sDefaultKubernetesClient); + PodSpec podSpec = defaultKubernetesClient.createPodSpec(commandContext, commandRequest, kubernetesPodConfig, container); PodSpec desiredPodSpec = new PodSpecBuilder() .addToContainers(container) .withRestartPolicy("Never") .withAffinity(new AffinityBuilder() .withNodeAffinity(new NodeAffinityBuilder() - .withRequiredDuringSchedulingIgnoredDuringExecution(new NodeSelectorBuilder() - .addToNodeSelectorTerms(0, - new NodeSelectorTermBuilder().addToMatchExpressions(0, - new NodeSelectorRequirementBuilder() - .withKey("failure-domain.beta.kubernetes.io/zone") - .addToValues(0,"asia-northeast1-a") - .withOperator("In") - .build() - ) - .build() - ) - .build() - ) - .build() - ) - .build() - ) + .withRequiredDuringSchedulingIgnoredDuringExecution(new NodeSelectorBuilder() + .addToNodeSelectorTerms(0, + new NodeSelectorTermBuilder() + .addToMatchExpressions(0, new NodeSelectorRequirementBuilder() + .withKey("test1") + .addToValues(0, "test1") + .withOperator("In") + .build() + ).build() + ).build() + ).build() + ).build()) + .withTolerations(Arrays.asList( + new TolerationBuilder().withKey("test").withOperator("Exists").withEffect("NoSchedule").build(), + new TolerationBuilder().withKey("test2").withOperator("Exists").withEffect("NoSchedule").build())) + .withVolumes(Arrays.asList( + new VolumeBuilder().withName("test").withEmptyDir(new EmptyDirVolumeSource()).build(), + new VolumeBuilder().withName("test2").withEmptyDir(new EmptyDirVolumeSource()).build())) .build(); assertThat(podSpec, is(desiredPodSpec)); From 5ecaba312da8b5f4d770c8f10fcfcdcc52fa1918 Mon Sep 17 00:00:00 2001 From: TrsNium Date: Thu, 30 Jan 2020 15:56:05 +0900 Subject: [PATCH 08/12] remove diff --- .../DefaultKubernetesClientTest.java | 34 +------------------ 1 file changed, 1 insertion(+), 33 deletions(-) diff --git a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java index 9419095ccd..906aecc440 100644 --- a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java +++ b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java @@ -92,24 +92,6 @@ public void setUp() - key: test2 operator: Exists effect: NoSchedule - PersistentVolumeClaim: - accessModes: - - ReadWriteOnce - volumeMode: Block - resources: - requests: - storage: 10Gi - PersistentVolume: - capacity: - storage: 10Gi - accessModes: - - ReadWriteOnce - volumeMode: "Block" - persistentVolumeReclaimPolicy: "ReadWriteOnce" - fc: - targetWWNs: ["50060e801049cfd1"] - lun: 0 - readOnly: false */ testKubernetesConfig = newConfig() .set("Pod", newConfig() @@ -130,21 +112,7 @@ public void setUp() newConfig().set("key", "test1").set("operator", "In").set("values", ImmutableList.of("test1"))))))))) .set("tolerations", ImmutableList.of( newConfig().set("key", "test").set("operator", "Exists").set("effect", "NoSchedule"), - newConfig().set("key", "test2").set("operator", "Exists").set("effect", "NoSchedule")))) - .set("PersistentVolumeClaim", newConfig() - .set("accessModes", ImmutableList.of("ReadWriteOnce")) - .set("volumeMode", "Block") - .set("resources", newConfig().set("requests", newConfig().set("storage", "10Gi")))) - .set("PersistentVolume", newConfig() - .set("capacity", newConfig().set("storage", "10Gi")) - .set("accessModes", ImmutableList.of("ReadWriteOnce")) - .set("volumeMode", "Block") - .set("persistentVolumeReclaimPolicy", "Retain") - .set("fc", newConfig() - .set("targetWWNs", ImmutableList.of("50060e801049cfd1")) - .set("lun", "0") - .set("readOnly", "false"))); - + newConfig().set("key", "test2").set("operator", "Exists").set("effect", "NoSchedule")))); } @Test From 44ff19425a48637f6a56b35b2d033ab9fb78d37d Mon Sep 17 00:00:00 2001 From: TrsNium Date: Thu, 30 Jan 2020 16:48:19 +0900 Subject: [PATCH 09/12] remove not used package --- .../standards/command/kubernetes/DefaultKubernetesClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java index eb83ed8037..d0a60b6ed1 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java +++ b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java @@ -21,7 +21,6 @@ import io.fabric8.kubernetes.api.model.Toleration; import io.fabric8.kubernetes.api.model.Volume; import io.fabric8.kubernetes.api.model.VolumeMount; -import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.utils.Serialization; import org.slf4j.Logger; From f30b9ac703da53644cec80b7fdd0059324f0e1bb Mon Sep 17 00:00:00 2001 From: TrsNium Date: Thu, 30 Jan 2020 16:53:30 +0900 Subject: [PATCH 10/12] update test config --- .../DefaultKubernetesClientTest.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java index 906aecc440..775e5108cc 100644 --- a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java +++ b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java @@ -92,6 +92,24 @@ public void setUp() - key: test2 operator: Exists effect: NoSchedule + PersistentVolumeClaim: + accessModes: + - ReadWriteOnce + volumeMode: Block + resources: + requests: + storage: 10Gi + PersistentVolume: + capacity: + storage: 10Gi + accessModes: + - ReadWriteOnce + volumeMode: "Block" + persistentVolumeReclaimPolicy: "ReadWriteOnce" + fc: + targetWWNs: ["50060e801049cfd1"] + lun: 0 + readOnly: false */ testKubernetesConfig = newConfig() .set("Pod", newConfig() @@ -112,7 +130,20 @@ public void setUp() newConfig().set("key", "test1").set("operator", "In").set("values", ImmutableList.of("test1"))))))))) .set("tolerations", ImmutableList.of( newConfig().set("key", "test").set("operator", "Exists").set("effect", "NoSchedule"), - newConfig().set("key", "test2").set("operator", "Exists").set("effect", "NoSchedule")))); + newConfig().set("key", "test2").set("operator", "Exists").set("effect", "NoSchedule")))) + .set("PersistentVolumeClaim", newConfig() + .set("accessModes", ImmutableList.of("ReadWriteOnce")) + .set("volumeMode", "Block") + .set("resources", newConfig().set("requests", newConfig().set("storage", "10Gi")))) + .set("PersistentVolume", newConfig() + .set("capacity", newConfig().set("storage", "10Gi")) + .set("accessModes", ImmutableList.of("ReadWriteOnce")) + .set("volumeMode", "Block") + .set("persistentVolumeReclaimPolicy", "Retain") + .set("fc", newConfig() + .set("targetWWNs", ImmutableList.of("50060e801049cfd1")) + .set("lun", "0") + .set("readOnly", "false"))); } @Test From dd45c3f356b5c076bc18d190685901cb89678a62 Mon Sep 17 00:00:00 2001 From: TrsNium Date: Thu, 30 Jan 2020 19:44:11 +0900 Subject: [PATCH 11/12] add pv, pvc resource --- .../kubernetes/DefaultKubernetesClient.java | 64 +++++- .../DefaultKubernetesClientTest.java | 191 +++++++++++------- 2 files changed, 177 insertions(+), 78 deletions(-) diff --git a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java index d0a60b6ed1..50a6b6adaf 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java +++ b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClient.java @@ -21,6 +21,10 @@ import io.fabric8.kubernetes.api.model.Toleration; import io.fabric8.kubernetes.api.model.Volume; import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.PersistentVolume; +import io.fabric8.kubernetes.api.model.PersistentVolumeSpec; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaim; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaimSpec; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.utils.Serialization; import org.slf4j.Logger; @@ -57,11 +61,11 @@ public KubernetesClientConfig getConfig() public Pod runPod(final CommandContext context, final CommandRequest request, final String name, final List commands, final List arguments) { - final TaskRequest taskRequest = context.getTaskRequest(); - final Config kubernetesConfig = taskRequest.getConfig().get("kubernetes", Config.class); - Config kubernetesPodConfig = null; - if (kubernetesConfig != null && kubernetesConfig.has("Pod")) kubernetesPodConfig = kubernetesConfig.get("Pod", Config.class); + // If PersistentVolume or PersistentVolumeClaim is set, create PersistentVolume or PersistentVolumeClaim before making pod. + createPersistentVolume(context); + createPersistentVolumeClaim(context); + final Config kubernetesPodConfig = extractTargetKindConfig(context, "Pod"); final Container container = createContainer(context, request, kubernetesPodConfig, name, commands, arguments); final PodSpec podSpec = createPodSpec(context, request, kubernetesPodConfig, container); io.fabric8.kubernetes.api.model.Pod pod = client.pods() @@ -171,6 +175,58 @@ PodSpec createPodSpec(final CommandContext context, final CommandRequest request return podSpec; } + protected PersistentVolume createPersistentVolume(final CommandContext context) + { + final Config kubernetesPvConfig = extractTargetKindConfig(context, "PersistentVolume"); + if (kubernetesPvConfig != null && kubernetesPvConfig.has("spec")) + return client.persistentVolumes() + .createOrReplaceWithNew() + .withNewMetadata() + .withName(kubernetesPvConfig.get("name", String.class)) + .withNamespace(client.getNamespace()) + .endMetadata() + .withSpec(getPersistentVolume(kubernetesPvConfig.get("spec", Config.class))) + .done(); + else + return null; + } + + protected PersistentVolumeClaim createPersistentVolumeClaim(final CommandContext context) + { + final Config kubernetesPvcConfig = extractTargetKindConfig(context, "PersistentVolumeClaim"); + if (kubernetesPvcConfig != null && kubernetesPvcConfig.has("spec")) + return client.persistentVolumeClaims() + .createOrReplaceWithNew() + .withNewMetadata() + .withName(kubernetesPvcConfig.get("name", String.class)) + .withNamespace(client.getNamespace()) + .endMetadata() + .withSpec(getPersistentVolumeClaim(kubernetesPvcConfig.get("spec", Config.class))) + .done(); + else + return null; + } + + protected Config extractTargetKindConfig(final CommandContext context, final String kind) { + final TaskRequest taskRequest = context.getTaskRequest(); + final Config kubernetesConfig = taskRequest.getConfig().get("kubernetes", Config.class); + Config kubernetesTargetKindConfig = null; + if (kubernetesConfig != null && kubernetesConfig.has(kind)) kubernetesTargetKindConfig = kubernetesConfig.get(kind, Config.class); + return kubernetesTargetKindConfig; + } + + @VisibleForTesting + PersistentVolumeSpec getPersistentVolume(Config kubernetesPvSpecConfig) { + final JsonNode persistentVolumeSpecNode = kubernetesPvSpecConfig.getInternalObjectNode(); + return Serialization.unmarshal(persistentVolumeSpecNode.toString(), PersistentVolumeSpec.class); + } + + @VisibleForTesting + PersistentVolumeClaimSpec getPersistentVolumeClaim(Config kubernetesPvcSpecConfig) { + final JsonNode persistentVolumeClaimSpecNode = kubernetesPvcSpecConfig.getInternalObjectNode(); + return Serialization.unmarshal(persistentVolumeClaimSpecNode.toString(), PersistentVolumeClaimSpec.class); + } + protected ResourceRequirements getResources(Config kubernetesPodConfig) { if (kubernetesPodConfig != null && kubernetesPodConfig.has("resources")) { final JsonNode resourcesNode = kubernetesPodConfig.getInternalObjectNode().get("resources"); diff --git a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java index 775e5108cc..a3326c28f8 100644 --- a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java +++ b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java @@ -22,6 +22,14 @@ import io.fabric8.kubernetes.api.model.VolumeBuilder; import io.fabric8.kubernetes.api.model.VolumeMount; import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import io.fabric8.kubernetes.api.model.PersistentVolume; +import io.fabric8.kubernetes.api.model.PersistentVolumeBuilder; +import io.fabric8.kubernetes.api.model.PersistentVolumeSpec; +import io.fabric8.kubernetes.api.model.PersistentVolumeSpecBuilder; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaim; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaimBuilder; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaimSpec; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaimSpecBuilder; import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource; import org.junit.Before; import org.junit.Test; @@ -30,6 +38,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static io.digdag.client.config.ConfigUtils.newConfig; @@ -95,55 +104,53 @@ public void setUp() PersistentVolumeClaim: accessModes: - ReadWriteOnce - volumeMode: Block resources: requests: storage: 10Gi + limits: + storage: 8Gi PersistentVolume: - capacity: - storage: 10Gi - accessModes: - - ReadWriteOnce - volumeMode: "Block" - persistentVolumeReclaimPolicy: "ReadWriteOnce" - fc: - targetWWNs: ["50060e801049cfd1"] - lun: 0 - readOnly: false + name: testPersistentVolume + spec: + capacity: + storage: 10Gi + accessModes: + - ReadWriteOnce + persistentVolumeReclaimPolicy: "ReadWriteOnce" */ testKubernetesConfig = newConfig() - .set("Pod", newConfig() - .set("volumeMounts", ImmutableList.of( - newConfig().set("mountPath", "/test-ebs").set("name", "test"), - newConfig().set("mountPath", "/test-ebs2").set("name", "test2"))) - .set("volumes", ImmutableList.of( - newConfig().set("name", "test").set("emptyDir", newConfig()), - newConfig().set("name", "test2").set("emptyDir", newConfig()))) - .set("resources", newConfig() - .set("limits", newConfig().set("memory", "200Mi")) - .set("requests", newConfig().set("memory", "100Mi"))) - .set("affinity", newConfig() - .set("nodeAffinity", newConfig() - .set("requiredDuringSchedulingIgnoredDuringExecution", newConfig() - .set("nodeSelectorTerms", ImmutableList.of( - newConfig().set("matchExpressions", ImmutableList.of( - newConfig().set("key", "test1").set("operator", "In").set("values", ImmutableList.of("test1"))))))))) - .set("tolerations", ImmutableList.of( - newConfig().set("key", "test").set("operator", "Exists").set("effect", "NoSchedule"), - newConfig().set("key", "test2").set("operator", "Exists").set("effect", "NoSchedule")))) - .set("PersistentVolumeClaim", newConfig() - .set("accessModes", ImmutableList.of("ReadWriteOnce")) - .set("volumeMode", "Block") - .set("resources", newConfig().set("requests", newConfig().set("storage", "10Gi")))) - .set("PersistentVolume", newConfig() - .set("capacity", newConfig().set("storage", "10Gi")) - .set("accessModes", ImmutableList.of("ReadWriteOnce")) - .set("volumeMode", "Block") - .set("persistentVolumeReclaimPolicy", "Retain") - .set("fc", newConfig() - .set("targetWWNs", ImmutableList.of("50060e801049cfd1")) - .set("lun", "0") - .set("readOnly", "false"))); + .set("Pod", newConfig() + .set("volumeMounts", ImmutableList.of( + newConfig().set("mountPath", "/test-ebs").set("name", "test"), + newConfig().set("mountPath", "/test-ebs2").set("name", "test2"))) + .set("volumes", ImmutableList.of( + newConfig().set("name", "test").set("emptyDir", newConfig()), + newConfig().set("name", "test2").set("emptyDir", newConfig()))) + .set("resources", newConfig() + .set("limits", newConfig().set("memory", "200Mi")) + .set("requests", newConfig().set("memory", "100Mi"))) + .set("affinity", newConfig() + .set("nodeAffinity", newConfig() + .set("requiredDuringSchedulingIgnoredDuringExecution", newConfig() + .set("nodeSelectorTerms", ImmutableList.of( + newConfig().set("matchExpressions", ImmutableList.of( + newConfig().set("key", "test1").set("operator", "In").set("values", ImmutableList.of("test1"))))))))) + .set("tolerations", ImmutableList.of( + newConfig().set("key", "test").set("operator", "Exists").set("effect", "NoSchedule"), + newConfig().set("key", "test2").set("operator", "Exists").set("effect", "NoSchedule")))) + .set("PersistentVolumeClaim", newConfig() + .set("name", "testPersistentVolumeClaim") + .set("spec", newConfig() + .set("accessModes", ImmutableList.of("ReadWriteOnce")) + .set("resources", newConfig() + .set("requests", newConfig().set("storage", "10Gi")) + .set("limits", newConfig().set("storage", "8Gi"))))) + .set("PersistentVolume", newConfig() + .set("name", "testPersistentVolume") + .set("spec", newConfig() + .set("capacity", newConfig().set("storage", "10Gi")) + .set("accessModes", ImmutableList.of("ReadWriteOnce")) + .set("persistentVolumeReclaimPolicy", "Retain"))); } @Test @@ -180,8 +187,8 @@ public void testCreateContainerWithKubernetesConfig() { final Config kubernetesPodConfig = testKubernetesConfig.get("Pod", Config.class); final Config taskRequestConfig = newConfig() - .set("kubernetes", testKubernetesConfig) - .set("docker", newConfig().set("image", "test")); + .set("kubernetes", testKubernetesConfig) + .set("docker", newConfig().set("image", "test")); final TaskRequest taskRequest = newTaskRequest().withConfig(taskRequestConfig); when(commandContext.getTaskRequest()).thenReturn(taskRequest); @@ -194,18 +201,18 @@ public void testCreateContainerWithKubernetesConfig() Container container = defaultKubernetesClient.createContainer(commandContext, commandRequest, kubernetesPodConfig, podName, commands, arguments); Container desiredContainer = new ContainerBuilder() - .withName(podName) - .withImage("test") - .withCommand(commands) - .withArgs(arguments) - .withResources(new ResourceRequirementsBuilder() - .addToLimits("memory", new Quantity("200Mi")) - .addToRequests("memory", new Quantity("100Mi")) - .build()) - .withVolumeMounts(Arrays.asList( - new VolumeMountBuilder().withName("test").withMountPath("/test-ebs").build(), - new VolumeMountBuilder().withName("test2").withMountPath("/test-ebs2").build())) - .build(); + .withName(podName) + .withImage("test") + .withCommand(commands) + .withArgs(arguments) + .withResources(new ResourceRequirementsBuilder() + .addToLimits("memory", new Quantity("200Mi")) + .addToRequests("memory", new Quantity("100Mi")) + .build()) + .withVolumeMounts(Arrays.asList( + new VolumeMountBuilder().withName("test").withMountPath("/test-ebs").build(), + new VolumeMountBuilder().withName("test2").withMountPath("/test-ebs2").build())) + .build(); assertThat(container, is(desiredContainer)); } @@ -240,8 +247,8 @@ public void testCreatePodSPecWithKubernetesConfig() { final Config kubernetesPodConfig = testKubernetesConfig.get("Pod", Config.class); final Config taskRequestConfig = newConfig() - .set("kubernetes", testKubernetesConfig) - .set("docker", newConfig().set("image", "test")); + .set("kubernetes", testKubernetesConfig) + .set("docker", newConfig().set("image", "test")); final TaskRequest taskRequest = newTaskRequest().withConfig(taskRequestConfig); when(commandContext.getTaskRequest()).thenReturn(taskRequest); @@ -255,26 +262,62 @@ public void testCreatePodSPecWithKubernetesConfig() .withRestartPolicy("Never") .withAffinity(new AffinityBuilder() .withNodeAffinity(new NodeAffinityBuilder() - .withRequiredDuringSchedulingIgnoredDuringExecution(new NodeSelectorBuilder() - .addToNodeSelectorTerms(0, - new NodeSelectorTermBuilder() - .addToMatchExpressions(0, new NodeSelectorRequirementBuilder() - .withKey("test1") - .addToValues(0, "test1") - .withOperator("In") - .build() - ).build() + .withRequiredDuringSchedulingIgnoredDuringExecution(new NodeSelectorBuilder() + .addToNodeSelectorTerms(0, + new NodeSelectorTermBuilder() + .addToMatchExpressions(0, new NodeSelectorRequirementBuilder() + .withKey("test1") + .addToValues(0, "test1") + .withOperator("In") + .build() ).build() ).build() + ).build() ).build()) - .withTolerations(Arrays.asList( - new TolerationBuilder().withKey("test").withOperator("Exists").withEffect("NoSchedule").build(), - new TolerationBuilder().withKey("test2").withOperator("Exists").withEffect("NoSchedule").build())) - .withVolumes(Arrays.asList( - new VolumeBuilder().withName("test").withEmptyDir(new EmptyDirVolumeSource()).build(), - new VolumeBuilder().withName("test2").withEmptyDir(new EmptyDirVolumeSource()).build())) + .withTolerations(Arrays.asList( + new TolerationBuilder().withKey("test").withOperator("Exists").withEffect("NoSchedule").build(), + new TolerationBuilder().withKey("test2").withOperator("Exists").withEffect("NoSchedule").build())) + .withVolumes(Arrays.asList( + new VolumeBuilder().withName("test").withEmptyDir(new EmptyDirVolumeSource()).build(), + new VolumeBuilder().withName("test2").withEmptyDir(new EmptyDirVolumeSource()).build())) .build(); assertThat(podSpec, is(desiredPodSpec)); } + + @Test + public void testGetPersistentVolume() + throws Exception + { + final Config persistentVolumeConfig = testKubernetesConfig.get("PersistentVolume", Config.class).get("spec", Config.class); + DefaultKubernetesClient defaultKubernetesClient = new DefaultKubernetesClient(kubernetesClientConfig, k8sDefaultKubernetesClient); + PersistentVolumeSpec persistentVolumeSpec = defaultKubernetesClient.getPersistentVolume(persistentVolumeConfig); + + PersistentVolumeSpec desiredPersistentVolumeSpec = new PersistentVolumeSpecBuilder() + .withCapacity(Collections.singletonMap("storage", new Quantity("10Gi"))) + .withAccessModes("ReadWriteOnce") + .withPersistentVolumeReclaimPolicy("Retain") + .build(); + + assertThat(persistentVolumeSpec, is(desiredPersistentVolumeSpec)); + } + + @Test + public void testGetPersistentVolumeClaim() + throws Exception + { + final Config persistentVolumeClaimConfig = testKubernetesConfig.get("PersistentVolumeClaim", Config.class).get("spec", Config.class); + DefaultKubernetesClient defaultKubernetesClient = new DefaultKubernetesClient(kubernetesClientConfig, k8sDefaultKubernetesClient); + PersistentVolumeClaimSpec persistentVolumeClaimSpec = defaultKubernetesClient.getPersistentVolumeClaim(persistentVolumeClaimConfig); + + PersistentVolumeClaimSpec desiredPersistentVolumeClaimSpec = new PersistentVolumeClaimSpecBuilder() + .withAccessModes("ReadWriteOnce") + .withResources(new ResourceRequirementsBuilder() + .addToRequests("storage", new Quantity("10Gi")) + .addToLimits("storage", new Quantity("8Gi")) + .build()) + .build(); + + assertThat(persistentVolumeClaimSpec, is(desiredPersistentVolumeClaimSpec)); + } } From 4d2bcd625f9d4c8c3a1d4513f762f05bfdf91ef7 Mon Sep 17 00:00:00 2001 From: TrsNium Date: Thu, 30 Jan 2020 19:47:02 +0900 Subject: [PATCH 12/12] fix yaml format --- .../kubernetes/DefaultKubernetesClientTest.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java index a3326c28f8..79f1774af8 100644 --- a/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java +++ b/digdag-standards/src/test/java/io/digdag/standards/command/kubernetes/DefaultKubernetesClientTest.java @@ -102,13 +102,15 @@ public void setUp() operator: Exists effect: NoSchedule PersistentVolumeClaim: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: 10Gi - limits: - storage: 8Gi + name: testPersistentVolumeClaim + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi + limits: + storage: 8Gi PersistentVolume: name: testPersistentVolume spec: