Skip to content

Commit

Permalink
core: Implement backup storage upload
Browse files Browse the repository at this point in the history
  • Loading branch information
victorges committed Jun 19, 2024
1 parent 9771e2b commit f05b61e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
2 changes: 1 addition & 1 deletion catalyst-uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func run() int {
return 1
}

out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout)
out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, storageBackupURLs)
if err != nil {
glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err)
return 1
Expand Down
43 changes: 35 additions & 8 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type StorageBackupURLs []struct {
Backup string `json:"backup"`
}

func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration) (*drivers.SaveDataOutput, error) {
func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageBackupURLs StorageBackupURLs) (*drivers.SaveDataOutput, error) {
output := outputURI.String()

// While we wait for storj to implement an easier method for global object deletion we are hacking something
Expand All @@ -69,12 +69,12 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout
return nil, fmt.Errorf("failed to read file")
}

out, bytesWritten, err := uploadFile(outputURI, fileContents, fields, segmentWriteTimeout, true)
out, bytesWritten, err := uploadFileWithBackup(outputURI, fileContents, fields, segmentWriteTimeout, true, storageBackupURLs)
if err != nil {
return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err)
}

if err = extractThumb(outputURI, output, fileContents); err != nil {
if err = extractThumb(outputURI, output, fileContents, storageBackupURLs); err != nil {
glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err)
}
return out, nil
Expand Down Expand Up @@ -105,7 +105,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout

// Only write the latest version of the data that's been piped in if enough time has elapsed since the last write
if lastWrite.Add(waitBetweenWrites).Before(time.Now()) {
if _, _, err := uploadFile(outputURI, fileContents, fields, writeTimeout, false); err != nil {
if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, false, storageBackupURLs); err != nil {
// Just log this error, since it'll effectively be retried after the next interval
glog.Errorf("Failed to write: %v", err)
} else {
Expand All @@ -119,14 +119,41 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout
}

// We have to do this final write, otherwise there might be final data that's arrived since the last periodic write
if _, _, err := uploadFile(outputURI, fileContents, fields, writeTimeout, false); err != nil {
if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, false, storageBackupURLs); err != nil {
// Don't ignore this error, since there won't be any further attempts to write
return nil, fmt.Errorf("failed to write final save: %w", err)
}
glog.Infof("Completed writing %s to storage", outputURI.Redacted())
return nil, nil
}

func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageBackupURLs StorageBackupURLs) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
out, bytesWritten, primaryErr := uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries)
if primaryErr == nil {
return out, bytesWritten, nil
}

backupURI, err := buildBackupURI(outputURI, storageBackupURLs)
if err != nil {
glog.Errorf("failed to build backup URL: %v", err)
return nil, 0, primaryErr
}

glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr)
return uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries)
}

func buildBackupURI(outputURI *url.URL, storageBackupURLs StorageBackupURLs) (*url.URL, error) {
outputURIStr := outputURI.String()
for _, strj := range storageBackupURLs {
if strings.HasPrefix(outputURIStr, strj.Primary) {
backupStr := strings.Replace(outputURIStr, strj.Primary, strj.Backup, 1)
return url.Parse(backupStr)
}
}
return nil, fmt.Errorf("no backup URL found for %s", outputURI.Redacted())
}

func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
driver, err := drivers.ParseOSURL(outputURI.String(), true)
if err != nil {
Expand Down Expand Up @@ -155,7 +182,7 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro
return out, bytesWritten, err
}

func extractThumb(outputURI *url.URL, filename string, segment []byte) error {
func extractThumb(outputURI *url.URL, filename string, segment []byte, storageBackupURLs StorageBackupURLs) error {
tmpDir, err := os.MkdirTemp(os.TempDir(), "thumb-*")
if err != nil {
return fmt.Errorf("temp file creation failed: %w", err)
Expand Down Expand Up @@ -201,10 +228,10 @@ func extractThumb(outputURI *url.URL, filename string, segment []byte) error {
}

thumbURL := outputURI.JoinPath("../latest.jpg")
_, _, err = uploadFile(thumbURL, thumbData, &drivers.FileProperties{
_, _, err = uploadFileWithBackup(thumbURL, thumbData, &drivers.FileProperties{
CacheControl: "max-age=5",
Metadata: expiryField,
}, 10*time.Second, true)
}, 10*time.Second, true, storageBackupURLs)
if err != nil {
return fmt.Errorf("saving thumbnail failed: %w", err)
}
Expand Down

0 comments on commit f05b61e

Please sign in to comment.