Skip to content

Commit

Permalink
Add segment write timeout config param
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Aug 22, 2024
1 parent 38ab542 commit 02957ca
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 6 deletions.
3 changes: 2 additions & 1 deletion catalyst-uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func run() int {
verbosity := fs.String("v", "", "Log verbosity. {4|5|6}")
timeout := fs.Duration("t", 30*time.Second, "Upload timeout")
storageFallbackURLs := CommaMapFlag(fs, "storage-fallback-urls", `Comma-separated map of primary to backup storage URLs. If a file fails uploading to one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced`)
segTimeout := fs.Duration("segment-timeout", 5*time.Minute, "Segment write timeout")

defaultConfigFile := "/etc/livepeer/catalyst_uploader.conf"
if _, err := os.Stat(defaultConfigFile); os.IsNotExist(err) {
Expand Down Expand Up @@ -100,7 +101,7 @@ func run() int {
}

start := time.Now()
out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs)
out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout)
if err != nil {
glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err)
return 1
Expand Down
6 changes: 2 additions & 4 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,11 @@ func SingleRequestRetryBackoff() backoff.BackOff {
return newExponentialBackOffExecutor(5*time.Second, 10*time.Second, 30*time.Second)
}

const segmentWriteTimeout = 5 * time.Minute

var expiryField = map[string]string{
"Object-Expires": "+168h", // Objects will be deleted after 7 days
}

func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string) (*drivers.SaveDataOutput, error) {
func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration) (*drivers.SaveDataOutput, error) {
ext := filepath.Ext(outputURI.Path)
inputFile, err := os.CreateTemp("", "upload-*"+ext)
if err != nil {
Expand All @@ -75,7 +73,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout
return nil, fmt.Errorf("failed to close input file: %w", err)
}

out, bytesWritten, err := uploadFileWithBackup(outputURI, inputFileName, nil, segmentWriteTimeout, true, storageFallbackURLs)
out, bytesWritten, err := uploadFileWithBackup(outputURI, inputFileName, nil, segTimeout, true, storageFallbackURLs)
if err != nil {
return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestItWritesSlowInputIncrementally(t *testing.T) {
go func() {
u, err := url.Parse(outputFile.Name())
require.NoError(t, err)
_, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil)
_, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, nil)

Check failure on line 39 in core/uploader_test.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

cannot use nil as time.Duration value in argument to Upload
require.NoError(t, err, "")
}()

Expand Down

0 comments on commit 02957ca

Please sign in to comment.