diff --git a/catalyst-uploader.go b/catalyst-uploader.go index 9eb9842..6eff256 100644 --- a/catalyst-uploader.go +++ b/catalyst-uploader.go @@ -40,6 +40,8 @@ func run() int { timeout := fs.Duration("t", 30*time.Second, "Upload timeout") storageFallbackURLs := CommaMapFlag(fs, "storage-fallback-urls", `Comma-separated map of primary to backup storage URLs. If a file fails uploading to one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced`) segTimeout := fs.Duration("segment-timeout", 5*time.Minute, "Segment write timeout") + disableRecording := CommaSliceFlag(fs, "disable-recording", `Comma-separated list of playbackIDs to disable recording for`) + disableThumbs := CommaSliceFlag(fs, "disable-thumbs", `Comma-separated list of playbackIDs to disable thumbs for`) defaultConfigFile := "/etc/livepeer/catalyst_uploader.conf" if _, err := os.Stat(defaultConfigFile); os.IsNotExist(err) { @@ -100,8 +102,15 @@ func run() int { return 1 } + for _, playbackID := range *disableRecording { + if strings.Contains(uri.Path, playbackID) { + glog.Errorf("Uploader disabled for %s", uri.Redacted()) + return 0 + } + } + start := time.Now() - out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout) + out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout, *disableThumbs) if err != nil { glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err) return 1 @@ -124,6 +133,20 @@ func run() int { return 0 } +// handles -foo=value1,value2,value3 +func CommaSliceFlag(fs *flag.FlagSet, name string, usage string) *[]string { + var dest []string + fs.Func(name, usage, func(s string) error { + split := strings.Split(s, ",") + if len(split) == 1 && split[0] == "" { + return nil + } + dest = split + return nil + }) + return &dest +} + // handles -foo=key1=value1,key2=value2 func CommaMapFlag(fs *flag.FlagSet, name string, usage string) *map[string]string { var dest map[string]string diff --git a/core/uploader.go b/core/uploader.go index d0e9746..1f4b918 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -53,7 +53,7 @@ var expiryField = map[string]string{ "Object-Expires": "+168h", // Objects will be deleted after 7 days } -func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration) (*drivers.SaveDataOutput, error) { +func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration, disableThumbs []string) (*drivers.SaveDataOutput, error) { ext := filepath.Ext(outputURI.Path) inputFile, err := os.CreateTemp("", "upload-*"+ext) if err != nil { @@ -78,7 +78,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err) } - if err = extractThumb(outputURI, inputFileName, storageFallbackURLs); err != nil { + if err = extractThumb(outputURI, inputFileName, storageFallbackURLs, disableThumbs); err != nil { glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err) } return out, nil @@ -229,7 +229,14 @@ func uploadFile(outputURI *url.URL, fileName string, fields *drivers.FilePropert return out, bytesWritten, err } -func extractThumb(outputURI *url.URL, segmentFileName string, storageFallbackURLs map[string]string) error { +func extractThumb(outputURI *url.URL, segmentFileName string, storageFallbackURLs map[string]string, disableThumbs []string) error { + for _, playbackID := range disableThumbs { + if strings.Contains(outputURI.Path, playbackID) { + glog.Infof("Thumbnails disabled for %s", outputURI.Redacted()) + return nil + } + } + tmpDir, err := os.MkdirTemp(os.TempDir(), "thumb-*") if err != nil { return fmt.Errorf("temp file creation failed: %w", err) diff --git a/core/uploader_test.go b/core/uploader_test.go index 9717b57..a917f00 100644 --- a/core/uploader_test.go +++ b/core/uploader_test.go @@ -36,7 +36,7 @@ func TestItWritesSlowInputIncrementally(t *testing.T) { go func() { u, err := url.Parse(outputFile.Name()) require.NoError(t, err) - _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, time.Minute) + _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, time.Minute, nil) require.NoError(t, err, "") }()