diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index 4569c381..9f37ca70 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -37,10 +37,11 @@ type uploader interface { } type Uploader struct { - primary uploader - backup uploader - info *livekit.EgressInfo - monitor *stats.HandlerMonitor + primary uploader + backup uploader + primaryFailed bool + info *livekit.EgressInfo + monitor *stats.HandlerMonitor } func New(conf, backup *config.StorageConfig, monitor *stats.HandlerMonitor, info *livekit.EgressInfo) (*Uploader, error) { @@ -90,24 +91,29 @@ func (u *Uploader) Upload( deleteAfterUpload bool, ) (string, int64, error) { - start := time.Now() - location, size, primaryErr := u.primary.upload(localFilepath, storageFilepath, outputType) - elapsed := time.Since(start) + var primaryErr error + if !u.primaryFailed { + start := time.Now() + location, size, err := u.primary.upload(localFilepath, storageFilepath, outputType) + elapsed := time.Since(start) - if primaryErr == nil { - // success - if u.monitor != nil { - u.monitor.IncUploadCountSuccess(string(outputType), float64(elapsed.Milliseconds())) - } - if deleteAfterUpload { - _ = os.Remove(localFilepath) + if err == nil { + if u.monitor != nil { + u.monitor.IncUploadCountSuccess(string(outputType), float64(elapsed.Milliseconds())) + } + if deleteAfterUpload { + _ = os.Remove(localFilepath) + } + return location, size, nil + } else { + if u.monitor != nil { + u.monitor.IncUploadCountFailure(string(outputType), float64(elapsed.Milliseconds())) + } + u.primaryFailed = true + primaryErr = err } - return location, size, nil } - if u.monitor != nil { - u.monitor.IncUploadCountFailure(string(outputType), float64(elapsed.Milliseconds())) - } if u.backup != nil { location, size, backupErr := u.backup.upload(localFilepath, storageFilepath, outputType) if backupErr == nil {