From b3c07afe59abc351f4ef9ae3eb62762e01074ff5 Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Wed, 6 Sep 2023 13:15:35 +0200 Subject: [PATCH 01/13] Simplify `DeleteData` - validate arguments before pod creation. --- pkg/function/delete_data.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/function/delete_data.go b/pkg/function/delete_data.go index 276ffc2d3b..9289972e83 100644 --- a/pkg/function/delete_data.go +++ b/pkg/function/delete_data.go @@ -67,6 +67,10 @@ func (*deleteDataFunc) Name() string { } func deleteData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string, jobPrefix string, podOverride crv1alpha1.JSONMap) (map[string]interface{}, error) { + if (len(deleteIdentifiers) == 0) == (len(deleteTags) == 0) { + return nil, errors.Errorf("Require one argument: %s or %s", DeleteDataBackupIdentifierArg, DeleteDataBackupTagArg) + } + options := &kube.PodOptions{ Namespace: namespace, GenerateName: jobPrefix, @@ -86,9 +90,6 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } - if (len(deleteIdentifiers) == 0) == (len(deleteTags) == 0) { - return nil, errors.Errorf("Require one argument: %s or %s", DeleteDataBackupIdentifierArg, DeleteDataBackupTagArg) - } pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile) if err != nil { return nil, err From 6d822949fd50b23af5357713bcc0a836315aa87c Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Wed, 6 Sep 2023 13:06:56 +0200 Subject: [PATCH 02/13] Minor refactoring of `WriteCredsToPod` Rename to `MaybeWriteProfileCredentials` --- pkg/function/copy_volume_data.go | 11 +++-------- pkg/function/utils.go | 14 ++++++++++---- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pkg/function/copy_volume_data.go b/pkg/function/copy_volume_data.go index 900853b226..792420c099 100644 --- a/pkg/function/copy_volume_data.go +++ b/pkg/function/copy_volume_data.go @@ -15,11 +15,11 @@ package function import ( + "bytes" "context" "fmt" "github.com/pkg/errors" - "go.uber.org/zap/buffer" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/client-go/kubernetes" @@ -90,12 +90,8 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na if err := pc.WaitForPodReady(ctx); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pc.PodName()) } - pw1, err := pc.GetFileWriter() - if err != nil { - return nil, errors.Wrapf(err, "Failed to write credentials to Pod %s", pc.PodName()) - } - remover, err := WriteCredsToPod(ctx, pw1, tp.Profile) + remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile) if err != nil { return nil, errors.Wrapf(err, "Failed to write credentials to Pod %s", pc.PodName()) } @@ -118,8 +114,7 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na if err != nil { return nil, err } - var stdout buffer.Buffer - var stderr buffer.Buffer + var stdout, stderr bytes.Buffer err = ex.Exec(ctx, cmd, nil, &stdout, &stderr) format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) diff --git a/pkg/function/utils.go b/pkg/function/utils.go index ecb18ee88c..be0196ab2b 100644 --- a/pkg/function/utils.go +++ b/pkg/function/utils.go @@ -102,16 +102,22 @@ func (nr nopRemover) Path() string { return "" } -// WriteCredsToPod creates a file with Google credentials if the given profile points to a GCS location -func WriteCredsToPod(ctx context.Context, writer kube.PodFileWriter, profile *param.Profile) (kube.PodFileRemover, error) { +// MaybeWriteProfileCredentials creates a file with Google credentials if the given profile points to a GCS location, otherwise does nothing +func MaybeWriteProfileCredentials(ctx context.Context, pc kube.PodController, profile *param.Profile) (kube.PodFileRemover, error) { if profile.Location.Type == crv1alpha1.LocationTypeGCS { - remover, err := writer.Write(ctx, consts.GoogleCloudCredsFilePath, bytes.NewBufferString(profile.Credential.KeyPair.Secret)) + pfw, err := pc.GetFileWriter() if err != nil { - return nil, errors.Wrapf(err, "Unable to write Google credentials to the pod.") + return nil, errors.Wrap(err, "Unable to write Google credentials") + } + + remover, err := pfw.Write(ctx, consts.GoogleCloudCredsFilePath, bytes.NewBufferString(profile.Credential.KeyPair.Secret)) + if err != nil { + return nil, errors.Wrap(err, "Unable to write Google credentials") } return remover, nil } + return nopRemover{}, nil } From 8fe453c5547c48a8b152673e3ef9c82db46920be Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Wed, 6 Sep 2023 13:08:46 +0200 Subject: [PATCH 03/13] In `CopyVolumeData` function, get rid of redundant `namespace` argument. It could be taken from `pod` --- pkg/function/copy_volume_data.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/function/copy_volume_data.go b/pkg/function/copy_volume_data.go index 792420c099..a159244d66 100644 --- a/pkg/function/copy_volume_data.go +++ b/pkg/function/copy_volume_data.go @@ -80,11 +80,11 @@ func copyVolumeData(ctx context.Context, cli kubernetes.Interface, tp param.Temp PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := copyVolumeDataPodFunc(cli, tp, namespace, mountPoint, targetPath, encryptionKey) + podFunc := copyVolumeDataPodFunc(cli, tp, mountPoint, targetPath, encryptionKey) return pr.RunEx(ctx, podFunc) } -func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, mountPoint, targetPath, encryptionKey string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { +func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, mountPoint, targetPath, encryptionKey string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { // Wait for pod to reach running state if err := pc.WaitForPodReady(ctx); err != nil { @@ -101,7 +101,7 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na pod := pc.Pod() // Get restic repository - if err := restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil { + if err := restic.GetOrCreateRepository(cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil { return nil, err } // Copy data to object store From b93fe9ff81dd5a1e8c05b7897c149c3e12c2c697 Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Wed, 6 Sep 2023 13:10:44 +0200 Subject: [PATCH 04/13] Refactor `BackupDataStats` to use `PodRunner.RunEx` --- pkg/function/backup_data_stats.go | 37 +++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/pkg/function/backup_data_stats.go b/pkg/function/backup_data_stats.go index 017b37cc7e..7aed9c2fc0 100644 --- a/pkg/function/backup_data_stats.go +++ b/pkg/function/backup_data_stats.go @@ -15,10 +15,10 @@ package function import ( + "bytes" "context" "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" @@ -71,33 +71,46 @@ func backupDataStats(ctx context.Context, cli kubernetes.Interface, tp param.Tem PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := backupDataStatsPodFunc(cli, tp, namespace, encryptionKey, backupArtifactPrefix, backupID, mode) - return pr.Run(ctx, podFunc) + podFunc := backupDataStatsPodFunc(tp, encryptionKey, backupArtifactPrefix, backupID, mode) + return pr.RunEx(ctx, podFunc) } -func backupDataStatsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, backupArtifactPrefix, backupID, mode string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { +func backupDataStatsPodFunc(tp param.TemplateParams, encryptionKey, backupArtifactPrefix, backupID, mode string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + pod := pc.Pod() + // Wait for pod to reach running state - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { + if err := pc.WaitForPodReady(ctx); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } - pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile) + + remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile) if err != nil { return nil, err } - defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) + + // Parent context could already be dead, so removing file within new context + defer remover.Remove(context.Background()) //nolint:errcheck + cmd, err := restic.StatsCommandByID(tp.Profile, backupArtifactPrefix, backupID, mode, encryptionKey) if err != nil { return nil, err } - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr) + + commandExecutor, err := pc.GetCommandExecutor() + if err != nil { + return nil, errors.Wrap(err, "Unable to get pod command executor") + } + + var stdout, stderr bytes.Buffer + err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) if err != nil { return nil, errors.Wrapf(err, "Failed to get backup stats") } // Get File Count and Size from Stats - mode, fc, size := restic.SnapshotStatsFromStatsLog(stdout) + mode, fc, size := restic.SnapshotStatsFromStatsLog(stdout.String()) if fc == "" || size == "" { return nil, errors.New("Failed to parse snapshot stats from logs") } From fb0dc556a02f21bb84736e944502afb68a8486ef Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Wed, 6 Sep 2023 13:12:14 +0200 Subject: [PATCH 05/13] Refactor `CheckRepository` to use `PodRunner.RunEx` --- pkg/function/checkRepository.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/pkg/function/checkRepository.go b/pkg/function/checkRepository.go index 18c4a3133e..2f05d960f8 100644 --- a/pkg/function/checkRepository.go +++ b/pkg/function/checkRepository.go @@ -5,7 +5,6 @@ import ( "strings" "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" @@ -55,22 +54,28 @@ func CheckRepository(ctx context.Context, cli kubernetes.Interface, tp param.Tem PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := CheckRepositoryPodFunc(cli, tp, namespace, encryptionKey, targetPaths) - return pr.Run(ctx, podFunc) + podFunc := CheckRepositoryPodFunc(cli, tp, encryptionKey, targetPaths) + return pr.RunEx(ctx, podFunc) } -func CheckRepositoryPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, targetPath string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { +func CheckRepositoryPodFunc(cli kubernetes.Interface, tp param.TemplateParams, encryptionKey, targetPath string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + pod := pc.Pod() + // Wait for pod to reach running state - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { + if err := pc.WaitForPodReady(ctx); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } - pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile) + + remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile) if err != nil { return nil, err } - defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) - err = restic.CheckIfRepoIsReachable(tp.Profile, targetPath, encryptionKey, cli, namespace, pod.Name, pod.Spec.Containers[0].Name) + + // Parent context could already be dead, so removing file within new context + defer remover.Remove(context.Background()) //nolint:errcheck + + err = restic.CheckIfRepoIsReachable(tp.Profile, targetPath, encryptionKey, cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) switch { case err == nil: break From 5a4ea719b95b81f45db1abfb4ac08d60c2334ed1 Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Wed, 6 Sep 2023 13:16:14 +0200 Subject: [PATCH 06/13] Refactor `DeleteData` to use `PodRunner.RunEx` --- pkg/function/delete_data.go | 58 +++++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/pkg/function/delete_data.go b/pkg/function/delete_data.go index 9289972e83..40190bebdb 100644 --- a/pkg/function/delete_data.go +++ b/pkg/function/delete_data.go @@ -15,6 +15,7 @@ package function import ( + "bytes" "context" "fmt" "strings" @@ -79,34 +80,48 @@ func deleteData(ctx context.Context, cli kubernetes.Interface, tp param.Template PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := deleteDataPodFunc(cli, tp, reclaimSpace, namespace, encryptionKey, targetPaths, deleteTags, deleteIdentifiers) - return pr.Run(ctx, podFunc) + podFunc := deleteDataPodFunc(cli, tp, reclaimSpace, encryptionKey, targetPaths, deleteTags, deleteIdentifiers) + return pr.RunEx(ctx, podFunc) } //nolint:gocognit -func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { +func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + pod := pc.Pod() + // Wait for pod to reach running state if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } - pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile) + + remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile) + if err != nil { + return nil, err + } + + // Parent context could already be dead, so removing file within new context + defer remover.Remove(context.Background()) //nolint:errcheck + + // Get command executor + podCommandExecutor, err := pc.GetCommandExecutor() if err != nil { return nil, err } - defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) + for i, deleteTag := range deleteTags { cmd, err := restic.SnapshotsCommandByTag(tp.Profile, targetPaths[i], deleteTag, encryptionKey) if err != nil { return nil, err } - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr) + + var stdout, stderr bytes.Buffer + err = podCommandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) if err != nil { return nil, errors.Wrapf(err, "Failed to forget data, could not get snapshotID from tag, Tag: %s", deleteTag) } - deleteIdentifier, err := restic.SnapshotIDFromSnapshotLog(stdout) + deleteIdentifier, err := restic.SnapshotIDFromSnapshotLog(stdout.String()) if err != nil { return nil, errors.Wrapf(err, "Failed to forget data, could not get snapshotID from tag, Tag: %s", deleteTag) } @@ -118,14 +133,16 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai if err != nil { return nil, err } - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr) + + var stdout, stderr bytes.Buffer + err = podCommandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) if err != nil { return nil, errors.Wrapf(err, "Failed to forget data") } if reclaimSpace { - spaceFreedStr, err := pruneData(cli, tp, pod, namespace, encryptionKey, targetPaths[i]) + spaceFreedStr, err := pruneData(tp, pod, podCommandExecutor, encryptionKey, targetPaths[i]) if err != nil { return nil, errors.Wrapf(err, "Error executing prune command") } @@ -139,15 +156,18 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai } } -func pruneData(cli kubernetes.Interface, tp param.TemplateParams, pod *v1.Pod, namespace, encryptionKey, targetPath string) (string, error) { +func pruneData(tp param.TemplateParams, pod *v1.Pod, podCommandExecutor kube.PodCommandExecutor, encryptionKey, targetPath string) (string, error) { cmd, err := restic.PruneCommand(tp.Profile, targetPath, encryptionKey) if err != nil { return "", err } - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr) - spaceFreed := restic.SpaceFreedFromPruneLog(stdout) + + var stdout, stderr bytes.Buffer + err = podCommandExecutor.Exec(context.Background(), cmd, nil, &stdout, &stderr) + format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr.String()) + + spaceFreed := restic.SpaceFreedFromPruneLog(stdout.String()) return spaceFreed, errors.Wrapf(err, "Failed to prune data after forget") } From dbae5865bd8514a5600ba1c19e5474cb9e07d9fe Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Wed, 6 Sep 2023 13:17:30 +0200 Subject: [PATCH 07/13] Refactor `DeleteDataUsingKopiaServer` to use `PodRunner.RunEx` --- .../delete_data_using_kopia_server.go | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/pkg/function/delete_data_using_kopia_server.go b/pkg/function/delete_data_using_kopia_server.go index 442cca63b8..f23c068815 100644 --- a/pkg/function/delete_data_using_kopia_server.go +++ b/pkg/function/delete_data_using_kopia_server.go @@ -15,10 +15,10 @@ package function import ( + "bytes" "context" "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" @@ -142,31 +142,30 @@ func deleteDataFromServer( } pr := kube.NewPodRunner(cli, options) podFunc := deleteDataFromServerPodFunc( - cli, hostname, - namespace, serverAddress, fingerprint, snapID, username, userPassphrase, ) - return pr.Run(ctx, podFunc) + return pr.RunEx(ctx, podFunc) } func deleteDataFromServerPodFunc( - cli kubernetes.Interface, hostname, - namespace, serverAddress, fingerprint, snapID, username, userPassphrase string, -) func(ctx context.Context, pod *corev1.Pod) (map[string]any, error) { - return func(ctx context.Context, pod *corev1.Pod) (map[string]any, error) { - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { - return nil, errors.Wrap(err, "Failed while waiting for Pod: "+pod.Name+" to be ready") +) func(ctx context.Context, pc kube.PodController) (map[string]any, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]any, error) { + pod := pc.Pod() + + // Wait for pod to reach running state + if err := pc.WaitForPodReady(ctx); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } contentCacheMB, metadataCacheMB := kopiacmd.GetCacheSizeSettingsForSnapshot() @@ -184,9 +183,16 @@ func deleteDataFromServerPodFunc( ContentCacheMB: contentCacheMB, MetadataCacheMB: metadataCacheMB, }) - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr) + + commandExecutor, err := pc.GetCommandExecutor() + if err != nil { + return nil, errors.Wrap(err, "Unable to get pod command executor") + } + + var stdout, stderr bytes.Buffer + err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) if err != nil { return nil, errors.Wrap(err, "Failed to connect to Kopia Repository server") } @@ -200,9 +206,11 @@ func deleteDataFromServerPodFunc( }, SnapID: snapID, }) - stdout, stderr, err = kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr) + stdout.Reset() + stderr.Reset() + err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) return nil, errors.Wrap(err, "Failed to delete backup from Kopia API server") } } From e5d96d3f97c51ee116dc272e62692d8f25ee3d82 Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Wed, 6 Sep 2023 13:24:40 +0200 Subject: [PATCH 08/13] Refactor `DescribeBackups` to use `PodRunner.RunEx` --- pkg/function/describe_backups.go | 41 ++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/pkg/function/describe_backups.go b/pkg/function/describe_backups.go index e1d2c45284..510fdf7f51 100644 --- a/pkg/function/describe_backups.go +++ b/pkg/function/describe_backups.go @@ -15,11 +15,11 @@ package function import ( + "bytes" "context" "strings" "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" @@ -73,22 +73,28 @@ func describeBackups(ctx context.Context, cli kubernetes.Interface, tp param.Tem PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := describeBackupsPodFunc(cli, tp, namespace, encryptionKey, targetPaths) - return pr.Run(ctx, podFunc) + podFunc := describeBackupsPodFunc(cli, tp, encryptionKey, targetPaths) + return pr.RunEx(ctx, podFunc) } -func describeBackupsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, targetPath string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { +func describeBackupsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, encryptionKey, targetPath string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + pod := pc.Pod() + // Wait for pod to reach running state - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { + if err := pc.WaitForPodReady(ctx); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } - pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile) + + remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile) if err != nil { return nil, err } - defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) - err = restic.CheckIfRepoIsReachable(tp.Profile, targetPath, encryptionKey, cli, namespace, pod.Name, pod.Spec.Containers[0].Name) + + // Parent context could already be dead, so removing file within new context + defer remover.Remove(context.Background()) //nolint:errcheck + + err = restic.CheckIfRepoIsReachable(tp.Profile, targetPath, encryptionKey, cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) switch { case err == nil: break @@ -114,18 +120,27 @@ func describeBackupsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, n default: return nil, err } + cmd, err := restic.StatsCommandByID(tp.Profile, targetPath, "" /* get all snapshot stats */, RawDataStatsMode, encryptionKey) if err != nil { return nil, err } - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr) + + ex, err := pc.GetCommandExecutor() + if err != nil { + return nil, err + } + + var stdout, stderr bytes.Buffer + err = ex.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) if err != nil { return nil, errors.Wrapf(err, "Failed to get backup stats") } + // Get File Count and Size from Stats - _, fc, size := restic.SnapshotStatsFromStatsLog(stdout) + _, fc, size := restic.SnapshotStatsFromStatsLog(stdout.String()) if fc == "" || size == "" { return nil, errors.New("Failed to parse snapshot stats from logs") } From a5cbbe5ab354436ec0804a518ac24059b4d0213a Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Wed, 6 Sep 2023 13:26:02 +0200 Subject: [PATCH 09/13] Refactor `PrepareData` to use `PodRunner.RunEx` --- pkg/function/prepare_data.go | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/pkg/function/prepare_data.go b/pkg/function/prepare_data.go index 6fe46a71d7..37c5544b20 100644 --- a/pkg/function/prepare_data.go +++ b/pkg/function/prepare_data.go @@ -17,9 +17,11 @@ package function import ( "context" "fmt" + "io" + "github.com/kanisterio/kanister/pkg/consts" + "github.com/kanisterio/kanister/pkg/field" "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -95,20 +97,32 @@ func prepareData(ctx context.Context, cli kubernetes.Interface, namespace, servi } pr := kube.NewPodRunner(cli, options) podFunc := prepareDataPodFunc(cli) - return pr.Run(ctx, podFunc) + return pr.RunEx(ctx, podFunc) } -func prepareDataPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - // Wait for pod completion - if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil { - return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name) +func prepareDataPodFunc(cli kubernetes.Interface) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + pod := pc.Pod() + + // Wait for pod to reach running state + if err := pc.WaitForPodReady(ctx); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } + + ctx = field.Context(ctx, consts.LogKindKey, consts.LogKindDatapath) // Fetch logs from the pod - logs, err := kube.GetPodLogs(ctx, cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) + r, err := pc.StreamPodLogs(ctx) if err != nil { return nil, errors.Wrapf(err, "Failed to fetch logs from the pod") } + defer r.Close() + + bytes, err := io.ReadAll(r) + if err != nil { + return nil, errors.Wrapf(err, "Failed to read logs from the pod") + } + logs := string(bytes) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, logs) out, err := parseLogAndCreateOutput(logs) return out, errors.Wrap(err, "Failed to parse phase output") From f8a26a9b23051ba7d722286b5c8524f8d1314b92 Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Wed, 6 Sep 2023 13:26:55 +0200 Subject: [PATCH 10/13] Refactor `RestoreData` to use `PodRunner.RunEx` --- pkg/function/restore_data.go | 38 ++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/pkg/function/restore_data.go b/pkg/function/restore_data.go index c8d82a4887..f104052a40 100644 --- a/pkg/function/restore_data.go +++ b/pkg/function/restore_data.go @@ -15,10 +15,10 @@ package function import ( + "bytes" "context" "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -125,21 +125,27 @@ func restoreData(ctx context.Context, cli kubernetes.Interface, tp param.Templat PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := restoreDataPodFunc(cli, tp, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID) - return pr.Run(ctx, podFunc) + podFunc := restoreDataPodFunc(tp, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID) + return pr.RunEx(ctx, podFunc) } -func restoreDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { +func restoreDataPodFunc(tp param.TemplateParams, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + pod := pc.Pod() + // Wait for pod to reach running state - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { + if err := pc.WaitForPodReady(ctx); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } - pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile) + + remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "Failed to write credentials to Pod %s", pc.PodName()) } - defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) + + // Parent context could already be dead, so removing file within new context + defer remover.Remove(context.Background()) //nolint:errcheck + var cmd []string // Generate restore command based on the identifier passed if backupTag != "" { @@ -150,13 +156,19 @@ func restoreDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, names if err != nil { return nil, err } - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr) + + ex, err := pc.GetCommandExecutor() + if err != nil { + return nil, err + } + var stdout, stderr bytes.Buffer + err = ex.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) if err != nil { return nil, errors.Wrapf(err, "Failed to restore backup") } - out, err := parseLogAndCreateOutput(stdout) + out, err := parseLogAndCreateOutput(stdout.String()) return out, errors.Wrap(err, "Failed to parse phase output") } } From 4fa4fa999929cdd9f9d59bc248be15c753e35bbd Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Wed, 6 Sep 2023 13:27:30 +0200 Subject: [PATCH 11/13] Refactor `RestoreDataUsingKopiaServer` to use `PodRunner.RunEx` --- .../restore_data_using_kopia_server.go | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/pkg/function/restore_data_using_kopia_server.go b/pkg/function/restore_data_using_kopia_server.go index d5117582e8..0d327fd5a0 100644 --- a/pkg/function/restore_data_using_kopia_server.go +++ b/pkg/function/restore_data_using_kopia_server.go @@ -15,11 +15,11 @@ package function import ( + "bytes" "context" "fmt" "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -185,9 +185,7 @@ func restoreDataFromServer( pr := kube.NewPodRunner(cli, options) podFunc := restoreDataFromServerPodFunc( - cli, hostname, - namespace, restorePath, serverAddress, fingerprint, @@ -196,13 +194,11 @@ func restoreDataFromServer( userPassphrase, sparseRestore, ) - return pr.Run(ctx, podFunc) + return pr.RunEx(ctx, podFunc) } func restoreDataFromServerPodFunc( - cli kubernetes.Interface, hostname, - namespace, restorePath, serverAddress, fingerprint, @@ -210,10 +206,13 @@ func restoreDataFromServerPodFunc( username, userPassphrase string, sparseRestore bool, -) func(ctx context.Context, pod *corev1.Pod) (map[string]any, error) { - return func(ctx context.Context, pod *corev1.Pod) (map[string]any, error) { - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { - return nil, errors.Wrap(err, "Failed while waiting for Pod: "+pod.Name+" to be ready") +) func(ctx context.Context, pc kube.PodController) (map[string]any, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]any, error) { + pod := pc.Pod() + + // Wait for pod to reach running state + if err := pc.WaitForPodReady(ctx); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } contentCacheMB, metadataCacheMB := kopiacmd.GetCacheSizeSettingsForSnapshot() @@ -232,11 +231,18 @@ func restoreDataFromServerPodFunc( ContentCacheMB: contentCacheMB, MetadataCacheMB: metadataCacheMB, }) - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr) + + commandExecutor, err := pc.GetCommandExecutor() if err != nil { - return nil, errors.Wrap(err, "Failed to connect to Kopia API server") + return nil, errors.Wrap(err, "Unable to get pod command executor") + } + + var stdout, stderr bytes.Buffer + err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) + if err != nil { + return nil, errors.Wrapf(err, "Failed to connect to Kopia Repository server") } cmd = kopiacmd.SnapshotRestore( @@ -251,9 +257,13 @@ func restoreDataFromServerPodFunc( SparseRestore: sparseRestore, IgnorePermissionErrors: true, }) - stdout, stderr, err = kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr) + + stdout.Reset() + stderr.Reset() + err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) + return nil, errors.Wrap(err, "Failed to restore backup from Kopia API server") } } From 57ecd20744676c4a8e1e7980f13746b3b71d2e24 Mon Sep 17 00:00:00 2001 From: Eugene Sumin <95425330+e-sumin@users.noreply.github.com> Date: Thu, 14 Sep 2023 11:32:00 +0200 Subject: [PATCH 12/13] Apply suggestions from code review - improve formatting / readability Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com> --- pkg/function/backup_data_stats.go | 8 +++++++- pkg/function/checkRepository.go | 17 ++++++++++++++-- pkg/function/copy_volume_data.go | 18 +++++++++++++++-- pkg/function/delete_data.go | 32 ++++++++++++++++++++++++++++--- pkg/function/describe_backups.go | 16 ++++++++++++++-- pkg/function/restore_data.go | 9 ++++++++- 6 files changed, 89 insertions(+), 11 deletions(-) diff --git a/pkg/function/backup_data_stats.go b/pkg/function/backup_data_stats.go index 7aed9c2fc0..3f23fe4f84 100644 --- a/pkg/function/backup_data_stats.go +++ b/pkg/function/backup_data_stats.go @@ -75,7 +75,13 @@ func backupDataStats(ctx context.Context, cli kubernetes.Interface, tp param.Tem return pr.RunEx(ctx, podFunc) } -func backupDataStatsPodFunc(tp param.TemplateParams, encryptionKey, backupArtifactPrefix, backupID, mode string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { +func backupDataStatsPodFunc( + tp param.TemplateParams, + encryptionKey, + backupArtifactPrefix, + backupID, + mode string, +) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { pod := pc.Pod() diff --git a/pkg/function/checkRepository.go b/pkg/function/checkRepository.go index 2f05d960f8..88056b56e4 100644 --- a/pkg/function/checkRepository.go +++ b/pkg/function/checkRepository.go @@ -58,7 +58,12 @@ func CheckRepository(ctx context.Context, cli kubernetes.Interface, tp param.Tem return pr.RunEx(ctx, podFunc) } -func CheckRepositoryPodFunc(cli kubernetes.Interface, tp param.TemplateParams, encryptionKey, targetPath string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { +func CheckRepositoryPodFunc( + cli kubernetes.Interface, + tp param.TemplateParams, + encryptionKey, + targetPath string, +) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { pod := pc.Pod() @@ -75,7 +80,15 @@ func CheckRepositoryPodFunc(cli kubernetes.Interface, tp param.TemplateParams, e // Parent context could already be dead, so removing file within new context defer remover.Remove(context.Background()) //nolint:errcheck - err = restic.CheckIfRepoIsReachable(tp.Profile, targetPath, encryptionKey, cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) + err = restic.CheckIfRepoIsReachable( + tp.Profile, + targetPath, + encryptionKey, + cli, + pod.Namespace, + pod.Name, + pod.Spec.Containers[0].Name, + ) switch { case err == nil: break diff --git a/pkg/function/copy_volume_data.go b/pkg/function/copy_volume_data.go index a159244d66..f96ebd2913 100644 --- a/pkg/function/copy_volume_data.go +++ b/pkg/function/copy_volume_data.go @@ -84,7 +84,13 @@ func copyVolumeData(ctx context.Context, cli kubernetes.Interface, tp param.Temp return pr.RunEx(ctx, podFunc) } -func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, mountPoint, targetPath, encryptionKey string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { +func copyVolumeDataPodFunc( + cli kubernetes.Interface, + tp param.TemplateParams, + mountPoint, + targetPath, + encryptionKey string, +) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { // Wait for pod to reach running state if err := pc.WaitForPodReady(ctx); err != nil { @@ -101,7 +107,15 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, mo pod := pc.Pod() // Get restic repository - if err := restic.GetOrCreateRepository(cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil { + if err := restic.GetOrCreateRepository( + cli, + pod.Namespace, + pod.Name, + pod.Spec.Containers[0].Name, + targetPath, + encryptionKey, + tp.Profile, + ); err != nil { return nil, err } // Copy data to object store diff --git a/pkg/function/delete_data.go b/pkg/function/delete_data.go index 40190bebdb..32935f0f33 100644 --- a/pkg/function/delete_data.go +++ b/pkg/function/delete_data.go @@ -67,7 +67,19 @@ func (*deleteDataFunc) Name() string { return DeleteDataFuncName } -func deleteData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string, jobPrefix string, podOverride crv1alpha1.JSONMap) (map[string]interface{}, error) { +func deleteData( + ctx context.Context, + cli kubernetes.Interface, + tp param.TemplateParams, + reclaimSpace bool, + namespace, + encryptionKey string, + targetPaths, + deleteTags, + deleteIdentifiers []string, + jobPrefix string, + podOverride crv1alpha1.JSONMap, +) (map[string]interface{}, error) { if (len(deleteIdentifiers) == 0) == (len(deleteTags) == 0) { return nil, errors.Errorf("Require one argument: %s or %s", DeleteDataBackupIdentifierArg, DeleteDataBackupTagArg) } @@ -85,7 +97,15 @@ func deleteData(ctx context.Context, cli kubernetes.Interface, tp param.Template } //nolint:gocognit -func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { +func deleteDataPodFunc( + cli kubernetes.Interface, + tp param.TemplateParams, + reclaimSpace bool, + encryptionKey string, + targetPaths, + deleteTags, + deleteIdentifiers []string, +) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { pod := pc.Pod() @@ -156,7 +176,13 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai } } -func pruneData(tp param.TemplateParams, pod *v1.Pod, podCommandExecutor kube.PodCommandExecutor, encryptionKey, targetPath string) (string, error) { +func pruneData( + tp param.TemplateParams, + pod *v1.Pod, + podCommandExecutor kube.PodCommandExecutor, + encryptionKey, + targetPath string, +) (string, error) { cmd, err := restic.PruneCommand(tp.Profile, targetPath, encryptionKey) if err != nil { return "", err diff --git a/pkg/function/describe_backups.go b/pkg/function/describe_backups.go index 510fdf7f51..fb9f512a13 100644 --- a/pkg/function/describe_backups.go +++ b/pkg/function/describe_backups.go @@ -77,7 +77,12 @@ func describeBackups(ctx context.Context, cli kubernetes.Interface, tp param.Tem return pr.RunEx(ctx, podFunc) } -func describeBackupsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, encryptionKey, targetPath string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { +func describeBackupsPodFunc( + cli kubernetes.Interface, + tp param.TemplateParams, + encryptionKey, + targetPath string, +) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { pod := pc.Pod() @@ -94,7 +99,14 @@ func describeBackupsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, e // Parent context could already be dead, so removing file within new context defer remover.Remove(context.Background()) //nolint:errcheck - err = restic.CheckIfRepoIsReachable(tp.Profile, targetPath, encryptionKey, cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) + err = restic.CheckIfRepoIsReachable( + tp.Profile, + targetPath, + encryptionKey, + cli, pod.Namespace, + pod.Name, + pod.Spec.Containers[0].Name, + ) switch { case err == nil: break diff --git a/pkg/function/restore_data.go b/pkg/function/restore_data.go index f104052a40..ad54550c1f 100644 --- a/pkg/function/restore_data.go +++ b/pkg/function/restore_data.go @@ -129,7 +129,14 @@ func restoreData(ctx context.Context, cli kubernetes.Interface, tp param.Templat return pr.RunEx(ctx, podFunc) } -func restoreDataPodFunc(tp param.TemplateParams, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { +func restoreDataPodFunc( + tp param.TemplateParams, + encryptionKey, + backupArtifactPrefix, + restorePath, + backupTag, + backupID string, +) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { pod := pc.Pod() From db19f7fedbb2a76273dc76cd23941773c1213960 Mon Sep 17 00:00:00 2001 From: Eugen Sumin Date: Tue, 19 Sep 2023 11:39:23 +0200 Subject: [PATCH 13/13] Fix bug with forgotten old way pod readiness check --- pkg/function/delete_data.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/function/delete_data.go b/pkg/function/delete_data.go index 32935f0f33..4de0951e87 100644 --- a/pkg/function/delete_data.go +++ b/pkg/function/delete_data.go @@ -92,13 +92,12 @@ func deleteData( PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := deleteDataPodFunc(cli, tp, reclaimSpace, encryptionKey, targetPaths, deleteTags, deleteIdentifiers) + podFunc := deleteDataPodFunc(tp, reclaimSpace, encryptionKey, targetPaths, deleteTags, deleteIdentifiers) return pr.RunEx(ctx, podFunc) } //nolint:gocognit func deleteDataPodFunc( - cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, encryptionKey string, @@ -110,7 +109,7 @@ func deleteDataPodFunc( pod := pc.Pod() // Wait for pod to reach running state - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { + if err := pc.WaitForPodReady(ctx); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) }