From 2aad6d65e7217514a78e1ccf9368fde66e36ed5f Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Fri, 21 Jun 2024 21:03:55 +0100 Subject: [PATCH] core: Improve retry policy for uploads --- core/uploader.go | 56 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/core/uploader.go b/core/uploader.go index 3971edd..a7f8cae 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -27,17 +27,25 @@ func (bc *ByteCounter) Write(p []byte) (n int, err error) { return n, nil } -func newExponentialBackOffExecutor() *backoff.ExponentialBackOff { +func newExponentialBackOffExecutor(initial, max, totalMax time.Duration) *backoff.ExponentialBackOff { backOff := backoff.NewExponentialBackOff() - backOff.InitialInterval = 30 * time.Second - backOff.MaxInterval = 2 * time.Minute - backOff.MaxElapsedTime = 0 // don't impose a timeout as part of the retries - + backOff.InitialInterval = initial + backOff.MaxInterval = max + backOff.MaxElapsedTime = totalMax + backOff.Reset() return backOff } +func NoRetries() backoff.BackOff { + return &backoff.StopBackOff{} +} + func UploadRetryBackoff() backoff.BackOff { - return backoff.WithMaxRetries(newExponentialBackOffExecutor(), 4) + return newExponentialBackOffExecutor(2*time.Minute, 5*time.Minute, 1*time.Hour) +} + +func SingleRequestRetryBackoff() backoff.BackOff { + return newExponentialBackOffExecutor(10*time.Second, 20*time.Second, 1*time.Minute) } const segmentWriteTimeout = 5 * time.Minute @@ -114,19 +122,31 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout } func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { - out, bytesWritten, primaryErr := uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries) - if primaryErr == nil { - return out, bytesWritten, nil + retryPolicy := NoRetries() + if withRetries { + retryPolicy = UploadRetryBackoff() } + err = backoff.Retry(func() error { + var primaryErr error + out, bytesWritten, primaryErr = uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries) + if primaryErr == nil { + return nil + } - backupURI, err := buildBackupURI(outputURI, storageFallbackURLs) - if err != nil { - glog.Errorf("failed to build backup URL: %v", err) - return nil, 0, primaryErr - } + backupURI, err := buildBackupURI(outputURI, storageFallbackURLs) + if err != nil { + glog.Errorf("failed to build backup URL: %v", err) + return err + } + glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr) - glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr) - return uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries) + out, bytesWritten, err = uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries) + if err == nil { + return nil + } + return fmt.Errorf("upload file errors: primary: %w; backup: %w", primaryErr, err) + }, retryPolicy) + return out, bytesWritten, err } func buildBackupURI(outputURI *url.URL, storageFallbackURLs map[string]string) (*url.URL, error) { @@ -159,9 +179,9 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro } session := driver.NewSession("") - var retryPolicy backoff.BackOff = &backoff.StopBackOff{} // no retries by default + retryPolicy := NoRetries() if withRetries { - retryPolicy = UploadRetryBackoff() + retryPolicy = SingleRequestRetryBackoff() } err = backoff.Retry(func() error { // To count how many bytes we are trying to read then write (upload) to s3 storage