Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

uploader: Add support for backup storage upload #60

Merged
merged 16 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 70 additions & 7 deletions catalyst-uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package main
import (
"encoding/json"
"flag"
"fmt"
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/golang/glog"
"github.com/livepeer/catalyst-uploader/core"
"github.com/livepeer/go-tools/drivers"
"github.com/peterbourgon/ff"
)

const WaitBetweenWrites = 5 * time.Second
Expand All @@ -27,27 +30,60 @@ func run() int {
glog.Error(err)
return 1
}
vFlag := flag.Lookup("v")
fs := flag.NewFlagSet("catalyst-uploader", flag.ExitOnError)

// cmd line args
describe := flag.Bool("j", false, "Describe supported storage services in JSON format and exit")
timeout := flag.Duration("t", 30*time.Second, "Upload timeout")
flag.Parse()
version := fs.Bool("version", false, "print application version")
describe := fs.Bool("j", false, "Describe supported storage services in JSON format and exit")
verbosity := fs.String("v", "", "Log verbosity. {4|5|6}")
timeout := fs.Duration("t", 30*time.Second, "Upload timeout")
storageFallbackURLs := CommaMapFlag(fs, "storage-fallback-urls", `Comma-separated map of primary to backup storage URLs. If a file fails uploading to one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced`)
mjh1 marked this conversation as resolved.
Show resolved Hide resolved

_ = fs.String("config", "", "config file (optional)")

err = ff.Parse(fs, os.Args[1:],
victorges marked this conversation as resolved.
Show resolved Hide resolved
ff.WithConfigFileFlag("config"),
ff.WithConfigFileParser(ff.PlainParser),
ff.WithEnvVarPrefix("CATALYST_UPLOADER"),
)
if err != nil {
glog.Fatalf("error parsing cli: %s", err)
}

err = flag.CommandLine.Parse(nil)
if err != nil {
glog.Fatal(err)
}

if *version {
fmt.Printf("catalyst-uploader version: %s", Version)
return 0
}

// list enabled handlers and exit
if *describe {
_, _ = os.Stdout.Write(drivers.DescribeDriversJson())
return 0
}

if flag.NArg() == 0 {
glog.Error("Destination URI is not specified. See -h for usage.")
if fs.NArg() == 0 {
glog.Error("Destination URI is not specified. See -j for usage.")
return 1
}

if *verbosity != "" {
err = vFlag.Value.Set(*verbosity)
if err != nil {
glog.Fatal(err)
}
}

// replace stdout to prevent any lib from writing debug output there
stdout := os.Stdout
os.Stdout, _ = os.Open(os.DevNull)

output := flag.Arg(0)
output := fs.Arg(0)
if output == "" {
glog.Error("Object store URI was empty")
return 1
Expand All @@ -59,7 +95,7 @@ func run() int {
return 1
}

out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout)
out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs)
if err != nil {
glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err)
return 1
Expand All @@ -81,3 +117,30 @@ func run() int {

return 0
}

// handles -foo=key1=value1,key2=value2
func CommaMapFlag(fs *flag.FlagSet, name string, usage string) *map[string]string {
var dest map[string]string
fs.Func(name, usage, func(s string) error {
var err error
dest, err = parseCommaMap(s)
return err
})
return &dest
}

func parseCommaMap(s string) (map[string]string, error) {
output := map[string]string{}
if s == "" {
return output, nil
}
for _, pair := range strings.Split(s, ",") {
kv := strings.Split(pair, "=")
if len(kv) != 2 {
return map[string]string{}, fmt.Errorf("failed to parse keypairs, -option=k1=v1,k2=v2 format required, got %s", s)
}
k, v := kv[0], kv[1]
output[k] = v
}
return output, nil
}
23 changes: 23 additions & 0 deletions catalyst-uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"crypto/rand"
"encoding/json"
"flag"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -166,3 +167,25 @@ func TestFormatsE2E(t *testing.T) {
require.Equal(t, h.UriSchemes(), driverDescr.Drivers[i].UriSchemes)
}
}

func TestCommaMap(t *testing.T) {
fs := flag.NewFlagSet("cli-test", flag.PanicOnError)
single := CommaMapFlag(fs, "single", "")
multi := CommaMapFlag(fs, "multi", "")
setEmpty := CommaMapFlag(fs, "empty", "")
err := fs.Parse([]string{
"-single=one=uno",
"-multi=one=uno,two=dos,three=tres",
"-empty=",
})
require.NoError(t, err)
require.Equal(t, *single, map[string]string{"one": "uno"})
require.Equal(t, *multi, map[string]string{"one": "uno", "two": "dos", "three": "tres"})
require.Equal(t, *setEmpty, map[string]string{})

fs2 := flag.NewFlagSet("cli-test", flag.ContinueOnError)
wrong := CommaMapFlag(fs2, "wrong", "")
err = fs2.Parse([]string{"-wrong=format"})
require.Error(t, err)
require.Equal(t, *wrong, map[string]string{})
}
130 changes: 86 additions & 44 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newExponentialBackOffExecutor() *backoff.ExponentialBackOff {
backOff.InitialInterval = 30 * time.Second
backOff.MaxInterval = 2 * time.Minute
backOff.MaxElapsedTime = 0 // don't impose a timeout as part of the retries

backOff.Reset()
return backOff
}

Expand All @@ -46,57 +46,28 @@ var expiryField = map[string]string{
"Object-Expires": "+168h", // Objects will be deleted after 7 days
}

func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration) (*drivers.SaveDataOutput, error) {
output := outputURI.String()
storageDriver, err := drivers.ParseOSURL(output, true)
if err != nil {
return nil, err
}
session := storageDriver.NewSession("")
if err != nil {
return nil, err
}

// While we wait for storj to implement an easier method for global object deletion we are hacking something
// here to allow us to have recording objects deleted after 7 days.
fields := &drivers.FileProperties{}
if strings.Contains(output, "gateway.storjshare.io/catalyst-recordings-com") {
fields = &drivers.FileProperties{Metadata: expiryField}
}

byteCounter := &ByteCounter{}
if strings.HasSuffix(output, ".ts") || strings.HasSuffix(output, ".mp4") {
func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string) (*drivers.SaveDataOutput, error) {
if strings.HasSuffix(outputURI.Path, ".ts") || strings.HasSuffix(outputURI.Path, ".mp4") {
// For segments we just write them in one go here and return early.
// (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.)
fileContents, err := io.ReadAll(input)
if err != nil {
return nil, fmt.Errorf("failed to read file")
}

var out *drivers.SaveDataOutput
err = backoff.Retry(func() error {
// To count how many bytes we are trying to read then write (upload) to s3 storage
teeReader := io.TeeReader(bytes.NewReader(fileContents), byteCounter)
byteCounter.Count = 0

out, err = session.SaveData(context.Background(), "", teeReader, fields, segmentWriteTimeout)
if err != nil {
glog.Errorf("failed upload attempt for %s (%d bytes): %v", outputURI.Redacted(), byteCounter.Count, err)
}
return err
}, UploadRetryBackoff())
out, bytesWritten, err := uploadFileWithBackup(outputURI, fileContents, nil, segmentWriteTimeout, true, storageFallbackURLs)
if err != nil {
return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), byteCounter.Count, err)
return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err)
}

if err = extractThumb(session, output, fileContents); err != nil {
if err = extractThumb(outputURI, fileContents, storageFallbackURLs); err != nil {
glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err)
}
return out, nil
}

// For the manifest files we want a very short cache ttl as the files are updating every few seconds
fields.CacheControl = "max-age=1"
fields := &drivers.FileProperties{CacheControl: "max-age=1"}
var fileContents []byte
var lastWrite = time.Now()

Expand All @@ -120,7 +91,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout

// Only write the latest version of the data that's been piped in if enough time has elapsed since the last write
if lastWrite.Add(waitBetweenWrites).Before(time.Now()) {
if _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, writeTimeout); err != nil {
if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, false, storageFallbackURLs); err != nil {
// Just log this error, since it'll effectively be retried after the next interval
glog.Errorf("Failed to write: %v", err)
} else {
Expand All @@ -134,22 +105,89 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout
}

// We have to do this final write, otherwise there might be final data that's arrived since the last periodic write
if _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, writeTimeout); err != nil {
if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, true, storageFallbackURLs); err != nil {
// Don't ignore this error, since there won't be any further attempts to write
return nil, fmt.Errorf("failed to write final save: %w", err)
}
glog.Infof("Completed writing %s to storage", outputURI.Redacted())
return nil, nil
}

func extractThumb(session drivers.OSSession, filename string, segment []byte) error {
func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
out, bytesWritten, primaryErr := uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries)
if primaryErr == nil {
return out, bytesWritten, nil
}

backupURI, err := buildBackupURI(outputURI, storageFallbackURLs)
if err != nil {
glog.Errorf("failed to build backup URL: %v", err)
return nil, 0, primaryErr
}

glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr)
return uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries)
}

func buildBackupURI(outputURI *url.URL, storageFallbackURLs map[string]string) (*url.URL, error) {
outputURIStr := outputURI.String()
for primary, backup := range storageFallbackURLs {
if strings.HasPrefix(outputURIStr, primary) {
backupStr := strings.Replace(outputURIStr, primary, backup, 1)
return url.Parse(backupStr)
}
}
return nil, fmt.Errorf("no backup URL found for %s", outputURI.Redacted())
}

func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
outputStr := outputURI.String()
// While we wait for storj to implement an easier method for global object deletion we are hacking something
// here to allow us to have recording objects deleted after 7 days.
if strings.Contains(outputStr, "gateway.storjshare.io/catalyst-recordings-com") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to check only for gateway.storjshare.io here in case we end up using Storj for the backup storage too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's ok, if we do use storj for backup we can set the expiry on the access keys instead, we just needed this hack because we needed it before storj implemented that.

var storjFields drivers.FileProperties
if fields != nil {
storjFields = *fields
}
storjFields.Metadata = expiryField
fields = &storjFields
mjh1 marked this conversation as resolved.
Show resolved Hide resolved
}

driver, err := drivers.ParseOSURL(outputStr, true)
if err != nil {
return nil, 0, err
}
session := driver.NewSession("")

var retryPolicy backoff.BackOff = &backoff.StopBackOff{} // no retries by default
if withRetries {
retryPolicy = UploadRetryBackoff()
}
err = backoff.Retry(func() error {
// To count how many bytes we are trying to read then write (upload) to s3 storage
byteCounter := &ByteCounter{}
teeReader := io.TeeReader(bytes.NewReader(fileContents), byteCounter)

out, err = session.SaveData(context.Background(), "", teeReader, fields, writeTimeout)
bytesWritten = byteCounter.Count

if err != nil {
glog.Errorf("failed upload attempt for %s (%d bytes): %v", outputURI.Redacted(), bytesWritten, err)
}
return err
}, retryPolicy)

return out, bytesWritten, err
}

func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[string]string) error {
tmpDir, err := os.MkdirTemp(os.TempDir(), "thumb-*")
if err != nil {
return fmt.Errorf("temp file creation failed: %w", err)
}
defer os.RemoveAll(tmpDir)
outFile := filepath.Join(tmpDir, "out.jpg")
inFile := filepath.Join(tmpDir, filepath.Base(filename))
inFile := filepath.Join(tmpDir, filepath.Base(outputURI.Path))
if err = os.WriteFile(inFile, segment, 0644); err != nil {
return fmt.Errorf("failed to write input file: %w", err)
}
Expand Down Expand Up @@ -182,10 +220,14 @@ func extractThumb(session drivers.OSSession, filename string, segment []byte) er
return fmt.Errorf("opening file failed: %w", err)
}
defer f.Close()
_, err = session.SaveData(context.Background(), "../latest.jpg", f, &drivers.FileProperties{
CacheControl: "max-age=5",
Metadata: expiryField,
}, 10*time.Second)
thumbData, err := io.ReadAll(f)
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}

thumbURL := outputURI.JoinPath("../latest.jpg")
fields := &drivers.FileProperties{CacheControl: "max-age=5"}
_, _, err = uploadFileWithBackup(thumbURL, thumbData, fields, 10*time.Second, true, storageFallbackURLs)
if err != nil {
return fmt.Errorf("saving thumbnail failed: %w", err)
}
Expand Down
Loading
Loading