Skip to content

Commit

Permalink
core: Improve retry policy for uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
victorges committed Jun 21, 2024
1 parent 34902f5 commit 2aad6d6
Showing 1 changed file with 38 additions and 18 deletions.
56 changes: 38 additions & 18 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,25 @@ func (bc *ByteCounter) Write(p []byte) (n int, err error) {
return n, nil
}

func newExponentialBackOffExecutor() *backoff.ExponentialBackOff {
func newExponentialBackOffExecutor(initial, max, totalMax time.Duration) *backoff.ExponentialBackOff {
backOff := backoff.NewExponentialBackOff()
backOff.InitialInterval = 30 * time.Second
backOff.MaxInterval = 2 * time.Minute
backOff.MaxElapsedTime = 0 // don't impose a timeout as part of the retries

backOff.InitialInterval = initial
backOff.MaxInterval = max
backOff.MaxElapsedTime = totalMax
backOff.Reset()
return backOff
}

func NoRetries() backoff.BackOff {
return &backoff.StopBackOff{}
}

func UploadRetryBackoff() backoff.BackOff {
return backoff.WithMaxRetries(newExponentialBackOffExecutor(), 4)
return newExponentialBackOffExecutor(2*time.Minute, 5*time.Minute, 1*time.Hour)
}

func SingleRequestRetryBackoff() backoff.BackOff {
return newExponentialBackOffExecutor(10*time.Second, 20*time.Second, 1*time.Minute)
}

const segmentWriteTimeout = 5 * time.Minute
Expand Down Expand Up @@ -114,19 +122,31 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout
}

func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
out, bytesWritten, primaryErr := uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries)
if primaryErr == nil {
return out, bytesWritten, nil
retryPolicy := NoRetries()
if withRetries {
retryPolicy = UploadRetryBackoff()
}
err = backoff.Retry(func() error {
var primaryErr error
out, bytesWritten, primaryErr = uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries)
if primaryErr == nil {
return nil
}

backupURI, err := buildBackupURI(outputURI, storageFallbackURLs)
if err != nil {
glog.Errorf("failed to build backup URL: %v", err)
return nil, 0, primaryErr
}
backupURI, err := buildBackupURI(outputURI, storageFallbackURLs)
if err != nil {
glog.Errorf("failed to build backup URL: %v", err)
return err
}
glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr)

glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr)
return uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries)
out, bytesWritten, err = uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries)
if err == nil {
return nil
}
return fmt.Errorf("upload file errors: primary: %w; backup: %w", primaryErr, err)
}, retryPolicy)
return out, bytesWritten, err
}

func buildBackupURI(outputURI *url.URL, storageFallbackURLs map[string]string) (*url.URL, error) {
Expand Down Expand Up @@ -159,9 +179,9 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro
}
session := driver.NewSession("")

var retryPolicy backoff.BackOff = &backoff.StopBackOff{} // no retries by default
retryPolicy := NoRetries()
if withRetries {
retryPolicy = UploadRetryBackoff()
retryPolicy = SingleRequestRetryBackoff()
}
err = backoff.Retry(func() error {
// To count how many bytes we are trying to read then write (upload) to s3 storage
Expand Down

0 comments on commit 2aad6d6

Please sign in to comment.