Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use invalid arg code for upload failures #692

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func ErrParticipantNotFound(identity string) error {
// This can have many reasons, some related to invalid parameters, other because of system failure.
// Do not provide an error code until we have code to analyze the error from the underlying upload library further.
func ErrUploadFailed(location string, err error) error {
return psrpc.NewErrorf(psrpc.Unknown, "%s upload failed: %v", location, err)
return psrpc.NewErrorf(psrpc.InvalidArgument, "%s upload failed: %v", location, err)
}

func ErrCPUExhausted(usage float64) error {
Expand Down
9 changes: 5 additions & 4 deletions pkg/pipeline/sink/uploader/alioss.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/aliyun/aliyun-oss-go-sdk/oss"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
)
Expand All @@ -37,22 +38,22 @@ func newAliOSSUploader(conf *livekit.AliOSSUpload) (uploader, error) {
func (u *AliOSSUploader) upload(localFilePath, requestedPath string, _ types.OutputType) (string, int64, error) {
stat, err := os.Stat(localFilePath)
if err != nil {
return "", 0, wrap("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

client, err := oss.New(u.conf.Endpoint, u.conf.AccessKey, u.conf.Secret)
if err != nil {
return "", 0, wrap("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

bucket, err := client.Bucket(u.conf.Bucket)
if err != nil {
return "", 0, wrap("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

err = bucket.PutObjectFromFile(requestedPath, localFilePath)
if err != nil {
return "", 0, wrap("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, requestedPath), stat.Size(), nil
Expand Down
11 changes: 6 additions & 5 deletions pkg/pipeline/sink/uploader/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/Azure/azure-storage-blob-go/azblob"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
)
Expand All @@ -44,12 +45,12 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType
u.conf.AccountKey,
)
if err != nil {
return "", 0, wrap("Azure", err)
return "", 0, errors.ErrUploadFailed("Azure", err)
}

azUrl, err := url.Parse(u.container)
if err != nil {
return "", 0, wrap("Azure", err)
return "", 0, errors.ErrUploadFailed("Azure", err)
}

pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{
Expand All @@ -65,15 +66,15 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType

file, err := os.Open(localFilepath)
if err != nil {
return "", 0, wrap("Azure", err)
return "", 0, errors.ErrUploadFailed("Azure", err)
}
defer func() {
_ = file.Close()
}()

stat, err := file.Stat()
if err != nil {
return "", 0, wrap("Azure", err)
return "", 0, errors.ErrUploadFailed("Azure", err)
}

// upload blocks in parallel for optimal performance
Expand All @@ -84,7 +85,7 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType
Parallelism: 16,
})
if err != nil {
return "", 0, wrap("Azure", err)
return "", 0, errors.ErrUploadFailed("Azure", err)
}

return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/pipeline/sink/uploader/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
)
Expand Down Expand Up @@ -77,15 +78,15 @@ func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) {
func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) {
file, err := os.Open(localFilepath)
if err != nil {
return "", 0, wrap("GCP", err)
return "", 0, errors.ErrUploadFailed("GCP", err)
}
defer func() {
_ = file.Close()
}()

stat, err := file.Stat()
if err != nil {
return "", 0, wrap("GCP", err)
return "", 0, errors.ErrUploadFailed("GCP", err)
}

wc := u.client.Bucket(u.conf.Bucket).Object(storageFilepath).Retryer(
Expand All @@ -100,11 +101,11 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp
wc.ChunkRetryDeadline = 0

if _, err = io.Copy(wc, file); err != nil {
return "", 0, wrap("GCP", err)
return "", 0, errors.ErrUploadFailed("GCP", err)
}

if err = wc.Close(); err != nil {
return "", 0, wrap("GCP", err)
return "", 0, errors.ErrUploadFailed("GCP", err)
}

return fmt.Sprintf("https://%s.storage.googleapis.com/%s", u.conf.Bucket, storageFilepath), stat.Size(), nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/pipeline/sink/uploader/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/logger"
"github.com/livekit/psrpc"
Expand Down Expand Up @@ -159,11 +160,11 @@ func (u *S3Uploader) getBucketLocation() (string, error) {
svc := s3.New(sess)
resp, err := svc.GetBucketLocation(req)
if err != nil {
return "", psrpc.NewErrorf(psrpc.Unknown, "failed to retrieve upload bucket region: %v", err)
return "", psrpc.NewErrorf(psrpc.InvalidArgument, "failed to retrieve upload bucket region: %v", err)
}

if resp.LocationConstraint == nil {
return "", psrpc.NewErrorf(psrpc.MalformedResponse, "invalid upload bucket region returned by provider. Try specifying the region manually in the request")
return "", psrpc.NewErrorf(psrpc.InvalidArgument, "invalid upload bucket region returned by provider. Try specifying the region manually in the request")
}

return *resp.LocationConstraint, nil
Expand All @@ -172,20 +173,20 @@ func (u *S3Uploader) getBucketLocation() (string, error) {
func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) {
sess, err := session.NewSession(u.awsConfig)
if err != nil {
return "", 0, wrap("S3", err)
return "", 0, errors.ErrUploadFailed("S3", err)
}

file, err := os.Open(localFilepath)
if err != nil {
return "", 0, wrap("S3", err)
return "", 0, errors.ErrUploadFailed("S3", err)
}
defer func() {
_ = file.Close()
}()

stat, err := file.Stat()
if err != nil {
return "", 0, wrap("S3", err)
return "", 0, errors.ErrUploadFailed("S3", err)
}

_, err = s3manager.NewUploader(sess).Upload(&s3manager.UploadInput{
Expand All @@ -198,7 +199,7 @@ func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType ty
ContentDisposition: u.contentDisposition,
})
if err != nil {
return "", 0, wrap("S3", err)
return "", 0, errors.ErrUploadFailed("S3", err)
}

endpoint := "s3.amazonaws.com"
Expand Down
13 changes: 3 additions & 10 deletions pkg/pipeline/sink/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
package uploader

import (
"fmt"
"os"
"path"
"time"

"github.com/pkg/errors"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/stats"
"github.com/livekit/egress/pkg/types"
Expand Down Expand Up @@ -82,11 +79,11 @@ type remoteUploader struct {

func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) {
start := time.Now()
location, size, err := u.upload(localFilepath, storageFilepath, outputType)
location, size, uploadErr := u.upload(localFilepath, storageFilepath, outputType)
elapsed := time.Since(start)

// success
if err == nil {
if uploadErr == nil {
u.monitor.IncUploadCountSuccess(fileType, float64(elapsed.Milliseconds()))
if deleteAfterUpload {
_ = os.Remove(localFilepath)
Expand Down Expand Up @@ -117,7 +114,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp
return backupFilepath, stat.Size(), nil
}

return "", 0, err
return "", 0, uploadErr
}

type localUploader struct{}
Expand All @@ -130,7 +127,3 @@ func (u *localUploader) Upload(localFilepath, _ string, _ types.OutputType, _ bo

return localFilepath, stat.Size(), nil
}

func wrap(name string, err error) error {
return errors.Wrap(err, fmt.Sprintf("%s upload failed", name))
}
Loading