Skip to content

Commit

Permalink
uploader: Change storage backup config to a comma-map
Browse files Browse the repository at this point in the history
  • Loading branch information
victorges committed Jun 20, 2024
1 parent 895357c commit 4126cad
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 18 deletions.
32 changes: 26 additions & 6 deletions catalyst-uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -37,7 +38,7 @@ func run() int {
describe := fs.Bool("j", false, "Describe supported storage services in JSON format and exit")
verbosity := fs.String("v", "", "Log verbosity. {4|5|6}")
timeout := fs.Duration("t", 30*time.Second, "Upload timeout")
storageBackupURLs := jsonFlag[core.StorageBackupURLs](fs, "storage-backup-urls", `JSON array of {"primary":X,"backup":Y} objects with base storage URLs. If a file fails uploading to one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced`)
storageBackupURLs := CommaMapFlag(fs, "storage-backup-urls", `Comma-separated map of primary to backup storage URLs. If a file fails uploading to one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced`)

_ = fs.String("config", "", "config file (optional)")

Expand Down Expand Up @@ -94,7 +95,7 @@ func run() int {
return 1
}

out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, storageBackupURLs)
out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageBackupURLs)
if err != nil {
glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err)
return 1
Expand All @@ -117,10 +118,29 @@ func run() int {
return 0
}

func jsonFlag[T any](fs *flag.FlagSet, name string, usage string) T {
var value T
// handles -foo=key1=value1,key2=value2
func CommaMapFlag(fs *flag.FlagSet, name string, usage string) *map[string]string {
var dest map[string]string
fs.Func(name, usage, func(s string) error {
return json.Unmarshal([]byte(s), &value)
var err error
dest, err = parseCommaMap(s)
return err
})
return value
return &dest
}

func parseCommaMap(s string) (map[string]string, error) {
output := map[string]string{}
if s == "" {
return output, nil
}
for _, pair := range strings.Split(s, ",") {
kv := strings.Split(pair, "=")
if len(kv) != 2 {
return map[string]string{}, fmt.Errorf("failed to parse keypairs, -option=k1=v1,k2=v2 format required, got %s", s)
}
k, v := kv[0], kv[1]
output[k] = v
}
return output, nil
}
19 changes: 7 additions & 12 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,7 @@ var expiryField = map[string]string{
"Object-Expires": "+168h", // Objects will be deleted after 7 days
}

type StorageBackupURLs []struct {
Primary string `json:"primary"`
Backup string `json:"backup"`
}

func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageBackupURLs StorageBackupURLs) (*drivers.SaveDataOutput, error) {
func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageBackupURLs map[string]string) (*drivers.SaveDataOutput, error) {
if strings.HasSuffix(outputURI.Path, ".ts") || strings.HasSuffix(outputURI.Path, ".mp4") {
// For segments we just write them in one go here and return early.
// (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.)
Expand Down Expand Up @@ -118,7 +113,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout
return nil, nil
}

func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageBackupURLs StorageBackupURLs) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageBackupURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
out, bytesWritten, primaryErr := uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries)
if primaryErr == nil {
return out, bytesWritten, nil
Expand All @@ -134,11 +129,11 @@ func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drive
return uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries)
}

func buildBackupURI(outputURI *url.URL, storageBackupURLs StorageBackupURLs) (*url.URL, error) {
func buildBackupURI(outputURI *url.URL, storageBackupURLs map[string]string) (*url.URL, error) {
outputURIStr := outputURI.String()
for _, strj := range storageBackupURLs {
if strings.HasPrefix(outputURIStr, strj.Primary) {
backupStr := strings.Replace(outputURIStr, strj.Primary, strj.Backup, 1)
for primary, backup := range storageBackupURLs {
if strings.HasPrefix(outputURIStr, primary) {
backupStr := strings.Replace(outputURIStr, primary, backup, 1)
return url.Parse(backupStr)
}
}
Expand Down Expand Up @@ -185,7 +180,7 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro
return out, bytesWritten, err
}

func extractThumb(outputURI *url.URL, segment []byte, storageBackupURLs StorageBackupURLs) error {
func extractThumb(outputURI *url.URL, segment []byte, storageBackupURLs map[string]string) error {
tmpDir, err := os.MkdirTemp(os.TempDir(), "thumb-*")
if err != nil {
return fmt.Errorf("temp file creation failed: %w", err)
Expand Down

0 comments on commit 4126cad

Please sign in to comment.