diff --git a/pkg/function/backup_data_stats.go b/pkg/function/backup_data_stats.go index 017b37cc7e..3f23fe4f84 100644 --- a/pkg/function/backup_data_stats.go +++ b/pkg/function/backup_data_stats.go @@ -15,10 +15,10 @@ package function import ( + "bytes" "context" "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" @@ -71,33 +71,52 @@ func backupDataStats(ctx context.Context, cli kubernetes.Interface, tp param.Tem PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := backupDataStatsPodFunc(cli, tp, namespace, encryptionKey, backupArtifactPrefix, backupID, mode) - return pr.Run(ctx, podFunc) + podFunc := backupDataStatsPodFunc(tp, encryptionKey, backupArtifactPrefix, backupID, mode) + return pr.RunEx(ctx, podFunc) } -func backupDataStatsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, backupArtifactPrefix, backupID, mode string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { +func backupDataStatsPodFunc( + tp param.TemplateParams, + encryptionKey, + backupArtifactPrefix, + backupID, + mode string, +) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + pod := pc.Pod() + // Wait for pod to reach running state - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { + if err := pc.WaitForPodReady(ctx); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } - pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile) + + remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile) if err != nil { return nil, err } - defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) + + // Parent context could already be dead, so removing file within new context + defer remover.Remove(context.Background()) //nolint:errcheck + cmd, err := restic.StatsCommandByID(tp.Profile, backupArtifactPrefix, backupID, mode, encryptionKey) if err != nil { return nil, err } - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr) + + commandExecutor, err := pc.GetCommandExecutor() + if err != nil { + return nil, errors.Wrap(err, "Unable to get pod command executor") + } + + var stdout, stderr bytes.Buffer + err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) if err != nil { return nil, errors.Wrapf(err, "Failed to get backup stats") } // Get File Count and Size from Stats - mode, fc, size := restic.SnapshotStatsFromStatsLog(stdout) + mode, fc, size := restic.SnapshotStatsFromStatsLog(stdout.String()) if fc == "" || size == "" { return nil, errors.New("Failed to parse snapshot stats from logs") } diff --git a/pkg/function/checkRepository.go b/pkg/function/checkRepository.go index 18c4a3133e..88056b56e4 100644 --- a/pkg/function/checkRepository.go +++ b/pkg/function/checkRepository.go @@ -5,7 +5,6 @@ import ( "strings" "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" @@ -55,22 +54,41 @@ func CheckRepository(ctx context.Context, cli kubernetes.Interface, tp param.Tem PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := CheckRepositoryPodFunc(cli, tp, namespace, encryptionKey, targetPaths) - return pr.Run(ctx, podFunc) + podFunc := CheckRepositoryPodFunc(cli, tp, encryptionKey, targetPaths) + return pr.RunEx(ctx, podFunc) } -func CheckRepositoryPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, targetPath string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { +func CheckRepositoryPodFunc( + cli kubernetes.Interface, + tp param.TemplateParams, + encryptionKey, + targetPath string, +) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + pod := pc.Pod() + // Wait for pod to reach running state - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { + if err := pc.WaitForPodReady(ctx); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } - pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile) + + remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile) if err != nil { return nil, err } - defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) - err = restic.CheckIfRepoIsReachable(tp.Profile, targetPath, encryptionKey, cli, namespace, pod.Name, pod.Spec.Containers[0].Name) + + // Parent context could already be dead, so removing file within new context + defer remover.Remove(context.Background()) //nolint:errcheck + + err = restic.CheckIfRepoIsReachable( + tp.Profile, + targetPath, + encryptionKey, + cli, + pod.Namespace, + pod.Name, + pod.Spec.Containers[0].Name, + ) switch { case err == nil: break diff --git a/pkg/function/copy_volume_data.go b/pkg/function/copy_volume_data.go index 900853b226..f96ebd2913 100644 --- a/pkg/function/copy_volume_data.go +++ b/pkg/function/copy_volume_data.go @@ -15,11 +15,11 @@ package function import ( + "bytes" "context" "fmt" "github.com/pkg/errors" - "go.uber.org/zap/buffer" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/client-go/kubernetes" @@ -80,22 +80,24 @@ func copyVolumeData(ctx context.Context, cli kubernetes.Interface, tp param.Temp PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := copyVolumeDataPodFunc(cli, tp, namespace, mountPoint, targetPath, encryptionKey) + podFunc := copyVolumeDataPodFunc(cli, tp, mountPoint, targetPath, encryptionKey) return pr.RunEx(ctx, podFunc) } -func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, mountPoint, targetPath, encryptionKey string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { +func copyVolumeDataPodFunc( + cli kubernetes.Interface, + tp param.TemplateParams, + mountPoint, + targetPath, + encryptionKey string, +) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { // Wait for pod to reach running state if err := pc.WaitForPodReady(ctx); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pc.PodName()) } - pw1, err := pc.GetFileWriter() - if err != nil { - return nil, errors.Wrapf(err, "Failed to write credentials to Pod %s", pc.PodName()) - } - remover, err := WriteCredsToPod(ctx, pw1, tp.Profile) + remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile) if err != nil { return nil, errors.Wrapf(err, "Failed to write credentials to Pod %s", pc.PodName()) } @@ -105,7 +107,15 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na pod := pc.Pod() // Get restic repository - if err := restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil { + if err := restic.GetOrCreateRepository( + cli, + pod.Namespace, + pod.Name, + pod.Spec.Containers[0].Name, + targetPath, + encryptionKey, + tp.Profile, + ); err != nil { return nil, err } // Copy data to object store @@ -118,8 +128,7 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na if err != nil { return nil, err } - var stdout buffer.Buffer - var stderr buffer.Buffer + var stdout, stderr bytes.Buffer err = ex.Exec(ctx, cmd, nil, &stdout, &stderr) format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) diff --git a/pkg/function/delete_data.go b/pkg/function/delete_data.go index 276ffc2d3b..4de0951e87 100644 --- a/pkg/function/delete_data.go +++ b/pkg/function/delete_data.go @@ -15,6 +15,7 @@ package function import ( + "bytes" "context" "fmt" "strings" @@ -66,7 +67,23 @@ func (*deleteDataFunc) Name() string { return DeleteDataFuncName } -func deleteData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string, jobPrefix string, podOverride crv1alpha1.JSONMap) (map[string]interface{}, error) { +func deleteData( + ctx context.Context, + cli kubernetes.Interface, + tp param.TemplateParams, + reclaimSpace bool, + namespace, + encryptionKey string, + targetPaths, + deleteTags, + deleteIdentifiers []string, + jobPrefix string, + podOverride crv1alpha1.JSONMap, +) (map[string]interface{}, error) { + if (len(deleteIdentifiers) == 0) == (len(deleteTags) == 0) { + return nil, errors.Errorf("Require one argument: %s or %s", DeleteDataBackupIdentifierArg, DeleteDataBackupTagArg) + } + options := &kube.PodOptions{ Namespace: namespace, GenerateName: jobPrefix, @@ -75,37 +92,55 @@ func deleteData(ctx context.Context, cli kubernetes.Interface, tp param.Template PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := deleteDataPodFunc(cli, tp, reclaimSpace, namespace, encryptionKey, targetPaths, deleteTags, deleteIdentifiers) - return pr.Run(ctx, podFunc) + podFunc := deleteDataPodFunc(tp, reclaimSpace, encryptionKey, targetPaths, deleteTags, deleteIdentifiers) + return pr.RunEx(ctx, podFunc) } //nolint:gocognit -func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { +func deleteDataPodFunc( + tp param.TemplateParams, + reclaimSpace bool, + encryptionKey string, + targetPaths, + deleteTags, + deleteIdentifiers []string, +) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + pod := pc.Pod() + // Wait for pod to reach running state - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { + if err := pc.WaitForPodReady(ctx); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } - if (len(deleteIdentifiers) == 0) == (len(deleteTags) == 0) { - return nil, errors.Errorf("Require one argument: %s or %s", DeleteDataBackupIdentifierArg, DeleteDataBackupTagArg) + + remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile) + if err != nil { + return nil, err } - pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile) + + // Parent context could already be dead, so removing file within new context + defer remover.Remove(context.Background()) //nolint:errcheck + + // Get command executor + podCommandExecutor, err := pc.GetCommandExecutor() if err != nil { return nil, err } - defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) + for i, deleteTag := range deleteTags { cmd, err := restic.SnapshotsCommandByTag(tp.Profile, targetPaths[i], deleteTag, encryptionKey) if err != nil { return nil, err } - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr) + + var stdout, stderr bytes.Buffer + err = podCommandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) if err != nil { return nil, errors.Wrapf(err, "Failed to forget data, could not get snapshotID from tag, Tag: %s", deleteTag) } - deleteIdentifier, err := restic.SnapshotIDFromSnapshotLog(stdout) + deleteIdentifier, err := restic.SnapshotIDFromSnapshotLog(stdout.String()) if err != nil { return nil, errors.Wrapf(err, "Failed to forget data, could not get snapshotID from tag, Tag: %s", deleteTag) } @@ -117,14 +152,16 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai if err != nil { return nil, err } - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr) + + var stdout, stderr bytes.Buffer + err = podCommandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) if err != nil { return nil, errors.Wrapf(err, "Failed to forget data") } if reclaimSpace { - spaceFreedStr, err := pruneData(cli, tp, pod, namespace, encryptionKey, targetPaths[i]) + spaceFreedStr, err := pruneData(tp, pod, podCommandExecutor, encryptionKey, targetPaths[i]) if err != nil { return nil, errors.Wrapf(err, "Error executing prune command") } @@ -138,15 +175,24 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai } } -func pruneData(cli kubernetes.Interface, tp param.TemplateParams, pod *v1.Pod, namespace, encryptionKey, targetPath string) (string, error) { +func pruneData( + tp param.TemplateParams, + pod *v1.Pod, + podCommandExecutor kube.PodCommandExecutor, + encryptionKey, + targetPath string, +) (string, error) { cmd, err := restic.PruneCommand(tp.Profile, targetPath, encryptionKey) if err != nil { return "", err } - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr) - spaceFreed := restic.SpaceFreedFromPruneLog(stdout) + + var stdout, stderr bytes.Buffer + err = podCommandExecutor.Exec(context.Background(), cmd, nil, &stdout, &stderr) + format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr.String()) + + spaceFreed := restic.SpaceFreedFromPruneLog(stdout.String()) return spaceFreed, errors.Wrapf(err, "Failed to prune data after forget") } diff --git a/pkg/function/delete_data_using_kopia_server.go b/pkg/function/delete_data_using_kopia_server.go index 442cca63b8..f23c068815 100644 --- a/pkg/function/delete_data_using_kopia_server.go +++ b/pkg/function/delete_data_using_kopia_server.go @@ -15,10 +15,10 @@ package function import ( + "bytes" "context" "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" @@ -142,31 +142,30 @@ func deleteDataFromServer( } pr := kube.NewPodRunner(cli, options) podFunc := deleteDataFromServerPodFunc( - cli, hostname, - namespace, serverAddress, fingerprint, snapID, username, userPassphrase, ) - return pr.Run(ctx, podFunc) + return pr.RunEx(ctx, podFunc) } func deleteDataFromServerPodFunc( - cli kubernetes.Interface, hostname, - namespace, serverAddress, fingerprint, snapID, username, userPassphrase string, -) func(ctx context.Context, pod *corev1.Pod) (map[string]any, error) { - return func(ctx context.Context, pod *corev1.Pod) (map[string]any, error) { - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { - return nil, errors.Wrap(err, "Failed while waiting for Pod: "+pod.Name+" to be ready") +) func(ctx context.Context, pc kube.PodController) (map[string]any, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]any, error) { + pod := pc.Pod() + + // Wait for pod to reach running state + if err := pc.WaitForPodReady(ctx); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } contentCacheMB, metadataCacheMB := kopiacmd.GetCacheSizeSettingsForSnapshot() @@ -184,9 +183,16 @@ func deleteDataFromServerPodFunc( ContentCacheMB: contentCacheMB, MetadataCacheMB: metadataCacheMB, }) - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr) + + commandExecutor, err := pc.GetCommandExecutor() + if err != nil { + return nil, errors.Wrap(err, "Unable to get pod command executor") + } + + var stdout, stderr bytes.Buffer + err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) if err != nil { return nil, errors.Wrap(err, "Failed to connect to Kopia Repository server") } @@ -200,9 +206,11 @@ func deleteDataFromServerPodFunc( }, SnapID: snapID, }) - stdout, stderr, err = kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr) + stdout.Reset() + stderr.Reset() + err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) return nil, errors.Wrap(err, "Failed to delete backup from Kopia API server") } } diff --git a/pkg/function/describe_backups.go b/pkg/function/describe_backups.go index e1d2c45284..fb9f512a13 100644 --- a/pkg/function/describe_backups.go +++ b/pkg/function/describe_backups.go @@ -15,11 +15,11 @@ package function import ( + "bytes" "context" "strings" "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" @@ -73,22 +73,40 @@ func describeBackups(ctx context.Context, cli kubernetes.Interface, tp param.Tem PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := describeBackupsPodFunc(cli, tp, namespace, encryptionKey, targetPaths) - return pr.Run(ctx, podFunc) + podFunc := describeBackupsPodFunc(cli, tp, encryptionKey, targetPaths) + return pr.RunEx(ctx, podFunc) } -func describeBackupsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, targetPath string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { +func describeBackupsPodFunc( + cli kubernetes.Interface, + tp param.TemplateParams, + encryptionKey, + targetPath string, +) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + pod := pc.Pod() + // Wait for pod to reach running state - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { + if err := pc.WaitForPodReady(ctx); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } - pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile) + + remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile) if err != nil { return nil, err } - defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) - err = restic.CheckIfRepoIsReachable(tp.Profile, targetPath, encryptionKey, cli, namespace, pod.Name, pod.Spec.Containers[0].Name) + + // Parent context could already be dead, so removing file within new context + defer remover.Remove(context.Background()) //nolint:errcheck + + err = restic.CheckIfRepoIsReachable( + tp.Profile, + targetPath, + encryptionKey, + cli, pod.Namespace, + pod.Name, + pod.Spec.Containers[0].Name, + ) switch { case err == nil: break @@ -114,18 +132,27 @@ func describeBackupsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, n default: return nil, err } + cmd, err := restic.StatsCommandByID(tp.Profile, targetPath, "" /* get all snapshot stats */, RawDataStatsMode, encryptionKey) if err != nil { return nil, err } - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr) + + ex, err := pc.GetCommandExecutor() + if err != nil { + return nil, err + } + + var stdout, stderr bytes.Buffer + err = ex.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) if err != nil { return nil, errors.Wrapf(err, "Failed to get backup stats") } + // Get File Count and Size from Stats - _, fc, size := restic.SnapshotStatsFromStatsLog(stdout) + _, fc, size := restic.SnapshotStatsFromStatsLog(stdout.String()) if fc == "" || size == "" { return nil, errors.New("Failed to parse snapshot stats from logs") } diff --git a/pkg/function/prepare_data.go b/pkg/function/prepare_data.go index 6fe46a71d7..37c5544b20 100644 --- a/pkg/function/prepare_data.go +++ b/pkg/function/prepare_data.go @@ -17,9 +17,11 @@ package function import ( "context" "fmt" + "io" + "github.com/kanisterio/kanister/pkg/consts" + "github.com/kanisterio/kanister/pkg/field" "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -95,20 +97,32 @@ func prepareData(ctx context.Context, cli kubernetes.Interface, namespace, servi } pr := kube.NewPodRunner(cli, options) podFunc := prepareDataPodFunc(cli) - return pr.Run(ctx, podFunc) + return pr.RunEx(ctx, podFunc) } -func prepareDataPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - // Wait for pod completion - if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil { - return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name) +func prepareDataPodFunc(cli kubernetes.Interface) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + pod := pc.Pod() + + // Wait for pod to reach running state + if err := pc.WaitForPodReady(ctx); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } + + ctx = field.Context(ctx, consts.LogKindKey, consts.LogKindDatapath) // Fetch logs from the pod - logs, err := kube.GetPodLogs(ctx, cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) + r, err := pc.StreamPodLogs(ctx) if err != nil { return nil, errors.Wrapf(err, "Failed to fetch logs from the pod") } + defer r.Close() + + bytes, err := io.ReadAll(r) + if err != nil { + return nil, errors.Wrapf(err, "Failed to read logs from the pod") + } + logs := string(bytes) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, logs) out, err := parseLogAndCreateOutput(logs) return out, errors.Wrap(err, "Failed to parse phase output") diff --git a/pkg/function/restore_data.go b/pkg/function/restore_data.go index c8d82a4887..ad54550c1f 100644 --- a/pkg/function/restore_data.go +++ b/pkg/function/restore_data.go @@ -15,10 +15,10 @@ package function import ( + "bytes" "context" "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -125,21 +125,34 @@ func restoreData(ctx context.Context, cli kubernetes.Interface, tp param.Templat PodOverride: podOverride, } pr := kube.NewPodRunner(cli, options) - podFunc := restoreDataPodFunc(cli, tp, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID) - return pr.Run(ctx, podFunc) + podFunc := restoreDataPodFunc(tp, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID) + return pr.RunEx(ctx, podFunc) } -func restoreDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { - return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) { +func restoreDataPodFunc( + tp param.TemplateParams, + encryptionKey, + backupArtifactPrefix, + restorePath, + backupTag, + backupID string, +) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + pod := pc.Pod() + // Wait for pod to reach running state - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { + if err := pc.WaitForPodReady(ctx); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } - pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile) + + remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "Failed to write credentials to Pod %s", pc.PodName()) } - defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name) + + // Parent context could already be dead, so removing file within new context + defer remover.Remove(context.Background()) //nolint:errcheck + var cmd []string // Generate restore command based on the identifier passed if backupTag != "" { @@ -150,13 +163,19 @@ func restoreDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, names if err != nil { return nil, err } - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout) - format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr) + + ex, err := pc.GetCommandExecutor() + if err != nil { + return nil, err + } + var stdout, stderr bytes.Buffer + err = ex.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) if err != nil { return nil, errors.Wrapf(err, "Failed to restore backup") } - out, err := parseLogAndCreateOutput(stdout) + out, err := parseLogAndCreateOutput(stdout.String()) return out, errors.Wrap(err, "Failed to parse phase output") } } diff --git a/pkg/function/restore_data_using_kopia_server.go b/pkg/function/restore_data_using_kopia_server.go index d5117582e8..0d327fd5a0 100644 --- a/pkg/function/restore_data_using_kopia_server.go +++ b/pkg/function/restore_data_using_kopia_server.go @@ -15,11 +15,11 @@ package function import ( + "bytes" "context" "fmt" "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -185,9 +185,7 @@ func restoreDataFromServer( pr := kube.NewPodRunner(cli, options) podFunc := restoreDataFromServerPodFunc( - cli, hostname, - namespace, restorePath, serverAddress, fingerprint, @@ -196,13 +194,11 @@ func restoreDataFromServer( userPassphrase, sparseRestore, ) - return pr.Run(ctx, podFunc) + return pr.RunEx(ctx, podFunc) } func restoreDataFromServerPodFunc( - cli kubernetes.Interface, hostname, - namespace, restorePath, serverAddress, fingerprint, @@ -210,10 +206,13 @@ func restoreDataFromServerPodFunc( username, userPassphrase string, sparseRestore bool, -) func(ctx context.Context, pod *corev1.Pod) (map[string]any, error) { - return func(ctx context.Context, pod *corev1.Pod) (map[string]any, error) { - if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { - return nil, errors.Wrap(err, "Failed while waiting for Pod: "+pod.Name+" to be ready") +) func(ctx context.Context, pc kube.PodController) (map[string]any, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]any, error) { + pod := pc.Pod() + + // Wait for pod to reach running state + if err := pc.WaitForPodReady(ctx); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) } contentCacheMB, metadataCacheMB := kopiacmd.GetCacheSizeSettingsForSnapshot() @@ -232,11 +231,18 @@ func restoreDataFromServerPodFunc( ContentCacheMB: contentCacheMB, MetadataCacheMB: metadataCacheMB, }) - stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr) + + commandExecutor, err := pc.GetCommandExecutor() if err != nil { - return nil, errors.Wrap(err, "Failed to connect to Kopia API server") + return nil, errors.Wrap(err, "Unable to get pod command executor") + } + + var stdout, stderr bytes.Buffer + err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) + if err != nil { + return nil, errors.Wrapf(err, "Failed to connect to Kopia Repository server") } cmd = kopiacmd.SnapshotRestore( @@ -251,9 +257,13 @@ func restoreDataFromServerPodFunc( SparseRestore: sparseRestore, IgnorePermissionErrors: true, }) - stdout, stderr, err = kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout) - format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr) + + stdout.Reset() + stderr.Reset() + err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String()) + format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String()) + return nil, errors.Wrap(err, "Failed to restore backup from Kopia API server") } } diff --git a/pkg/function/utils.go b/pkg/function/utils.go index ecb18ee88c..be0196ab2b 100644 --- a/pkg/function/utils.go +++ b/pkg/function/utils.go @@ -102,16 +102,22 @@ func (nr nopRemover) Path() string { return "" } -// WriteCredsToPod creates a file with Google credentials if the given profile points to a GCS location -func WriteCredsToPod(ctx context.Context, writer kube.PodFileWriter, profile *param.Profile) (kube.PodFileRemover, error) { +// MaybeWriteProfileCredentials creates a file with Google credentials if the given profile points to a GCS location, otherwise does nothing +func MaybeWriteProfileCredentials(ctx context.Context, pc kube.PodController, profile *param.Profile) (kube.PodFileRemover, error) { if profile.Location.Type == crv1alpha1.LocationTypeGCS { - remover, err := writer.Write(ctx, consts.GoogleCloudCredsFilePath, bytes.NewBufferString(profile.Credential.KeyPair.Secret)) + pfw, err := pc.GetFileWriter() if err != nil { - return nil, errors.Wrapf(err, "Unable to write Google credentials to the pod.") + return nil, errors.Wrap(err, "Unable to write Google credentials") + } + + remover, err := pfw.Write(ctx, consts.GoogleCloudCredsFilePath, bytes.NewBufferString(profile.Credential.KeyPair.Secret)) + if err != nil { + return nil, errors.Wrap(err, "Unable to write Google credentials") } return remover, nil } + return nopRemover{}, nil }