Skip to content

Commit

Permalink
Migrate pkg/blockstorage to errkit
Browse files Browse the repository at this point in the history
  • Loading branch information
e-sumin committed Oct 4, 2024
1 parent 6266802 commit 737d1ba
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 159 deletions.
87 changes: 44 additions & 43 deletions pkg/blockstorage/awsebs/awsebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package awsebs

import (
"context"
"fmt"
"net/http"
"time"

Expand All @@ -26,7 +27,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/jpillora/backoff"
"github.com/pkg/errors"
"github.com/kanisterio/errkit"

awsconfig "github.com/kanisterio/kanister/pkg/aws"
"github.com/kanisterio/kanister/pkg/blockstorage"
Expand Down Expand Up @@ -71,19 +72,19 @@ func NewProvider(ctx context.Context, config map[string]string) (blockstorage.Pr
}
ec2Cli, err := newEC2Client(region, awsConfig)
if err != nil {
return nil, errors.Wrapf(err, "Could not get EC2 client")
return nil, errkit.Wrap(err, "Could not get EC2 client")
}
return &EbsStorage{Ec2Cli: ec2Cli, Role: config[awsconfig.ConfigRole], config: awsConfig}, nil
}

// newEC2Client returns ec2 client struct.
func newEC2Client(awsRegion string, config *aws.Config) (*EC2, error) {
if config == nil {
return nil, errors.New("Invalid empty AWS config")
return nil, errkit.New("Invalid empty AWS config")
}
s, err := session.NewSession(config)
if err != nil {
return nil, errors.Wrap(err, "Failed to create session for EBS")
return nil, errkit.Wrap(err, "Failed to create session for EBS")
}
conf := config.WithMaxRetries(maxRetries).WithRegion(awsRegion).WithCredentials(config.Credentials)
return &EC2{EC2: ec2.New(s, conf)}, nil
Expand Down Expand Up @@ -124,17 +125,17 @@ func (s *EbsStorage) CheckVolumeCreate(ctx context.Context) (bool, error) {

ec2Cli, err := newEC2Client(*s.config.Region, s.config)
if err != nil {
return false, errors.Wrap(err, "Could not get EC2 client")
return false, errkit.Wrap(err, "Could not get EC2 client")
}
dai := &ec2.DescribeAvailabilityZonesInput{}
az, err := ec2Cli.DescribeAvailabilityZones(dai)
if err != nil {
return false, errors.New("Fail to get available zone for EC2 client")
return false, errkit.New("Fail to get available zone for EC2 client")
}
if az != nil {
zoneName = az.AvailabilityZones[1].ZoneName
} else {
return false, errors.New("No available zone for EC2 client")
return false, errkit.New("No available zone for EC2 client")
}

cvi := &ec2.CreateVolumeInput{
Expand All @@ -144,7 +145,7 @@ func (s *EbsStorage) CheckVolumeCreate(ctx context.Context) (bool, error) {
}
_, err = s.Ec2Cli.CreateVolume(cvi)
if !isDryRunErr(err) {
return false, errors.Wrap(err, "Could not create volume with EC2 client")
return false, errkit.Wrap(err, "Could not create volume with EC2 client")
}
return true, nil
}
Expand All @@ -159,14 +160,14 @@ func (s *EbsStorage) VolumeGet(ctx context.Context, id string, zone string) (*bl
return nil, err
}
if len(dvo.Volumes) != len(volIDs) {
return nil, errors.New("Object not found")
return nil, errkit.New("Object not found")
}
vols := dvo.Volumes
if len(vols) == 0 {
return nil, errors.New("Volume with volume_id: " + id + " not found")
return nil, errkit.New("Volume with volume_id: " + id + " not found")
}
if len(vols) > 1 {
return nil, errors.Errorf("Found an unexpected number of volumes: volume_id=%s result_count=%d", id, len(vols))
return nil, errkit.New(fmt.Sprintf("Found an unexpected number of volumes: volume_id=%s result_count=%d", id, len(vols)))
}
vol := vols[0]
mv := s.volumeParse(ctx, vol)
Expand Down Expand Up @@ -268,15 +269,15 @@ func (s *EbsStorage) SnapshotsList(ctx context.Context, tags map[string]string)
// i.e., copying unencrypted to encrypted snapshot is allowed but not vice versa.
func (s *EbsStorage) SnapshotCopy(ctx context.Context, from, to blockstorage.Snapshot) (*blockstorage.Snapshot, error) {
if to.Region == "" {
return nil, errors.New("Destination snapshot AvailabilityZone must be specified")
return nil, errkit.New("Destination snapshot AvailabilityZone must be specified")
}
if to.ID != "" {
return nil, errors.Errorf("Snapshot %v destination ID must be empty", to)
return nil, errkit.New(fmt.Sprintf("Snapshot %v destination ID must be empty", to))
}
// Copy operation must be initiated from the destination region.
ec2Cli, err := newEC2Client(to.Region, s.Ec2Cli.Config.Copy())
if err != nil {
return nil, errors.Wrapf(err, "Could not get EC2 client")
return nil, errkit.Wrap(err, "Could not get EC2 client")
}
// Include a presigned URL when the regions are different. Include it
// independent of whether or not the snapshot is encrypted.
Expand All @@ -290,7 +291,7 @@ func (s *EbsStorage) SnapshotCopy(ctx context.Context, from, to blockstorage.Sna
rq, _ := ec2Cli.CopySnapshotRequest(&si)
su, err2 := rq.Presign(120 * time.Minute)
if err2 != nil {
return nil, errors.Wrap(err2, "Could not presign URL for snapshot copy request")
return nil, errkit.Wrap(err2, "Could not presign URL for snapshot copy request")
}
presignedURL = &su
}
Expand All @@ -316,14 +317,14 @@ func (s *EbsStorage) SnapshotCopy(ctx context.Context, from, to blockstorage.Sna
}
cso, err := ec2Cli.CopySnapshotWithContext(ctx, &csi)
if err != nil {
return nil, errors.Wrapf(err, "Failed to copy snapshot %v", csi)
return nil, errkit.Wrap(err, "Failed to copy snapshot", "snapshot", csi)
}
snapID := aws.StringValue(cso.SnapshotId)
if err = setResourceTags(ctx, ec2Cli, snapID, ktags.GetTags(tags)); err != nil {
return nil, err
}
if err = waitOnSnapshotID(ctx, ec2Cli, snapID); err != nil {
return nil, errors.Wrapf(err, "Snapshot %s did not complete", snapID)
return nil, errkit.Wrap(err, "Snapshot did not complete", "snapshot", snapID)
}
snaps, err := getSnapshots(ctx, ec2Cli, []*string{aws.String(snapID)})
if err != nil {
Expand All @@ -341,7 +342,7 @@ func (s *EbsStorage) SnapshotCopy(ctx context.Context, from, to blockstorage.Sna

// SnapshotCopyWithArgs is part of blockstorage.Provider
func (s *EbsStorage) SnapshotCopyWithArgs(ctx context.Context, from blockstorage.Snapshot, to blockstorage.Snapshot, args map[string]string) (*blockstorage.Snapshot, error) {
return nil, errors.New("Copy Snapshot with Args not implemented")
return nil, errkit.New("Copy Snapshot with Args not implemented")
}

// SnapshotCreate is part of blockstorage.Provider
Expand All @@ -358,7 +359,7 @@ func (s *EbsStorage) SnapshotCreate(ctx context.Context, volume blockstorage.Vol
csi.SetDryRun(s.Ec2Cli.DryRun)
snap, err := s.Ec2Cli.CreateSnapshotWithContext(ctx, csi)
if err != nil && !isDryRunErr(err) {
return nil, errors.Wrapf(err, "Failed to create snapshot, volume_id: %s", *csi.VolumeId)
return nil, errkit.Wrap(err, "Failed to create snapshot", "volume_id", *csi.VolumeId)
}

region, err := availabilityZoneToRegion(ctx, s.Ec2Cli, volume.Az)
Expand All @@ -381,7 +382,7 @@ func (s *EbsStorage) SnapshotCreateWaitForCompletion(ctx context.Context, snap *
return nil
}
if err := waitOnSnapshotID(ctx, s.Ec2Cli, snap.ID); err != nil {
return errors.Wrapf(err, "Waiting on snapshot %v", snap)
return errkit.Wrap(err, "Waiting on snapshot", "snapshot", snap)
}
return nil
}
Expand All @@ -399,7 +400,7 @@ func (s *EbsStorage) SnapshotDelete(ctx context.Context, snapshot *blockstorage.
return nil
}
if err != nil && !isDryRunErr(err) {
return errors.Wrap(err, "Failed to delete snapshot")
return errkit.Wrap(err, "Failed to delete snapshot")
}
return nil
}
Expand Down Expand Up @@ -431,7 +432,7 @@ func (s *EbsStorage) VolumeDelete(ctx context.Context, volume *blockstorage.Volu
return nil
}
if err != nil && !isDryRunErr(err) {
return errors.Wrapf(err, "Failed to delete volume volID: %s", volume.ID)
return errkit.Wrap(err, "Failed to delete volume", "volID", volume.ID)
}
return nil
}
Expand All @@ -444,26 +445,26 @@ func (s *EbsStorage) SetTags(ctx context.Context, resource interface{}, tags map
case *blockstorage.Snapshot:
return setResourceTags(ctx, s.Ec2Cli, res.ID, tags)
default:
return errors.Wrapf(nil, "Unknown resource type: %v", res)
return errkit.Wrap(nil, "Unknown resource type", "resourceType", res)
}
}

// setResourceTags sets tags on the specified resource
func setResourceTags(ctx context.Context, ec2Cli *EC2, resourceID string, tags map[string]string) error {
cti := &ec2.CreateTagsInput{Resources: []*string{&resourceID}, Tags: mapToEC2Tags(tags)}
if _, err := ec2Cli.CreateTags(cti); err != nil {
return errors.Wrapf(err, "Failed to set tags, resource_id:%s", resourceID)
return errkit.Wrap(err, "Failed to set tags", "resource_id", resourceID)
}
return nil
}

// VolumeCreateFromSnapshot is part of blockstorage.Provider
func (s *EbsStorage) VolumeCreateFromSnapshot(ctx context.Context, snapshot blockstorage.Snapshot, tags map[string]string) (*blockstorage.Volume, error) {
if snapshot.Volume == nil {
return nil, errors.New("Snapshot volume information not available")
return nil, errkit.New("Snapshot volume information not available")
}
if snapshot.Volume.VolumeType == "" || snapshot.Volume.Az == "" || snapshot.Volume.Tags == nil {
return nil, errors.Errorf("Required volume fields not available, volumeType: %s, Az: %s, VolumeTags: %v", snapshot.Volume.VolumeType, snapshot.Volume.Az, snapshot.Volume.Tags)
return nil, errkit.New(fmt.Sprintf("Required volume fields not available, volumeType: %s, Az: %s, VolumeTags: %v", snapshot.Volume.VolumeType, snapshot.Volume.Az, snapshot.Volume.Tags))
}
kubeCli, err := kube.NewClient()
if err != nil {
Expand All @@ -474,7 +475,7 @@ func (s *EbsStorage) VolumeCreateFromSnapshot(ctx context.Context, snapshot bloc
return nil, err
}
if len(zones) != 1 {
return nil, errors.Errorf("Length of zone slice should be 1, got %d", len(zones))
return nil, errkit.New(fmt.Sprintf("Length of zone slice should be 1, got %d", len(zones)))
}
cvi := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String(zones[0]),
Expand All @@ -495,7 +496,7 @@ func (s *EbsStorage) VolumeCreateFromSnapshot(ctx context.Context, snapshot bloc
volID, err := createVolume(ctx, s.Ec2Cli, cvi, ktags.GetTags(tags))
if err != nil {
if isVolNotFoundErr(err) {
return nil, errors.Wrap(err, "This may indicate insufficient permissions for KMS keys.")
return nil, errkit.Wrap(err, "This may indicate insufficient permissions for KMS keys.")
}
return nil, err
}
Expand Down Expand Up @@ -530,13 +531,13 @@ func getSnapshots(ctx context.Context, ec2Cli *EC2, snapIDs []*string) ([]*ec2.S
dsi := &ec2.DescribeSnapshotsInput{SnapshotIds: snapIDs}
dso, err := ec2Cli.DescribeSnapshotsWithContext(ctx, dsi)
if err != nil {
return nil, errors.Wrapf(err, blockstorage.SnapshotDoesNotExistError+", snapshot_ids: %p", snapIDs)
return nil, errkit.Wrap(err, blockstorage.SnapshotDoesNotExistError+", snapshot_ids: %p", snapIDs)
}
// TODO: handle paging and continuation
if len(dso.Snapshots) != len(snapIDs) {
log.Error().Print("Did not find all requested snapshots", field.M{"snapshots_requested": snapIDs, "snapshots_found": dso.Snapshots})
// TODO: Move mapping to HTTP error to the caller
return nil, errors.New(blockstorage.SnapshotDoesNotExistError)
return nil, errkit.New(blockstorage.SnapshotDoesNotExistError)
}
return dso.Snapshots, nil
}
Expand All @@ -549,11 +550,11 @@ func availabilityZoneToRegion(ctx context.Context, awsCli *EC2, az string) (ar s

azo, err := awsCli.DescribeAvailabilityZonesWithContext(ctx, azi)
if err != nil {
return "", errors.Wrapf(err, "Could not determine region for availability zone (AZ) %s", az)
return "", errkit.Wrap(err, "Could not determine region for availability zone (AZ)", "az", az)
}

if len(azo.AvailabilityZones) == 0 {
return "", errors.New("Region unavailable for availability zone" + az)
return "", errkit.New("Region unavailable for availability zone" + az)
}

return aws.StringValue(azo.AvailabilityZones[0].RegionName), nil
Expand Down Expand Up @@ -585,11 +586,11 @@ func waitOnVolume(ctx context.Context, ec2Cli *EC2, vol *ec2.Volume) error {
return err
}
if len(dvo.Volumes) != 1 {
return errors.New("Object not found")
return errkit.New("Object not found")
}
s := dvo.Volumes[0]
if *s.State == ec2.VolumeStateError {
return errors.New("Creating EBS volume failed")
return errkit.New("Creating EBS volume failed")
}
if *s.State == ec2.VolumeStateAvailable {
log.Print("Volume creation complete", field.M{"VolumeID": *vol.VolumeId})
Expand All @@ -612,14 +613,14 @@ func waitOnSnapshotID(ctx context.Context, ec2Cli *EC2, snapID string) error {
return poll.WaitWithBackoff(ctx, snapWaitBackoff, func(ctx context.Context) (bool, error) {
dso, err := ec2Cli.DescribeSnapshotsWithContext(ctx, dsi)
if err != nil {
return false, errors.Wrapf(err, "Failed to describe snapshot, snapshot_id: %s", snapID)
return false, errkit.Wrap(err, "Failed to describe snapshot", "snapshot_id", snapID)
}
if len(dso.Snapshots) != 1 {
return false, errors.New(blockstorage.SnapshotDoesNotExistError)
return false, errkit.New(blockstorage.SnapshotDoesNotExistError)
}
s := dso.Snapshots[0]
if *s.State == ec2.SnapshotStateError {
return false, errors.New("Snapshot EBS volume failed")
return false, errkit.New("Snapshot EBS volume failed")
}
if *s.State == ec2.SnapshotStateCompleted {
log.Print("Snapshot completed", field.M{"SnapshotID": snapID})
Expand All @@ -644,14 +645,14 @@ func GetRegionFromEC2Metadata() (string, error) {
ec2MetaData := ec2metadata.New(session.Must(session.NewSession()), &conf)

awsRegion, err := ec2MetaData.Region()
return awsRegion, errors.Wrap(err, "Failed to get AWS Region")
return awsRegion, errkit.Wrap(err, "Failed to get AWS Region")
}

// FromRegion is part of zone.Mapper
func (s *EbsStorage) FromRegion(ctx context.Context, region string) ([]string, error) {
ec2Cli, err := newEC2Client(region, s.config)
if err != nil {
return nil, errors.Wrapf(err, "Could not get EC2 client while fetching zones FromRegion (%s)", region)
return nil, errkit.Wrap(err, "Could not get EC2 client while fetching zones FromRegion", "region", region)
}
trueBool := true
filterKey := "region-name"
Expand All @@ -662,7 +663,7 @@ func (s *EbsStorage) FromRegion(ctx context.Context, region string) ([]string, e
},
})
if err != nil {
return nil, errors.Wrapf(err, "Failed to get availability zones for region %s", region)
return nil, errkit.Wrap(err, "Failed to get availability zones for region", "region", region)
}
zoneList := []string{}
for _, zone := range zones.AvailabilityZones {
Expand All @@ -675,7 +676,7 @@ func (s *EbsStorage) GetRegions(ctx context.Context) ([]string, error) {
trueBool := true
result, err := s.Ec2Cli.DescribeRegions(&ec2.DescribeRegionsInput{AllRegions: &trueBool})
if err != nil {
return nil, errors.Wrap(err, "Failed to describe regions")
return nil, errkit.Wrap(err, "Failed to describe regions")
}
regions := []string{}

Expand All @@ -689,10 +690,10 @@ func (s *EbsStorage) GetRegions(ctx context.Context) ([]string, error) {
func (s *EbsStorage) SnapshotRestoreTargets(ctx context.Context, snapshot *blockstorage.Snapshot) (global bool, regionsAndZones map[string][]string, err error) {
// A few checks from VolumeCreateFromSnapshot
if snapshot.Volume == nil {
return false, nil, errors.New("Snapshot volume information not available")
return false, nil, errkit.New("Snapshot volume information not available")
}
if snapshot.Volume.VolumeType == "" || snapshot.Volume.Az == "" || snapshot.Volume.Tags == nil {
return false, nil, errors.Errorf("Required volume fields not available, volumeType: %s, Az: %s, VolumeTags: %v", snapshot.Volume.VolumeType, snapshot.Volume.Az, snapshot.Volume.Tags)
return false, nil, errkit.New(fmt.Sprintf("Required volume fields not available, volumeType: %s, Az: %s, VolumeTags: %v", snapshot.Volume.VolumeType, snapshot.Volume.Az, snapshot.Volume.Tags))
}
// EBS snapshots can only be restored in their region
zl, err := s.FromRegion(ctx, snapshot.Region)
Expand Down
Loading

0 comments on commit 737d1ba

Please sign in to comment.