From f05b61e112c90f4fb43919ecf691ec5d3748ce93 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 19 Jun 2024 20:46:15 +0100 Subject: [PATCH] core: Implement backup storage upload --- catalyst-uploader.go | 2 +- core/uploader.go | 43 +++++++++++++++++++++++++++++++++++-------- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/catalyst-uploader.go b/catalyst-uploader.go index 557f6ae..14fe67f 100644 --- a/catalyst-uploader.go +++ b/catalyst-uploader.go @@ -85,7 +85,7 @@ func run() int { return 1 } - out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout) + out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, storageBackupURLs) if err != nil { glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err) return 1 diff --git a/core/uploader.go b/core/uploader.go index d78f629..2ac28ed 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -51,7 +51,7 @@ type StorageBackupURLs []struct { Backup string `json:"backup"` } -func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration) (*drivers.SaveDataOutput, error) { +func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageBackupURLs StorageBackupURLs) (*drivers.SaveDataOutput, error) { output := outputURI.String() // While we wait for storj to implement an easier method for global object deletion we are hacking something @@ -69,12 +69,12 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout return nil, fmt.Errorf("failed to read file") } - out, bytesWritten, err := uploadFile(outputURI, fileContents, fields, segmentWriteTimeout, true) + out, bytesWritten, err := uploadFileWithBackup(outputURI, fileContents, fields, segmentWriteTimeout, true, storageBackupURLs) if err != nil { return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err) } - if err = extractThumb(outputURI, output, fileContents); err != nil { + if err = extractThumb(outputURI, output, fileContents, storageBackupURLs); err != nil { glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err) } return out, nil @@ -105,7 +105,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout // Only write the latest version of the data that's been piped in if enough time has elapsed since the last write if lastWrite.Add(waitBetweenWrites).Before(time.Now()) { - if _, _, err := uploadFile(outputURI, fileContents, fields, writeTimeout, false); err != nil { + if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, false, storageBackupURLs); err != nil { // Just log this error, since it'll effectively be retried after the next interval glog.Errorf("Failed to write: %v", err) } else { @@ -119,7 +119,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout } // We have to do this final write, otherwise there might be final data that's arrived since the last periodic write - if _, _, err := uploadFile(outputURI, fileContents, fields, writeTimeout, false); err != nil { + if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, false, storageBackupURLs); err != nil { // Don't ignore this error, since there won't be any further attempts to write return nil, fmt.Errorf("failed to write final save: %w", err) } @@ -127,6 +127,33 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout return nil, nil } +func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageBackupURLs StorageBackupURLs) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { + out, bytesWritten, primaryErr := uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries) + if primaryErr == nil { + return out, bytesWritten, nil + } + + backupURI, err := buildBackupURI(outputURI, storageBackupURLs) + if err != nil { + glog.Errorf("failed to build backup URL: %v", err) + return nil, 0, primaryErr + } + + glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr) + return uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries) +} + +func buildBackupURI(outputURI *url.URL, storageBackupURLs StorageBackupURLs) (*url.URL, error) { + outputURIStr := outputURI.String() + for _, strj := range storageBackupURLs { + if strings.HasPrefix(outputURIStr, strj.Primary) { + backupStr := strings.Replace(outputURIStr, strj.Primary, strj.Backup, 1) + return url.Parse(backupStr) + } + } + return nil, fmt.Errorf("no backup URL found for %s", outputURI.Redacted()) +} + func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { driver, err := drivers.ParseOSURL(outputURI.String(), true) if err != nil { @@ -155,7 +182,7 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro return out, bytesWritten, err } -func extractThumb(outputURI *url.URL, filename string, segment []byte) error { +func extractThumb(outputURI *url.URL, filename string, segment []byte, storageBackupURLs StorageBackupURLs) error { tmpDir, err := os.MkdirTemp(os.TempDir(), "thumb-*") if err != nil { return fmt.Errorf("temp file creation failed: %w", err) @@ -201,10 +228,10 @@ func extractThumb(outputURI *url.URL, filename string, segment []byte) error { } thumbURL := outputURI.JoinPath("../latest.jpg") - _, _, err = uploadFile(thumbURL, thumbData, &drivers.FileProperties{ + _, _, err = uploadFileWithBackup(thumbURL, thumbData, &drivers.FileProperties{ CacheControl: "max-age=5", Metadata: expiryField, - }, 10*time.Second, true) + }, 10*time.Second, true, storageBackupURLs) if err != nil { return fmt.Errorf("saving thumbnail failed: %w", err) }