Skip to content

Commit

Permalink
uploader: Add support for backup storage upload (#60)
Browse files Browse the repository at this point in the history
* go.mod: Upgrade go version to 1.22

This was required to be able to use the ff lib for some reason.

* go.mod: Add github.com/peterbourgon/ff

* main: Allow getting configs from env var

Copied all the stuff we usually have for flag parsing
using peterbourgon/ff and an explicit FlagSet

* main: Create StorageBackupURLs config

* core: Create uploadFile helper

* core: Add retries to thumbnail upload

By migrating to uploadFile it gets support
automatically too.

* core: Implement backup storage upload

* core: Move storj workaround to uploadFile

Primary and backup have different storage providers
so we need to move that lower layer.

* Fix tests

* core: Simplify adding the Storj metadata to fields

* uploader: Change storage backup config to a comma-map

* storageBackupURLs -> storageFallbackURLs

* core: Make sure last manifest upload has retries

* .: Add test for commamap flag

Copied from catalyst-api too

* core: Add a couple tests for the new logic

* reset backoff object

---------

Co-authored-by: Max Holland <[email protected]>
  • Loading branch information
victorges and mjh1 authored Jun 25, 2024
1 parent c8e9215 commit c84d101
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 2,291 deletions.
77 changes: 70 additions & 7 deletions catalyst-uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package main
import (
"encoding/json"
"flag"
"fmt"
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/golang/glog"
"github.com/livepeer/catalyst-uploader/core"
"github.com/livepeer/go-tools/drivers"
"github.com/peterbourgon/ff"
)

const WaitBetweenWrites = 5 * time.Second
Expand All @@ -27,27 +30,60 @@ func run() int {
glog.Error(err)
return 1
}
vFlag := flag.Lookup("v")
fs := flag.NewFlagSet("catalyst-uploader", flag.ExitOnError)

// cmd line args
describe := flag.Bool("j", false, "Describe supported storage services in JSON format and exit")
timeout := flag.Duration("t", 30*time.Second, "Upload timeout")
flag.Parse()
version := fs.Bool("version", false, "print application version")
describe := fs.Bool("j", false, "Describe supported storage services in JSON format and exit")
verbosity := fs.String("v", "", "Log verbosity. {4|5|6}")
timeout := fs.Duration("t", 30*time.Second, "Upload timeout")
storageFallbackURLs := CommaMapFlag(fs, "storage-fallback-urls", `Comma-separated map of primary to backup storage URLs. If a file fails uploading to one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced`)

_ = fs.String("config", "", "config file (optional)")

err = ff.Parse(fs, os.Args[1:],
ff.WithConfigFileFlag("config"),
ff.WithConfigFileParser(ff.PlainParser),
ff.WithEnvVarPrefix("CATALYST_UPLOADER"),
)
if err != nil {
glog.Fatalf("error parsing cli: %s", err)
}

err = flag.CommandLine.Parse(nil)
if err != nil {
glog.Fatal(err)
}

if *version {
fmt.Printf("catalyst-uploader version: %s", Version)
return 0
}

// list enabled handlers and exit
if *describe {
_, _ = os.Stdout.Write(drivers.DescribeDriversJson())
return 0
}

if flag.NArg() == 0 {
glog.Error("Destination URI is not specified. See -h for usage.")
if fs.NArg() == 0 {
glog.Error("Destination URI is not specified. See -j for usage.")
return 1
}

if *verbosity != "" {
err = vFlag.Value.Set(*verbosity)
if err != nil {
glog.Fatal(err)
}
}

// replace stdout to prevent any lib from writing debug output there
stdout := os.Stdout
os.Stdout, _ = os.Open(os.DevNull)

output := flag.Arg(0)
output := fs.Arg(0)
if output == "" {
glog.Error("Object store URI was empty")
return 1
Expand All @@ -59,7 +95,7 @@ func run() int {
return 1
}

out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout)
out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs)
if err != nil {
glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err)
return 1
Expand All @@ -81,3 +117,30 @@ func run() int {

return 0
}

// handles -foo=key1=value1,key2=value2
func CommaMapFlag(fs *flag.FlagSet, name string, usage string) *map[string]string {
var dest map[string]string
fs.Func(name, usage, func(s string) error {
var err error
dest, err = parseCommaMap(s)
return err
})
return &dest
}

func parseCommaMap(s string) (map[string]string, error) {
output := map[string]string{}
if s == "" {
return output, nil
}
for _, pair := range strings.Split(s, ",") {
kv := strings.Split(pair, "=")
if len(kv) != 2 {
return map[string]string{}, fmt.Errorf("failed to parse keypairs, -option=k1=v1,k2=v2 format required, got %s", s)
}
k, v := kv[0], kv[1]
output[k] = v
}
return output, nil
}
23 changes: 23 additions & 0 deletions catalyst-uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"crypto/rand"
"encoding/json"
"flag"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -166,3 +167,25 @@ func TestFormatsE2E(t *testing.T) {
require.Equal(t, h.UriSchemes(), driverDescr.Drivers[i].UriSchemes)
}
}

func TestCommaMap(t *testing.T) {
fs := flag.NewFlagSet("cli-test", flag.PanicOnError)
single := CommaMapFlag(fs, "single", "")
multi := CommaMapFlag(fs, "multi", "")
setEmpty := CommaMapFlag(fs, "empty", "")
err := fs.Parse([]string{
"-single=one=uno",
"-multi=one=uno,two=dos,three=tres",
"-empty=",
})
require.NoError(t, err)
require.Equal(t, *single, map[string]string{"one": "uno"})
require.Equal(t, *multi, map[string]string{"one": "uno", "two": "dos", "three": "tres"})
require.Equal(t, *setEmpty, map[string]string{})

fs2 := flag.NewFlagSet("cli-test", flag.ContinueOnError)
wrong := CommaMapFlag(fs2, "wrong", "")
err = fs2.Parse([]string{"-wrong=format"})
require.Error(t, err)
require.Equal(t, *wrong, map[string]string{})
}
130 changes: 86 additions & 44 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newExponentialBackOffExecutor() *backoff.ExponentialBackOff {
backOff.InitialInterval = 30 * time.Second
backOff.MaxInterval = 2 * time.Minute
backOff.MaxElapsedTime = 0 // don't impose a timeout as part of the retries

backOff.Reset()
return backOff
}

Expand All @@ -46,57 +46,28 @@ var expiryField = map[string]string{
"Object-Expires": "+168h", // Objects will be deleted after 7 days
}

func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration) (*drivers.SaveDataOutput, error) {
output := outputURI.String()
storageDriver, err := drivers.ParseOSURL(output, true)
if err != nil {
return nil, err
}
session := storageDriver.NewSession("")
if err != nil {
return nil, err
}

// While we wait for storj to implement an easier method for global object deletion we are hacking something
// here to allow us to have recording objects deleted after 7 days.
fields := &drivers.FileProperties{}
if strings.Contains(output, "gateway.storjshare.io/catalyst-recordings-com") {
fields = &drivers.FileProperties{Metadata: expiryField}
}

byteCounter := &ByteCounter{}
if strings.HasSuffix(output, ".ts") || strings.HasSuffix(output, ".mp4") {
func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string) (*drivers.SaveDataOutput, error) {
if strings.HasSuffix(outputURI.Path, ".ts") || strings.HasSuffix(outputURI.Path, ".mp4") {
// For segments we just write them in one go here and return early.
// (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.)
fileContents, err := io.ReadAll(input)
if err != nil {
return nil, fmt.Errorf("failed to read file")
}

var out *drivers.SaveDataOutput
err = backoff.Retry(func() error {
// To count how many bytes we are trying to read then write (upload) to s3 storage
teeReader := io.TeeReader(bytes.NewReader(fileContents), byteCounter)
byteCounter.Count = 0

out, err = session.SaveData(context.Background(), "", teeReader, fields, segmentWriteTimeout)
if err != nil {
glog.Errorf("failed upload attempt for %s (%d bytes): %v", outputURI.Redacted(), byteCounter.Count, err)
}
return err
}, UploadRetryBackoff())
out, bytesWritten, err := uploadFileWithBackup(outputURI, fileContents, nil, segmentWriteTimeout, true, storageFallbackURLs)
if err != nil {
return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), byteCounter.Count, err)
return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err)
}

if err = extractThumb(session, output, fileContents); err != nil {
if err = extractThumb(outputURI, fileContents, storageFallbackURLs); err != nil {
glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err)
}
return out, nil
}

// For the manifest files we want a very short cache ttl as the files are updating every few seconds
fields.CacheControl = "max-age=1"
fields := &drivers.FileProperties{CacheControl: "max-age=1"}
var fileContents []byte
var lastWrite = time.Now()

Expand All @@ -120,7 +91,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout

// Only write the latest version of the data that's been piped in if enough time has elapsed since the last write
if lastWrite.Add(waitBetweenWrites).Before(time.Now()) {
if _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, writeTimeout); err != nil {
if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, false, storageFallbackURLs); err != nil {
// Just log this error, since it'll effectively be retried after the next interval
glog.Errorf("Failed to write: %v", err)
} else {
Expand All @@ -134,22 +105,89 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout
}

// We have to do this final write, otherwise there might be final data that's arrived since the last periodic write
if _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, writeTimeout); err != nil {
if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, true, storageFallbackURLs); err != nil {
// Don't ignore this error, since there won't be any further attempts to write
return nil, fmt.Errorf("failed to write final save: %w", err)
}
glog.Infof("Completed writing %s to storage", outputURI.Redacted())
return nil, nil
}

func extractThumb(session drivers.OSSession, filename string, segment []byte) error {
func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
out, bytesWritten, primaryErr := uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries)
if primaryErr == nil {
return out, bytesWritten, nil
}

backupURI, err := buildBackupURI(outputURI, storageFallbackURLs)
if err != nil {
glog.Errorf("failed to build backup URL: %v", err)
return nil, 0, primaryErr
}

glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr)
return uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries)
}

func buildBackupURI(outputURI *url.URL, storageFallbackURLs map[string]string) (*url.URL, error) {
outputURIStr := outputURI.String()
for primary, backup := range storageFallbackURLs {
if strings.HasPrefix(outputURIStr, primary) {
backupStr := strings.Replace(outputURIStr, primary, backup, 1)
return url.Parse(backupStr)
}
}
return nil, fmt.Errorf("no backup URL found for %s", outputURI.Redacted())
}

func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
outputStr := outputURI.String()
// While we wait for storj to implement an easier method for global object deletion we are hacking something
// here to allow us to have recording objects deleted after 7 days.
if strings.Contains(outputStr, "gateway.storjshare.io/catalyst-recordings-com") {
var storjFields drivers.FileProperties
if fields != nil {
storjFields = *fields
}
storjFields.Metadata = expiryField
fields = &storjFields
}

driver, err := drivers.ParseOSURL(outputStr, true)
if err != nil {
return nil, 0, err
}
session := driver.NewSession("")

var retryPolicy backoff.BackOff = &backoff.StopBackOff{} // no retries by default
if withRetries {
retryPolicy = UploadRetryBackoff()
}
err = backoff.Retry(func() error {
// To count how many bytes we are trying to read then write (upload) to s3 storage
byteCounter := &ByteCounter{}
teeReader := io.TeeReader(bytes.NewReader(fileContents), byteCounter)

out, err = session.SaveData(context.Background(), "", teeReader, fields, writeTimeout)
bytesWritten = byteCounter.Count

if err != nil {
glog.Errorf("failed upload attempt for %s (%d bytes): %v", outputURI.Redacted(), bytesWritten, err)
}
return err
}, retryPolicy)

return out, bytesWritten, err
}

func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[string]string) error {
tmpDir, err := os.MkdirTemp(os.TempDir(), "thumb-*")
if err != nil {
return fmt.Errorf("temp file creation failed: %w", err)
}
defer os.RemoveAll(tmpDir)
outFile := filepath.Join(tmpDir, "out.jpg")
inFile := filepath.Join(tmpDir, filepath.Base(filename))
inFile := filepath.Join(tmpDir, filepath.Base(outputURI.Path))
if err = os.WriteFile(inFile, segment, 0644); err != nil {
return fmt.Errorf("failed to write input file: %w", err)
}
Expand Down Expand Up @@ -182,10 +220,14 @@ func extractThumb(session drivers.OSSession, filename string, segment []byte) er
return fmt.Errorf("opening file failed: %w", err)
}
defer f.Close()
_, err = session.SaveData(context.Background(), "../latest.jpg", f, &drivers.FileProperties{
CacheControl: "max-age=5",
Metadata: expiryField,
}, 10*time.Second)
thumbData, err := io.ReadAll(f)
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}

thumbURL := outputURI.JoinPath("../latest.jpg")
fields := &drivers.FileProperties{CacheControl: "max-age=5"}
_, _, err = uploadFileWithBackup(thumbURL, thumbData, fields, 10*time.Second, true, storageFallbackURLs)
if err != nil {
return fmt.Errorf("saving thumbnail failed: %w", err)
}
Expand Down
Loading

0 comments on commit c84d101

Please sign in to comment.