Skip to content

Commit

Permalink
fic(deps): replace finite retry.Do with k8s.io/client-go's retry.OnError
Browse files Browse the repository at this point in the history
Trying to limit the potential dependencies. We already rely on
k8s.io/client-go which retry logic has very similar interfac.

Signed-off-by: Maciej Szulik <[email protected]>
  • Loading branch information
soltysh committed Sep 23, 2024
1 parent 669777a commit 690c257
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 235 deletions.
72 changes: 42 additions & 30 deletions src/internal/packager/helm/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/Masterminds/semver/v3"
"github.com/avast/retry-go/v4"
plutoversionsfile "github.com/fairwindsops/pluto/v5"
plutoapi "github.com/fairwindsops/pluto/v5/pkg/api"
goyaml "github.com/goccy/go-yaml"
Expand All @@ -22,6 +21,8 @@ import (
"helm.sh/helm/v3/pkg/releaseutil"
"helm.sh/helm/v3/pkg/storage/driver"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/yaml"

"github.com/zarf-dev/zarf/src/config"
Expand Down Expand Up @@ -59,37 +60,48 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings,

histClient := action.NewHistory(h.actionConfig)

err = retry.Do(func() error {
var err error

releases, histErr := histClient.Run(h.chart.ReleaseName)

spinner.Updatef("Checking for existing helm deployment")

if errors.Is(histErr, driver.ErrReleaseNotFound) {
// No prior release, try to install it.
spinner.Updatef("Attempting chart installation")

_, err = h.installChart(postRender)
} else if histErr == nil && len(releases) > 0 {
// Otherwise, there is a prior release so upgrade it.
spinner.Updatef("Attempting chart upgrade")

lastRelease := releases[len(releases)-1]

_, err = h.upgradeChart(lastRelease, postRender)
} else {
// 😭 things aren't working
return fmt.Errorf("unable to verify the chart installation status: %w", histErr)
}
err = retry.OnError(
// backoff configuration with h.retries and InitialDuration of 500ms
wait.Backoff{
Duration: 500 * time.Millisecond,
Jitter: 1,
Factor: 2,
Steps: h.retries,
},
// always retry
func(err error) bool { return true },
// the actual action
func() error {
var err error

releases, histErr := histClient.Run(h.chart.ReleaseName)

spinner.Updatef("Checking for existing helm deployment")

if errors.Is(histErr, driver.ErrReleaseNotFound) {
// No prior release, try to install it.
spinner.Updatef("Attempting chart installation")

_, err = h.installChart(postRender)
} else if histErr == nil && len(releases) > 0 {
// Otherwise, there is a prior release so upgrade it.
spinner.Updatef("Attempting chart upgrade")

lastRelease := releases[len(releases)-1]

_, err = h.upgradeChart(lastRelease, postRender)
} else {
// 😭 things aren't working
return fmt.Errorf("unable to verify the chart installation status: %w", histErr)
}

if err != nil {
return err
}
if err != nil {
return err
}

spinner.Success()
return nil
}, retry.Context(ctx), retry.Attempts(uint(h.retries)), retry.Delay(500*time.Millisecond))
spinner.Success()
return nil
})
if err != nil {
removeMsg := "if you need to remove the failed chart, use `zarf package remove`"
installErr := fmt.Errorf("unable to install chart after %d attempts: %w: %s", h.retries, err, removeMsg)
Expand Down
50 changes: 37 additions & 13 deletions src/internal/packager/images/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/avast/retry-go/v4"
"github.com/defenseunicorns/pkg/helpers/v2"
"github.com/google/go-containerregistry/pkg/crane"
"github.com/google/go-containerregistry/pkg/logs"
Expand All @@ -38,6 +38,8 @@ import (
"github.com/zarf-dev/zarf/src/pkg/transform"
"github.com/zarf-dev/zarf/src/pkg/utils"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
)

func checkForIndex(refInfo transform.Image, desc *remote.Descriptor) error {
Expand Down Expand Up @@ -229,22 +231,44 @@ func Pull(ctx context.Context, cfg PullConfig) (map[transform.Image]v1.Image, er

toPull := maps.Clone(fetched)

err = retry.Do(func() error {
saved, err := SaveConcurrent(ctx, cranePath, toPull)
for k := range saved {
delete(toPull, k)
}
return err
}, retry.Context(ctx), retry.Attempts(2))
if err != nil {
message.Warnf("Failed to save images in parallel, falling back to sequential save: %s", err.Error())
err = retry.Do(func() error {
saved, err := SaveSequential(ctx, cranePath, toPull)
err = retry.OnError(
// backoff configuration with 2 retries and InitialDuration of 100ms
wait.Backoff{
Duration: 100 * time.Millisecond,
Jitter: 1,
Factor: 2,
Steps: 2,
},
// always retry
func(err error) bool { return true },
// the actual action
func() error {
saved, err := SaveConcurrent(ctx, cranePath, toPull)
for k := range saved {
delete(toPull, k)
}
return err
}, retry.Context(ctx), retry.Attempts(2))
})
if err != nil {
message.Warnf("Failed to save images in parallel, falling back to sequential save: %s", err.Error())
err = retry.OnError(
// backoff configuration with 2 retries and InitialDuration of 100ms
wait.Backoff{
Duration: 100 * time.Millisecond,
Jitter: 1,
Factor: 2,
Steps: 2,
},
// always retry
func(err error) bool { return true },
// the actual action
func() error {
saved, err := SaveSequential(ctx, cranePath, toPull)
for k := range saved {
delete(toPull, k)
}
return err
})
if err != nil {
return nil, err
}
Expand Down
124 changes: 67 additions & 57 deletions src/internal/packager/images/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
"fmt"
"time"

"github.com/avast/retry-go/v4"
"github.com/defenseunicorns/pkg/helpers/v2"
"github.com/google/go-containerregistry/pkg/crane"
"github.com/google/go-containerregistry/pkg/logs"
v1 "github.com/google/go-containerregistry/pkg/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

"github.com/zarf-dev/zarf/src/pkg/cluster"
"github.com/zarf-dev/zarf/src/pkg/message"
Expand Down Expand Up @@ -53,78 +54,87 @@ func Push(ctx context.Context, cfg PushConfig) error {
registryURL = cfg.RegInfo.Address
)

err = retry.Do(func() error {
c, _ := cluster.NewCluster()
if c != nil {
registryURL, tunnel, err = c.ConnectToZarfRegistryEndpoint(ctx, cfg.RegInfo)
if err != nil {
return err
}
if tunnel != nil {
defer tunnel.Close()
err = retry.OnError(
// constant backoff configuration with cfg.Retries and InitialDuration of 500ms
wait.Backoff{
Duration: 500 * time.Millisecond,
Steps: cfg.Retries,
},
// always retry
func(err error) bool { return true },
// the actual action
func() error {
c, _ := cluster.NewCluster()
if c != nil {
registryURL, tunnel, err = c.ConnectToZarfRegistryEndpoint(ctx, cfg.RegInfo)
if err != nil {
return err
}
if tunnel != nil {
defer tunnel.Close()
}
}
}

progress := message.NewProgressBar(totalSize, fmt.Sprintf("Pushing %d images", len(toPush)))
defer progress.Close()
pushOptions := createPushOpts(cfg, progress)

pushImage := func(img v1.Image, name string) error {
if tunnel != nil {
return tunnel.Wrap(func() error { return crane.Push(img, name, pushOptions...) })
}
progress := message.NewProgressBar(totalSize, fmt.Sprintf("Pushing %d images", len(toPush)))
defer progress.Close()
pushOptions := createPushOpts(cfg, progress)

return crane.Push(img, name, pushOptions...)
}
pushImage := func(img v1.Image, name string) error {
if tunnel != nil {
return tunnel.Wrap(func() error { return crane.Push(img, name, pushOptions...) })
}

pushed := []transform.Image{}
defer func() {
for _, refInfo := range pushed {
delete(toPush, refInfo)
}
}()
for refInfo, img := range toPush {
refTruncated := helpers.Truncate(refInfo.Reference, 55, true)
progress.Updatef(fmt.Sprintf("Pushing %s", refTruncated))

size, err := calcImgSize(img)
if err != nil {
return err
return crane.Push(img, name, pushOptions...)
}

// If this is not a no checksum image push it for use with the Zarf agent
if !cfg.NoChecksum {
offlineNameCRC, err := transform.ImageTransformHost(registryURL, refInfo.Reference)
pushed := []transform.Image{}
defer func() {
for _, refInfo := range pushed {
delete(toPush, refInfo)
}
}()
for refInfo, img := range toPush {
refTruncated := helpers.Truncate(refInfo.Reference, 55, true)
progress.Updatef(fmt.Sprintf("Pushing %s", refTruncated))

size, err := calcImgSize(img)
if err != nil {
return err
}

if err = pushImage(img, offlineNameCRC); err != nil {
return err
// If this is not a no checksum image push it for use with the Zarf agent
if !cfg.NoChecksum {
offlineNameCRC, err := transform.ImageTransformHost(registryURL, refInfo.Reference)
if err != nil {
return err
}

if err = pushImage(img, offlineNameCRC); err != nil {
return err
}

totalSize -= size
}

totalSize -= size
}
// To allow for other non-zarf workloads to easily see the images upload a non-checksum version
// (this may result in collisions but this is acceptable for this use case)
offlineName, err := transform.ImageTransformHostWithoutChecksum(registryURL, refInfo.Reference)
if err != nil {
return err
}

// To allow for other non-zarf workloads to easily see the images upload a non-checksum version
// (this may result in collisions but this is acceptable for this use case)
offlineName, err := transform.ImageTransformHostWithoutChecksum(registryURL, refInfo.Reference)
if err != nil {
return err
}
message.Debugf("push %s -> %s)", refInfo.Reference, offlineName)

message.Debugf("push %s -> %s)", refInfo.Reference, offlineName)
if err = pushImage(img, offlineName); err != nil {
return err
}

if err = pushImage(img, offlineName); err != nil {
return err
pushed = append(pushed, refInfo)
totalSize -= size
}

pushed = append(pushed, refInfo)
totalSize -= size
}
progress.Successf("Pushed %d images", len(cfg.ImageList))
return nil
}, retry.Context(ctx), retry.Attempts(uint(cfg.Retries)), retry.Delay(500*time.Millisecond))
progress.Successf("Pushed %d images", len(cfg.ImageList))
return nil
})
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 690c257

Please sign in to comment.