diff --git a/catalyst-uploader.go b/catalyst-uploader.go index 68a195e..f8e6601 100644 --- a/catalyst-uploader.go +++ b/catalyst-uploader.go @@ -3,6 +3,8 @@ package main import ( "encoding/json" "flag" + "net/http" + "net/url" "os" "time" @@ -44,24 +46,37 @@ func run() int { stdout := os.Stdout os.Stdout, _ = os.Open(os.DevNull) - uri := flag.Arg(0) - if uri == "" { - glog.Fatalf("Could not parse object store URI: %s", uri) + output := flag.Arg(0) + if output == "" { + glog.Fatal("Object store URI was empty") return 1 } - err = core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout) + uri, err := url.Parse(output) if err != nil { - glog.Fatalf("Uploader failed for %s: %s", uri, err) + glog.Fatalf("Failed to parse URI: %s", err) return 1 } - // success, write uploaded file details to stdout - err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri}) + out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout) if err != nil { - glog.Fatal(err) + glog.Fatalf("Uploader failed for %s: %s", uri.Redacted(), err) return 1 } + var respHeaders http.Header + if out != nil { + respHeaders = out.UploaderResponseHeaders + } + glog.Infof("Uploader succeeded for %s. storageRequestID=%s Etag=%s", uri.Redacted(), respHeaders.Get("X-Amz-Request-Id"), respHeaders.Get("Etag")) + // success, write uploaded file details to stdout + if glog.V(5) { + err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri.Redacted()}) + if err != nil { + glog.Fatal(err) + return 1 + } + } + return 0 } diff --git a/core/uploader.go b/core/uploader.go index c8ca924..069c5fa 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "io" + "net/url" "os" "os/exec" "path/filepath" @@ -32,20 +33,21 @@ func UploadRetryBackoff() backoff.BackOff { const segmentWriteTimeout = 5 * time.Minute -func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout time.Duration) error { - storageDriver, err := drivers.ParseOSURL(outputURI, true) +func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration) (*drivers.SaveDataOutput, error) { + output := outputURI.String() + storageDriver, err := drivers.ParseOSURL(output, true) if err != nil { - return err + return nil, err } session := storageDriver.NewSession("") if err != nil { - return err + return nil, err } // While we wait for storj to implement an easier method for global object deletion we are hacking something // here to allow us to have recording objects deleted after 7 days. fields := &drivers.FileProperties{} - if strings.Contains(outputURI, "gateway.storjshare.io/catalyst-recordings-com") { + if strings.Contains(output, "gateway.storjshare.io/catalyst-recordings-com") { fields = &drivers.FileProperties{ Metadata: map[string]string{ "Object-Expires": "+720h", // Objects will be deleted after 30 days @@ -53,29 +55,30 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t } } - if strings.HasSuffix(outputURI, ".ts") || strings.HasSuffix(outputURI, ".mp4") { + if strings.HasSuffix(output, ".ts") || strings.HasSuffix(output, ".mp4") { // For segments we just write them in one go here and return early. // (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.) fileContents, err := io.ReadAll(input) if err != nil { - return fmt.Errorf("failed to read file") + return nil, fmt.Errorf("failed to read file") } + var out *drivers.SaveDataOutput err = backoff.Retry(func() error { - _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout) + out, err = session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout) if err != nil { - glog.Errorf("failed upload attempt: %v", err) + glog.Errorf("failed upload attempt for %s: %v", outputURI.Redacted(), err) } return err }, UploadRetryBackoff()) if err != nil { - return fmt.Errorf("failed to upload video: %w", err) + return nil, fmt.Errorf("failed to upload video %s: %w", outputURI.Redacted(), err) } - if err = extractThumb(session, outputURI, fileContents); err != nil { - glog.Errorf("extracting thumbnail failed: %v", err) + if err = extractThumb(session, output, fileContents); err != nil { + glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err) } - return nil + return out, nil } // For the manifest files we want a very short cache ttl as the files are updating every few seconds @@ -107,22 +110,22 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t // Just log this error, since it'll effectively be retried after the next interval glog.Errorf("Failed to write: %v", err) } else { - glog.V(5).Infof("Wrote %s to storage: %d bytes", outputURI, len(b)) + glog.V(5).Infof("Wrote %s to storage: %d bytes", outputURI.Redacted(), len(b)) } lastWrite = time.Now() } } if err := scanner.Err(); err != nil { - return err + return nil, err } // We have to do this final write, otherwise there might be final data that's arrived since the last periodic write if _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, writeTimeout); err != nil { // Don't ignore this error, since there won't be any further attempts to write - return fmt.Errorf("failed to write final save: %w", err) + return nil, fmt.Errorf("failed to write final save: %w", err) } - glog.Infof("Completed writing %s to storage", outputURI) - return nil + glog.Infof("Completed writing %s to storage", outputURI.Redacted()) + return nil, nil } func extractThumb(session drivers.OSSession, filename string, segment []byte) error { diff --git a/core/uploader_test.go b/core/uploader_test.go index 2dcd993..120ce07 100644 --- a/core/uploader_test.go +++ b/core/uploader_test.go @@ -2,6 +2,7 @@ package core import ( "io" + "net/url" "os" "strings" "testing" @@ -32,7 +33,9 @@ func TestItWritesSlowInputIncrementally(t *testing.T) { // Kick off the upload in a goroutine so that we can check the file is incrementally written go func() { - err := Upload(slowReader, outputFile.Name(), 100*time.Millisecond, time.Second) + u, err := url.Parse(outputFile.Name()) + require.NoError(t, err) + _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second) require.NoError(t, err, "") }() diff --git a/go.mod b/go.mod index c023463..0164087 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/golang/glog v1.1.0 github.com/google/uuid v1.3.0 - github.com/livepeer/go-tools v0.3.4 + github.com/livepeer/go-tools v0.3.6 github.com/stretchr/testify v1.8.4 ) diff --git a/go.sum b/go.sum index 25168f0..ee70a60 100644 --- a/go.sum +++ b/go.sum @@ -1435,6 +1435,8 @@ github.com/livepeer/go-tools v0.3.3 h1:Je2P+ovDIIlFWtlns5v+MmHtdIytsAJS6+XyeZ8sF github.com/livepeer/go-tools v0.3.3/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos= github.com/livepeer/go-tools v0.3.4 h1:Otl8VkGA5FdNQMTTN/yh0V72vhSbSQevUTL67AXz6kU= github.com/livepeer/go-tools v0.3.4/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos= +github.com/livepeer/go-tools v0.3.6 h1:LhRnoVVGFCtfBh6WyKdwJ2bPD/h5gaRvsAszmCqKt1Q= +github.com/livepeer/go-tools v0.3.6/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos= github.com/lucas-clemente/quic-go v0.19.3/go.mod h1:ADXpNbTQjq1hIzCpB+y/k5iz4n4z4IwqoLb94Kh5Hu8= github.com/lucas-clemente/quic-go v0.28.1/go.mod h1:oGz5DKK41cJt5+773+BSO9BXDsREY4HLf7+0odGAPO0= github.com/lucas-clemente/quic-go v0.29.1/go.mod h1:CTcNfLYJS2UuRNB+zcNlgvkjBhxX6Hm3WUxxAQx2mgE=