Skip to content

Commit

Permalink
Added init container support.
Browse files Browse the repository at this point in the history
Init container can be used to set up data in the shared volume.
  • Loading branch information
hairyhum committed Sep 17, 2024
1 parent eddbeff commit e64a65b
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 25 deletions.
17 changes: 13 additions & 4 deletions docs/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ It's similar to KubeTask, but allows using multiple images to move backup data.
should export it to destination.
The main difference between them is that phase outputs can only generated from the
"output" container outputs.
The function also supports an optional init container to set up the volume contents.

.. csv-table::
:header: "Argument", "Required", "Type", "Description"
Expand All @@ -171,11 +172,15 @@ The main difference between them is that phase outputs can only generated from t
`backgroundCommand`, Yes, `[]string`, command list to execute in "background" container
`outputImage`, Yes, `string`, image to be used in "output" container
`outputCommand`, Yes, `[]string`, command list to execute in "output" container
`initImage`, No, `string`, image to be used in init container of the pod
`initCommand`, No, `[]string`, command list to execute in init container of the pod
`podOverride`, No, `map[string]interface{}`, specs to override default pod specs with
`podAnnotations`, No, `map[string]string`, custom annotations for the temporary pod that gets created
`podLabels`, No, `map[string]string`, custom labels for the temporary pod that gets created
`sharedVolumeMedium`, No, `string`, medium setting for shared volume. See https://kubernetes.io/docs/concepts/storage/volumes/#emptydir
`sharedVolumeSizeLimit`, No, `string`, sizeLimit setting for shared volume
`sharedVolumeMedium`, No, `string`, medium setting for shared volume. See the Kubernetes `documentation
<https://kubernetes.io/docs/concepts/storage/volumes/#emptydir>`_.
`sharedVolumeSizeLimit`, No, `string`, sizeLimit setting for shared volume. See the Kubernetes `documentation
<https://kubernetes.io/docs/concepts/storage/volumes/#emptydir>`_.
`sharedVolumeDir`, No, `string`, directory to mount shared volume. Defaults to `/tmp`

Example:
Expand All @@ -198,12 +203,17 @@ Example:
sharedVolumeMedium: Memory
sharedVolumeSizeLimit: 1Gi
sharedVolumeDir: /tmp/
initImage: ubuntu
initCommand:
- bash
- -c
- |
mkfifo /tmp/pipe-file
backgroundImage: ubuntu
backgroundCommand:
- bash
- -c
- |
mkfifo /tmp/pipe-file
for i in {1..10}
do
echo $i
Expand All @@ -214,7 +224,6 @@ Example:
- bash
- -c
- |
while [ ! -e /tmp/pipe-file ]; do sleep 1; done
cat /tmp/pipe-file
ScaleWorkload
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,4 @@ kopia
hostname
emptyDir
sizeLimit
init
9 changes: 6 additions & 3 deletions docs_new/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ It's similar to KubeTask, but allows using multiple images to move backup data.
should export it to destination.
The main difference between them is that phase outputs can only generated from the
"output" container outputs.
The function also supports an optional init container to set up the volume contents.
| Argument | Required | Type | Description |
Expand All @@ -146,12 +147,14 @@ The main difference between them is that phase outputs can only generated from t
| backgroundCommand | Yes | []string | command list to execute in "background" container |
| outputImage | Yes | string | image to be used in "output" container |
| outputCommand | Yes | []string | command list to execute in "output" container |
| initImage | No | string | image to be used in init container of the pod |
| initCommand | No | []string | command list to execute in init container of the pod |
| podOverride | No | map[string]interface{} | specs to override default pod specs with |
| podAnnotations | No | map[string]string | custom annotations for the temporary pod that gets created |
| podLabels | No | map[string]string | custom labels for the temporary pod that gets created |
| sharedVolumeMedium | No | string | medium setting for shared volume, see https://kubernetes.io/docs/concepts/storage/volumes/#emptydir |
| sharedVolumeSizeLimit | No | string | sizeLimit setting for shared volume |
| sharedVolumeDir | No | string | directory to mount shared volume, defaults to `/tmp` |
| sharedVolumeMedium | No | string | medium setting for shared volume. See the Kubernetes [documentation](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir). |
| sharedVolumeSizeLimit | No | string | sizeLimit setting for shared volume. See the Kubernetes [documentation](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir). |
| sharedVolumeDir | No | string | directory to mount shared volume, defaults to `/tmp` |


Example:
Expand Down
59 changes: 41 additions & 18 deletions pkg/function/kube_task_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
KubeTaskParallelVolumeSizeLimitArg = "sharedVolumeSizeLimit"
KubeTaskParallelSharedDirArg = "sharedVolumeDir"
KubeTaskParallelPodOverrideArg = "podOverride"
KubeTaskParallelInitImageArg = "initImage"
KubeTaskParallelInitCommandArg = "initCommand"
)

const (
Expand All @@ -71,6 +73,8 @@ type kubeTaskParallelFunc struct {
backgroundCommand []string
outputImage string
outputCommand []string
initImage string
initCommand []string
storageDir string
storageMedium corev1.StorageMedium
storageSizeLimit *resource.Quantity
Expand All @@ -87,6 +91,26 @@ func (ktpf *kubeTaskParallelFunc) run(
ctx context.Context,
cli kubernetes.Interface,
) (map[string]interface{}, error) {
volumeMounts := []corev1.VolumeMount{
{
Name: ktpSharedVolumeName,
MountPath: ktpf.storageDir,
},
}

var initContainers []corev1.Container
// If init image is specified
if ktpf.initImage != "" {
initContainers = []corev1.Container{
{
Name: "init",
Image: ktpf.initImage,
Command: ktpf.initCommand,
VolumeMounts: volumeMounts,
},
}
}

podSpec := corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Volumes: []corev1.Volume{
Expand All @@ -100,28 +124,19 @@ func (ktpf *kubeTaskParallelFunc) run(
},
},
},
InitContainers: initContainers,
Containers: []corev1.Container{
{
Name: ktpOutputContainer,
Image: ktpf.outputImage,
Command: ktpf.outputCommand,
VolumeMounts: []corev1.VolumeMount{
{
Name: ktpSharedVolumeName,
MountPath: ktpf.storageDir,
},
},
Name: ktpOutputContainer,
Image: ktpf.outputImage,
Command: ktpf.outputCommand,
VolumeMounts: volumeMounts,
},
{
Name: ktpBackgroundContainer,
Image: ktpf.backgroundImage,
Command: ktpf.backgroundCommand,
VolumeMounts: []corev1.VolumeMount{
{
Name: ktpSharedVolumeName,
MountPath: ktpf.storageDir,
},
},
Name: ktpBackgroundContainer,
Image: ktpf.backgroundImage,
Command: ktpf.backgroundCommand,
VolumeMounts: volumeMounts,
},
},
}
Expand Down Expand Up @@ -215,12 +230,18 @@ func (ktpf *kubeTaskParallelFunc) Exec(ctx context.Context, tp param.TemplatePar
if err = Arg(args, KubeTaskParallelOutputImageArg, &ktpf.outputImage); err != nil {
return nil, err
}
if err = OptArg(args, KubeTaskParallelInitImageArg, &ktpf.initImage, ""); err != nil {
return nil, err
}
if err = Arg(args, KubeTaskParallelBackgroundCommandArg, &ktpf.backgroundCommand); err != nil {
return nil, err
}
if err = Arg(args, KubeTaskParallelOutputCommandArg, &ktpf.outputCommand); err != nil {
return nil, err
}
if err = OptArg(args, KubeTaskParallelInitCommandArg, &ktpf.initCommand, nil); err != nil {
return nil, err
}
if err = OptArg(args, KubeTaskParallelNamespaceArg, &ktpf.namespace, ""); err != nil {
return nil, err
}
Expand Down Expand Up @@ -288,6 +309,8 @@ func (*kubeTaskParallelFunc) RequiredArgs() []string {
func (*kubeTaskParallelFunc) Arguments() []string {
return []string{
KubeTaskParallelNamespaceArg,
KubeTaskParallelInitImageArg,
KubeTaskParallelInitCommandArg,
KubeTaskParallelBackgroundImageArg,
KubeTaskParallelBackgroundCommandArg,
KubeTaskParallelOutputImageArg,
Expand Down
73 changes: 73 additions & 0 deletions pkg/function/kube_task_parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,76 @@ func (s *KubeTaskParallelSuite) TestKubeTaskParallel(c *C) {
}
}
}

func kubeTaskParallelPhaseWithInit(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testKubeTaskParallel",
Func: KubeTaskParallelFuncName,
Args: map[string]interface{}{
KubeTaskParallelNamespaceArg: namespace,
KubeTaskParallelInitImageArg: consts.LatestKanisterToolsImage,
KubeTaskParallelInitCommandArg: []string{
"sh",
"-c",
"mkfifo /tmp/file",
},
KubeTaskParallelBackgroundImageArg: consts.LatestKanisterToolsImage,
KubeTaskParallelBackgroundCommandArg: []string{
"sh",
"-c",
"if [ ! -e /tmp/file ]; then exit 1; fi; echo foo >> /tmp/file",
},
KubeTaskParallelOutputImageArg: consts.LatestKanisterToolsImage,
KubeTaskParallelOutputCommandArg: []string{
"sh",
"-c",
"if [ ! -e /tmp/file ]; then exit 1; fi; kando output value $(cat /tmp/file)",
},
},
}
}

func (s *KubeTaskParallelSuite) TestKubeTaskParallelWithInit(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
tp := param.TemplateParams{
StatefulSet: &param.StatefulSetParams{
Namespace: s.namespace,
},
PodOverride: crv1alpha1.JSONMap{
"containers": []map[string]interface{}{
{
"name": "background",
"imagePullPolicy": "Always",
},
{
"name": "output",
"imagePullPolicy": "Always",
},
},
},
}
action := "test"
for _, tc := range []struct {
bp *crv1alpha1.Blueprint
outs []map[string]interface{}
}{
{
bp: newTaskBlueprint(kubeTaskParallelPhaseWithInit(s.namespace)),
outs: []map[string]interface{}{
{
"value": "foo",
},
},
},
} {
phases, err := kanister.GetPhases(*tc.bp, action, kanister.DefaultVersion, tp)
c.Assert(err, IsNil)
c.Assert(phases, HasLen, len(tc.outs))
for i, p := range phases {
out, err := p.Exec(ctx, *tc.bp, action, tp)
c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name()))
c.Assert(out, DeepEquals, tc.outs[i])
}
}
}

0 comments on commit e64a65b

Please sign in to comment.