Skip to content

Commit

Permalink
Simplify validateLabelKeyIsPresentFromContext and AddLabelToPodOption…
Browse files Browse the repository at this point in the history
…s funcs

Signed-off-by: Abhijit Mukherjee <[email protected]>
  • Loading branch information
mabhi committed Apr 3, 2024
1 parent 39dfe2e commit 67eaff2
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 93 deletions.
29 changes: 14 additions & 15 deletions pkg/function/kube_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ func kubeTask(ctx context.Context, cli kubernetes.Interface, namespace, image st
PodOverride: podOverride,
}
// Mark labels to pods with prefix `kanister.io`. Add the jobID as reference to the origin for the pod.
validateFunc := validateLabelKeyIsPresentFunc(consts.LabelPrefix)
kube.AddLabelsToPodOptionsFromContext(ctx, options, path.Join(consts.LabelPrefix, jobIDSuffix), validateFunc)

ok, val := validateLabelKeyIsPresentFromContext(ctx, consts.LabelPrefix)
if ok {
kube.AddLabelsToPodOptions(options, path.Join(consts.LabelPrefix, jobIDSuffix), val)
}
pr := kube.NewPodRunner(cli, options)
podFunc := kubeTaskPodFunc()
return pr.Run(ctx, podFunc)
Expand Down Expand Up @@ -100,21 +101,19 @@ func kubeTaskPodFunc() func(ctx context.Context, pc kube.PodController) (map[str
}
}

// validateLabelKeyIsPresentFunc: This is a helper validation function used by kubetask to validate the presence of
// validateLabelKeyIsPresent: This is a helper validation function used by kubetask to validate the presence of
// label key. Result of this is used to add target label selector to the pod
func validateLabelKeyIsPresentFunc(keyPrefix string) func(ctx context.Context) (bool, string) {
return func(ctx context.Context) (bool, string) {
fields := field.FromContext(ctx)
if fields == nil {
return false, ""
}
for _, f := range fields.Fields() {
if strings.HasPrefix(f.Key(), keyPrefix) {
return true, f.Value().(string)
}
}
func validateLabelKeyIsPresentFromContext(ctx context.Context, keyPrefix string) (bool, string) {
fields := field.FromContext(ctx)
if fields == nil {
return false, ""
}
for _, f := range fields.Fields() {
if strings.HasPrefix(f.Key(), keyPrefix) {
return true, f.Value().(string)
}
}
return false, ""
}

func (ktf *kubeTaskFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
Expand Down
100 changes: 33 additions & 67 deletions pkg/kube/pod_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,65 +118,42 @@ func (s *PodRunnerTestSuite) TestPodRunnerForSuccessCase(c *C) {
// pod got created with corresponding label using the entry or not.
func (s *PodRunnerTestSuite) TestPodRunnerWithJobIDDebugLabelForSuccessCase(c *C) {
randomUUID := "xyz123"
for _, tc := range []struct {
name string
validateFn func(_ context.Context) (bool, string)
afterPodRunTestFn func(labelKey, labelValue string, ch chan struct{}) func(ctx context.Context, pc PodController) (map[string]interface{}, error)
}{
{
name: "test key not present in context",
validateFn: func(_ context.Context) (bool, string) {
return false, ""
},
afterPodRunTestFn: afterPodRunTestKeyAbsentFunc,
},
{
name: "test key is present in context",
validateFn: func(_ context.Context) (bool, string) {
return true, randomUUID
ctx, cancel := context.WithCancel(context.Background())
ctx = field.Context(ctx, path.Join(consts.LabelPrefix, "JobID"), randomUUID)
cli := fake.NewSimpleClientset()
cli.PrependReactor("create", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) {
return false, nil, nil
})
cli.PrependReactor("get", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) {
p := &corev1.Pod{
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
afterPodRunTestFn: afterPodRunTestKeyPresentFunc,
},
} {
ctx, cancel := context.WithCancel(context.Background())
ctx = field.Context(ctx, consts.LabelPrefix+"JobID", randomUUID)
ctx = field.Context(ctx, "some-test-key", "some-test-value")

cli := fake.NewSimpleClientset()
cli.PrependReactor("create", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) {
return false, nil, nil
})
cli.PrependReactor("get", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) {
p := &corev1.Pod{
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}
return true, p, nil
})
po := &PodOptions{
Namespace: podRunnerNS,
Name: podName,
Command: []string{"sh", "-c", "tail -f /dev/null"},
}
deleted := make(chan struct{})
cli.PrependReactor("delete", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) {
c.Log("Pod deleted due to Context Cancelled")
close(deleted)
return true, nil, nil
})
var targetKey = path.Join(consts.LabelPrefix, "JobID")
AddLabelsToPodOptionsFromContext(ctx, po, targetKey, tc.validateFn)
pr := NewPodRunner(cli, po)
errorCh := make(chan error)
go func() {
_, err := pr.Run(ctx, tc.afterPodRunTestFn(targetKey, randomUUID, deleted))
errorCh <- err
}()
deleted <- struct{}{}
c.Assert(<-errorCh, IsNil)
cancel()
return true, p, nil
})
po := &PodOptions{
Namespace: podRunnerNS,
Name: podName,
Command: []string{"sh", "-c", "tail -f /dev/null"},
}
deleted := make(chan struct{})
cli.PrependReactor("delete", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) {
c.Log("Pod deleted due to Context Cancelled")
close(deleted)
return true, nil, nil
})
var targetKey = path.Join(consts.LabelPrefix, "JobID")
AddLabelsToPodOptions(po, targetKey, randomUUID)
pr := NewPodRunner(cli, po)
errorCh := make(chan error)
go func() {
_, err := pr.Run(ctx, afterPodRunTestKeyPresentFunc(targetKey, randomUUID, deleted))
errorCh <- err
}()
deleted <- struct{}{}
c.Assert(<-errorCh, IsNil)
cancel()
}

func makePodRunnerTestFunc(ch chan struct{}) func(ctx context.Context, pc PodController) (map[string]interface{}, error) {
Expand All @@ -199,14 +176,3 @@ func afterPodRunTestKeyPresentFunc(labelKey, labelValue string, ch chan struct{}
return nil, nil
}
}

func afterPodRunTestKeyAbsentFunc(labelKey, labelValue string, ch chan struct{}) func(ctx context.Context, pc PodController) (map[string]interface{}, error) {
return func(ctx context.Context, pc PodController) (map[string]interface{}, error) {
<-ch
_, present := pc.Pod().Labels[labelKey]
if present {
return nil, errors.New("Key should not be present")
}
return nil, nil
}
}
16 changes: 5 additions & 11 deletions pkg/kube/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,14 @@ func PVCContainsReadOnlyAccessMode(pvc *corev1.PersistentVolumeClaim) bool {
return false
}

// AddLabelsToPodOptionsFromContext adds additional label selector to `PodOptions`,
// provided the validationFunc passes successfully.
func AddLabelsToPodOptionsFromContext(
ctx context.Context,
// AddLabelsToPodOptions adds additional label selector to `PodOptions`,
func AddLabelsToPodOptions(
options *PodOptions,
targetKey string,
validateFn func(context.Context) (bool, string),
targetKey,
targetValue string,
) {
ok, value := validateFn(ctx)
if !ok {
return
}
if options.Labels == nil {
options.Labels = make(map[string]string)
}
options.Labels[targetKey] = value
options.Labels[targetKey] = targetValue
}

0 comments on commit 67eaff2

Please sign in to comment.