Skip to content

Commit

Permalink
Adapt to a generic url replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Oct 21, 2024
1 parent b4b05e7 commit c94385b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 19 deletions.
5 changes: 2 additions & 3 deletions catalyst-uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ func run() int {
segTimeout := fs.Duration("segment-timeout", 5*time.Minute, "Segment write timeout")
disableRecording := CommaSliceFlag(fs, "disable-recording", `Comma-separated list of playbackIDs to disable recording for`)
disableThumbs := CommaSliceFlag(fs, "disable-thumbs", `Comma-separated list of playbackIDs to disable thumbs for`)
privateThumbs := CommaSliceFlag(fs, "private-thumbs", `Comma-separated list of playbackIDs to save to private location`)
privateThumbsURLReplacement := CommaMapFlag(fs, "private-thumbs-replace", `Map for replacement URL to use when saving thumbnails to the private location`)
thumbsURLReplacement := CommaMapFlag(fs, "thumbs-replace-urls", `Map of space separated playbackIDs to space separated URL replacement to use when saving thumbnails. E.g. playbackID1 playbackID2=oldURL newURL`)

defaultConfigFile := "/etc/livepeer/catalyst_uploader.conf"
if _, err := os.Stat(defaultConfigFile); os.IsNotExist(err) {
Expand Down Expand Up @@ -113,7 +112,7 @@ func run() int {
}

start := time.Now()
out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout, *disableThumbs, *privateThumbs, *privateThumbsURLReplacement)
out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout, *disableThumbs, *thumbsURLReplacement)
if err != nil {
glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err)
return 1
Expand Down
33 changes: 18 additions & 15 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var expiryField = map[string]string{
"Object-Expires": "+168h", // Objects will be deleted after 7 days
}

func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration, disableThumbs, privateThumbs []string, privateThumbsURLReplacement map[string]string) (*drivers.SaveDataOutput, error) {
func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration, disableThumbs []string, thumbsURLReplacement map[string]string) (*drivers.SaveDataOutput, error) {
ext := filepath.Ext(outputURI.Path)
inputFile, err := os.CreateTemp("", "upload-*"+ext)
if err != nil {
Expand All @@ -78,7 +78,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout
return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err)
}

if err = extractThumb(outputURI, inputFileName, storageFallbackURLs, disableThumbs, privateThumbs, privateThumbsURLReplacement); err != nil {
if err = extractThumb(outputURI, inputFileName, storageFallbackURLs, disableThumbs, thumbsURLReplacement); err != nil {
glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err)
}
return out, nil
Expand Down Expand Up @@ -229,28 +229,31 @@ func uploadFile(outputURI *url.URL, fileName string, fields *drivers.FilePropert
return out, bytesWritten, err
}

func extractThumb(outputURI *url.URL, segmentFileName string, storageFallbackURLs map[string]string, disableThumbs []string, privateThumbs []string, privateThumbsURLReplacement map[string]string) error {
func extractThumb(outputURI *url.URL, segmentFileName string, storageFallbackURLs map[string]string, disableThumbs []string, thumbsURLReplacement map[string]string) error {
for _, playbackID := range disableThumbs {
if strings.Contains(outputURI.Path, playbackID) {
glog.Infof("Thumbnails disabled for %s", outputURI.Redacted())
return nil
}
}
for _, playbackID := range privateThumbs {
if strings.Contains(outputURI.Path, playbackID) {
glog.Infof("Saving thumbnail to private location for %s", outputURI.Redacted())
outputURIStr := outputURI.String()
for original, private := range privateThumbsURLReplacement {
if strings.HasPrefix(outputURIStr, original) {
newURI, err := url.Parse(strings.Replace(outputURIStr, original, private, 1))
if err != nil {
return fmt.Errorf("failed to parse thumbnail URL: %w", err)
}
outputURI = newURI
for playbackIDs, replacement := range thumbsURLReplacement {
for _, playbackID := range strings.Split(playbackIDs, " ") {
if strings.Contains(outputURI.Path, playbackID) {
outputURIStr := outputURI.String()
split := strings.Split(replacement, " ")
if len(split) != 2 {
break
}
original, replacementWith := split[0], split[1]

glog.Infof("Replacing thumbnail location for %s", outputURI.Redacted())
newURI, err := url.Parse(strings.Replace(outputURIStr, original, replacementWith, 1))
if err != nil {
return fmt.Errorf("failed to parse thumbnail URL: %w", err)
}
outputURI = newURI
break
}
break
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestItWritesSlowInputIncrementally(t *testing.T) {
go func() {
u, err := url.Parse(outputFile.Name())
require.NoError(t, err)
_, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, time.Minute, nil, nil, nil)
_, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, time.Minute, nil, nil)
require.NoError(t, err, "")
}()

Expand Down

0 comments on commit c94385b

Please sign in to comment.