Skip to content

Commit

Permalink
core: Improve retry policy for uploads
Browse files Browse the repository at this point in the history
Have 2 tiers of retries:
 - one shorter cycle, when trying to upload to primary or backup, where
   we retry a couple times up to 1m
 - a longer cycle wrapping those 2, to sustain potentially long running
   crisis. We wait longer between each retry and keep trying for up to 1h

The first loop solves for transient errors when saving to primary or backup
storage, while the second loop solves for longer running incidents, for which
it's probably better to keep trying for a long time instead of dropping the
process and lose the recording saving.
  • Loading branch information
victorges committed Jun 21, 2024
1 parent b8a95ef commit 01d1ce6
Showing 1 changed file with 37 additions and 17 deletions.
54 changes: 37 additions & 17 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,25 @@ func (bc *ByteCounter) Write(p []byte) (n int, err error) {
return n, nil
}

func newExponentialBackOffExecutor() *backoff.ExponentialBackOff {
func newExponentialBackOffExecutor(initial, max, totalMax time.Duration) *backoff.ExponentialBackOff {
backOff := backoff.NewExponentialBackOff()
backOff.InitialInterval = 30 * time.Second
backOff.MaxInterval = 2 * time.Minute
backOff.MaxElapsedTime = 0 // don't impose a timeout as part of the retries
backOff.InitialInterval = initial
backOff.MaxInterval = max
backOff.MaxElapsedTime = totalMax
backOff.Reset()
return backOff
}

func NoRetries() backoff.BackOff {
return &backoff.StopBackOff{}
}

func UploadRetryBackoff() backoff.BackOff {
return backoff.WithMaxRetries(newExponentialBackOffExecutor(), 4)
return newExponentialBackOffExecutor(2*time.Minute, 5*time.Minute, 1*time.Hour)
}

func SingleRequestRetryBackoff() backoff.BackOff {
return newExponentialBackOffExecutor(5*time.Second, 10*time.Second, 30*time.Second)
}

const segmentWriteTimeout = 5 * time.Minute
Expand Down Expand Up @@ -114,19 +122,31 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout
}

func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
out, bytesWritten, primaryErr := uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries)
if primaryErr == nil {
return out, bytesWritten, nil
retryPolicy := NoRetries()
if withRetries {
retryPolicy = UploadRetryBackoff()
}
err = backoff.Retry(func() error {
var primaryErr error
out, bytesWritten, primaryErr = uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries)
if primaryErr == nil {
return nil
}

backupURI, err := buildBackupURI(outputURI, storageFallbackURLs)
if err != nil {
glog.Errorf("failed to build backup URL: %v", err)
return nil, 0, primaryErr
}
backupURI, err := buildBackupURI(outputURI, storageFallbackURLs)
if err != nil {
glog.Errorf("failed to build backup URL: %v", err)
return err
}
glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr)

glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr)
return uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries)
out, bytesWritten, err = uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries)
if err == nil {
return nil
}
return fmt.Errorf("upload file errors: primary: %w; backup: %w", primaryErr, err)
}, retryPolicy)
return out, bytesWritten, err
}

func buildBackupURI(outputURI *url.URL, storageFallbackURLs map[string]string) (*url.URL, error) {
Expand Down Expand Up @@ -159,9 +179,9 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro
}
session := driver.NewSession("")

var retryPolicy backoff.BackOff = &backoff.StopBackOff{} // no retries by default
retryPolicy := NoRetries()
if withRetries {
retryPolicy = UploadRetryBackoff()
retryPolicy = SingleRequestRetryBackoff()
}
err = backoff.Retry(func() error {
// To count how many bytes we are trying to read then write (upload) to s3 storage
Expand Down

0 comments on commit 01d1ce6

Please sign in to comment.