Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor functions with pod runner #2308

Merged
merged 15 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions pkg/function/backup_data_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package function

import (
"bytes"
"context"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
Expand Down Expand Up @@ -71,33 +71,46 @@ func backupDataStats(ctx context.Context, cli kubernetes.Interface, tp param.Tem
PodOverride: podOverride,
}
pr := kube.NewPodRunner(cli, options)
podFunc := backupDataStatsPodFunc(cli, tp, namespace, encryptionKey, backupArtifactPrefix, backupID, mode)
return pr.Run(ctx, podFunc)
podFunc := backupDataStatsPodFunc(tp, encryptionKey, backupArtifactPrefix, backupID, mode)
return pr.RunEx(ctx, podFunc)
}

func backupDataStatsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, backupArtifactPrefix, backupID, mode string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
func backupDataStatsPodFunc(tp param.TemplateParams, encryptionKey, backupArtifactPrefix, backupID, mode string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
e-sumin marked this conversation as resolved.
Show resolved Hide resolved
return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
pod := pc.Pod()

// Wait for pod to reach running state
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)

remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile)
if err != nil {
return nil, err
}
defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)

// Parent context could already be dead, so removing file within new context
defer remover.Remove(context.Background()) //nolint:errcheck

cmd, err := restic.StatsCommandByID(tp.Profile, backupArtifactPrefix, backupID, mode, encryptionKey)
if err != nil {
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)

commandExecutor, err := pc.GetCommandExecutor()
if err != nil {
return nil, errors.Wrap(err, "Unable to get pod command executor")
}

var stdout, stderr bytes.Buffer
err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
Comment on lines +111 to +114
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very common pattern in functions - execute, log and then analyze output.
we can extract this functionality to utils and then reuse everywhere to reduce amount of code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also not sure whether we need to wait until end of execution and log at the end, probably we can log on the fly, since we passing io.Writer to Exec function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Could you please create a GitHub issue with this suggestion? It could be good for first-time contributors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
return nil, errors.Wrapf(err, "Failed to get backup stats")
}
// Get File Count and Size from Stats
mode, fc, size := restic.SnapshotStatsFromStatsLog(stdout)
mode, fc, size := restic.SnapshotStatsFromStatsLog(stdout.String())
if fc == "" || size == "" {
return nil, errors.New("Failed to parse snapshot stats from logs")
}
Expand Down
23 changes: 14 additions & 9 deletions pkg/function/checkRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"strings"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
Expand Down Expand Up @@ -55,22 +54,28 @@ func CheckRepository(ctx context.Context, cli kubernetes.Interface, tp param.Tem
PodOverride: podOverride,
}
pr := kube.NewPodRunner(cli, options)
podFunc := CheckRepositoryPodFunc(cli, tp, namespace, encryptionKey, targetPaths)
return pr.Run(ctx, podFunc)
podFunc := CheckRepositoryPodFunc(cli, tp, encryptionKey, targetPaths)
return pr.RunEx(ctx, podFunc)
}

func CheckRepositoryPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, targetPath string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
func CheckRepositoryPodFunc(cli kubernetes.Interface, tp param.TemplateParams, encryptionKey, targetPath string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
e-sumin marked this conversation as resolved.
Show resolved Hide resolved
return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
pod := pc.Pod()

// Wait for pod to reach running state
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)

remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile)
if err != nil {
return nil, err
}
defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
err = restic.CheckIfRepoIsReachable(tp.Profile, targetPath, encryptionKey, cli, namespace, pod.Name, pod.Spec.Containers[0].Name)

// Parent context could already be dead, so removing file within new context
defer remover.Remove(context.Background()) //nolint:errcheck

err = restic.CheckIfRepoIsReachable(tp.Profile, targetPath, encryptionKey, cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
e-sumin marked this conversation as resolved.
Show resolved Hide resolved
switch {
case err == nil:
break
Expand Down
17 changes: 6 additions & 11 deletions pkg/function/copy_volume_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
package function

import (
"bytes"
"context"
"fmt"

"github.com/pkg/errors"
"go.uber.org/zap/buffer"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -80,22 +80,18 @@ func copyVolumeData(ctx context.Context, cli kubernetes.Interface, tp param.Temp
PodOverride: podOverride,
}
pr := kube.NewPodRunner(cli, options)
podFunc := copyVolumeDataPodFunc(cli, tp, namespace, mountPoint, targetPath, encryptionKey)
podFunc := copyVolumeDataPodFunc(cli, tp, mountPoint, targetPath, encryptionKey)
return pr.RunEx(ctx, podFunc)
}

func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, mountPoint, targetPath, encryptionKey string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, mountPoint, targetPath, encryptionKey string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
e-sumin marked this conversation as resolved.
Show resolved Hide resolved
return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
// Wait for pod to reach running state
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pc.PodName())
}
pw1, err := pc.GetFileWriter()
if err != nil {
return nil, errors.Wrapf(err, "Failed to write credentials to Pod %s", pc.PodName())
}

remover, err := WriteCredsToPod(ctx, pw1, tp.Profile)
remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile)
if err != nil {
return nil, errors.Wrapf(err, "Failed to write credentials to Pod %s", pc.PodName())
}
Expand All @@ -105,7 +101,7 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na

pod := pc.Pod()
// Get restic repository
if err := restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil {
if err := restic.GetOrCreateRepository(cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil {
e-sumin marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
// Copy data to object store
Expand All @@ -118,8 +114,7 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na
if err != nil {
return nil, err
}
var stdout buffer.Buffer
var stderr buffer.Buffer
var stdout, stderr bytes.Buffer
err = ex.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
Expand Down
63 changes: 42 additions & 21 deletions pkg/function/delete_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package function

import (
"bytes"
"context"
"fmt"
"strings"
Expand Down Expand Up @@ -67,6 +68,10 @@ func (*deleteDataFunc) Name() string {
}

func deleteData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string, jobPrefix string, podOverride crv1alpha1.JSONMap) (map[string]interface{}, error) {
e-sumin marked this conversation as resolved.
Show resolved Hide resolved
if (len(deleteIdentifiers) == 0) == (len(deleteTags) == 0) {
return nil, errors.Errorf("Require one argument: %s or %s", DeleteDataBackupIdentifierArg, DeleteDataBackupTagArg)
}

options := &kube.PodOptions{
Namespace: namespace,
GenerateName: jobPrefix,
Expand All @@ -75,37 +80,48 @@ func deleteData(ctx context.Context, cli kubernetes.Interface, tp param.Template
PodOverride: podOverride,
}
pr := kube.NewPodRunner(cli, options)
podFunc := deleteDataPodFunc(cli, tp, reclaimSpace, namespace, encryptionKey, targetPaths, deleteTags, deleteIdentifiers)
return pr.Run(ctx, podFunc)
podFunc := deleteDataPodFunc(cli, tp, reclaimSpace, encryptionKey, targetPaths, deleteTags, deleteIdentifiers)
return pr.RunEx(ctx, podFunc)
}

//nolint:gocognit
func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
e-sumin marked this conversation as resolved.
Show resolved Hide resolved
return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
pod := pc.Pod()

// Wait for pod to reach running state
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
if (len(deleteIdentifiers) == 0) == (len(deleteTags) == 0) {
return nil, errors.Errorf("Require one argument: %s or %s", DeleteDataBackupIdentifierArg, DeleteDataBackupTagArg)

remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile)
if err != nil {
return nil, err
}
pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)

// Parent context could already be dead, so removing file within new context
defer remover.Remove(context.Background()) //nolint:errcheck

// Get command executor
podCommandExecutor, err := pc.GetCommandExecutor()
if err != nil {
return nil, err
}
defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)

for i, deleteTag := range deleteTags {
cmd, err := restic.SnapshotsCommandByTag(tp.Profile, targetPaths[i], deleteTag, encryptionKey)
if err != nil {
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)

var stdout, stderr bytes.Buffer
err = podCommandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
if err != nil {
return nil, errors.Wrapf(err, "Failed to forget data, could not get snapshotID from tag, Tag: %s", deleteTag)
}
deleteIdentifier, err := restic.SnapshotIDFromSnapshotLog(stdout)
deleteIdentifier, err := restic.SnapshotIDFromSnapshotLog(stdout.String())
if err != nil {
return nil, errors.Wrapf(err, "Failed to forget data, could not get snapshotID from tag, Tag: %s", deleteTag)
}
Expand All @@ -117,14 +133,16 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai
if err != nil {
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)

var stdout, stderr bytes.Buffer
err = podCommandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
if err != nil {
return nil, errors.Wrapf(err, "Failed to forget data")
}
if reclaimSpace {
spaceFreedStr, err := pruneData(cli, tp, pod, namespace, encryptionKey, targetPaths[i])
spaceFreedStr, err := pruneData(tp, pod, podCommandExecutor, encryptionKey, targetPaths[i])
if err != nil {
return nil, errors.Wrapf(err, "Error executing prune command")
}
Expand All @@ -138,15 +156,18 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai
}
}

func pruneData(cli kubernetes.Interface, tp param.TemplateParams, pod *v1.Pod, namespace, encryptionKey, targetPath string) (string, error) {
func pruneData(tp param.TemplateParams, pod *v1.Pod, podCommandExecutor kube.PodCommandExecutor, encryptionKey, targetPath string) (string, error) {
e-sumin marked this conversation as resolved.
Show resolved Hide resolved
cmd, err := restic.PruneCommand(tp.Profile, targetPath, encryptionKey)
if err != nil {
return "", err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
spaceFreed := restic.SpaceFreedFromPruneLog(stdout)

var stdout, stderr bytes.Buffer
err = podCommandExecutor.Exec(context.Background(), cmd, nil, &stdout, &stderr)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr.String())

spaceFreed := restic.SpaceFreedFromPruneLog(stdout.String())
return spaceFreed, errors.Wrapf(err, "Failed to prune data after forget")
}

Expand Down
40 changes: 24 additions & 16 deletions pkg/function/delete_data_using_kopia_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package function

import (
"bytes"
"context"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
Expand Down Expand Up @@ -142,31 +142,30 @@ func deleteDataFromServer(
}
pr := kube.NewPodRunner(cli, options)
podFunc := deleteDataFromServerPodFunc(
cli,
hostname,
namespace,
serverAddress,
fingerprint,
snapID,
username,
userPassphrase,
)
return pr.Run(ctx, podFunc)
return pr.RunEx(ctx, podFunc)
}

func deleteDataFromServerPodFunc(
cli kubernetes.Interface,
hostname,
namespace,
serverAddress,
fingerprint,
snapID,
username,
userPassphrase string,
) func(ctx context.Context, pod *corev1.Pod) (map[string]any, error) {
return func(ctx context.Context, pod *corev1.Pod) (map[string]any, error) {
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrap(err, "Failed while waiting for Pod: "+pod.Name+" to be ready")
) func(ctx context.Context, pc kube.PodController) (map[string]any, error) {
return func(ctx context.Context, pc kube.PodController) (map[string]any, error) {
pod := pc.Pod()

// Wait for pod to reach running state
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}

contentCacheMB, metadataCacheMB := kopiacmd.GetCacheSizeSettingsForSnapshot()
Expand All @@ -184,9 +183,16 @@ func deleteDataFromServerPodFunc(
ContentCacheMB: contentCacheMB,
MetadataCacheMB: metadataCacheMB,
})
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)

commandExecutor, err := pc.GetCommandExecutor()
if err != nil {
return nil, errors.Wrap(err, "Unable to get pod command executor")
}

var stdout, stderr bytes.Buffer
err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
if err != nil {
return nil, errors.Wrap(err, "Failed to connect to Kopia Repository server")
}
Expand All @@ -200,9 +206,11 @@ func deleteDataFromServerPodFunc(
},
SnapID: snapID,
})
stdout, stderr, err = kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
stdout.Reset()
stderr.Reset()
err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
return nil, errors.Wrap(err, "Failed to delete backup from Kopia API server")
}
}
Loading