From 865c30f246bd4b5739c8c6a653a1cac914577a17 Mon Sep 17 00:00:00 2001 From: emranemran Date: Thu, 1 Feb 2024 18:23:10 -0800 Subject: [PATCH] pipeline: ffmpeg: probe and update width/height for recording inputs --- pipeline/ffmpeg.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index 9ef238e62..9d470252e 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -153,6 +153,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) { return nil, fmt.Errorf("error downloading source manifest: %s", err) } + // Check a few segments from the segmented source input file (non-hls) sourceSegments := sourceManifest.GetAllSegments() job.sourceSegments = len(sourceSegments) err = f.probeSourceSegments(job, sourceSegments) @@ -160,6 +161,12 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) { return nil, err } + // Check a few segments from the recording source input file (hls) + err = f.probeRecordingSourceSegments(job, &inputInfo, sourceSegments) + if err != nil { + log.LogError(job.RequestID, "failed to probe recording source segments before transcoding - continuing with transcode", err) + } + outputs, transcodedSegments, err := transcode.RunTranscodeProcess(transcodeRequest, job.StreamName, inputInfo, f.Broadcaster) if err != nil { log.LogError(job.RequestID, "RunTranscodeProcess returned an error", err) @@ -333,6 +340,44 @@ func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, so return nil } +func (f *ffmpeg) probeRecordingSourceSegments(job *JobInfo, iv *video.InputVideo, sourceSegments []*m3u8.MediaSegment) error { + // Only inspect recording segments if the height/width was not determined in the initial probing step + if job.InputFileInfo.Format != "hls" && (iv.Tracks[0].VideoTrack.Width == 0 || iv.Tracks[0].VideoTrack.Height == 0) { + return nil + } + oldWidth, oldHeight := iv.Tracks[0].VideoTrack.Width, iv.Tracks[0].VideoTrack.Height + segCount := len(sourceSegments) + // Check a random segment in the middle + segmentToCheck := sourceSegments[segCount/2] + + u, err := clients.ManifestURLToSegmentURL(job.SegmentingTargetURL, segmentToCheck.URI) + if err != nil { + return fmt.Errorf("error checking recording source segments: %w", err) + } + probeURL, err := clients.SignURL(u) + if err != nil { + return fmt.Errorf("failed to create signed url for %s: %w", u, err) + } + if err := backoff.Retry(func() error { + recSegmentProbe, err := f.probe.ProbeFile(job.RequestID, probeURL) + if err != nil { + return fmt.Errorf("probe failed for recording source segment %s: %w", u, err) + } + videoTrack, err := recSegmentProbe.GetTrack(video.TrackTypeVideo) + hasVideoTrack := err == nil + if hasVideoTrack { + iv.Tracks[0].VideoTrack.Width = videoTrack.Width + iv.Tracks[0].VideoTrack.Height = videoTrack.Height + } + return nil + }, retries(3)); err != nil { + return err + } + log.Log(job.RequestID, "Updated recording track info from", "old-width", oldWidth, "old-height", oldHeight, "new-width", iv.Tracks[0].VideoTrack.Width, "new-height", iv.Tracks[0].VideoTrack.Height) + + return nil +} + func copyFileToLocalTmpAndSegment(job *JobInfo) (string, error) { // Create a temporary local file to write to localSourceFile, err := os.CreateTemp(os.TempDir(), LocalSourceFilePattern)