diff --git a/core/uploader.go b/core/uploader.go index 60d5771..e8355d7 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -54,21 +54,48 @@ var expiryField = map[string]string{ "Object-Expires": "+168h", // Objects will be deleted after 7 days } +func writeToTempFile(data []byte) (*os.File, error) { + tmpFile, err := os.CreateTemp("", "upload-*.tmp") + if err != nil { + return nil, fmt.Errorf("failed to create temp file: %w", err) + } + + if _, err := tmpFile.Write(data); err != nil { + tmpFile.Close() + return nil, fmt.Errorf("failed to write to temp file: %w", err) + } + + if _, err := tmpFile.Seek(0, 0); err != nil { + tmpFile.Close() + return nil, fmt.Errorf("failed to seek temp file: %w", err) + } + + return tmpFile, nil +} + func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string) (*drivers.SaveDataOutput, error) { - if strings.HasSuffix(outputURI.Path, ".ts") || strings.HasSuffix(outputURI.Path, ".mp4") { + ext := filepath.Ext(outputURI.Path) + inputFile, err := os.CreateTemp("", "upload-*"+ext) + if err != nil { + return nil, fmt.Errorf("failed to write to temp file: %w", err) + } + defer os.Remove(inputFile.Name()) + defer inputFile.Close() + + if ext == ".ts" || ext == ".mp4" { // For segments we just write them in one go here and return early. // (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.) - fileContents, err := io.ReadAll(input) + _, err = io.Copy(inputFile, input) if err != nil { - return nil, fmt.Errorf("failed to read file") + return nil, fmt.Errorf("failed to write to temp file: %w", err) } - out, bytesWritten, err := uploadFileWithBackup(outputURI, fileContents, nil, segmentWriteTimeout, true, storageFallbackURLs) + out, bytesWritten, err := uploadFileWithBackup(outputURI, inputFile, nil, segmentWriteTimeout, true, storageFallbackURLs) if err != nil { return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err) } - if err = extractThumb(outputURI, fileContents, storageFallbackURLs); err != nil { + if err = extractThumb(outputURI, inputFile, storageFallbackURLs); err != nil { glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err) } return out, nil @@ -76,7 +103,6 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout // For the manifest files we want a very short cache ttl as the files are updating every few seconds fields := &drivers.FileProperties{CacheControl: "max-age=1"} - var fileContents []byte var lastWrite = time.Now() scanner := bufio.NewScanner(input) @@ -95,11 +121,16 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout for scanner.Scan() { b := scanner.Bytes() - fileContents = append(fileContents, b...) + if _, err := inputFile.Seek(0, io.SeekEnd); err != nil { + return nil, fmt.Errorf("failed to seek input file: %w", err) + } + if _, err := inputFile.Write(b); err != nil { + return nil, fmt.Errorf("failed to write to input file: %w", err) + } // Only write the latest version of the data that's been piped in if enough time has elapsed since the last write if lastWrite.Add(waitBetweenWrites).Before(time.Now()) { - if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, false, storageFallbackURLs); err != nil { + if _, _, err := uploadFileWithBackup(outputURI, inputFile, fields, writeTimeout, false, storageFallbackURLs); err != nil { // Just log this error, since it'll effectively be retried after the next interval glog.Errorf("Failed to write: %v", err) } else { @@ -113,7 +144,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout } // We have to do this final write, otherwise there might be final data that's arrived since the last periodic write - if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, false, storageFallbackURLs); err != nil { + if _, _, err := uploadFileWithBackup(outputURI, inputFile, fields, writeTimeout, false, storageFallbackURLs); err != nil { // Don't ignore this error, since there won't be any further attempts to write return nil, fmt.Errorf("failed to write final save: %w", err) } @@ -121,7 +152,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout return nil, nil } -func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { +func uploadFileWithBackup(outputURI *url.URL, fileContents *os.File, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { retryPolicy := NoRetries() if withRetries { retryPolicy = UploadRetryBackoff() @@ -160,7 +191,7 @@ func buildBackupURI(outputURI *url.URL, storageFallbackURLs map[string]string) ( return nil, fmt.Errorf("no backup URL found for %s", outputURI.Redacted()) } -func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { +func uploadFile(outputURI *url.URL, file *os.File, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { outputStr := outputURI.String() // While we wait for storj to implement an easier method for global object deletion we are hacking something // here to allow us to have recording objects deleted after 7 days. @@ -184,9 +215,12 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro retryPolicy = SingleRequestRetryBackoff() } err = backoff.Retry(func() error { + if _, err := file.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("failed to seek to file start: %w", err) + } // To count how many bytes we are trying to read then write (upload) to s3 storage byteCounter := &ByteCounter{} - teeReader := io.TeeReader(bytes.NewReader(fileContents), byteCounter) + teeReader := io.TeeReader(file, byteCounter) out, err = session.SaveData(context.Background(), "", teeReader, fields, writeTimeout) bytesWritten = byteCounter.Count @@ -200,20 +234,16 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro return out, bytesWritten, err } -func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[string]string) error { +func extractThumb(outputURI *url.URL, segment *os.File, storageFallbackURLs map[string]string) error { tmpDir, err := os.MkdirTemp(os.TempDir(), "thumb-*") if err != nil { return fmt.Errorf("temp file creation failed: %w", err) } defer os.RemoveAll(tmpDir) outFile := filepath.Join(tmpDir, "out.jpg") - inFile := filepath.Join(tmpDir, filepath.Base(outputURI.Path)) - if err = os.WriteFile(inFile, segment, 0644); err != nil { - return fmt.Errorf("failed to write input file: %w", err) - } args := []string{ - "-i", inFile, + "-i", segment.Name(), "-ss", "00:00:00", "-vframes", "1", "-vf", "scale=854:480:force_original_aspect_ratio=decrease", @@ -235,19 +265,15 @@ func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[st return fmt.Errorf("ffmpeg failed[%s] [%s]: %w", outputBuf.String(), stdErr.String(), err) } - f, err := os.Open(outFile) + thumbFile, err := os.Open(outFile) if err != nil { return fmt.Errorf("opening file failed: %w", err) } - defer f.Close() - thumbData, err := io.ReadAll(f) - if err != nil { - return fmt.Errorf("failed to read file: %w", err) - } + defer thumbFile.Close() thumbURL := outputURI.JoinPath("../latest.jpg") fields := &drivers.FileProperties{CacheControl: "max-age=5"} - _, _, err = uploadFileWithBackup(thumbURL, thumbData, fields, 10*time.Second, true, storageFallbackURLs) + _, _, err = uploadFileWithBackup(thumbURL, thumbFile, fields, 10*time.Second, true, storageFallbackURLs) if err != nil { return fmt.Errorf("saving thumbnail failed: %w", err) }