Skip to content

Commit

Permalink
Migrate pkg/function to errkit
Browse files Browse the repository at this point in the history
  • Loading branch information
e-sumin committed Oct 4, 2024
1 parent 33200ed commit 7262bdb
Show file tree
Hide file tree
Showing 32 changed files with 276 additions and 272 deletions.
10 changes: 6 additions & 4 deletions pkg/function/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package function

import (
"fmt"

"github.com/kanisterio/errkit"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"sigs.k8s.io/yaml"

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
Expand All @@ -29,11 +31,11 @@ import (
func Arg(args map[string]interface{}, argName string, result interface{}) error {
if val, ok := args[argName]; ok {
if err := mapstructure.WeakDecode(val, result); err != nil {
return errors.Wrapf(err, "Failed to decode arg `%s`", argName)
return errkit.Wrap(err, fmt.Sprintf("Failed to decode arg `%s`", argName))
}
return nil
}
return errors.New("Argument missing " + argName)
return errkit.New("Argument missing " + argName)
}

// OptArg returns the value of the specified argument if it exists
Expand Down Expand Up @@ -105,5 +107,5 @@ func GetYamlList(args map[string]interface{}, argName string) ([]string, error)
err := yaml.Unmarshal(valListBytes, &valList)
return valList, err
}
return nil, errors.Errorf("Invalid %s arg format", argName)
return nil, errkit.New(fmt.Sprintf("Invalid %s arg format", argName))
}
13 changes: 7 additions & 6 deletions pkg/function/backup_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package function

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
"github.com/kanisterio/errkit"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -110,20 +111,20 @@ func (b *backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args
}

if err = ValidateProfile(tp.Profile); err != nil {
return nil, errors.Wrapf(err, "Failed to validate Profile")
return nil, errkit.Wrap(err, "Failed to validate Profile")
}

backupArtifactPrefix = ResolveArtifactPrefix(backupArtifactPrefix, tp.Profile)

cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
return nil, errkit.Wrap(err, "Failed to create Kubernetes client")
}
ctx = field.Context(ctx, consts.PodNameKey, pod)
ctx = field.Context(ctx, consts.ContainerNameKey, container)
backupOutputs, err := backupData(ctx, cli, namespace, pod, container, backupArtifactPrefix, includePath, encryptionKey, insecureTLS, tp)
if err != nil {
return nil, errors.Wrapf(err, "Failed to backup data")
return nil, errkit.Wrap(err, "Failed to backup data")
}
output := map[string]interface{}{
BackupDataOutputBackupID: backupOutputs.backupID,
Expand Down Expand Up @@ -194,12 +195,12 @@ func backupData(ctx context.Context, cli kubernetes.Interface, namespace, pod, c
format.LogWithCtx(ctx, pod, container, stdout)
format.LogWithCtx(ctx, pod, container, stderr)
if err != nil {
return backupDataParsedOutput{}, errors.Wrapf(err, "Failed to create and upload backup")
return backupDataParsedOutput{}, errkit.Wrap(err, "Failed to create and upload backup")
}
// Get the snapshot ID from log
backupID := restic.SnapshotIDFromBackupLog(stdout)
if backupID == "" {
return backupDataParsedOutput{}, errors.Errorf("Failed to parse the backup ID from logs, backup logs %s", stdout)
return backupDataParsedOutput{}, errkit.New(fmt.Sprintf("Failed to parse the backup ID from logs, backup logs %s", stdout))
}
// Get the file count and size of the backup from log
fileCount, backupSize, phySize := restic.SnapshotStatsFromBackupLog(stdout)
Expand Down
14 changes: 7 additions & 7 deletions pkg/function/backup_data_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"github.com/kanisterio/errkit"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -106,14 +106,14 @@ func (b *backupDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, a
}

if err = ValidateProfile(tp.Profile); err != nil {
return nil, errors.Wrapf(err, "Failed to validate Profile")
return nil, errkit.Wrap(err, "Failed to validate Profile")
}

backupArtifactPrefix = ResolveArtifactPrefix(backupArtifactPrefix, tp.Profile)

cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
return nil, errkit.Wrap(err, "Failed to create Kubernetes client")
}
var ps []string
if pods == "" {
Expand All @@ -123,7 +123,7 @@ func (b *backupDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, a
case tp.StatefulSet != nil:
ps = tp.StatefulSet.Pods
default:
return nil, errors.New("Failed to get pods")
return nil, errkit.New("Failed to get pods")
}
} else {
ps = strings.Fields(pods)
Expand Down Expand Up @@ -171,7 +171,7 @@ func backupDataAll(ctx context.Context, cli kubernetes.Interface, namespace stri
go func(pod string, container string) {
ctx = field.Context(ctx, consts.PodNameKey, pod)
backupOutputs, err := backupData(ctx, cli, namespace, pod, container, fmt.Sprintf("%s/%s", backupArtifactPrefix, pod), includePath, encryptionKey, insecureTLS, tp)
errChan <- errors.Wrapf(err, "Failed to backup data for pod %s", pod)
errChan <- errkit.Wrap(err, "Failed to backup data for pod", "pod", pod)
outChan <- BackupInfo{PodName: pod, BackupID: backupOutputs.backupID, BackupTag: backupOutputs.backupTag}
}(pod, container)
}
Expand All @@ -186,11 +186,11 @@ func backupDataAll(ctx context.Context, cli kubernetes.Interface, namespace stri
}
}
if len(errs) != 0 {
return nil, errors.New(strings.Join(errs, "\n"))
return nil, errkit.New(strings.Join(errs, "\n"))
}
manifestData, err := json.Marshal(Output)
if err != nil {
return nil, errors.Wrapf(err, "Failed to encode JSON data")
return nil, errkit.Wrap(err, "Failed to encode JSON data")
}
return map[string]interface{}{
BackupDataAllOutput: string(manifestData),
Expand Down
14 changes: 7 additions & 7 deletions pkg/function/backup_data_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"context"
"time"

"github.com/pkg/errors"
"github.com/kanisterio/errkit"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -113,7 +113,7 @@ func backupDataStatsPodFunc(

// Wait for pod to reach running state
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
return nil, errkit.Wrap(err, "Failed while waiting for Pod to be ready", "pod", pod.Name)
}

remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile)
Expand All @@ -131,20 +131,20 @@ func backupDataStatsPodFunc(

commandExecutor, err := pc.GetCommandExecutor()
if err != nil {
return nil, errors.Wrap(err, "Unable to get pod command executor")
return nil, errkit.Wrap(err, "Unable to get pod command executor")
}

var stdout, stderr bytes.Buffer
err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
if err != nil {
return nil, errors.Wrapf(err, "Failed to get backup stats")
return nil, errkit.Wrap(err, "Failed to get backup stats")
}
// Get File Count and Size from Stats
mode, fc, size := restic.SnapshotStatsFromStatsLog(stdout.String())
if fc == "" || size == "" {
return nil, errors.New("Failed to parse snapshot stats from logs")
return nil, errkit.New("Failed to parse snapshot stats from logs")
}
return map[string]interface{}{
BackupDataStatsOutputMode: mode,
Expand Down Expand Up @@ -206,14 +206,14 @@ func (b *BackupDataStatsFunc) Exec(ctx context.Context, tp param.TemplateParams,
}

if err = ValidateProfile(tp.Profile); err != nil {
return nil, errors.Wrapf(err, "Failed to validate Profile")
return nil, errkit.Wrap(err, "Failed to validate Profile")
}

backupArtifactPrefix = ResolveArtifactPrefix(backupArtifactPrefix, tp.Profile)

cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
return nil, errkit.Wrap(err, "Failed to create Kubernetes client")
}
return backupDataStats(
ctx,
Expand Down
30 changes: 15 additions & 15 deletions pkg/function/backup_data_using_kopia_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"time"

"github.com/dustin/go-humanize"
"github.com/pkg/errors"
"github.com/kanisterio/errkit"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -131,22 +131,22 @@ func (b *backupDataUsingKopiaServerFunc) Exec(ctx context.Context, tp param.Temp

userPassphrase, cert, err := userCredentialsAndServerTLS(&tp)
if err != nil {
return nil, errors.Wrap(err, "Failed to fetch User Credentials/Certificate Data from Template Params")
return nil, errkit.Wrap(err, "Failed to fetch User Credentials/Certificate Data from Template Params")
}

fingerprint, err := kankopia.ExtractFingerprintFromCertificateJSON(cert)
if err != nil {
return nil, errors.Wrap(err, "Failed to fetch Kopia API Server Certificate Secret Data from Certificate")
return nil, errkit.Wrap(err, "Failed to fetch Kopia API Server Certificate Secret Data from Certificate")
}

hostname, userAccessPassphrase, err := hostNameAndUserPassPhraseFromRepoServer(userPassphrase, userHostname)
if err != nil {
return nil, errors.Wrap(err, "Failed to fetch Hostname/User Passphrase from Secret")
return nil, errkit.Wrap(err, "Failed to fetch Hostname/User Passphrase from Secret")
}

cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrap(err, "Failed to create Kubernetes client")
return nil, errkit.Wrap(err, "Failed to create Kubernetes client")
}

snapInfo, err := backupDataUsingKopiaServer(
Expand All @@ -164,7 +164,7 @@ func (b *backupDataUsingKopiaServerFunc) Exec(ctx context.Context, tp param.Temp
tags,
)
if err != nil {
return nil, errors.Wrap(err, "Failed to backup data using Kopia Repository Server")
return nil, errkit.Wrap(err, "Failed to backup data using Kopia Repository Server")
}

var logSize, phySize int64
Expand Down Expand Up @@ -226,7 +226,7 @@ func backupDataUsingKopiaServer(
format.Log(pod, container, stdout)
format.Log(pod, container, stderr)
if err != nil {
return nil, errors.Wrap(err, "Failed to connect to Kopia Repository Server")
return nil, errkit.Wrap(err, "Failed to connect to Kopia Repository Server")
}

cmd = kopiacmd.SnapshotCreate(
Expand All @@ -242,7 +242,7 @@ func backupDataUsingKopiaServer(
Parallelism: utils.GetEnvAsIntOrDefault(kankopia.DataStoreParallelUploadName, kankopia.DefaultDataStoreParallelUpload),
})
if err != nil {
return nil, errors.Wrap(err, "Failed to construct snapshot create command")
return nil, errkit.Wrap(err, "Failed to construct snapshot create command")
}
stdout, stderr, err = kube.Exec(ctx, cli, namespace, pod, container, cmd, nil)
format.Log(pod, container, stdout)
Expand All @@ -253,7 +253,7 @@ func backupDataUsingKopiaServer(
if strings.Contains(err.Error(), kerrors.ErrCodeOutOfMemoryStr) {
message = message + ": " + kerrors.ErrOutOfMemoryStr
}
return nil, errors.Wrap(err, message)
return nil, errkit.Wrap(err, message)
}
// Parse logs and return snapshot IDs and stats
return kopiacmd.ParseSnapshotCreateOutput(stdout, stderr)
Expand All @@ -262,14 +262,14 @@ func backupDataUsingKopiaServer(
func hostNameAndUserPassPhraseFromRepoServer(userCreds, hostname string) (string, string, error) {
var userAccessMap map[string]string
if err := json.Unmarshal([]byte(userCreds), &userAccessMap); err != nil {
return "", "", errors.Wrap(err, "Failed to unmarshal User Credentials Data")
return "", "", errkit.Wrap(err, "Failed to unmarshal User Credentials Data")
}

// Check if hostname provided exists in the User Access Map
if hostname != "" {
err := checkHostnameExistsInUserAccessMap(userAccessMap, hostname)
if err != nil {
return "", "", errors.Wrap(err, "Failed to find hostname in the User Access Map")
return "", "", errkit.Wrap(err, "Failed to find hostname in the User Access Map")
}
}

Expand All @@ -286,27 +286,27 @@ func hostNameAndUserPassPhraseFromRepoServer(userCreds, hostname string) (string

decodedUserPassphrase, err := base64.StdEncoding.DecodeString(userPassphrase)
if err != nil {
return "", "", errors.Wrap(err, "Failed to Decode User Passphrase")
return "", "", errkit.Wrap(err, "Failed to Decode User Passphrase")
}
return hostname, string(decodedUserPassphrase), nil
}

func userCredentialsAndServerTLS(tp *param.TemplateParams) (string, string, error) {
userCredJSON, err := json.Marshal(tp.RepositoryServer.Credentials.ServerUserAccess.Data)
if err != nil {
return "", "", errors.Wrap(err, "Error marshalling User Credentials Data")
return "", "", errkit.Wrap(err, "Error marshalling User Credentials Data")
}
certJSON, err := json.Marshal(tp.RepositoryServer.Credentials.ServerTLS.Data)
if err != nil {
return "", "", errors.Wrap(err, "Error marshalling Certificate Data")
return "", "", errkit.Wrap(err, "Error marshalling Certificate Data")
}
return string(userCredJSON), string(certJSON), nil
}

func checkHostnameExistsInUserAccessMap(userAccessMap map[string]string, hostname string) error {
// check if hostname that is provided by the user exists in the user access map
if _, ok := userAccessMap[hostname]; !ok {
return errors.New("hostname provided in the repository server CR does not exist in the user access map")
return errkit.New("hostname provided in the repository server CR does not exist in the user access map")
}
return nil
}
8 changes: 4 additions & 4 deletions pkg/function/checkRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"github.com/kanisterio/errkit"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -62,7 +62,7 @@ func CheckRepository(
) (map[string]interface{}, error) {
namespace, err := kube.GetControllerNamespace()
if err != nil {
return nil, errors.Wrapf(err, "Failed to get controller namespace")
return nil, errkit.Wrap(err, "Failed to get controller namespace")
}
options := &kube.PodOptions{
Namespace: namespace,
Expand Down Expand Up @@ -94,7 +94,7 @@ func CheckRepositoryPodFunc(

// Wait for pod to reach running state
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
return nil, errkit.Wrap(err, "Failed while waiting for Pod to be ready", "pod", pod.Name)
}

remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile)
Expand Down Expand Up @@ -193,7 +193,7 @@ func (c *CheckRepositoryFunc) Exec(ctx context.Context, tp param.TemplateParams,

cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
return nil, errkit.Wrap(err, "Failed to create Kubernetes client")
}
return CheckRepository(
ctx,
Expand Down
Loading

0 comments on commit 7262bdb

Please sign in to comment.