Skip to content

Commit

Permalink
use invalid arg code for upload failures
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Jun 6, 2024
1 parent 4427655 commit 389659a
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func ErrParticipantNotFound(identity string) error {
// This can have many reasons, some related to invalid parameters, other because of system failure.
// Do not provide an error code until we have code to analyze the error from the underlying upload library further.
func ErrUploadFailed(location string, err error) error {
return psrpc.NewErrorf(psrpc.Unknown, "%s upload failed: %v", location, err)
return psrpc.NewErrorf(psrpc.InvalidArgument, "%s upload failed: %v", location, err)
}

func ErrCPUExhausted(usage float64) error {
Expand Down
9 changes: 5 additions & 4 deletions pkg/pipeline/sink/uploader/alioss.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/aliyun/aliyun-oss-go-sdk/oss"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
)
Expand All @@ -37,22 +38,22 @@ func newAliOSSUploader(conf *livekit.AliOSSUpload) (uploader, error) {
func (u *AliOSSUploader) upload(localFilePath, requestedPath string, _ types.OutputType) (string, int64, error) {
stat, err := os.Stat(localFilePath)
if err != nil {
return "", 0, wrap("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

client, err := oss.New(u.conf.Endpoint, u.conf.AccessKey, u.conf.Secret)
if err != nil {
return "", 0, wrap("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

bucket, err := client.Bucket(u.conf.Bucket)
if err != nil {
return "", 0, wrap("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

err = bucket.PutObjectFromFile(requestedPath, localFilePath)
if err != nil {
return "", 0, wrap("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, requestedPath), stat.Size(), nil
Expand Down
11 changes: 6 additions & 5 deletions pkg/pipeline/sink/uploader/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/Azure/azure-storage-blob-go/azblob"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
)
Expand All @@ -44,12 +45,12 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType
u.conf.AccountKey,
)
if err != nil {
return "", 0, wrap("Azure", err)
return "", 0, errors.ErrUploadFailed("Azure", err)
}

azUrl, err := url.Parse(u.container)
if err != nil {
return "", 0, wrap("Azure", err)
return "", 0, errors.ErrUploadFailed("Azure", err)
}

pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{
Expand All @@ -65,15 +66,15 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType

file, err := os.Open(localFilepath)
if err != nil {
return "", 0, wrap("Azure", err)
return "", 0, errors.ErrUploadFailed("Azure", err)
}
defer func() {
_ = file.Close()
}()

stat, err := file.Stat()
if err != nil {
return "", 0, wrap("Azure", err)
return "", 0, errors.ErrUploadFailed("Azure", err)
}

// upload blocks in parallel for optimal performance
Expand All @@ -84,7 +85,7 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType
Parallelism: 16,
})
if err != nil {
return "", 0, wrap("Azure", err)
return "", 0, errors.ErrUploadFailed("Azure", err)
}

return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/pipeline/sink/uploader/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
)
Expand Down Expand Up @@ -77,15 +78,15 @@ func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) {
func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) {
file, err := os.Open(localFilepath)
if err != nil {
return "", 0, wrap("GCP", err)
return "", 0, errors.ErrUploadFailed("GCP", err)
}
defer func() {
_ = file.Close()
}()

stat, err := file.Stat()
if err != nil {
return "", 0, wrap("GCP", err)
return "", 0, errors.ErrUploadFailed("GCP", err)
}

wc := u.client.Bucket(u.conf.Bucket).Object(storageFilepath).Retryer(
Expand All @@ -100,11 +101,11 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp
wc.ChunkRetryDeadline = 0

if _, err = io.Copy(wc, file); err != nil {
return "", 0, wrap("GCP", err)
return "", 0, errors.ErrUploadFailed("GCP", err)
}

if err = wc.Close(); err != nil {
return "", 0, wrap("GCP", err)
return "", 0, errors.ErrUploadFailed("GCP", err)
}

return fmt.Sprintf("https://%s.storage.googleapis.com/%s", u.conf.Bucket, storageFilepath), stat.Size(), nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/pipeline/sink/uploader/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/logger"
"github.com/livekit/psrpc"
Expand Down Expand Up @@ -159,11 +160,11 @@ func (u *S3Uploader) getBucketLocation() (string, error) {
svc := s3.New(sess)
resp, err := svc.GetBucketLocation(req)
if err != nil {
return "", psrpc.NewErrorf(psrpc.Unknown, "failed to retrieve upload bucket region: %v", err)
return "", psrpc.NewErrorf(psrpc.InvalidArgument, "failed to retrieve upload bucket region: %v", err)
}

if resp.LocationConstraint == nil {
return "", psrpc.NewErrorf(psrpc.MalformedResponse, "invalid upload bucket region returned by provider. Try specifying the region manually in the request")
return "", psrpc.NewErrorf(psrpc.InvalidArgument, "invalid upload bucket region returned by provider. Try specifying the region manually in the request")
}

return *resp.LocationConstraint, nil
Expand All @@ -172,20 +173,20 @@ func (u *S3Uploader) getBucketLocation() (string, error) {
func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) {
sess, err := session.NewSession(u.awsConfig)
if err != nil {
return "", 0, wrap("S3", err)
return "", 0, errors.ErrUploadFailed("S3", err)
}

file, err := os.Open(localFilepath)
if err != nil {
return "", 0, wrap("S3", err)
return "", 0, errors.ErrUploadFailed("S3", err)
}
defer func() {
_ = file.Close()
}()

stat, err := file.Stat()
if err != nil {
return "", 0, wrap("S3", err)
return "", 0, errors.ErrUploadFailed("S3", err)
}

_, err = s3manager.NewUploader(sess).Upload(&s3manager.UploadInput{
Expand All @@ -198,7 +199,7 @@ func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType ty
ContentDisposition: u.contentDisposition,
})
if err != nil {
return "", 0, wrap("S3", err)
return "", 0, errors.ErrUploadFailed("S3", err)
}

endpoint := "s3.amazonaws.com"
Expand Down
13 changes: 3 additions & 10 deletions pkg/pipeline/sink/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
package uploader

import (
"fmt"
"os"
"path"
"time"

"github.com/pkg/errors"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/stats"
"github.com/livekit/egress/pkg/types"
Expand Down Expand Up @@ -82,11 +79,11 @@ type remoteUploader struct {

func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) {
start := time.Now()
location, size, err := u.upload(localFilepath, storageFilepath, outputType)
location, size, uploadErr := u.upload(localFilepath, storageFilepath, outputType)
elapsed := time.Since(start)

// success
if err == nil {
if uploadErr == nil {
u.monitor.IncUploadCountSuccess(fileType, float64(elapsed.Milliseconds()))
if deleteAfterUpload {
_ = os.Remove(localFilepath)
Expand Down Expand Up @@ -117,7 +114,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp
return backupFilepath, stat.Size(), nil
}

return "", 0, err
return "", 0, uploadErr
}

type localUploader struct{}
Expand All @@ -130,7 +127,3 @@ func (u *localUploader) Upload(localFilepath, _ string, _ types.OutputType, _ bo

return localFilepath, stat.Size(), nil
}

func wrap(name string, err error) error {
return errors.Wrap(err, fmt.Sprintf("%s upload failed", name))
}

0 comments on commit 389659a

Please sign in to comment.