diff --git a/src/cmd/connect.go b/src/cmd/connect.go index 131faa0f7c..73fab7665a 100644 --- a/src/cmd/connect.go +++ b/src/cmd/connect.go @@ -74,9 +74,13 @@ var ( signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) exec.SuppressGlobalInterrupt = true - // Wait for the interrupt signal. - <-interruptChan - spinner.Successf(lang.CmdConnectTunnelClosed, url) + // Wait for the interrupt signal or an error. + select { + case err = <-tunnel.ErrChan(): + spinner.Fatalf(err, lang.CmdConnectErrService, err.Error()) + case <-interruptChan: + spinner.Successf(lang.CmdConnectTunnelClosed, url) + } os.Exit(0) }, } diff --git a/src/cmd/tools/crane.go b/src/cmd/tools/crane.go index fb7274eeff..80030fd361 100644 --- a/src/cmd/tools/crane.go +++ b/src/cmd/tools/crane.go @@ -16,6 +16,7 @@ import ( "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/transform" "github.com/defenseunicorns/zarf/src/pkg/utils/exec" + "github.com/defenseunicorns/zarf/src/types" craneCmd "github.com/google/go-containerregistry/cmd/crane/cmd" "github.com/google/go-containerregistry/pkg/crane" "github.com/google/go-containerregistry/pkg/logs" @@ -132,15 +133,16 @@ func zarfCraneCatalog(cranePlatformOptions *[]crane.Option) *cobra.Command { return err } + // Add the correct authentication to the crane command options + authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PullUsername, zarfState.RegistryInfo.PullPassword) + *cranePlatformOptions = append(*cranePlatformOptions, authOption) + if tunnel != nil { message.Notef(lang.CmdToolsRegistryTunnel, registryEndpoint, zarfState.RegistryInfo.Address) defer tunnel.Close() + return tunnel.Wrap(func() error { return originalCatalogFn(cmd, []string{registryEndpoint}) }) } - // Add the correct authentication to the crane command options - authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PullUsername, zarfState.RegistryInfo.PullPassword) - *cranePlatformOptions = append(*cranePlatformOptions, authOption) - return originalCatalogFn(cmd, []string{registryEndpoint}) } @@ -186,6 +188,10 @@ func zarfCraneInternalWrapper(commandToWrap func(*[]crane.Option) *cobra.Command return err } + // Add the correct authentication to the crane command options + authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PushUsername, zarfState.RegistryInfo.PushPassword) + *cranePlatformOptions = append(*cranePlatformOptions, authOption) + if tunnel != nil { message.Notef(lang.CmdToolsRegistryTunnel, tunnel.Endpoint(), zarfState.RegistryInfo.Address) @@ -194,12 +200,9 @@ func zarfCraneInternalWrapper(commandToWrap func(*[]crane.Option) *cobra.Command givenAddress := fmt.Sprintf("%s/", zarfState.RegistryInfo.Address) tunnelAddress := fmt.Sprintf("%s/", tunnel.Endpoint()) args[imageNameArgumentIndex] = strings.Replace(args[imageNameArgumentIndex], givenAddress, tunnelAddress, 1) + return tunnel.Wrap(func() error { return originalListFn(cmd, args) }) } - // Add the correct authentication to the crane command options - authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PushUsername, zarfState.RegistryInfo.PushPassword) - *cranePlatformOptions = append(*cranePlatformOptions, authOption) - return originalListFn(cmd, args) } @@ -234,8 +237,13 @@ func pruneImages(_ *cobra.Command, _ []string) error { if tunnel != nil { message.Notef(lang.CmdToolsRegistryTunnel, registryEndpoint, zarfState.RegistryInfo.Address) defer tunnel.Close() + return tunnel.Wrap(func() error { return doPruneImagesForPackages(zarfState, zarfPackages, registryEndpoint) }) } + return doPruneImagesForPackages(zarfState, zarfPackages, registryEndpoint) +} + +func doPruneImagesForPackages(zarfState *types.ZarfState, zarfPackages []types.DeployedPackage, registryEndpoint string) error { authOption := config.GetCraneAuthOption(zarfState.RegistryInfo.PushUsername, zarfState.RegistryInfo.PushPassword) // Determine which image digests are currently used by Zarf packages diff --git a/src/internal/packager/git/gitea.go b/src/internal/packager/git/gitea.go index 4aa1cf7b78..1eede33a7d 100644 --- a/src/internal/packager/git/gitea.go +++ b/src/internal/packager/git/gitea.go @@ -49,10 +49,15 @@ func (g *Git) CreateReadOnlyUser() error { tunnelURL := tunnel.HTTPEndpoint() + var out []byte + // Determine if the read only user already exists getUserEndpoint := fmt.Sprintf("%s/api/v1/admin/users", tunnelURL) getUserRequest, _ := netHttp.NewRequest("GET", getUserEndpoint, nil) - out, err := g.DoHTTPThings(getUserRequest, g.Server.PushUsername, g.Server.PushPassword) + err = tunnel.Wrap(func() error { + out, err = g.DoHTTPThings(getUserRequest, g.Server.PushUsername, g.Server.PushPassword) + return err + }) message.Debugf("GET %s:\n%s", getUserEndpoint, string(out)) if err != nil { return err @@ -80,7 +85,10 @@ func (g *Git) CreateReadOnlyUser() error { updateUserData, _ := json.Marshal(updateUserBody) updateUserEndpoint := fmt.Sprintf("%s/api/v1/admin/users/%s", tunnelURL, g.Server.PullUsername) updateUserRequest, _ := netHttp.NewRequest("PATCH", updateUserEndpoint, bytes.NewBuffer(updateUserData)) - out, err = g.DoHTTPThings(updateUserRequest, g.Server.PushUsername, g.Server.PushPassword) + err = tunnel.Wrap(func() error { + out, err = g.DoHTTPThings(updateUserRequest, g.Server.PushUsername, g.Server.PushPassword) + return err + }) message.Debugf("PATCH %s:\n%s", updateUserEndpoint, string(out)) return err } @@ -100,7 +108,10 @@ func (g *Git) CreateReadOnlyUser() error { // Send API request to create the user createUserEndpoint := fmt.Sprintf("%s/api/v1/admin/users", tunnelURL) createUserRequest, _ := netHttp.NewRequest("POST", createUserEndpoint, bytes.NewBuffer(createUserData)) - out, err = g.DoHTTPThings(createUserRequest, g.Server.PushUsername, g.Server.PushPassword) + err = tunnel.Wrap(func() error { + out, err = g.DoHTTPThings(createUserRequest, g.Server.PushUsername, g.Server.PushPassword) + return err + }) message.Debugf("POST %s:\n%s", createUserEndpoint, string(out)) if err != nil { return err @@ -115,7 +126,10 @@ func (g *Git) CreateReadOnlyUser() error { updateUserData, _ := json.Marshal(updateUserBody) updateUserEndpoint := fmt.Sprintf("%s/api/v1/admin/users/%s", tunnelURL, g.Server.PullUsername) updateUserRequest, _ := netHttp.NewRequest("PATCH", updateUserEndpoint, bytes.NewBuffer(updateUserData)) - out, err = g.DoHTTPThings(updateUserRequest, g.Server.PushUsername, g.Server.PushPassword) + err = tunnel.Wrap(func() error { + out, err = g.DoHTTPThings(updateUserRequest, g.Server.PushUsername, g.Server.PushPassword) + return err + }) message.Debugf("PATCH %s:\n%s", updateUserEndpoint, string(out)) return err } @@ -142,10 +156,15 @@ func (g *Git) CreatePackageRegistryToken() (CreateTokenResponse, error) { tunnelURL := tunnel.Endpoint() + var out []byte + // Determine if the package token already exists getTokensEndpoint := fmt.Sprintf("http://%s/api/v1/users/%s/tokens", tunnelURL, g.Server.PushUsername) getTokensRequest, _ := netHttp.NewRequest("GET", getTokensEndpoint, nil) - out, err := g.DoHTTPThings(getTokensRequest, g.Server.PushUsername, g.Server.PushPassword) + err = tunnel.Wrap(func() error { + out, err = g.DoHTTPThings(getTokensRequest, g.Server.PushUsername, g.Server.PushPassword) + return err + }) message.Debugf("GET %s:\n%s", getTokensEndpoint, string(out)) if err != nil { return CreateTokenResponse{}, err @@ -168,7 +187,10 @@ func (g *Git) CreatePackageRegistryToken() (CreateTokenResponse, error) { // Delete the existing token to be replaced deleteTokensEndpoint := fmt.Sprintf("http://%s/api/v1/users/%s/tokens/%s", tunnelURL, g.Server.PushUsername, config.ZarfArtifactTokenName) deleteTokensRequest, _ := netHttp.NewRequest("DELETE", deleteTokensEndpoint, nil) - out, err := g.DoHTTPThings(deleteTokensRequest, g.Server.PushUsername, g.Server.PushPassword) + err = tunnel.Wrap(func() error { + out, err = g.DoHTTPThings(deleteTokensRequest, g.Server.PushUsername, g.Server.PushPassword) + return err + }) message.Debugf("DELETE %s:\n%s", deleteTokensEndpoint, string(out)) if err != nil { return CreateTokenResponse{}, err @@ -181,7 +203,10 @@ func (g *Git) CreatePackageRegistryToken() (CreateTokenResponse, error) { } createTokensData, _ := json.Marshal(createTokensBody) createTokensRequest, _ := netHttp.NewRequest("POST", createTokensEndpoint, bytes.NewBuffer(createTokensData)) - out, err = g.DoHTTPThings(createTokensRequest, g.Server.PushUsername, g.Server.PushPassword) + err = tunnel.Wrap(func() error { + out, err = g.DoHTTPThings(createTokensRequest, g.Server.PushUsername, g.Server.PushPassword) + return err + }) message.Debugf("POST %s:\n%s", createTokensEndpoint, string(out)) if err != nil { return CreateTokenResponse{}, err diff --git a/src/internal/packager/images/push.go b/src/internal/packager/images/push.go index 57482c0dd8..bc0e3aa4a8 100644 --- a/src/internal/packager/images/push.go +++ b/src/internal/packager/images/push.go @@ -78,6 +78,14 @@ func (i *ImageConfig) PushToZarfRegistry() error { defer tunnel.Close() } + pushImage := func(img v1.Image, name string) error { + if tunnel != nil { + return tunnel.Wrap(func() error { return crane.Push(img, name, pushOptions...) }) + } + + return crane.Push(img, name, pushOptions...) + } + for refInfo, img := range refInfoToImage { refTruncated := message.Truncate(refInfo.Reference, 55, true) progressBar.UpdateTitle(fmt.Sprintf("Pushing %s", refTruncated)) @@ -91,7 +99,8 @@ func (i *ImageConfig) PushToZarfRegistry() error { message.Debugf("crane.Push() %s:%s -> %s)", i.ImagesPath, refInfo.Reference, offlineNameCRC) - if err = crane.Push(img, offlineNameCRC, pushOptions...); err != nil { + err = pushImage(img, offlineNameCRC) + if err != nil { return err } } @@ -105,7 +114,8 @@ func (i *ImageConfig) PushToZarfRegistry() error { message.Debugf("crane.Push() %s:%s -> %s)", i.ImagesPath, refInfo.Reference, offlineName) - if err = crane.Push(img, offlineName, pushOptions...); err != nil { + err = pushImage(img, offlineName) + if err != nil { return err } } diff --git a/src/pkg/cluster/injector.go b/src/pkg/cluster/injector.go index a472b18205..a740db617d 100644 --- a/src/pkg/cluster/injector.go +++ b/src/pkg/cluster/injector.go @@ -245,7 +245,15 @@ func (c *Cluster) injectorIsReady(seedImages []transform.Image, spinner *message for _, seedImage := range seedImages { seedRegistry := fmt.Sprintf("%s/v2/%s/manifests/%s", tunnel.HTTPEndpoint(), seedImage.Path, seedImage.Tag) - if resp, err := http.Get(seedRegistry); err != nil || resp.StatusCode != 200 { + + var resp *http.Response + var err error + err = tunnel.Wrap(func() error { + resp, err = http.Get(seedRegistry) + return err + }) + + if err != nil || resp.StatusCode != 200 { // Just debug log the output because failures just result in trying the next image message.Debug(resp, err) return false diff --git a/src/pkg/k8s/pods.go b/src/pkg/k8s/pods.go index 486ff075af..e4853034e0 100644 --- a/src/pkg/k8s/pods.go +++ b/src/pkg/k8s/pods.go @@ -109,63 +109,59 @@ func (k *K8s) WaitForPodsAndContainers(target PodLookup, include PodFilter) []co var readyPods = []corev1.Pod{} - // Reverse sort by creation time + // Sort the pods from newest to oldest sort.Slice(pods.Items, func(i, j int) bool { return pods.Items[i].CreationTimestamp.After(pods.Items[j].CreationTimestamp.Time) }) - if len(pods.Items) > 0 { - for _, pod := range pods.Items { - k.Log("Testing pod %q", pod.Name) + for _, pod := range pods.Items { + k.Log("Testing pod %q", pod.Name) - // If an include function is provided, only keep pods that return true - if include != nil && !include(pod) { - continue - } - - // Handle container targeting - if target.Container != "" { - k.Log("Testing pod %q for container %q", pod.Name, target.Container) - var matchesInitContainer bool - - // Check the status of initContainers for a running match - for _, initContainer := range pod.Status.InitContainerStatuses { - isRunning := initContainer.State.Running != nil - if isRunning && initContainer.Name == target.Container { - // On running match in initContainer break this loop - matchesInitContainer = true - readyPods = append(readyPods, pod) - break - } - } + // If an include function is provided, only keep pods that return true + if include != nil && !include(pod) { + continue + } - // Don't check any further if there's already a match - if matchesInitContainer { - continue + // Handle container targeting + if target.Container != "" { + k.Log("Testing pod %q for container %q", pod.Name, target.Container) + var matchesInitContainer bool + + // Check the status of initContainers for a running match + for _, initContainer := range pod.Status.InitContainerStatuses { + isRunning := initContainer.State.Running != nil + if isRunning && initContainer.Name == target.Container { + // On running match in initContainer break this loop + matchesInitContainer = true + readyPods = append(readyPods, pod) + break } + } - // Check the status of regular containers for a running match - for _, container := range pod.Status.ContainerStatuses { - isRunning := container.State.Running != nil - if isRunning && container.Name == target.Container { - readyPods = append(readyPods, pod) - } - } + // Don't check any further if there's already a match + if matchesInitContainer { + continue + } - } else { - status := pod.Status.Phase - k.Log("Testing pod %q phase, want (%q) got (%q)", pod.Name, corev1.PodRunning, status) - // Regular status checking without a container - if status == corev1.PodRunning { + // Check the status of regular containers for a running match + for _, container := range pod.Status.ContainerStatuses { + isRunning := container.State.Running != nil + if isRunning && container.Name == target.Container { readyPods = append(readyPods, pod) } } - + } else { + status := pod.Status.Phase + k.Log("Testing pod %q phase, want (%q) got (%q)", pod.Name, corev1.PodRunning, status) + // Regular status checking without a container + if status == corev1.PodRunning { + readyPods = append(readyPods, pod) + } } + } - if len(readyPods) > 0 { - return readyPods - } + if len(readyPods) > 0 { + return readyPods } time.Sleep(3 * time.Second) diff --git a/src/pkg/k8s/tunnel.go b/src/pkg/k8s/tunnel.go index 6ca835d351..116db22ab7 100644 --- a/src/pkg/k8s/tunnel.go +++ b/src/pkg/k8s/tunnel.go @@ -40,6 +40,7 @@ type Tunnel struct { attempt int stopChan chan struct{} readyChan chan struct{} + errChan chan error } // NewTunnel will create a new Tunnel struct. @@ -60,6 +61,23 @@ func (k *K8s) NewTunnel(namespace, resourceType, resourceName, urlSuffix string, }, nil } +// Wrap takes a function that returns an error and wraps it to check for tunnel errors as well. +func (tunnel *Tunnel) Wrap(function func() error) error { + var err error + funcErrChan := make(chan error) + + go func() { + funcErrChan <- function() + }() + + select { + case err = <-funcErrChan: + return err + case err = <-tunnel.ErrChan(): + return err + } +} + // Connect will establish a tunnel to the specified target. func (tunnel *Tunnel) Connect() (string, error) { url, err := tunnel.establish() @@ -90,6 +108,11 @@ func (tunnel *Tunnel) Endpoint() string { return fmt.Sprintf("%s:%d", helpers.IPV4Localhost, tunnel.localPort) } +// ErrChan returns the tunnel's error channel +func (tunnel *Tunnel) ErrChan() chan error { + return tunnel.errChan +} + // HTTPEndpoint returns the tunnel endpoint as a HTTP URL string. func (tunnel *Tunnel) HTTPEndpoint() string { return fmt.Sprintf("http://%s", tunnel.Endpoint()) @@ -189,6 +212,9 @@ func (tunnel *Tunnel) establish() (string, error) { tunnel.localPort = localPort url := tunnel.FullURL() + // Store the error channel to listen for errors + tunnel.errChan = errChan + tunnel.kube.Log("Creating port forwarding tunnel at %s", url) return url, nil } diff --git a/src/pkg/packager/deploy.go b/src/pkg/packager/deploy.go index 655900a9fb..607a48c350 100644 --- a/src/pkg/packager/deploy.go +++ b/src/pkg/packager/deploy.go @@ -476,6 +476,9 @@ func (p *Packager) pushReposToRepository(reposPath string, repos []string) error gitClient := git.New(p.cfg.State.GitServer) svcInfo, _ := k8s.ServiceInfoFromServiceURL(gitClient.Server.Address) + var err error + var tunnel *k8s.Tunnel + // If this is a service (svcInfo is not nil), create a port-forward tunnel to that resource if svcInfo != nil { if !p.isConnectedToCluster() { @@ -485,7 +488,7 @@ func (p *Packager) pushReposToRepository(reposPath string, repos []string) error } } - tunnel, err := p.cluster.NewTunnel(svcInfo.Namespace, k8s.SvcResource, svcInfo.Name, "", 0, svcInfo.Port) + tunnel, err = p.cluster.NewTunnel(svcInfo.Namespace, k8s.SvcResource, svcInfo.Name, "", 0, svcInfo.Port) if err != nil { return err } @@ -496,6 +499,8 @@ func (p *Packager) pushReposToRepository(reposPath string, repos []string) error } defer tunnel.Close() gitClient.Server.Address = tunnel.HTTPEndpoint() + + return tunnel.Wrap(func() error { return gitClient.PushRepo(repoURL, reposPath) }) } return gitClient.PushRepo(repoURL, reposPath) diff --git a/src/test/e2e/main_test.go b/src/test/e2e/main_test.go index aa078cc401..82cb1b2d33 100644 --- a/src/test/e2e/main_test.go +++ b/src/test/e2e/main_test.go @@ -11,6 +11,7 @@ import ( "testing" "github.com/defenseunicorns/zarf/src/config" + "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/test" ) @@ -32,6 +33,9 @@ func TestMain(m *testing.M) { // K3d use the intern package, which requires this to be set in go 1.19 os.Setenv("ASSUME_NO_MOVING_GC_UNSAFE_RISK_IT_WITH", "go1.19") + // Set the log level to trace for when we call Zarf functions internally + message.SetLogLevel(message.TraceLevel) + retCode, err := doAllTheThings(m) if err != nil { fmt.Println(err) //nolint:forbidigo