Skip to content

Commit

Permalink
fix: data injection to return errors (#2720)
Browse files Browse the repository at this point in the history
Signed-off-by: Philip Laine <[email protected]>
  • Loading branch information
phillebaba authored Jul 22, 2024
1 parent 612bd8e commit 1c3e426
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 100 deletions.
182 changes: 92 additions & 90 deletions src/pkg/cluster/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -32,12 +31,10 @@ import (

// HandleDataInjection waits for the target pod(s) to come up and inject the data into them
// todo: this currently requires kubectl but we should have enough k8s work to make this native now.
func (c *Cluster) HandleDataInjection(ctx context.Context, wg *sync.WaitGroup, data types.ZarfDataInjection, componentPath *layout.ComponentPaths, dataIdx int) {
defer wg.Done()
func (c *Cluster) HandleDataInjection(ctx context.Context, data types.ZarfDataInjection, componentPath *layout.ComponentPaths, dataIdx int) error {
injectionCompletionMarker := filepath.Join(componentPath.DataInjections, config.GetDataInjectionMarker())
if err := os.WriteFile(injectionCompletionMarker, []byte("🦄"), helpers.ReadWriteUser); err != nil {
message.WarnErrf(err, "Unable to create the data injection completion marker")
return
return fmt.Errorf("unable to create the data injection completion marker: %w", err)
}

tarCompressFlag := ""
Expand All @@ -59,103 +56,110 @@ func (c *Cluster) HandleDataInjection(ctx context.Context, wg *sync.WaitGroup, d
shell, shellArgs := exec.GetOSShell(exec.Shell{Windows: "cmd"})

if _, _, err := exec.Cmd(shell, append(shellArgs, "tar --version")...); err != nil {
message.WarnErr(err, "Unable to execute tar on this system. Please ensure it is installed and on your $PATH.")
return
return fmt.Errorf("unable to execute tar, ensure it is installed in the $PATH: %w", err)
}

iterator:
// The eternal loop because some data injections can take a very long time
for {
message.Debugf("Attempting to inject data into %s", data.Target)
source := filepath.Join(componentPath.DataInjections, filepath.Base(data.Target.Path))
if helpers.InvalidPath(source) {
// The path is likely invalid because of how we compose OCI components, add an index suffix to the filename
source = filepath.Join(componentPath.DataInjections, strconv.Itoa(dataIdx), filepath.Base(data.Target.Path))
select {
case <-ctx.Done():
return ctx.Err()
default:
message.Debugf("Attempting to inject data into %s", data.Target)
source := filepath.Join(componentPath.DataInjections, filepath.Base(data.Target.Path))
if helpers.InvalidPath(source) {
message.Warnf("Unable to find the data injection source path %s", source)
return
// The path is likely invalid because of how we compose OCI components, add an index suffix to the filename
source = filepath.Join(componentPath.DataInjections, strconv.Itoa(dataIdx), filepath.Base(data.Target.Path))
if helpers.InvalidPath(source) {
return fmt.Errorf("could not find the data injection source path %s", source)
}
}
}

target := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
Container: data.Target.Container,
}

// Wait until the pod we are injecting data into becomes available
pods := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer)
if len(pods) < 1 {
continue
}
target := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
Container: data.Target.Container,
}

// Inject into all the pods
for _, pod := range pods {
// Try to use the embedded kubectl if we can
zarfCommand, err := utils.GetFinalExecutableCommand()
kubectlBinPath := "kubectl"
// Wait until the pod we are injecting data into becomes available
pods, err := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer)
if err != nil {
message.Warnf("Unable to get the zarf executable path, falling back to host kubectl: %s", err)
} else {
kubectlBinPath = fmt.Sprintf("%s tools kubectl", zarfCommand)
return err
}
if len(pods) < 1 {
continue
}
kubectlCmd := fmt.Sprintf("%s exec -i -n %s %s -c %s ", kubectlBinPath, data.Target.Namespace, pod.Name, data.Target.Container)

// Note that each command flag is separated to provide the widest cross-platform tar support
tarCmd := fmt.Sprintf("tar -c %s -f -", tarCompressFlag)
untarCmd := fmt.Sprintf("tar -x %s -v -f - -C %s", tarCompressFlag, data.Target.Path)
// Inject into all the pods
for _, pod := range pods {
// Try to use the embedded kubectl if we can
zarfCommand, err := utils.GetFinalExecutableCommand()
kubectlBinPath := "kubectl"
if err != nil {
message.Warnf("Unable to get the zarf executable path, falling back to host kubectl: %s", err)
} else {
kubectlBinPath = fmt.Sprintf("%s tools kubectl", zarfCommand)
}
kubectlCmd := fmt.Sprintf("%s exec -i -n %s %s -c %s ", kubectlBinPath, data.Target.Namespace, pod.Name, data.Target.Container)

// Must create the target directory before trying to change to it for untar
mkdirCmd := fmt.Sprintf("%s -- mkdir -p %s", kubectlCmd, data.Target.Path)
if err := exec.CmdWithPrint(shell, append(shellArgs, mkdirCmd)...); err != nil {
message.Warnf("Unable to create the data injection target directory %s in pod %s", data.Target.Path, pod.Name)
continue iterator
}
// Note that each command flag is separated to provide the widest cross-platform tar support
tarCmd := fmt.Sprintf("tar -c %s -f -", tarCompressFlag)
untarCmd := fmt.Sprintf("tar -x %s -v -f - -C %s", tarCompressFlag, data.Target.Path)

cpPodCmd := fmt.Sprintf("%s -C %s . | %s -- %s",
tarCmd,
source,
kubectlCmd,
untarCmd,
)

// Do the actual data injection
if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
message.Warnf("Error copying data into the pod %#v: %#v\n", pod.Name, err)
continue iterator
}
// Must create the target directory before trying to change to it for untar
mkdirCmd := fmt.Sprintf("%s -- mkdir -p %s", kubectlCmd, data.Target.Path)
if err := exec.CmdWithPrint(shell, append(shellArgs, mkdirCmd)...); err != nil {
return fmt.Errorf("unable to create the data injection target directory %s in pod %s: %w", data.Target.Path, pod.Name, err)
}

cpPodCmd := fmt.Sprintf("%s -C %s . | %s -- %s",
tarCmd,
source,
kubectlCmd,
untarCmd,
)

// Leave a marker in the target container for pods to track the sync action
cpPodCmd = fmt.Sprintf("%s -C %s %s | %s -- %s",
tarCmd,
componentPath.DataInjections,
config.GetDataInjectionMarker(),
kubectlCmd,
untarCmd,
)

if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
message.Warnf("Error saving the zarf sync completion file after injection into pod %#v\n", pod.Name)
continue iterator
// Do the actual data injection
if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
return fmt.Errorf("could not copy data into the pod %s: %w", pod.Name, err)
}

// Leave a marker in the target container for pods to track the sync action
cpPodCmd = fmt.Sprintf("%s -C %s %s | %s -- %s",
tarCmd,
componentPath.DataInjections,
config.GetDataInjectionMarker(),
kubectlCmd,
untarCmd,
)

if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
return fmt.Errorf("could not save the Zarf sync completion file after injection into pod %s: %w", pod.Name, err)
}
}
}

// Do not look for a specific container after injection in case they are running an init container
podOnlyTarget := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
}
// Do not look for a specific container after injection in case they are running an init container
podOnlyTarget := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
}

// Block one final time to make sure at least one pod has come up and injected the data
// Using only the pod as the final selector because we don't know what the container name will be
// Still using the init container filter to make sure we have the right running pod
_ = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer)
// Block one final time to make sure at least one pod has come up and injected the data
// Using only the pod as the final selector because we don't know what the container name will be
// Still using the init container filter to make sure we have the right running pod
_, err = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer)
if err != nil {
return err
}

// Cleanup now to reduce disk pressure
_ = os.RemoveAll(source)
// Cleanup now to reduce disk pressure
err = os.RemoveAll(source)
if err != nil {
return err
}

// Return to stop the loop
return
// Return to stop the loop
return nil
}
}
}

Expand All @@ -173,7 +177,7 @@ type podFilter func(pod corev1.Pod) bool
// It will wait up to 90 seconds for the pods to be found and will return a list of matching pod names
// If the timeout is reached, an empty list will be returned.
// TODO: Test, refactor and/or remove.
func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) []corev1.Pod {
func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) ([]corev1.Pod, error) {
waitCtx, cancel := context.WithTimeout(ctx, 90*time.Second)
defer cancel()

Expand All @@ -183,16 +187,14 @@ func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interfac
for {
select {
case <-waitCtx.Done():
message.Debug("Pod lookup failed: %v", ctx.Err())
return nil
return nil, ctx.Err()
case <-timer.C:
listOpts := metav1.ListOptions{
LabelSelector: target.Selector,
}
podList, err := clientset.CoreV1().Pods(target.Namespace).List(ctx, listOpts)
if err != nil {
message.Debug("Unable to find matching pods: %w", err)
return nil
return nil, err
}

message.Debug("Found %d pods for target %#v", len(podList.Items), target)
Expand Down Expand Up @@ -245,7 +247,7 @@ func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interfac
}
}
if len(readyPods) > 0 {
return readyPods
return readyPods, nil
}
timer.Reset(3 * time.Second)
}
Expand Down
21 changes: 11 additions & 10 deletions src/pkg/packager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/sync/errgroup"

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -293,7 +294,6 @@ func (p *Packager) deployComponent(ctx context.Context, component types.ZarfComp
hasCharts := len(component.Charts) > 0
hasManifests := len(component.Manifests) > 0
hasRepos := len(component.Repos) > 0
hasDataInjections := len(component.DataInjections) > 0
hasFiles := len(component.Files) > 0

onDeploy := component.Actions.OnDeploy
Expand Down Expand Up @@ -344,14 +344,11 @@ func (p *Packager) deployComponent(ctx context.Context, component types.ZarfComp
}
}

if hasDataInjections {
waitGroup := sync.WaitGroup{}
defer waitGroup.Wait()

for idx, data := range component.DataInjections {
waitGroup.Add(1)
go p.cluster.HandleDataInjection(ctx, &waitGroup, data, componentPath, idx)
}
g, gCtx := errgroup.WithContext(ctx)
for idx, data := range component.DataInjections {
g.Go(func() error {
return p.cluster.HandleDataInjection(gCtx, data, componentPath, idx)
})
}

if hasCharts || hasManifests {
Expand All @@ -364,6 +361,10 @@ func (p *Packager) deployComponent(ctx context.Context, component types.ZarfComp
return charts, fmt.Errorf("unable to run component after action: %w", err)
}

err = g.Wait()
if err != nil {
return nil, err
}
return charts, nil
}

Expand Down

0 comments on commit 1c3e426

Please sign in to comment.