From e64a65b46df3cb173546cec0c7dafcd1d61bb711 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Tue, 17 Sep 2024 16:35:23 -0400 Subject: [PATCH] Added init container support. Init container can be used to set up data in the shared volume. --- docs/functions.rst | 17 ++++-- docs/spelling_wordlist.txt | 1 + docs_new/functions.md | 9 ++- pkg/function/kube_task_parallel.go | 59 ++++++++++++++------ pkg/function/kube_task_parallel_test.go | 73 +++++++++++++++++++++++++ 5 files changed, 134 insertions(+), 25 deletions(-) diff --git a/docs/functions.rst b/docs/functions.rst index 170bca6266..3d4db6dc48 100644 --- a/docs/functions.rst +++ b/docs/functions.rst @@ -160,6 +160,7 @@ It's similar to KubeTask, but allows using multiple images to move backup data. should export it to destination. The main difference between them is that phase outputs can only generated from the "output" container outputs. +The function also supports an optional init container to set up the volume contents. .. csv-table:: :header: "Argument", "Required", "Type", "Description" @@ -171,11 +172,15 @@ The main difference between them is that phase outputs can only generated from t `backgroundCommand`, Yes, `[]string`, command list to execute in "background" container `outputImage`, Yes, `string`, image to be used in "output" container `outputCommand`, Yes, `[]string`, command list to execute in "output" container + `initImage`, No, `string`, image to be used in init container of the pod + `initCommand`, No, `[]string`, command list to execute in init container of the pod `podOverride`, No, `map[string]interface{}`, specs to override default pod specs with `podAnnotations`, No, `map[string]string`, custom annotations for the temporary pod that gets created `podLabels`, No, `map[string]string`, custom labels for the temporary pod that gets created - `sharedVolumeMedium`, No, `string`, medium setting for shared volume. See https://kubernetes.io/docs/concepts/storage/volumes/#emptydir - `sharedVolumeSizeLimit`, No, `string`, sizeLimit setting for shared volume + `sharedVolumeMedium`, No, `string`, medium setting for shared volume. See the Kubernetes `documentation +`_. + `sharedVolumeSizeLimit`, No, `string`, sizeLimit setting for shared volume. See the Kubernetes `documentation +`_. `sharedVolumeDir`, No, `string`, directory to mount shared volume. Defaults to `/tmp` Example: @@ -198,12 +203,17 @@ Example: sharedVolumeMedium: Memory sharedVolumeSizeLimit: 1Gi sharedVolumeDir: /tmp/ + initImage: ubuntu + initCommand: + - bash + - -c + - | + mkfifo /tmp/pipe-file backgroundImage: ubuntu backgroundCommand: - bash - -c - | - mkfifo /tmp/pipe-file for i in {1..10} do echo $i @@ -214,7 +224,6 @@ Example: - bash - -c - | - while [ ! -e /tmp/pipe-file ]; do sleep 1; done cat /tmp/pipe-file ScaleWorkload diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index d4e1eeb191..2aef851130 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -65,3 +65,4 @@ kopia hostname emptyDir sizeLimit +init \ No newline at end of file diff --git a/docs_new/functions.md b/docs_new/functions.md index 89d466f1a6..40a1ddca83 100644 --- a/docs_new/functions.md +++ b/docs_new/functions.md @@ -137,6 +137,7 @@ It's similar to KubeTask, but allows using multiple images to move backup data. should export it to destination. The main difference between them is that phase outputs can only generated from the "output" container outputs. +The function also supports an optional init container to set up the volume contents. | Argument | Required | Type | Description | @@ -146,12 +147,14 @@ The main difference between them is that phase outputs can only generated from t | backgroundCommand | Yes | []string | command list to execute in "background" container | | outputImage | Yes | string | image to be used in "output" container | | outputCommand | Yes | []string | command list to execute in "output" container | + | initImage | No | string | image to be used in init container of the pod | + | initCommand | No | []string | command list to execute in init container of the pod | | podOverride | No | map[string]interface{} | specs to override default pod specs with | | podAnnotations | No | map[string]string | custom annotations for the temporary pod that gets created | | podLabels | No | map[string]string | custom labels for the temporary pod that gets created | - | sharedVolumeMedium | No | string | medium setting for shared volume, see https://kubernetes.io/docs/concepts/storage/volumes/#emptydir | - | sharedVolumeSizeLimit | No | string | sizeLimit setting for shared volume | - | sharedVolumeDir | No | string | directory to mount shared volume, defaults to `/tmp` | + | sharedVolumeMedium | No | string | medium setting for shared volume. See the Kubernetes [documentation](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir). | + | sharedVolumeSizeLimit | No | string | sizeLimit setting for shared volume. See the Kubernetes [documentation](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir). | + | sharedVolumeDir | No | string | directory to mount shared volume, defaults to `/tmp` | Example: diff --git a/pkg/function/kube_task_parallel.go b/pkg/function/kube_task_parallel.go index f4b3ff5597..9260febe7d 100644 --- a/pkg/function/kube_task_parallel.go +++ b/pkg/function/kube_task_parallel.go @@ -49,6 +49,8 @@ const ( KubeTaskParallelVolumeSizeLimitArg = "sharedVolumeSizeLimit" KubeTaskParallelSharedDirArg = "sharedVolumeDir" KubeTaskParallelPodOverrideArg = "podOverride" + KubeTaskParallelInitImageArg = "initImage" + KubeTaskParallelInitCommandArg = "initCommand" ) const ( @@ -71,6 +73,8 @@ type kubeTaskParallelFunc struct { backgroundCommand []string outputImage string outputCommand []string + initImage string + initCommand []string storageDir string storageMedium corev1.StorageMedium storageSizeLimit *resource.Quantity @@ -87,6 +91,26 @@ func (ktpf *kubeTaskParallelFunc) run( ctx context.Context, cli kubernetes.Interface, ) (map[string]interface{}, error) { + volumeMounts := []corev1.VolumeMount{ + { + Name: ktpSharedVolumeName, + MountPath: ktpf.storageDir, + }, + } + + var initContainers []corev1.Container + // If init image is specified + if ktpf.initImage != "" { + initContainers = []corev1.Container{ + { + Name: "init", + Image: ktpf.initImage, + Command: ktpf.initCommand, + VolumeMounts: volumeMounts, + }, + } + } + podSpec := corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyNever, Volumes: []corev1.Volume{ @@ -100,28 +124,19 @@ func (ktpf *kubeTaskParallelFunc) run( }, }, }, + InitContainers: initContainers, Containers: []corev1.Container{ { - Name: ktpOutputContainer, - Image: ktpf.outputImage, - Command: ktpf.outputCommand, - VolumeMounts: []corev1.VolumeMount{ - { - Name: ktpSharedVolumeName, - MountPath: ktpf.storageDir, - }, - }, + Name: ktpOutputContainer, + Image: ktpf.outputImage, + Command: ktpf.outputCommand, + VolumeMounts: volumeMounts, }, { - Name: ktpBackgroundContainer, - Image: ktpf.backgroundImage, - Command: ktpf.backgroundCommand, - VolumeMounts: []corev1.VolumeMount{ - { - Name: ktpSharedVolumeName, - MountPath: ktpf.storageDir, - }, - }, + Name: ktpBackgroundContainer, + Image: ktpf.backgroundImage, + Command: ktpf.backgroundCommand, + VolumeMounts: volumeMounts, }, }, } @@ -215,12 +230,18 @@ func (ktpf *kubeTaskParallelFunc) Exec(ctx context.Context, tp param.TemplatePar if err = Arg(args, KubeTaskParallelOutputImageArg, &ktpf.outputImage); err != nil { return nil, err } + if err = OptArg(args, KubeTaskParallelInitImageArg, &ktpf.initImage, ""); err != nil { + return nil, err + } if err = Arg(args, KubeTaskParallelBackgroundCommandArg, &ktpf.backgroundCommand); err != nil { return nil, err } if err = Arg(args, KubeTaskParallelOutputCommandArg, &ktpf.outputCommand); err != nil { return nil, err } + if err = OptArg(args, KubeTaskParallelInitCommandArg, &ktpf.initCommand, nil); err != nil { + return nil, err + } if err = OptArg(args, KubeTaskParallelNamespaceArg, &ktpf.namespace, ""); err != nil { return nil, err } @@ -288,6 +309,8 @@ func (*kubeTaskParallelFunc) RequiredArgs() []string { func (*kubeTaskParallelFunc) Arguments() []string { return []string{ KubeTaskParallelNamespaceArg, + KubeTaskParallelInitImageArg, + KubeTaskParallelInitCommandArg, KubeTaskParallelBackgroundImageArg, KubeTaskParallelBackgroundCommandArg, KubeTaskParallelOutputImageArg, diff --git a/pkg/function/kube_task_parallel_test.go b/pkg/function/kube_task_parallel_test.go index ce0cb0b71e..aecfdb53aa 100644 --- a/pkg/function/kube_task_parallel_test.go +++ b/pkg/function/kube_task_parallel_test.go @@ -130,3 +130,76 @@ func (s *KubeTaskParallelSuite) TestKubeTaskParallel(c *C) { } } } + +func kubeTaskParallelPhaseWithInit(namespace string) crv1alpha1.BlueprintPhase { + return crv1alpha1.BlueprintPhase{ + Name: "testKubeTaskParallel", + Func: KubeTaskParallelFuncName, + Args: map[string]interface{}{ + KubeTaskParallelNamespaceArg: namespace, + KubeTaskParallelInitImageArg: consts.LatestKanisterToolsImage, + KubeTaskParallelInitCommandArg: []string{ + "sh", + "-c", + "mkfifo /tmp/file", + }, + KubeTaskParallelBackgroundImageArg: consts.LatestKanisterToolsImage, + KubeTaskParallelBackgroundCommandArg: []string{ + "sh", + "-c", + "if [ ! -e /tmp/file ]; then exit 1; fi; echo foo >> /tmp/file", + }, + KubeTaskParallelOutputImageArg: consts.LatestKanisterToolsImage, + KubeTaskParallelOutputCommandArg: []string{ + "sh", + "-c", + "if [ ! -e /tmp/file ]; then exit 1; fi; kando output value $(cat /tmp/file)", + }, + }, + } +} + +func (s *KubeTaskParallelSuite) TestKubeTaskParallelWithInit(c *C) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + tp := param.TemplateParams{ + StatefulSet: ¶m.StatefulSetParams{ + Namespace: s.namespace, + }, + PodOverride: crv1alpha1.JSONMap{ + "containers": []map[string]interface{}{ + { + "name": "background", + "imagePullPolicy": "Always", + }, + { + "name": "output", + "imagePullPolicy": "Always", + }, + }, + }, + } + action := "test" + for _, tc := range []struct { + bp *crv1alpha1.Blueprint + outs []map[string]interface{} + }{ + { + bp: newTaskBlueprint(kubeTaskParallelPhaseWithInit(s.namespace)), + outs: []map[string]interface{}{ + { + "value": "foo", + }, + }, + }, + } { + phases, err := kanister.GetPhases(*tc.bp, action, kanister.DefaultVersion, tp) + c.Assert(err, IsNil) + c.Assert(phases, HasLen, len(tc.outs)) + for i, p := range phases { + out, err := p.Exec(ctx, *tc.bp, action, tp) + c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name())) + c.Assert(out, DeepEquals, tc.outs[i]) + } + } +}