From 389659a67864b8456243546f379bd0f99c0c2536 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 6 Jun 2024 12:18:17 -0700 Subject: [PATCH] use invalid arg code for upload failures --- pkg/errors/errors.go | 2 +- pkg/pipeline/sink/uploader/alioss.go | 9 +++++---- pkg/pipeline/sink/uploader/azure.go | 11 ++++++----- pkg/pipeline/sink/uploader/gcp.go | 9 +++++---- pkg/pipeline/sink/uploader/s3.go | 13 +++++++------ pkg/pipeline/sink/uploader/uploader.go | 13 +++---------- 6 files changed, 27 insertions(+), 30 deletions(-) diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 11b65df8..e31846f2 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -143,7 +143,7 @@ func ErrParticipantNotFound(identity string) error { // This can have many reasons, some related to invalid parameters, other because of system failure. // Do not provide an error code until we have code to analyze the error from the underlying upload library further. func ErrUploadFailed(location string, err error) error { - return psrpc.NewErrorf(psrpc.Unknown, "%s upload failed: %v", location, err) + return psrpc.NewErrorf(psrpc.InvalidArgument, "%s upload failed: %v", location, err) } func ErrCPUExhausted(usage float64) error { diff --git a/pkg/pipeline/sink/uploader/alioss.go b/pkg/pipeline/sink/uploader/alioss.go index 985ee744..e2f75a67 100644 --- a/pkg/pipeline/sink/uploader/alioss.go +++ b/pkg/pipeline/sink/uploader/alioss.go @@ -20,6 +20,7 @@ import ( "github.com/aliyun/aliyun-oss-go-sdk/oss" + "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" ) @@ -37,22 +38,22 @@ func newAliOSSUploader(conf *livekit.AliOSSUpload) (uploader, error) { func (u *AliOSSUploader) upload(localFilePath, requestedPath string, _ types.OutputType) (string, int64, error) { stat, err := os.Stat(localFilePath) if err != nil { - return "", 0, wrap("AliOSS", err) + return "", 0, errors.ErrUploadFailed("AliOSS", err) } client, err := oss.New(u.conf.Endpoint, u.conf.AccessKey, u.conf.Secret) if err != nil { - return "", 0, wrap("AliOSS", err) + return "", 0, errors.ErrUploadFailed("AliOSS", err) } bucket, err := client.Bucket(u.conf.Bucket) if err != nil { - return "", 0, wrap("AliOSS", err) + return "", 0, errors.ErrUploadFailed("AliOSS", err) } err = bucket.PutObjectFromFile(requestedPath, localFilePath) if err != nil { - return "", 0, wrap("AliOSS", err) + return "", 0, errors.ErrUploadFailed("AliOSS", err) } return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, requestedPath), stat.Size(), nil diff --git a/pkg/pipeline/sink/uploader/azure.go b/pkg/pipeline/sink/uploader/azure.go index 04a47f73..130abd3a 100644 --- a/pkg/pipeline/sink/uploader/azure.go +++ b/pkg/pipeline/sink/uploader/azure.go @@ -22,6 +22,7 @@ import ( "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" ) @@ -44,12 +45,12 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType u.conf.AccountKey, ) if err != nil { - return "", 0, wrap("Azure", err) + return "", 0, errors.ErrUploadFailed("Azure", err) } azUrl, err := url.Parse(u.container) if err != nil { - return "", 0, wrap("Azure", err) + return "", 0, errors.ErrUploadFailed("Azure", err) } pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{ @@ -65,7 +66,7 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType file, err := os.Open(localFilepath) if err != nil { - return "", 0, wrap("Azure", err) + return "", 0, errors.ErrUploadFailed("Azure", err) } defer func() { _ = file.Close() @@ -73,7 +74,7 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType stat, err := file.Stat() if err != nil { - return "", 0, wrap("Azure", err) + return "", 0, errors.ErrUploadFailed("Azure", err) } // upload blocks in parallel for optimal performance @@ -84,7 +85,7 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType Parallelism: 16, }) if err != nil { - return "", 0, wrap("Azure", err) + return "", 0, errors.ErrUploadFailed("Azure", err) } return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), nil diff --git a/pkg/pipeline/sink/uploader/gcp.go b/pkg/pipeline/sink/uploader/gcp.go index a4e11259..e7490230 100644 --- a/pkg/pipeline/sink/uploader/gcp.go +++ b/pkg/pipeline/sink/uploader/gcp.go @@ -27,6 +27,7 @@ import ( "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" + "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" ) @@ -77,7 +78,7 @@ func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) { func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { file, err := os.Open(localFilepath) if err != nil { - return "", 0, wrap("GCP", err) + return "", 0, errors.ErrUploadFailed("GCP", err) } defer func() { _ = file.Close() @@ -85,7 +86,7 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp stat, err := file.Stat() if err != nil { - return "", 0, wrap("GCP", err) + return "", 0, errors.ErrUploadFailed("GCP", err) } wc := u.client.Bucket(u.conf.Bucket).Object(storageFilepath).Retryer( @@ -100,11 +101,11 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp wc.ChunkRetryDeadline = 0 if _, err = io.Copy(wc, file); err != nil { - return "", 0, wrap("GCP", err) + return "", 0, errors.ErrUploadFailed("GCP", err) } if err = wc.Close(); err != nil { - return "", 0, wrap("GCP", err) + return "", 0, errors.ErrUploadFailed("GCP", err) } return fmt.Sprintf("https://%s.storage.googleapis.com/%s", u.conf.Bucket, storageFilepath), stat.Size(), nil diff --git a/pkg/pipeline/sink/uploader/s3.go b/pkg/pipeline/sink/uploader/s3.go index 01ead404..2da6b080 100644 --- a/pkg/pipeline/sink/uploader/s3.go +++ b/pkg/pipeline/sink/uploader/s3.go @@ -30,6 +30,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/livekit/egress/pkg/config" + "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/logger" "github.com/livekit/psrpc" @@ -159,11 +160,11 @@ func (u *S3Uploader) getBucketLocation() (string, error) { svc := s3.New(sess) resp, err := svc.GetBucketLocation(req) if err != nil { - return "", psrpc.NewErrorf(psrpc.Unknown, "failed to retrieve upload bucket region: %v", err) + return "", psrpc.NewErrorf(psrpc.InvalidArgument, "failed to retrieve upload bucket region: %v", err) } if resp.LocationConstraint == nil { - return "", psrpc.NewErrorf(psrpc.MalformedResponse, "invalid upload bucket region returned by provider. Try specifying the region manually in the request") + return "", psrpc.NewErrorf(psrpc.InvalidArgument, "invalid upload bucket region returned by provider. Try specifying the region manually in the request") } return *resp.LocationConstraint, nil @@ -172,12 +173,12 @@ func (u *S3Uploader) getBucketLocation() (string, error) { func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) { sess, err := session.NewSession(u.awsConfig) if err != nil { - return "", 0, wrap("S3", err) + return "", 0, errors.ErrUploadFailed("S3", err) } file, err := os.Open(localFilepath) if err != nil { - return "", 0, wrap("S3", err) + return "", 0, errors.ErrUploadFailed("S3", err) } defer func() { _ = file.Close() @@ -185,7 +186,7 @@ func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType ty stat, err := file.Stat() if err != nil { - return "", 0, wrap("S3", err) + return "", 0, errors.ErrUploadFailed("S3", err) } _, err = s3manager.NewUploader(sess).Upload(&s3manager.UploadInput{ @@ -198,7 +199,7 @@ func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType ty ContentDisposition: u.contentDisposition, }) if err != nil { - return "", 0, wrap("S3", err) + return "", 0, errors.ErrUploadFailed("S3", err) } endpoint := "s3.amazonaws.com" diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index ca521fc3..7510ee94 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -15,13 +15,10 @@ package uploader import ( - "fmt" "os" "path" "time" - "github.com/pkg/errors" - "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/stats" "github.com/livekit/egress/pkg/types" @@ -82,11 +79,11 @@ type remoteUploader struct { func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) { start := time.Now() - location, size, err := u.upload(localFilepath, storageFilepath, outputType) + location, size, uploadErr := u.upload(localFilepath, storageFilepath, outputType) elapsed := time.Since(start) // success - if err == nil { + if uploadErr == nil { u.monitor.IncUploadCountSuccess(fileType, float64(elapsed.Milliseconds())) if deleteAfterUpload { _ = os.Remove(localFilepath) @@ -117,7 +114,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp return backupFilepath, stat.Size(), nil } - return "", 0, err + return "", 0, uploadErr } type localUploader struct{} @@ -130,7 +127,3 @@ func (u *localUploader) Upload(localFilepath, _ string, _ types.OutputType, _ bo return localFilepath, stat.Size(), nil } - -func wrap(name string, err error) error { - return errors.Wrap(err, fmt.Sprintf("%s upload failed", name)) -}