Skip to content

Commit

Permalink
Rename KubeTaskParallel to MultiContainerRun
Browse files Browse the repository at this point in the history
  • Loading branch information
hairyhum committed Oct 3, 2024
1 parent 61d1267 commit 5938e47
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 86 deletions.
8 changes: 4 additions & 4 deletions docs/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ Example:
- |
echo "Example"
KubeTaskParallel
----------------
MultiContainerRun
-----------------

KubeTaskParallel spins up a new pod with two containers connected
MultiContainerRun spins up a new pod with two containers connected
via shared `emptyDir`_ volume.
It's similar to KubeTask, but allows using multiple images to move backup data.
"background" container is one responsible for generating data, while "output" container
Expand Down Expand Up @@ -188,7 +188,7 @@ Example:
.. code-block:: yaml
:linenos:
- func: KubeTaskParallel
- func: MultiContainerRun
name: examplePhase
args:
namespace: "{{ .Deployment.Namespace }}"
Expand Down
6 changes: 3 additions & 3 deletions docs_new/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ Example:
echo "Example"
```
### KubeTaskParallel
### MultiContainerRun
KubeTaskParallel spins up a new pod with two containers connected via shared [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume.
MultiContainerRun spins up a new pod with two containers connected via shared [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume.
It's similar to KubeTask, but allows using multiple images to move backup data.
"background" container is one responsible for generating data, while "output" container
should export it to destination.
Expand Down Expand Up @@ -160,7 +160,7 @@ The function also supports an optional init container to set up the volume conte
Example:

``` yaml
- func: KubeTaskParallel
- func: MultiContainerRun
name: examplePhase
args:
namespace: "{{ .Deployment.Namespace }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ import (
)

const (
// KubeTaskParallelFuncName gives the function name
KubeTaskParallelFuncName = "KubeTaskParallel"
KubeTaskParallelNamespaceArg = "namespace"
KubeTaskParallelBackgroundImageArg = "backgroundImage"
KubeTaskParallelBackgroundCommandArg = "backgroundCommand"
KubeTaskParallelOutputImageArg = "outputImage"
KubeTaskParallelOutputCommandArg = "outputCommand"
KubeTaskParallelVolumeMediumArg = "sharedVolumeMedium"
KubeTaskParallelVolumeSizeLimitArg = "sharedVolumeSizeLimit"
KubeTaskParallelSharedDirArg = "sharedVolumeDir"
KubeTaskParallelPodOverrideArg = "podOverride"
KubeTaskParallelInitImageArg = "initImage"
KubeTaskParallelInitCommandArg = "initCommand"
// MultiContainerRunFuncName gives the function name
MultiContainerRunFuncName = "MultiContainerRun"
MultiContainerRunNamespaceArg = "namespace"
MultiContainerRunBackgroundImageArg = "backgroundImage"
MultiContainerRunBackgroundCommandArg = "backgroundCommand"
MultiContainerRunOutputImageArg = "outputImage"
MultiContainerRunOutputCommandArg = "outputCommand"
MultiContainerRunVolumeMediumArg = "sharedVolumeMedium"
MultiContainerRunVolumeSizeLimitArg = "sharedVolumeSizeLimit"
MultiContainerRunSharedDirArg = "sharedVolumeDir"
MultiContainerRunPodOverrideArg = "podOverride"
MultiContainerRunInitImageArg = "initImage"
MultiContainerRunInitCommandArg = "initCommand"
)

const (
Expand All @@ -61,12 +61,12 @@ const (
)

func init() {
_ = kanister.Register(&kubeTaskParallelFunc{})
_ = kanister.Register(&multiContainerRunFunc{})
}

var _ kanister.Func = (*kubeTaskParallelFunc)(nil)
var _ kanister.Func = (*multiContainerRunFunc)(nil)

type kubeTaskParallelFunc struct {
type multiContainerRunFunc struct {
progressPercent string
namespace string
backgroundImage string
Expand All @@ -83,11 +83,11 @@ type kubeTaskParallelFunc struct {
annotations map[string]string
}

func (*kubeTaskParallelFunc) Name() string {
return KubeTaskParallelFuncName
func (*multiContainerRunFunc) Name() string {
return MultiContainerRunFuncName
}

func (ktpf *kubeTaskParallelFunc) run(
func (ktpf *multiContainerRunFunc) run(
ctx context.Context,
cli kubernetes.Interface,
) (map[string]interface{}, error) {
Expand Down Expand Up @@ -216,39 +216,39 @@ func getPodOutput(ctx context.Context, pc kube.PodController) (map[string]interf
return out, err
}

func (ktpf *kubeTaskParallelFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
func (ktpf *multiContainerRunFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
// Set progress percent
ktpf.progressPercent = progress.StartedPercent
defer func() { ktpf.progressPercent = progress.CompletedPercent }()

var storageSizeString string
var bpAnnotations, bpLabels map[string]string
var err error
if err = Arg(args, KubeTaskParallelBackgroundImageArg, &ktpf.backgroundImage); err != nil {
if err = Arg(args, MultiContainerRunBackgroundImageArg, &ktpf.backgroundImage); err != nil {
return nil, err
}
if err = Arg(args, KubeTaskParallelOutputImageArg, &ktpf.outputImage); err != nil {
if err = Arg(args, MultiContainerRunOutputImageArg, &ktpf.outputImage); err != nil {
return nil, err
}
if err = OptArg(args, KubeTaskParallelInitImageArg, &ktpf.initImage, ""); err != nil {
if err = OptArg(args, MultiContainerRunInitImageArg, &ktpf.initImage, ""); err != nil {
return nil, err
}
if err = Arg(args, KubeTaskParallelBackgroundCommandArg, &ktpf.backgroundCommand); err != nil {
if err = Arg(args, MultiContainerRunBackgroundCommandArg, &ktpf.backgroundCommand); err != nil {
return nil, err
}
if err = Arg(args, KubeTaskParallelOutputCommandArg, &ktpf.outputCommand); err != nil {
if err = Arg(args, MultiContainerRunOutputCommandArg, &ktpf.outputCommand); err != nil {
return nil, err
}
if err = OptArg(args, KubeTaskParallelInitCommandArg, &ktpf.initCommand, nil); err != nil {
if err = OptArg(args, MultiContainerRunInitCommandArg, &ktpf.initCommand, nil); err != nil {
return nil, err
}
if err = OptArg(args, KubeTaskParallelNamespaceArg, &ktpf.namespace, ""); err != nil {
if err = OptArg(args, MultiContainerRunNamespaceArg, &ktpf.namespace, ""); err != nil {
return nil, err
}
if err = OptArg(args, KubeTaskParallelVolumeMediumArg, &ktpf.storageMedium, ""); err != nil {
if err = OptArg(args, MultiContainerRunVolumeMediumArg, &ktpf.storageMedium, ""); err != nil {
return nil, err
}
if err = OptArg(args, KubeTaskParallelVolumeSizeLimitArg, &storageSizeString, ""); err != nil {
if err = OptArg(args, MultiContainerRunVolumeSizeLimitArg, &storageSizeString, ""); err != nil {
return nil, err
}
if storageSizeString != "" {
Expand All @@ -258,7 +258,7 @@ func (ktpf *kubeTaskParallelFunc) Exec(ctx context.Context, tp param.TemplatePar
}
ktpf.storageSizeLimit = &size
}
if err = OptArg(args, KubeTaskParallelSharedDirArg, &ktpf.storageDir, ktpDefaultSharedDir); err != nil {
if err = OptArg(args, MultiContainerRunSharedDirArg, &ktpf.storageDir, ktpDefaultSharedDir); err != nil {
return nil, err
}
if err = OptArg(args, PodAnnotationsArg, &bpAnnotations, nil); err != nil {
Expand All @@ -268,7 +268,7 @@ func (ktpf *kubeTaskParallelFunc) Exec(ctx context.Context, tp param.TemplatePar
return nil, err
}

ktpf.podOverride, err = GetPodSpecOverride(tp, args, KubeTaskParallelPodOverrideArg)
ktpf.podOverride, err = GetPodSpecOverride(tp, args, MultiContainerRunPodOverrideArg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -297,34 +297,34 @@ func (ktpf *kubeTaskParallelFunc) Exec(ctx context.Context, tp param.TemplatePar
)
}

func (*kubeTaskParallelFunc) RequiredArgs() []string {
func (*multiContainerRunFunc) RequiredArgs() []string {
return []string{
KubeTaskParallelBackgroundImageArg,
KubeTaskParallelBackgroundCommandArg,
KubeTaskParallelOutputImageArg,
KubeTaskParallelOutputCommandArg,
MultiContainerRunBackgroundImageArg,
MultiContainerRunBackgroundCommandArg,
MultiContainerRunOutputImageArg,
MultiContainerRunOutputCommandArg,
}
}

func (*kubeTaskParallelFunc) Arguments() []string {
func (*multiContainerRunFunc) Arguments() []string {
return []string{
KubeTaskParallelNamespaceArg,
KubeTaskParallelInitImageArg,
KubeTaskParallelInitCommandArg,
KubeTaskParallelBackgroundImageArg,
KubeTaskParallelBackgroundCommandArg,
KubeTaskParallelOutputImageArg,
KubeTaskParallelOutputCommandArg,
KubeTaskParallelVolumeMediumArg,
KubeTaskParallelVolumeSizeLimitArg,
KubeTaskParallelSharedDirArg,
KubeTaskParallelPodOverrideArg,
MultiContainerRunNamespaceArg,
MultiContainerRunInitImageArg,
MultiContainerRunInitCommandArg,
MultiContainerRunBackgroundImageArg,
MultiContainerRunBackgroundCommandArg,
MultiContainerRunOutputImageArg,
MultiContainerRunOutputCommandArg,
MultiContainerRunVolumeMediumArg,
MultiContainerRunVolumeSizeLimitArg,
MultiContainerRunSharedDirArg,
MultiContainerRunPodOverrideArg,
PodLabelsArg,
PodAnnotationsArg,
}
}

func (ktpf *kubeTaskParallelFunc) Validate(args map[string]any) error {
func (ktpf *multiContainerRunFunc) Validate(args map[string]any) error {
if err := ValidatePodLabelsAndAnnotations(ktpf.Name(), args); err != nil {
return err
}
Expand All @@ -336,7 +336,7 @@ func (ktpf *kubeTaskParallelFunc) Validate(args map[string]any) error {
return utils.CheckRequiredArgs(ktpf.RequiredArgs(), args)
}

func (k *kubeTaskParallelFunc) ExecutionProgress() (crv1alpha1.PhaseProgress, error) {
func (k *multiContainerRunFunc) ExecutionProgress() (crv1alpha1.PhaseProgress, error) {
metav1Time := metav1.NewTime(time.Now())
return crv1alpha1.PhaseProgress{
ProgressPercent: k.progressPercent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ import (
"github.com/kanisterio/kanister/pkg/param"
)

var _ = Suite(&KubeTaskParallelSuite{})
var _ = Suite(&MultiContainerRunSuite{})

type KubeTaskParallelSuite struct {
type MultiContainerRunSuite struct {
cli kubernetes.Interface
namespace string
}

func (s *KubeTaskParallelSuite) SetUpSuite(c *C) {
func (s *MultiContainerRunSuite) SetUpSuite(c *C) {
cli, err := kube.NewClient()
c.Assert(err, IsNil)
s.cli = cli

ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "kanister-kubetaskparalleltest-",
GenerateName: "kanister-multicontainerruntest-",
},
}
cns, err := s.cli.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{})
Expand All @@ -57,26 +57,26 @@ func (s *KubeTaskParallelSuite) SetUpSuite(c *C) {
c.Assert(err, IsNil)
}

func (s *KubeTaskParallelSuite) TearDownSuite(c *C) {
func (s *MultiContainerRunSuite) TearDownSuite(c *C) {
if s.namespace != "" {
_ = s.cli.CoreV1().Namespaces().Delete(context.TODO(), s.namespace, metav1.DeleteOptions{})
}
}

func kubeTaskParallelPhase(namespace string) crv1alpha1.BlueprintPhase {
func multiContainerRunPhase(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testKubeTaskParallel",
Func: KubeTaskParallelFuncName,
Name: "testMultiContainerRun",
Func: MultiContainerRunFuncName,
Args: map[string]interface{}{
KubeTaskParallelNamespaceArg: namespace,
KubeTaskParallelBackgroundImageArg: consts.LatestKanisterToolsImage,
KubeTaskParallelBackgroundCommandArg: []string{
MultiContainerRunNamespaceArg: namespace,
MultiContainerRunBackgroundImageArg: consts.LatestKanisterToolsImage,
MultiContainerRunBackgroundCommandArg: []string{
"sh",
"-c",
"echo foo > /tmp/file",
},
KubeTaskParallelOutputImageArg: consts.LatestKanisterToolsImage,
KubeTaskParallelOutputCommandArg: []string{
MultiContainerRunOutputImageArg: consts.LatestKanisterToolsImage,
MultiContainerRunOutputCommandArg: []string{
"sh",
"-c",
"while [ ! -e /tmp/file ]; do sleep 1; done; kando output value $(cat /tmp/file)",
Expand All @@ -85,7 +85,7 @@ func kubeTaskParallelPhase(namespace string) crv1alpha1.BlueprintPhase {
}
}

func (s *KubeTaskParallelSuite) TestKubeTaskParallel(c *C) {
func (s *MultiContainerRunSuite) TestMultiContainerRun(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
tp := param.TemplateParams{
Expand All @@ -111,7 +111,7 @@ func (s *KubeTaskParallelSuite) TestKubeTaskParallel(c *C) {
outs []map[string]interface{}
}{
{
bp: newTaskBlueprint(kubeTaskParallelPhase(s.namespace)),
bp: newTaskBlueprint(multiContainerRunPhase(s.namespace)),
outs: []map[string]interface{}{
{
"value": "foo",
Expand All @@ -130,26 +130,26 @@ func (s *KubeTaskParallelSuite) TestKubeTaskParallel(c *C) {
}
}

func kubeTaskParallelPhaseWithInit(namespace string) crv1alpha1.BlueprintPhase {
func multiContainerRunPhaseWithInit(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testKubeTaskParallel",
Func: KubeTaskParallelFuncName,
Name: "testMultiContainerRun",
Func: MultiContainerRunFuncName,
Args: map[string]interface{}{
KubeTaskParallelNamespaceArg: namespace,
KubeTaskParallelInitImageArg: consts.LatestKanisterToolsImage,
KubeTaskParallelInitCommandArg: []string{
MultiContainerRunNamespaceArg: namespace,
MultiContainerRunInitImageArg: consts.LatestKanisterToolsImage,
MultiContainerRunInitCommandArg: []string{
"sh",
"-c",
"mkfifo /tmp/file",
},
KubeTaskParallelBackgroundImageArg: consts.LatestKanisterToolsImage,
KubeTaskParallelBackgroundCommandArg: []string{
MultiContainerRunBackgroundImageArg: consts.LatestKanisterToolsImage,
MultiContainerRunBackgroundCommandArg: []string{
"sh",
"-c",
"if [ ! -e /tmp/file ]; then exit 1; fi; echo foo >> /tmp/file",
},
KubeTaskParallelOutputImageArg: consts.LatestKanisterToolsImage,
KubeTaskParallelOutputCommandArg: []string{
MultiContainerRunOutputImageArg: consts.LatestKanisterToolsImage,
MultiContainerRunOutputCommandArg: []string{
"sh",
"-c",
"if [ ! -e /tmp/file ]; then exit 1; fi; kando output value $(cat /tmp/file)",
Expand All @@ -158,7 +158,7 @@ func kubeTaskParallelPhaseWithInit(namespace string) crv1alpha1.BlueprintPhase {
}
}

func (s *KubeTaskParallelSuite) TestKubeTaskParallelWithInit(c *C) {
func (s *MultiContainerRunSuite) TestMultiContainerRunWithInit(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
tp := param.TemplateParams{
Expand All @@ -184,7 +184,7 @@ func (s *KubeTaskParallelSuite) TestKubeTaskParallelWithInit(c *C) {
outs []map[string]interface{}
}{
{
bp: newTaskBlueprint(kubeTaskParallelPhaseWithInit(s.namespace)),
bp: newTaskBlueprint(multiContainerRunPhaseWithInit(s.namespace)),
outs: []map[string]interface{}{
{
"value": "foo",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
---
features: Introduced new Kanister function ``MultiContainerRun`` to run pods with two containers connected by shared volume

0 comments on commit 5938e47

Please sign in to comment.