Skip to content

Commit

Permalink
fix(deps): replace infinite retry.Do with k8s.io/apimachinery wait.Po…
Browse files Browse the repository at this point in the history
…llUntil*

Trying to limit the potential dependencies. We already rely on
k8s/apimachinery which has very similar wait.PollUntil* functionality.

Signed-off-by: Maciej Szulik <[email protected]>
  • Loading branch information
soltysh committed Sep 23, 2024
1 parent 690c257 commit 83b6c59
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 43 deletions.
17 changes: 9 additions & 8 deletions src/pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"

"github.com/avast/retry-go/v4"
pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"

"github.com/zarf-dev/zarf/src/pkg/message"
Expand Down Expand Up @@ -45,25 +45,26 @@ func NewClusterWithWait(ctx context.Context) (*Cluster, error) {
if err != nil {
return nil, err
}
err = retry.Do(func() error {
// returning false, or an error continues polling, true stops it
err = wait.PollUntilContextCancel(ctx, time.Second, false, func(context.Context) (bool, error) {
nodeList, err := c.Clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return err
return false, err
}
if len(nodeList.Items) < 1 {
return fmt.Errorf("cluster does not have any nodes")
return false, fmt.Errorf("cluster does not have any nodes")
}
pods, err := c.Clientset.CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return err
return false, err
}
for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodRunning {
return nil
return true, nil
}
}
return fmt.Errorf("no pods are in succeeded or running state")
}, retry.Context(ctx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second))
return false, fmt.Errorf("no pods are in succeeded or running state")
})
if err != nil {
return nil, err
}
Expand Down
16 changes: 9 additions & 7 deletions src/pkg/cluster/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"

"github.com/avast/retry-go/v4"
"github.com/defenseunicorns/pkg/helpers/v2"

"github.com/zarf-dev/zarf/src/api/v1alpha1"
Expand Down Expand Up @@ -171,21 +172,22 @@ type podFilter func(pod corev1.Pod) bool
// If the timeout is reached, an empty list will be returned.
// TODO: Test, refactor and/or remove.
func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) ([]corev1.Pod, error) {
readyPods, err := retry.DoWithData(func() ([]corev1.Pod, error) {
var readyPods []corev1.Pod
err := wait.PollUntilContextCancel(ctx, time.Second, false, func(context.Context) (bool, error) {
listOpts := metav1.ListOptions{
LabelSelector: target.Selector,
}
podList, err := clientset.CoreV1().Pods(target.Namespace).List(ctx, listOpts)
if err != nil {
return nil, err
return false, err
}
message.Debugf("Found %d pods for target %#v", len(podList.Items), target)
// Sort the pods from newest to oldest
sort.Slice(podList.Items, func(i, j int) bool {
return podList.Items[i].CreationTimestamp.After(podList.Items[j].CreationTimestamp.Time)
})

readyPods := []corev1.Pod{}
readyPods = []corev1.Pod{}
for _, pod := range podList.Items {
message.Debugf("Testing pod %q", pod.Name)

Expand Down Expand Up @@ -227,10 +229,10 @@ func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interfac
}
}
if len(readyPods) == 0 {
return nil, fmt.Errorf("no ready pods found")
return false, fmt.Errorf("no ready pods found")
}
return readyPods, nil
}, retry.Context(ctx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second))
return true, nil
})
if err != nil {
return nil, err
}
Expand Down
19 changes: 10 additions & 9 deletions src/pkg/cluster/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"fmt"
"time"

"github.com/avast/retry-go/v4"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/zarf-dev/zarf/src/pkg/message"
)
Expand All @@ -29,16 +29,17 @@ func (c *Cluster) DeleteZarfNamespace(ctx context.Context) error {
if err != nil {
return err
}
err = retry.Do(func() error {
_, err := c.Clientset.CoreV1().Namespaces().Get(ctx, ZarfNamespaceName, metav1.GetOptions{})
if kerrors.IsNotFound(err) {
return nil
// returning false, or an error continues polling, true stops it
err = wait.PollUntilContextCancel(ctx, time.Second, false, func(context.Context) (bool, error) {
_, ierr := c.Clientset.CoreV1().Namespaces().Get(ctx, ZarfNamespaceName, metav1.GetOptions{})
if kerrors.IsNotFound(ierr) {
return true, nil
}
if err != nil {
return err
if ierr != nil {
return false, ierr
}
return fmt.Errorf("namespace still exists")
}, retry.Context(ctx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second))
return false, fmt.Errorf("namespace still exists")
})
if err != nil {
return err
}
Expand Down
16 changes: 7 additions & 9 deletions src/pkg/cluster/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/avast/retry-go/v4"
"github.com/defenseunicorns/pkg/helpers/v2"
"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/config/lang"
Expand Down Expand Up @@ -127,15 +127,13 @@ func (c *Cluster) InitZarfState(ctx context.Context, initOptions types.ZarfInitO
// Wait up to 2 minutes for the default service account to be created.
// Some clusters seem to take a while to create this, see https://github.com/kubernetes/kubernetes/issues/66689.
// The default SA is required for pods to start properly.
saCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
err = retry.Do(func() error {
_, err := c.Clientset.CoreV1().ServiceAccounts(ZarfNamespaceName).Get(saCtx, "default", metav1.GetOptions{})
if err != nil {
return err
err = wait.PollUntilContextTimeout(ctx, time.Second, 2*time.Minute, false, func(saCtx context.Context) (bool, error) {
_, ierr := c.Clientset.CoreV1().ServiceAccounts(ZarfNamespaceName).Get(saCtx, "default", metav1.GetOptions{})
if ierr != nil {
return false, ierr
}
return nil
}, retry.Context(saCtx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second))
return true, nil
})
if err != nil {
return fmt.Errorf("unable get default Zarf service account: %w", err)
}
Expand Down
20 changes: 10 additions & 10 deletions src/pkg/cluster/zarf.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

"github.com/avast/retry-go/v4"
"github.com/zarf-dev/zarf/src/api/v1alpha1"
"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/internal/gitea"
Expand Down Expand Up @@ -157,19 +158,18 @@ func (c *Cluster) RecordPackageDeploymentAndWait(ctx context.Context, pkg v1alph
if waitSeconds > 0 {
waitDuration = time.Duration(waitSeconds) * time.Second
}
waitCtx, cancel := context.WithTimeout(ctx, waitDuration)
defer cancel()
deployedPackage, err = retry.DoWithData(func() (*types.DeployedPackage, error) {
deployedPackage, err = c.GetDeployedPackage(waitCtx, deployedPackage.Name)
if err != nil {
return nil, err
err = wait.PollUntilContextTimeout(ctx, time.Second, waitDuration, false, func(waitCtx context.Context) (bool, error) {
pkg, ierr := c.GetDeployedPackage(waitCtx, deployedPackage.Name)
if ierr != nil {
return false, ierr
}
deployedPackage = pkg
packageNeedsWait, _, _ = c.PackageSecretNeedsWait(deployedPackage, component, skipWebhooks)
if !packageNeedsWait {
return deployedPackage, nil
return true, nil
}
return deployedPackage, nil
}, retry.Context(waitCtx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second))
return true, nil
})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 83b6c59

Please sign in to comment.