Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Errkit migration 5 (pkg/function) #3174

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions pkg/function/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package function

import (
"fmt"

"github.com/kanisterio/errkit"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"sigs.k8s.io/yaml"

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
Expand All @@ -29,11 +31,11 @@ import (
func Arg(args map[string]interface{}, argName string, result interface{}) error {
if val, ok := args[argName]; ok {
if err := mapstructure.WeakDecode(val, result); err != nil {
return errors.Wrapf(err, "Failed to decode arg `%s`", argName)
return errkit.Wrap(err, fmt.Sprintf("Failed to decode arg `%s`", argName))
}
return nil
}
return errors.New("Argument missing " + argName)
return errkit.New("Argument missing " + argName)
}

// OptArg returns the value of the specified argument if it exists
Expand Down Expand Up @@ -105,5 +107,5 @@ func GetYamlList(args map[string]interface{}, argName string) ([]string, error)
err := yaml.Unmarshal(valListBytes, &valList)
return valList, err
}
return nil, errors.Errorf("Invalid %s arg format", argName)
return nil, errkit.New(fmt.Sprintf("Invalid %s arg format", argName))
}
13 changes: 7 additions & 6 deletions pkg/function/backup_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package function

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
"github.com/kanisterio/errkit"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -110,20 +111,20 @@ func (b *backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args
}

if err = ValidateProfile(tp.Profile); err != nil {
return nil, errors.Wrapf(err, "Failed to validate Profile")
return nil, errkit.Wrap(err, "Failed to validate Profile")
}

backupArtifactPrefix = ResolveArtifactPrefix(backupArtifactPrefix, tp.Profile)

cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
return nil, errkit.Wrap(err, "Failed to create Kubernetes client")
}
ctx = field.Context(ctx, consts.PodNameKey, pod)
ctx = field.Context(ctx, consts.ContainerNameKey, container)
backupOutputs, err := backupData(ctx, cli, namespace, pod, container, backupArtifactPrefix, includePath, encryptionKey, insecureTLS, tp)
if err != nil {
return nil, errors.Wrapf(err, "Failed to backup data")
return nil, errkit.Wrap(err, "Failed to backup data")
}
output := map[string]interface{}{
BackupDataOutputBackupID: backupOutputs.backupID,
Expand Down Expand Up @@ -194,12 +195,12 @@ func backupData(ctx context.Context, cli kubernetes.Interface, namespace, pod, c
format.LogWithCtx(ctx, pod, container, stdout)
format.LogWithCtx(ctx, pod, container, stderr)
if err != nil {
return backupDataParsedOutput{}, errors.Wrapf(err, "Failed to create and upload backup")
return backupDataParsedOutput{}, errkit.Wrap(err, "Failed to create and upload backup")
}
// Get the snapshot ID from log
backupID := restic.SnapshotIDFromBackupLog(stdout)
if backupID == "" {
return backupDataParsedOutput{}, errors.Errorf("Failed to parse the backup ID from logs, backup logs %s", stdout)
return backupDataParsedOutput{}, errkit.New(fmt.Sprintf("Failed to parse the backup ID from logs, backup logs %s", stdout))
}
// Get the file count and size of the backup from log
fileCount, backupSize, phySize := restic.SnapshotStatsFromBackupLog(stdout)
Expand Down
14 changes: 7 additions & 7 deletions pkg/function/backup_data_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"github.com/kanisterio/errkit"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -106,14 +106,14 @@ func (b *backupDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, a
}

if err = ValidateProfile(tp.Profile); err != nil {
return nil, errors.Wrapf(err, "Failed to validate Profile")
return nil, errkit.Wrap(err, "Failed to validate Profile")
}

backupArtifactPrefix = ResolveArtifactPrefix(backupArtifactPrefix, tp.Profile)

cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
return nil, errkit.Wrap(err, "Failed to create Kubernetes client")
}
var ps []string
if pods == "" {
Expand All @@ -123,7 +123,7 @@ func (b *backupDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, a
case tp.StatefulSet != nil:
ps = tp.StatefulSet.Pods
default:
return nil, errors.New("Failed to get pods")
return nil, errkit.New("Failed to get pods")
}
} else {
ps = strings.Fields(pods)
Expand Down Expand Up @@ -171,7 +171,7 @@ func backupDataAll(ctx context.Context, cli kubernetes.Interface, namespace stri
go func(pod string, container string) {
ctx = field.Context(ctx, consts.PodNameKey, pod)
backupOutputs, err := backupData(ctx, cli, namespace, pod, container, fmt.Sprintf("%s/%s", backupArtifactPrefix, pod), includePath, encryptionKey, insecureTLS, tp)
errChan <- errors.Wrapf(err, "Failed to backup data for pod %s", pod)
errChan <- errkit.Wrap(err, "Failed to backup data for pod", "pod", pod)
outChan <- BackupInfo{PodName: pod, BackupID: backupOutputs.backupID, BackupTag: backupOutputs.backupTag}
}(pod, container)
}
Expand All @@ -186,11 +186,11 @@ func backupDataAll(ctx context.Context, cli kubernetes.Interface, namespace stri
}
}
if len(errs) != 0 {
return nil, errors.New(strings.Join(errs, "\n"))
return nil, errkit.New(strings.Join(errs, "\n"))
}
manifestData, err := json.Marshal(Output)
if err != nil {
return nil, errors.Wrapf(err, "Failed to encode JSON data")
return nil, errkit.Wrap(err, "Failed to encode JSON data")
}
return map[string]interface{}{
BackupDataAllOutput: string(manifestData),
Expand Down
14 changes: 7 additions & 7 deletions pkg/function/backup_data_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"context"
"time"

"github.com/pkg/errors"
"github.com/kanisterio/errkit"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -113,7 +113,7 @@ func backupDataStatsPodFunc(

// Wait for pod to reach running state
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
return nil, errkit.Wrap(err, "Failed while waiting for Pod to be ready", "pod", pod.Name)
}

remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile)
Expand All @@ -131,20 +131,20 @@ func backupDataStatsPodFunc(

commandExecutor, err := pc.GetCommandExecutor()
if err != nil {
return nil, errors.Wrap(err, "Unable to get pod command executor")
return nil, errkit.Wrap(err, "Unable to get pod command executor")
}

var stdout, stderr bytes.Buffer
err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
if err != nil {
return nil, errors.Wrapf(err, "Failed to get backup stats")
return nil, errkit.Wrap(err, "Failed to get backup stats")
}
// Get File Count and Size from Stats
mode, fc, size := restic.SnapshotStatsFromStatsLog(stdout.String())
if fc == "" || size == "" {
return nil, errors.New("Failed to parse snapshot stats from logs")
return nil, errkit.New("Failed to parse snapshot stats from logs")
}
return map[string]interface{}{
BackupDataStatsOutputMode: mode,
Expand Down Expand Up @@ -206,14 +206,14 @@ func (b *BackupDataStatsFunc) Exec(ctx context.Context, tp param.TemplateParams,
}

if err = ValidateProfile(tp.Profile); err != nil {
return nil, errors.Wrapf(err, "Failed to validate Profile")
return nil, errkit.Wrap(err, "Failed to validate Profile")
}

backupArtifactPrefix = ResolveArtifactPrefix(backupArtifactPrefix, tp.Profile)

cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
return nil, errkit.Wrap(err, "Failed to create Kubernetes client")
}
return backupDataStats(
ctx,
Expand Down
30 changes: 15 additions & 15 deletions pkg/function/backup_data_using_kopia_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"time"

"github.com/dustin/go-humanize"
"github.com/pkg/errors"
"github.com/kanisterio/errkit"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -131,22 +131,22 @@ func (b *backupDataUsingKopiaServerFunc) Exec(ctx context.Context, tp param.Temp

userPassphrase, cert, err := userCredentialsAndServerTLS(&tp)
if err != nil {
return nil, errors.Wrap(err, "Failed to fetch User Credentials/Certificate Data from Template Params")
return nil, errkit.Wrap(err, "Failed to fetch User Credentials/Certificate Data from Template Params")
}

fingerprint, err := kankopia.ExtractFingerprintFromCertificateJSON(cert)
if err != nil {
return nil, errors.Wrap(err, "Failed to fetch Kopia API Server Certificate Secret Data from Certificate")
return nil, errkit.Wrap(err, "Failed to fetch Kopia API Server Certificate Secret Data from Certificate")
}

hostname, userAccessPassphrase, err := hostNameAndUserPassPhraseFromRepoServer(userPassphrase, userHostname)
if err != nil {
return nil, errors.Wrap(err, "Failed to fetch Hostname/User Passphrase from Secret")
return nil, errkit.Wrap(err, "Failed to fetch Hostname/User Passphrase from Secret")
}

cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrap(err, "Failed to create Kubernetes client")
return nil, errkit.Wrap(err, "Failed to create Kubernetes client")
}

snapInfo, err := backupDataUsingKopiaServer(
Expand All @@ -164,7 +164,7 @@ func (b *backupDataUsingKopiaServerFunc) Exec(ctx context.Context, tp param.Temp
tags,
)
if err != nil {
return nil, errors.Wrap(err, "Failed to backup data using Kopia Repository Server")
return nil, errkit.Wrap(err, "Failed to backup data using Kopia Repository Server")
}

var logSize, phySize int64
Expand Down Expand Up @@ -226,7 +226,7 @@ func backupDataUsingKopiaServer(
format.Log(pod, container, stdout)
format.Log(pod, container, stderr)
if err != nil {
return nil, errors.Wrap(err, "Failed to connect to Kopia Repository Server")
return nil, errkit.Wrap(err, "Failed to connect to Kopia Repository Server")
}

cmd = kopiacmd.SnapshotCreate(
Expand All @@ -242,7 +242,7 @@ func backupDataUsingKopiaServer(
Parallelism: utils.GetEnvAsIntOrDefault(kankopia.DataStoreParallelUploadName, kankopia.DefaultDataStoreParallelUpload),
})
if err != nil {
return nil, errors.Wrap(err, "Failed to construct snapshot create command")
return nil, errkit.Wrap(err, "Failed to construct snapshot create command")
}
stdout, stderr, err = kube.Exec(ctx, cli, namespace, pod, container, cmd, nil)
format.Log(pod, container, stdout)
Expand All @@ -253,7 +253,7 @@ func backupDataUsingKopiaServer(
if strings.Contains(err.Error(), kerrors.ErrCodeOutOfMemoryStr) {
message = message + ": " + kerrors.ErrOutOfMemoryStr
}
return nil, errors.Wrap(err, message)
return nil, errkit.Wrap(err, message)
}
// Parse logs and return snapshot IDs and stats
return kopiacmd.ParseSnapshotCreateOutput(stdout, stderr)
Expand All @@ -262,14 +262,14 @@ func backupDataUsingKopiaServer(
func hostNameAndUserPassPhraseFromRepoServer(userCreds, hostname string) (string, string, error) {
var userAccessMap map[string]string
if err := json.Unmarshal([]byte(userCreds), &userAccessMap); err != nil {
return "", "", errors.Wrap(err, "Failed to unmarshal User Credentials Data")
return "", "", errkit.Wrap(err, "Failed to unmarshal User Credentials Data")
}

// Check if hostname provided exists in the User Access Map
if hostname != "" {
err := checkHostnameExistsInUserAccessMap(userAccessMap, hostname)
if err != nil {
return "", "", errors.Wrap(err, "Failed to find hostname in the User Access Map")
return "", "", errkit.Wrap(err, "Failed to find hostname in the User Access Map")
}
}

Expand All @@ -286,27 +286,27 @@ func hostNameAndUserPassPhraseFromRepoServer(userCreds, hostname string) (string

decodedUserPassphrase, err := base64.StdEncoding.DecodeString(userPassphrase)
if err != nil {
return "", "", errors.Wrap(err, "Failed to Decode User Passphrase")
return "", "", errkit.Wrap(err, "Failed to Decode User Passphrase")
}
return hostname, string(decodedUserPassphrase), nil
}

func userCredentialsAndServerTLS(tp *param.TemplateParams) (string, string, error) {
userCredJSON, err := json.Marshal(tp.RepositoryServer.Credentials.ServerUserAccess.Data)
if err != nil {
return "", "", errors.Wrap(err, "Error marshalling User Credentials Data")
return "", "", errkit.Wrap(err, "Error marshalling User Credentials Data")
}
certJSON, err := json.Marshal(tp.RepositoryServer.Credentials.ServerTLS.Data)
if err != nil {
return "", "", errors.Wrap(err, "Error marshalling Certificate Data")
return "", "", errkit.Wrap(err, "Error marshalling Certificate Data")
}
return string(userCredJSON), string(certJSON), nil
}

func checkHostnameExistsInUserAccessMap(userAccessMap map[string]string, hostname string) error {
// check if hostname that is provided by the user exists in the user access map
if _, ok := userAccessMap[hostname]; !ok {
return errors.New("hostname provided in the repository server CR does not exist in the user access map")
return errkit.New("hostname provided in the repository server CR does not exist in the user access map")
}
return nil
}
8 changes: 4 additions & 4 deletions pkg/function/checkRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"github.com/kanisterio/errkit"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -62,7 +62,7 @@ func CheckRepository(
) (map[string]interface{}, error) {
namespace, err := kube.GetControllerNamespace()
if err != nil {
return nil, errors.Wrapf(err, "Failed to get controller namespace")
return nil, errkit.Wrap(err, "Failed to get controller namespace")
}
options := &kube.PodOptions{
Namespace: namespace,
Expand Down Expand Up @@ -94,7 +94,7 @@ func CheckRepositoryPodFunc(

// Wait for pod to reach running state
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
return nil, errkit.Wrap(err, "Failed while waiting for Pod to be ready", "pod", pod.Name)
}

remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile)
Expand Down Expand Up @@ -193,7 +193,7 @@ func (c *CheckRepositoryFunc) Exec(ctx context.Context, tp param.TemplateParams,

cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
return nil, errkit.Wrap(err, "Failed to create Kubernetes client")
}
return CheckRepository(
ctx,
Expand Down
Loading