diff --git a/catalyst-uploader.go b/catalyst-uploader.go index cf843ac..9eb9842 100644 --- a/catalyst-uploader.go +++ b/catalyst-uploader.go @@ -39,6 +39,7 @@ func run() int { verbosity := fs.String("v", "", "Log verbosity. {4|5|6}") timeout := fs.Duration("t", 30*time.Second, "Upload timeout") storageFallbackURLs := CommaMapFlag(fs, "storage-fallback-urls", `Comma-separated map of primary to backup storage URLs. If a file fails uploading to one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced`) + segTimeout := fs.Duration("segment-timeout", 5*time.Minute, "Segment write timeout") defaultConfigFile := "/etc/livepeer/catalyst_uploader.conf" if _, err := os.Stat(defaultConfigFile); os.IsNotExist(err) { @@ -100,7 +101,7 @@ func run() int { } start := time.Now() - out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs) + out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout) if err != nil { glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err) return 1 diff --git a/core/uploader.go b/core/uploader.go index 49ba0ab..d0e9746 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -49,13 +49,11 @@ func SingleRequestRetryBackoff() backoff.BackOff { return newExponentialBackOffExecutor(5*time.Second, 10*time.Second, 30*time.Second) } -const segmentWriteTimeout = 5 * time.Minute - var expiryField = map[string]string{ "Object-Expires": "+168h", // Objects will be deleted after 7 days } -func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string) (*drivers.SaveDataOutput, error) { +func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration) (*drivers.SaveDataOutput, error) { ext := filepath.Ext(outputURI.Path) inputFile, err := os.CreateTemp("", "upload-*"+ext) if err != nil { @@ -75,7 +73,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout return nil, fmt.Errorf("failed to close input file: %w", err) } - out, bytesWritten, err := uploadFileWithBackup(outputURI, inputFileName, nil, segmentWriteTimeout, true, storageFallbackURLs) + out, bytesWritten, err := uploadFileWithBackup(outputURI, inputFileName, nil, segTimeout, true, storageFallbackURLs) if err != nil { return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err) } diff --git a/core/uploader_test.go b/core/uploader_test.go index 86f3a76..b775ba9 100644 --- a/core/uploader_test.go +++ b/core/uploader_test.go @@ -36,7 +36,7 @@ func TestItWritesSlowInputIncrementally(t *testing.T) { go func() { u, err := url.Parse(outputFile.Name()) require.NoError(t, err) - _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil) + _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, nil) require.NoError(t, err, "") }()