Skip to content

Commit

Permalink
write all to backup once primary fails (#825)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored Dec 11, 2024
1 parent 8821f85 commit b966630
Showing 1 changed file with 24 additions and 18 deletions.
42 changes: 24 additions & 18 deletions pkg/pipeline/sink/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ type uploader interface {
}

type Uploader struct {
primary uploader
backup uploader
info *livekit.EgressInfo
monitor *stats.HandlerMonitor
primary uploader
backup uploader
primaryFailed bool
info *livekit.EgressInfo
monitor *stats.HandlerMonitor
}

func New(conf, backup *config.StorageConfig, monitor *stats.HandlerMonitor, info *livekit.EgressInfo) (*Uploader, error) {
Expand Down Expand Up @@ -90,24 +91,29 @@ func (u *Uploader) Upload(
deleteAfterUpload bool,
) (string, int64, error) {

start := time.Now()
location, size, primaryErr := u.primary.upload(localFilepath, storageFilepath, outputType)
elapsed := time.Since(start)
var primaryErr error
if !u.primaryFailed {
start := time.Now()
location, size, err := u.primary.upload(localFilepath, storageFilepath, outputType)
elapsed := time.Since(start)

if primaryErr == nil {
// success
if u.monitor != nil {
u.monitor.IncUploadCountSuccess(string(outputType), float64(elapsed.Milliseconds()))
}
if deleteAfterUpload {
_ = os.Remove(localFilepath)
if err == nil {
if u.monitor != nil {
u.monitor.IncUploadCountSuccess(string(outputType), float64(elapsed.Milliseconds()))
}
if deleteAfterUpload {
_ = os.Remove(localFilepath)
}
return location, size, nil
} else {
if u.monitor != nil {
u.monitor.IncUploadCountFailure(string(outputType), float64(elapsed.Milliseconds()))
}
u.primaryFailed = true
primaryErr = err
}
return location, size, nil
}

if u.monitor != nil {
u.monitor.IncUploadCountFailure(string(outputType), float64(elapsed.Milliseconds()))
}
if u.backup != nil {
location, size, backupErr := u.backup.upload(localFilepath, storageFilepath, outputType)
if backupErr == nil {
Expand Down

0 comments on commit b966630

Please sign in to comment.