diff --git a/catalyst-uploader.go b/catalyst-uploader.go index 629771f..9baf390 100644 --- a/catalyst-uploader.go +++ b/catalyst-uploader.go @@ -3,6 +3,7 @@ package main import ( "encoding/json" "flag" + "net/url" "os" "time" @@ -44,21 +45,27 @@ func run() int { stdout := os.Stdout os.Stdout, _ = os.Open(os.DevNull) - uri := flag.Arg(0) - if uri == "" { - glog.Fatalf("Could not parse object store URI: %s", uri) + output := flag.Arg(0) + if output == "" { + glog.Fatal("Object store URI was emtpy") + return 1 + } + + uri, err := url.Parse(output) + if err != nil { + glog.Fatalf("Failed to parse URI: %s", err) return 1 } err = core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout) if err != nil { - glog.Fatalf("Uploader failed for %s: %s", uri, err) + glog.Fatalf("Uploader failed for %s: %s", uri.Redacted(), err) return 1 } // success, write uploaded file details to stdout if glog.V(5) { - err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri}) + err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri.Redacted()}) if err != nil { glog.Fatal(err) return 1 diff --git a/core/uploader.go b/core/uploader.go index fac96b4..9c94e2f 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "io" + "net/url" "os" "os/exec" "path/filepath" @@ -32,8 +33,9 @@ func UploadRetryBackoff() backoff.BackOff { const segmentWriteTimeout = 5 * time.Minute -func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout time.Duration) error { - storageDriver, err := drivers.ParseOSURL(outputURI, true) +func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration) error { + output := outputURI.String() + storageDriver, err := drivers.ParseOSURL(output, true) if err != nil { return err } @@ -45,7 +47,7 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t // While we wait for storj to implement an easier method for global object deletion we are hacking something // here to allow us to have recording objects deleted after 7 days. fields := &drivers.FileProperties{} - if strings.Contains(outputURI, "gateway.storjshare.io/catalyst-recordings-com") { + if strings.Contains(output, "gateway.storjshare.io/catalyst-recordings-com") { fields = &drivers.FileProperties{ Metadata: map[string]string{ "Object-Expires": "+720h", // Objects will be deleted after 30 days @@ -53,7 +55,7 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t } } - if strings.HasSuffix(outputURI, ".ts") || strings.HasSuffix(outputURI, ".mp4") { + if strings.HasSuffix(output, ".ts") || strings.HasSuffix(output, ".mp4") { // For segments we just write them in one go here and return early. // (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.) fileContents, err := io.ReadAll(input) @@ -64,16 +66,16 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t err = backoff.Retry(func() error { _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout) if err != nil { - glog.Errorf("failed upload attempt for %s: %v", outputURI, err) + glog.Errorf("failed upload attempt for %s: %v", outputURI.Redacted(), err) } return err }, UploadRetryBackoff()) if err != nil { - return fmt.Errorf("failed to upload video %s: %w", outputURI, err) + return fmt.Errorf("failed to upload video %s: %w", outputURI.Redacted(), err) } - if err = extractThumb(session, outputURI, fileContents); err != nil { - glog.Errorf("extracting thumbnail failed for %s: %v", outputURI, err) + if err = extractThumb(session, output, fileContents); err != nil { + glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err) } return nil } @@ -107,7 +109,7 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t // Just log this error, since it'll effectively be retried after the next interval glog.Errorf("Failed to write: %v", err) } else { - glog.V(5).Infof("Wrote %s to storage: %d bytes", outputURI, len(b)) + glog.V(5).Infof("Wrote %s to storage: %d bytes", outputURI.Redacted(), len(b)) } lastWrite = time.Now() } @@ -121,7 +123,7 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t // Don't ignore this error, since there won't be any further attempts to write return fmt.Errorf("failed to write final save: %w", err) } - glog.Infof("Completed writing %s to storage", outputURI) + glog.Infof("Completed writing %s to storage", outputURI.Redacted()) return nil } diff --git a/core/uploader_test.go b/core/uploader_test.go index 2dcd993..dc5007e 100644 --- a/core/uploader_test.go +++ b/core/uploader_test.go @@ -2,6 +2,7 @@ package core import ( "io" + "net/url" "os" "strings" "testing" @@ -32,7 +33,9 @@ func TestItWritesSlowInputIncrementally(t *testing.T) { // Kick off the upload in a goroutine so that we can check the file is incrementally written go func() { - err := Upload(slowReader, outputFile.Name(), 100*time.Millisecond, time.Second) + u, err := url.Parse(outputFile.Name()) + require.NoError(t, err) + err = Upload(slowReader, u, 100*time.Millisecond, time.Second) require.NoError(t, err, "") }()