Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: set default namespace and serviceaccount for MultiContainerRun pods #3285

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 49 additions & 18 deletions pkg/function/multi_container_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ func (*multiContainerRunFunc) Name() string {
return MultiContainerRunFuncName
}

func (ktpf *multiContainerRunFunc) run(
ctx context.Context,
cli kubernetes.Interface,
) (map[string]interface{}, error) {
func (ktpf *multiContainerRunFunc) run(ctx context.Context) (map[string]interface{}, error) {
cli, err := kube.NewClient()
if err != nil {
return nil, errkit.Wrap(err, "Failed to create Kubernetes client")
}

volumeMounts := []corev1.VolumeMount{
{
Name: ktpSharedVolumeName,
Expand Down Expand Up @@ -141,7 +143,7 @@ func (ktpf *multiContainerRunFunc) run(
},
}

podSpec, err := kube.PatchDefaultPodSpecs(podSpec, ktpf.podOverride)
podSpec, err = kube.PatchDefaultPodSpecs(podSpec, ktpf.podOverride)
if err != nil {
return nil, errkit.Wrap(err, "Unable to apply podOverride", "podSpec", podSpec, "podOverride", ktpf.podOverride)
}
Expand All @@ -162,6 +164,11 @@ func (ktpf *multiContainerRunFunc) run(
// FIXME: this doesn't work with pod controller currently so we have to reorder containers
ktpf.annotations[defaultContainerAnn] = ktpOutputContainer

err = setPodSpecServiceAccount(&podSpec, ktpf.namespace, cli)
if err != nil {
return nil, errkit.Wrap(err, "Failed to set serviceaccount for pod")
}

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: jobPrefix,
Expand Down Expand Up @@ -194,6 +201,23 @@ func (ktpf *multiContainerRunFunc) run(
return getPodOutput(ctx, pc)
}

func setPodSpecServiceAccount(podSpec *corev1.PodSpec, ns string, cli kubernetes.Interface) error {
sa := podSpec.ServiceAccountName
controllerNamespace, err := kube.GetControllerNamespace()
if err != nil {
return errkit.Wrap(err, "Failed to get controller namespace")
}

if sa == "" && ns == controllerNamespace {
sa, err = kube.GetControllerServiceAccount(cli)
if err != nil {
return errkit.Wrap(err, "Failed to get Controller Service Account")
}
}
podSpec.ServiceAccountName = sa
return nil
}

// This function is similar to kubeTaskPodFunc
func getPodOutput(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
if err := pc.WaitForPodReady(ctx); err != nil {
Expand Down Expand Up @@ -242,9 +266,19 @@ func (ktpf *multiContainerRunFunc) Exec(ctx context.Context, tp param.TemplatePa
if err = OptArg(args, MultiContainerRunInitCommandArg, &ktpf.initCommand, nil); err != nil {
return nil, err
}

if err = OptArg(args, MultiContainerRunNamespaceArg, &ktpf.namespace, ""); err != nil {
return nil, err
}

if ktpf.namespace == "" {
controllerNamespace, err := kube.GetControllerNamespace()
if err != nil {
return nil, errkit.Wrap(err, "Failed to get controller namespace")
}
ktpf.namespace = controllerNamespace
}

if err = OptArg(args, MultiContainerRunVolumeMediumArg, &ktpf.storageMedium, ""); err != nil {
return nil, err
}
Expand Down Expand Up @@ -273,28 +307,25 @@ func (ktpf *multiContainerRunFunc) Exec(ctx context.Context, tp param.TemplatePa
return nil, err
}

ktpf.labels = bpLabels
ktpf.annotations = bpAnnotations
ktpf.setLabelsAndAnnotations(tp, bpLabels, bpAnnotations)

return ktpf.run(ctx)
}

func (ktpf *multiContainerRunFunc) setLabelsAndAnnotations(tp param.TemplateParams, labels, annotation map[string]string) {
ktpf.labels = labels
ktpf.annotations = annotation
if tp.PodAnnotations != nil {
// merge the actionset annotations with blueprint annotations
var actionSetAnn ActionSetAnnotations = tp.PodAnnotations
ktpf.annotations = actionSetAnn.MergeBPAnnotations(bpAnnotations)
ktpf.annotations = actionSetAnn.MergeBPAnnotations(annotation)
}

if tp.PodLabels != nil {
// merge the actionset labels with blueprint labels
var actionSetLabels ActionSetLabels = tp.PodLabels
ktpf.labels = actionSetLabels.MergeBPLabels(bpLabels)
}

cli, err := kube.NewClient()
if err != nil {
return nil, errkit.Wrap(err, "Failed to create Kubernetes client")
ktpf.labels = actionSetLabels.MergeBPLabels(labels)
}
return ktpf.run(
ctx,
cli,
)
}

func (*multiContainerRunFunc) RequiredArgs() []string {
Expand Down
66 changes: 66 additions & 0 deletions pkg/function/multi_container_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,69 @@ func (s *MultiContainerRunSuite) TestMultiContainerRunWithInit(c *C) {
}
}
}

func multiContainerRunPhaseWithoutNamespace() crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testMultiContainerRun",
Func: MultiContainerRunFuncName,
Args: map[string]interface{}{
MultiContainerRunBackgroundImageArg: consts.LatestKanisterToolsImage,
MultiContainerRunBackgroundCommandArg: []string{
"sh",
"-c",
"echo foo > /tmp/file",
},
MultiContainerRunOutputImageArg: consts.LatestKanisterToolsImage,
MultiContainerRunOutputCommandArg: []string{
"sh",
"-c",
"while [ ! -e /tmp/file ]; do sleep 1; done; kando output value $(cat /tmp/file)",
},
},
}
}

func (s *MultiContainerRunSuite) TestMultiContainerRunWithoutNamespace(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
tp := param.TemplateParams{
StatefulSet: &param.StatefulSetParams{
Namespace: s.namespace,
},
PodOverride: crv1alpha1.JSONMap{
"containers": []map[string]interface{}{
{
"name": "background",
"imagePullPolicy": "Always",
},
{
"name": "output",
"imagePullPolicy": "Always",
},
},
},
}
action := "test"
for _, tc := range []struct {
bp *crv1alpha1.Blueprint
outs []map[string]interface{}
}{
{
bp: newTaskBlueprint(multiContainerRunPhaseWithoutNamespace()),
outs: []map[string]interface{}{
{
"value": "foo",
},
},
},
} {
phases, err := kanister.GetPhases(*tc.bp, action, kanister.DefaultVersion, tp)
c.Assert(err, IsNil)
c.Assert(phases, HasLen, len(tc.outs))
for i, p := range phases {
out, err := p.Exec(ctx, *tc.bp, action, tp)
c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name()))
c.Assert(out, DeepEquals, tc.outs[i])
}
}
}
Loading