From ec03331cdd377c377593983a4bbcb3c2493b682b Mon Sep 17 00:00:00 2001 From: Max Holland Date: Thu, 25 Jan 2024 16:11:09 +0000 Subject: [PATCH 1/3] Add URI to logs and errors for better debugging --- core/uploader.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/uploader.go b/core/uploader.go index c8ca924..fac96b4 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -64,16 +64,16 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t err = backoff.Retry(func() error { _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout) if err != nil { - glog.Errorf("failed upload attempt: %v", err) + glog.Errorf("failed upload attempt for %s: %v", outputURI, err) } return err }, UploadRetryBackoff()) if err != nil { - return fmt.Errorf("failed to upload video: %w", err) + return fmt.Errorf("failed to upload video %s: %w", outputURI, err) } if err = extractThumb(session, outputURI, fileContents); err != nil { - glog.Errorf("extracting thumbnail failed: %v", err) + glog.Errorf("extracting thumbnail failed for %s: %v", outputURI, err) } return nil } From 97d557c4d3b5fddb81a568eab5b7246f86f7412b Mon Sep 17 00:00:00 2001 From: Max Holland Date: Fri, 26 Jan 2024 15:09:22 +0000 Subject: [PATCH 2/3] redact uri --- catalyst-uploader.go | 17 ++++++++++++----- core/uploader.go | 22 ++++++++++++---------- core/uploader_test.go | 5 ++++- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/catalyst-uploader.go b/catalyst-uploader.go index 68a195e..70745ef 100644 --- a/catalyst-uploader.go +++ b/catalyst-uploader.go @@ -3,6 +3,7 @@ package main import ( "encoding/json" "flag" + "net/url" "os" "time" @@ -44,20 +45,26 @@ func run() int { stdout := os.Stdout os.Stdout, _ = os.Open(os.DevNull) - uri := flag.Arg(0) - if uri == "" { - glog.Fatalf("Could not parse object store URI: %s", uri) + output := flag.Arg(0) + if output == "" { + glog.Fatal("Object store URI was emtpy") + return 1 + } + + uri, err := url.Parse(output) + if err != nil { + glog.Fatalf("Failed to parse URI: %s", err) return 1 } err = core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout) if err != nil { - glog.Fatalf("Uploader failed for %s: %s", uri, err) + glog.Fatalf("Uploader failed for %s: %s", uri.Redacted(), err) return 1 } // success, write uploaded file details to stdout - err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri}) + err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri.Redacted()}) if err != nil { glog.Fatal(err) return 1 diff --git a/core/uploader.go b/core/uploader.go index fac96b4..9c94e2f 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "io" + "net/url" "os" "os/exec" "path/filepath" @@ -32,8 +33,9 @@ func UploadRetryBackoff() backoff.BackOff { const segmentWriteTimeout = 5 * time.Minute -func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout time.Duration) error { - storageDriver, err := drivers.ParseOSURL(outputURI, true) +func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration) error { + output := outputURI.String() + storageDriver, err := drivers.ParseOSURL(output, true) if err != nil { return err } @@ -45,7 +47,7 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t // While we wait for storj to implement an easier method for global object deletion we are hacking something // here to allow us to have recording objects deleted after 7 days. fields := &drivers.FileProperties{} - if strings.Contains(outputURI, "gateway.storjshare.io/catalyst-recordings-com") { + if strings.Contains(output, "gateway.storjshare.io/catalyst-recordings-com") { fields = &drivers.FileProperties{ Metadata: map[string]string{ "Object-Expires": "+720h", // Objects will be deleted after 30 days @@ -53,7 +55,7 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t } } - if strings.HasSuffix(outputURI, ".ts") || strings.HasSuffix(outputURI, ".mp4") { + if strings.HasSuffix(output, ".ts") || strings.HasSuffix(output, ".mp4") { // For segments we just write them in one go here and return early. // (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.) fileContents, err := io.ReadAll(input) @@ -64,16 +66,16 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t err = backoff.Retry(func() error { _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout) if err != nil { - glog.Errorf("failed upload attempt for %s: %v", outputURI, err) + glog.Errorf("failed upload attempt for %s: %v", outputURI.Redacted(), err) } return err }, UploadRetryBackoff()) if err != nil { - return fmt.Errorf("failed to upload video %s: %w", outputURI, err) + return fmt.Errorf("failed to upload video %s: %w", outputURI.Redacted(), err) } - if err = extractThumb(session, outputURI, fileContents); err != nil { - glog.Errorf("extracting thumbnail failed for %s: %v", outputURI, err) + if err = extractThumb(session, output, fileContents); err != nil { + glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err) } return nil } @@ -107,7 +109,7 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t // Just log this error, since it'll effectively be retried after the next interval glog.Errorf("Failed to write: %v", err) } else { - glog.V(5).Infof("Wrote %s to storage: %d bytes", outputURI, len(b)) + glog.V(5).Infof("Wrote %s to storage: %d bytes", outputURI.Redacted(), len(b)) } lastWrite = time.Now() } @@ -121,7 +123,7 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t // Don't ignore this error, since there won't be any further attempts to write return fmt.Errorf("failed to write final save: %w", err) } - glog.Infof("Completed writing %s to storage", outputURI) + glog.Infof("Completed writing %s to storage", outputURI.Redacted()) return nil } diff --git a/core/uploader_test.go b/core/uploader_test.go index 2dcd993..dc5007e 100644 --- a/core/uploader_test.go +++ b/core/uploader_test.go @@ -2,6 +2,7 @@ package core import ( "io" + "net/url" "os" "strings" "testing" @@ -32,7 +33,9 @@ func TestItWritesSlowInputIncrementally(t *testing.T) { // Kick off the upload in a goroutine so that we can check the file is incrementally written go func() { - err := Upload(slowReader, outputFile.Name(), 100*time.Millisecond, time.Second) + u, err := url.Parse(outputFile.Name()) + require.NoError(t, err) + err = Upload(slowReader, u, 100*time.Millisecond, time.Second) require.NoError(t, err, "") }() From b97d64261181fc4b31a591aa930828187f55c47b Mon Sep 17 00:00:00 2001 From: Max Holland Date: Tue, 30 Jan 2024 11:16:14 +0000 Subject: [PATCH 3/3] Log out useful headers to pass on to storage providers for debugging --- catalyst-uploader.go | 20 ++++++++++++++------ core/uploader.go | 21 +++++++++++---------- core/uploader_test.go | 2 +- go.mod | 2 +- go.sum | 2 ++ 5 files changed, 29 insertions(+), 18 deletions(-) diff --git a/catalyst-uploader.go b/catalyst-uploader.go index 70745ef..f8e6601 100644 --- a/catalyst-uploader.go +++ b/catalyst-uploader.go @@ -3,6 +3,7 @@ package main import ( "encoding/json" "flag" + "net/http" "net/url" "os" "time" @@ -47,7 +48,7 @@ func run() int { output := flag.Arg(0) if output == "" { - glog.Fatal("Object store URI was emtpy") + glog.Fatal("Object store URI was empty") return 1 } @@ -57,17 +58,24 @@ func run() int { return 1 } - err = core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout) + out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout) if err != nil { glog.Fatalf("Uploader failed for %s: %s", uri.Redacted(), err) return 1 } + var respHeaders http.Header + if out != nil { + respHeaders = out.UploaderResponseHeaders + } + glog.Infof("Uploader succeeded for %s. storageRequestID=%s Etag=%s", uri.Redacted(), respHeaders.Get("X-Amz-Request-Id"), respHeaders.Get("Etag")) // success, write uploaded file details to stdout - err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri.Redacted()}) - if err != nil { - glog.Fatal(err) - return 1 + if glog.V(5) { + err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri.Redacted()}) + if err != nil { + glog.Fatal(err) + return 1 + } } return 0 diff --git a/core/uploader.go b/core/uploader.go index 9c94e2f..069c5fa 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -33,15 +33,15 @@ func UploadRetryBackoff() backoff.BackOff { const segmentWriteTimeout = 5 * time.Minute -func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration) error { +func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration) (*drivers.SaveDataOutput, error) { output := outputURI.String() storageDriver, err := drivers.ParseOSURL(output, true) if err != nil { - return err + return nil, err } session := storageDriver.NewSession("") if err != nil { - return err + return nil, err } // While we wait for storj to implement an easier method for global object deletion we are hacking something @@ -60,24 +60,25 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout // (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.) fileContents, err := io.ReadAll(input) if err != nil { - return fmt.Errorf("failed to read file") + return nil, fmt.Errorf("failed to read file") } + var out *drivers.SaveDataOutput err = backoff.Retry(func() error { - _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout) + out, err = session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout) if err != nil { glog.Errorf("failed upload attempt for %s: %v", outputURI.Redacted(), err) } return err }, UploadRetryBackoff()) if err != nil { - return fmt.Errorf("failed to upload video %s: %w", outputURI.Redacted(), err) + return nil, fmt.Errorf("failed to upload video %s: %w", outputURI.Redacted(), err) } if err = extractThumb(session, output, fileContents); err != nil { glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err) } - return nil + return out, nil } // For the manifest files we want a very short cache ttl as the files are updating every few seconds @@ -115,16 +116,16 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout } } if err := scanner.Err(); err != nil { - return err + return nil, err } // We have to do this final write, otherwise there might be final data that's arrived since the last periodic write if _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, writeTimeout); err != nil { // Don't ignore this error, since there won't be any further attempts to write - return fmt.Errorf("failed to write final save: %w", err) + return nil, fmt.Errorf("failed to write final save: %w", err) } glog.Infof("Completed writing %s to storage", outputURI.Redacted()) - return nil + return nil, nil } func extractThumb(session drivers.OSSession, filename string, segment []byte) error { diff --git a/core/uploader_test.go b/core/uploader_test.go index dc5007e..120ce07 100644 --- a/core/uploader_test.go +++ b/core/uploader_test.go @@ -35,7 +35,7 @@ func TestItWritesSlowInputIncrementally(t *testing.T) { go func() { u, err := url.Parse(outputFile.Name()) require.NoError(t, err) - err = Upload(slowReader, u, 100*time.Millisecond, time.Second) + _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second) require.NoError(t, err, "") }() diff --git a/go.mod b/go.mod index c023463..0164087 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/golang/glog v1.1.0 github.com/google/uuid v1.3.0 - github.com/livepeer/go-tools v0.3.4 + github.com/livepeer/go-tools v0.3.6 github.com/stretchr/testify v1.8.4 ) diff --git a/go.sum b/go.sum index 25168f0..ee70a60 100644 --- a/go.sum +++ b/go.sum @@ -1435,6 +1435,8 @@ github.com/livepeer/go-tools v0.3.3 h1:Je2P+ovDIIlFWtlns5v+MmHtdIytsAJS6+XyeZ8sF github.com/livepeer/go-tools v0.3.3/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos= github.com/livepeer/go-tools v0.3.4 h1:Otl8VkGA5FdNQMTTN/yh0V72vhSbSQevUTL67AXz6kU= github.com/livepeer/go-tools v0.3.4/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos= +github.com/livepeer/go-tools v0.3.6 h1:LhRnoVVGFCtfBh6WyKdwJ2bPD/h5gaRvsAszmCqKt1Q= +github.com/livepeer/go-tools v0.3.6/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos= github.com/lucas-clemente/quic-go v0.19.3/go.mod h1:ADXpNbTQjq1hIzCpB+y/k5iz4n4z4IwqoLb94Kh5Hu8= github.com/lucas-clemente/quic-go v0.28.1/go.mod h1:oGz5DKK41cJt5+773+BSO9BXDsREY4HLf7+0odGAPO0= github.com/lucas-clemente/quic-go v0.29.1/go.mod h1:CTcNfLYJS2UuRNB+zcNlgvkjBhxX6Hm3WUxxAQx2mgE=