From beb30ad1484d11a41bcbe283c3e1883447d26418 Mon Sep 17 00:00:00 2001 From: Philip Laine Date: Mon, 22 Jul 2024 21:17:57 +0200 Subject: [PATCH] fix: data injection to return errors (#2720) Signed-off-by: Philip Laine Signed-off-by: Tim Seagren --- src/pkg/cluster/data.go | 182 +++++++++++++++++++------------------ src/pkg/packager/deploy.go | 21 +++-- 2 files changed, 103 insertions(+), 100 deletions(-) diff --git a/src/pkg/cluster/data.go b/src/pkg/cluster/data.go index 7b19257040..cd46dbb28b 100644 --- a/src/pkg/cluster/data.go +++ b/src/pkg/cluster/data.go @@ -13,7 +13,6 @@ import ( "sort" "strconv" "strings" - "sync" "time" corev1 "k8s.io/api/core/v1" @@ -32,12 +31,10 @@ import ( // HandleDataInjection waits for the target pod(s) to come up and inject the data into them // todo: this currently requires kubectl but we should have enough k8s work to make this native now. -func (c *Cluster) HandleDataInjection(ctx context.Context, wg *sync.WaitGroup, data types.ZarfDataInjection, componentPath *layout.ComponentPaths, dataIdx int) { - defer wg.Done() +func (c *Cluster) HandleDataInjection(ctx context.Context, data types.ZarfDataInjection, componentPath *layout.ComponentPaths, dataIdx int) error { injectionCompletionMarker := filepath.Join(componentPath.DataInjections, config.GetDataInjectionMarker()) if err := os.WriteFile(injectionCompletionMarker, []byte("🦄"), helpers.ReadWriteUser); err != nil { - message.WarnErrf(err, "Unable to create the data injection completion marker") - return + return fmt.Errorf("unable to create the data injection completion marker: %w", err) } tarCompressFlag := "" @@ -59,103 +56,110 @@ func (c *Cluster) HandleDataInjection(ctx context.Context, wg *sync.WaitGroup, d shell, shellArgs := exec.GetOSShell(exec.Shell{Windows: "cmd"}) if _, _, err := exec.Cmd(shell, append(shellArgs, "tar --version")...); err != nil { - message.WarnErr(err, "Unable to execute tar on this system. Please ensure it is installed and on your $PATH.") - return + return fmt.Errorf("unable to execute tar, ensure it is installed in the $PATH: %w", err) } -iterator: - // The eternal loop because some data injections can take a very long time for { - message.Debugf("Attempting to inject data into %s", data.Target) - source := filepath.Join(componentPath.DataInjections, filepath.Base(data.Target.Path)) - if helpers.InvalidPath(source) { - // The path is likely invalid because of how we compose OCI components, add an index suffix to the filename - source = filepath.Join(componentPath.DataInjections, strconv.Itoa(dataIdx), filepath.Base(data.Target.Path)) + select { + case <-ctx.Done(): + return ctx.Err() + default: + message.Debugf("Attempting to inject data into %s", data.Target) + source := filepath.Join(componentPath.DataInjections, filepath.Base(data.Target.Path)) if helpers.InvalidPath(source) { - message.Warnf("Unable to find the data injection source path %s", source) - return + // The path is likely invalid because of how we compose OCI components, add an index suffix to the filename + source = filepath.Join(componentPath.DataInjections, strconv.Itoa(dataIdx), filepath.Base(data.Target.Path)) + if helpers.InvalidPath(source) { + return fmt.Errorf("could not find the data injection source path %s", source) + } } - } - target := podLookup{ - Namespace: data.Target.Namespace, - Selector: data.Target.Selector, - Container: data.Target.Container, - } - - // Wait until the pod we are injecting data into becomes available - pods := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer) - if len(pods) < 1 { - continue - } + target := podLookup{ + Namespace: data.Target.Namespace, + Selector: data.Target.Selector, + Container: data.Target.Container, + } - // Inject into all the pods - for _, pod := range pods { - // Try to use the embedded kubectl if we can - zarfCommand, err := utils.GetFinalExecutableCommand() - kubectlBinPath := "kubectl" + // Wait until the pod we are injecting data into becomes available + pods, err := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer) if err != nil { - message.Warnf("Unable to get the zarf executable path, falling back to host kubectl: %s", err) - } else { - kubectlBinPath = fmt.Sprintf("%s tools kubectl", zarfCommand) + return err + } + if len(pods) < 1 { + continue } - kubectlCmd := fmt.Sprintf("%s exec -i -n %s %s -c %s ", kubectlBinPath, data.Target.Namespace, pod.Name, data.Target.Container) - // Note that each command flag is separated to provide the widest cross-platform tar support - tarCmd := fmt.Sprintf("tar -c %s -f -", tarCompressFlag) - untarCmd := fmt.Sprintf("tar -x %s -v -f - -C %s", tarCompressFlag, data.Target.Path) + // Inject into all the pods + for _, pod := range pods { + // Try to use the embedded kubectl if we can + zarfCommand, err := utils.GetFinalExecutableCommand() + kubectlBinPath := "kubectl" + if err != nil { + message.Warnf("Unable to get the zarf executable path, falling back to host kubectl: %s", err) + } else { + kubectlBinPath = fmt.Sprintf("%s tools kubectl", zarfCommand) + } + kubectlCmd := fmt.Sprintf("%s exec -i -n %s %s -c %s ", kubectlBinPath, data.Target.Namespace, pod.Name, data.Target.Container) - // Must create the target directory before trying to change to it for untar - mkdirCmd := fmt.Sprintf("%s -- mkdir -p %s", kubectlCmd, data.Target.Path) - if err := exec.CmdWithPrint(shell, append(shellArgs, mkdirCmd)...); err != nil { - message.Warnf("Unable to create the data injection target directory %s in pod %s", data.Target.Path, pod.Name) - continue iterator - } + // Note that each command flag is separated to provide the widest cross-platform tar support + tarCmd := fmt.Sprintf("tar -c %s -f -", tarCompressFlag) + untarCmd := fmt.Sprintf("tar -x %s -v -f - -C %s", tarCompressFlag, data.Target.Path) - cpPodCmd := fmt.Sprintf("%s -C %s . | %s -- %s", - tarCmd, - source, - kubectlCmd, - untarCmd, - ) - - // Do the actual data injection - if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil { - message.Warnf("Error copying data into the pod %#v: %#v\n", pod.Name, err) - continue iterator - } + // Must create the target directory before trying to change to it for untar + mkdirCmd := fmt.Sprintf("%s -- mkdir -p %s", kubectlCmd, data.Target.Path) + if err := exec.CmdWithPrint(shell, append(shellArgs, mkdirCmd)...); err != nil { + return fmt.Errorf("unable to create the data injection target directory %s in pod %s: %w", data.Target.Path, pod.Name, err) + } + + cpPodCmd := fmt.Sprintf("%s -C %s . | %s -- %s", + tarCmd, + source, + kubectlCmd, + untarCmd, + ) - // Leave a marker in the target container for pods to track the sync action - cpPodCmd = fmt.Sprintf("%s -C %s %s | %s -- %s", - tarCmd, - componentPath.DataInjections, - config.GetDataInjectionMarker(), - kubectlCmd, - untarCmd, - ) - - if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil { - message.Warnf("Error saving the zarf sync completion file after injection into pod %#v\n", pod.Name) - continue iterator + // Do the actual data injection + if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil { + return fmt.Errorf("could not copy data into the pod %s: %w", pod.Name, err) + } + + // Leave a marker in the target container for pods to track the sync action + cpPodCmd = fmt.Sprintf("%s -C %s %s | %s -- %s", + tarCmd, + componentPath.DataInjections, + config.GetDataInjectionMarker(), + kubectlCmd, + untarCmd, + ) + + if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil { + return fmt.Errorf("could not save the Zarf sync completion file after injection into pod %s: %w", pod.Name, err) + } } - } - // Do not look for a specific container after injection in case they are running an init container - podOnlyTarget := podLookup{ - Namespace: data.Target.Namespace, - Selector: data.Target.Selector, - } + // Do not look for a specific container after injection in case they are running an init container + podOnlyTarget := podLookup{ + Namespace: data.Target.Namespace, + Selector: data.Target.Selector, + } - // Block one final time to make sure at least one pod has come up and injected the data - // Using only the pod as the final selector because we don't know what the container name will be - // Still using the init container filter to make sure we have the right running pod - _ = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer) + // Block one final time to make sure at least one pod has come up and injected the data + // Using only the pod as the final selector because we don't know what the container name will be + // Still using the init container filter to make sure we have the right running pod + _, err = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer) + if err != nil { + return err + } - // Cleanup now to reduce disk pressure - _ = os.RemoveAll(source) + // Cleanup now to reduce disk pressure + err = os.RemoveAll(source) + if err != nil { + return err + } - // Return to stop the loop - return + // Return to stop the loop + return nil + } } } @@ -173,7 +177,7 @@ type podFilter func(pod corev1.Pod) bool // It will wait up to 90 seconds for the pods to be found and will return a list of matching pod names // If the timeout is reached, an empty list will be returned. // TODO: Test, refactor and/or remove. -func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) []corev1.Pod { +func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) ([]corev1.Pod, error) { waitCtx, cancel := context.WithTimeout(ctx, 90*time.Second) defer cancel() @@ -183,16 +187,14 @@ func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interfac for { select { case <-waitCtx.Done(): - message.Debug("Pod lookup failed: %v", ctx.Err()) - return nil + return nil, ctx.Err() case <-timer.C: listOpts := metav1.ListOptions{ LabelSelector: target.Selector, } podList, err := clientset.CoreV1().Pods(target.Namespace).List(ctx, listOpts) if err != nil { - message.Debug("Unable to find matching pods: %w", err) - return nil + return nil, err } message.Debug("Found %d pods for target %#v", len(podList.Items), target) @@ -245,7 +247,7 @@ func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interfac } } if len(readyPods) > 0 { - return readyPods + return readyPods, nil } timer.Reset(3 * time.Second) } diff --git a/src/pkg/packager/deploy.go b/src/pkg/packager/deploy.go index 35f31514fd..5c174a684f 100644 --- a/src/pkg/packager/deploy.go +++ b/src/pkg/packager/deploy.go @@ -14,9 +14,10 @@ import ( "runtime" "strconv" "strings" - "sync" "time" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -293,7 +294,6 @@ func (p *Packager) deployComponent(ctx context.Context, component types.ZarfComp hasCharts := len(component.Charts) > 0 hasManifests := len(component.Manifests) > 0 hasRepos := len(component.Repos) > 0 - hasDataInjections := len(component.DataInjections) > 0 hasFiles := len(component.Files) > 0 onDeploy := component.Actions.OnDeploy @@ -344,14 +344,11 @@ func (p *Packager) deployComponent(ctx context.Context, component types.ZarfComp } } - if hasDataInjections { - waitGroup := sync.WaitGroup{} - defer waitGroup.Wait() - - for idx, data := range component.DataInjections { - waitGroup.Add(1) - go p.cluster.HandleDataInjection(ctx, &waitGroup, data, componentPath, idx) - } + g, gCtx := errgroup.WithContext(ctx) + for idx, data := range component.DataInjections { + g.Go(func() error { + return p.cluster.HandleDataInjection(gCtx, data, componentPath, idx) + }) } if hasCharts || hasManifests { @@ -364,6 +361,10 @@ func (p *Packager) deployComponent(ctx context.Context, component types.ZarfComp return charts, fmt.Errorf("unable to run component after action: %w", err) } + err = g.Wait() + if err != nil { + return nil, err + } return charts, nil }