Skip to content

Commit

Permalink
refactor: remove unnecessary retry logic from data injection (#2867)
Browse files Browse the repository at this point in the history
Signed-off-by: Philip Laine <[email protected]>
  • Loading branch information
phillebaba authored Aug 14, 2024
1 parent a6502e4 commit e2e2600
Showing 1 changed file with 85 additions and 96 deletions.
181 changes: 85 additions & 96 deletions src/pkg/cluster/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,109 +60,100 @@ func (c *Cluster) HandleDataInjection(ctx context.Context, data v1alpha1.ZarfDat
return fmt.Errorf("unable to execute tar, ensure it is installed in the $PATH: %w", err)
}

// TODO: Refactor to use retry.
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
message.Debugf("Attempting to inject data into %s", data.Target)
source := filepath.Join(componentPath.DataInjections, filepath.Base(data.Target.Path))
if helpers.InvalidPath(source) {
// The path is likely invalid because of how we compose OCI components, add an index suffix to the filename
source = filepath.Join(componentPath.DataInjections, strconv.Itoa(dataIdx), filepath.Base(data.Target.Path))
if helpers.InvalidPath(source) {
return fmt.Errorf("could not find the data injection source path %s", source)
}
}

target := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
Container: data.Target.Container,
}

// Wait until the pod we are injecting data into becomes available
pods, err := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer)
if err != nil {
return err
}
if len(pods) < 1 {
continue
}
message.Debugf("Attempting to inject data into %s", data.Target)

source := filepath.Join(componentPath.DataInjections, filepath.Base(data.Target.Path))
if helpers.InvalidPath(source) {
// The path is likely invalid because of how we compose OCI components, add an index suffix to the filename
source = filepath.Join(componentPath.DataInjections, strconv.Itoa(dataIdx), filepath.Base(data.Target.Path))
if helpers.InvalidPath(source) {
return fmt.Errorf("could not find the data injection source path %s", source)
}
}

// Inject into all the pods
for _, pod := range pods {
// Try to use the embedded kubectl if we can
zarfCommand, err := utils.GetFinalExecutableCommand()
kubectlBinPath := "kubectl"
if err != nil {
message.Warnf("Unable to get the zarf executable path, falling back to host kubectl: %s", err)
} else {
kubectlBinPath = fmt.Sprintf("%s tools kubectl", zarfCommand)
}
kubectlCmd := fmt.Sprintf("%s exec -i -n %s %s -c %s ", kubectlBinPath, data.Target.Namespace, pod.Name, data.Target.Container)
// Wait until the pod we are injecting data into becomes available
target := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
Container: data.Target.Container,
}
waitCtx, waitCancel := context.WithTimeout(ctx, 90*time.Second)
defer waitCancel()
pods, err := waitForPodsAndContainers(waitCtx, c.Clientset, target, podFilterByInitContainer)
if err != nil {
return err
}

// Note that each command flag is separated to provide the widest cross-platform tar support
tarCmd := fmt.Sprintf("tar -c %s -f -", tarCompressFlag)
untarCmd := fmt.Sprintf("tar -x %s -v -f - -C %s", tarCompressFlag, data.Target.Path)
// Inject into all the pods
for _, pod := range pods {
// Try to use the embedded kubectl if we can
zarfCommand, err := utils.GetFinalExecutableCommand()
kubectlBinPath := "kubectl"
if err != nil {
message.Warnf("Unable to get the zarf executable path, falling back to host kubectl: %s", err)
} else {
kubectlBinPath = fmt.Sprintf("%s tools kubectl", zarfCommand)
}
kubectlCmd := fmt.Sprintf("%s exec -i -n %s %s -c %s ", kubectlBinPath, data.Target.Namespace, pod.Name, data.Target.Container)

// Must create the target directory before trying to change to it for untar
mkdirCmd := fmt.Sprintf("%s -- mkdir -p %s", kubectlCmd, data.Target.Path)
if err := exec.CmdWithPrint(shell, append(shellArgs, mkdirCmd)...); err != nil {
return fmt.Errorf("unable to create the data injection target directory %s in pod %s: %w", data.Target.Path, pod.Name, err)
}
// Note that each command flag is separated to provide the widest cross-platform tar support
tarCmd := fmt.Sprintf("tar -c %s -f -", tarCompressFlag)
untarCmd := fmt.Sprintf("tar -x %s -v -f - -C %s", tarCompressFlag, data.Target.Path)

cpPodCmd := fmt.Sprintf("%s -C %s . | %s -- %s",
tarCmd,
source,
kubectlCmd,
untarCmd,
)
// Must create the target directory before trying to change to it for untar
mkdirCmd := fmt.Sprintf("%s -- mkdir -p %s", kubectlCmd, data.Target.Path)
if err := exec.CmdWithPrint(shell, append(shellArgs, mkdirCmd)...); err != nil {
return fmt.Errorf("unable to create the data injection target directory %s in pod %s: %w", data.Target.Path, pod.Name, err)
}

// Do the actual data injection
if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
return fmt.Errorf("could not copy data into the pod %s: %w", pod.Name, err)
}
cpPodCmd := fmt.Sprintf("%s -C %s . | %s -- %s",
tarCmd,
source,
kubectlCmd,
untarCmd,
)

// Leave a marker in the target container for pods to track the sync action
cpPodCmd = fmt.Sprintf("%s -C %s %s | %s -- %s",
tarCmd,
componentPath.DataInjections,
config.GetDataInjectionMarker(),
kubectlCmd,
untarCmd,
)

if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
return fmt.Errorf("could not save the Zarf sync completion file after injection into pod %s: %w", pod.Name, err)
}
}
// Do the actual data injection
if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
return fmt.Errorf("could not copy data into the pod %s: %w", pod.Name, err)
}

// Do not look for a specific container after injection in case they are running an init container
podOnlyTarget := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
}
// Leave a marker in the target container for pods to track the sync action
cpPodCmd = fmt.Sprintf("%s -C %s %s | %s -- %s",
tarCmd,
componentPath.DataInjections,
config.GetDataInjectionMarker(),
kubectlCmd,
untarCmd,
)

if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
return fmt.Errorf("could not save the Zarf sync completion file after injection into pod %s: %w", pod.Name, err)
}
}

// Block one final time to make sure at least one pod has come up and injected the data
// Using only the pod as the final selector because we don't know what the container name will be
// Still using the init container filter to make sure we have the right running pod
_, err = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer)
if err != nil {
return err
}
// Do not look for a specific container after injection in case they are running an init container
podOnlyTarget := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
}

// Cleanup now to reduce disk pressure
err = os.RemoveAll(source)
if err != nil {
return err
}
// Block one final time to make sure at least one pod has come up and injected the data
// Using only the pod as the final selector because we don't know what the container name will be
// Still using the init container filter to make sure we have the right running pod
_, err = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer)
if err != nil {
return err
}

// Return to stop the loop
return nil
}
// Cleanup now to reduce disk pressure
err = os.RemoveAll(source)
if err != nil {
return err
}

// Return to stop the loop
return nil
}

// podLookup is a struct for specifying a pod to target for data injection or lookups.
Expand All @@ -180,13 +171,11 @@ type podFilter func(pod corev1.Pod) bool
// If the timeout is reached, an empty list will be returned.
// TODO: Test, refactor and/or remove.
func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) ([]corev1.Pod, error) {
waitCtx, cancel := context.WithTimeout(ctx, 90*time.Second)
defer cancel()
readyPods, err := retry.DoWithData(func() ([]corev1.Pod, error) {
listOpts := metav1.ListOptions{
LabelSelector: target.Selector,
}
podList, err := clientset.CoreV1().Pods(target.Namespace).List(waitCtx, listOpts)
podList, err := clientset.CoreV1().Pods(target.Namespace).List(ctx, listOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -241,7 +230,7 @@ func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interfac
return nil, fmt.Errorf("no ready pods found")
}
return readyPods, nil
}, retry.Context(waitCtx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second))
}, retry.Context(ctx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit e2e2600

Please sign in to comment.