diff --git a/pipeline/coordinator.go b/pipeline/coordinator.go index 3f625732d..b057f6584 100644 --- a/pipeline/coordinator.go +++ b/pipeline/coordinator.go @@ -40,6 +40,8 @@ const ( // Execute the FFMPEG pipeline first and fallback to the external transcoding // provider on errors. StrategyFallbackExternal Strategy = "fallback_external" + // Execute the external transcoder pipeline in foreground and FFMPEG / Livepeer in background. + StrategyBackgroundFFMpeg Strategy = "background_ffmpeg" // Only mp4s of maxMP4OutDuration will have MP4s generated for each rendition maxMP4OutDuration = 2 * time.Minute maxRecordingMP4Duration = 12 * time.Hour @@ -48,7 +50,7 @@ const ( func (s Strategy) IsValid() bool { switch s { - case StrategyExternalDominance, StrategyCatalystFfmpegDominance, StrategyFallbackExternal: + case StrategyExternalDominance, StrategyCatalystFfmpegDominance, StrategyFallbackExternal, StrategyBackgroundFFMpeg: return true default: return false @@ -417,17 +419,20 @@ func (c *Coordinator) startUploadJob(p *JobInfo) { switch strategy { case StrategyExternalDominance: - c.startOneUploadJob(p, c.pipeExternal, false) + c.startOneUploadJob(p, c.pipeExternal, true, false) case StrategyCatalystFfmpegDominance: - c.startOneUploadJob(p, c.pipeFfmpeg, false) + c.startOneUploadJob(p, c.pipeFfmpeg, true, false) + case StrategyBackgroundFFMpeg: + c.startOneUploadJob(p, c.pipeExternal, true, false) + c.startOneUploadJob(p, c.pipeFfmpeg, false, false) case StrategyFallbackExternal: // nolint:errcheck go recovered(func() (t bool, e error) { - success := <-c.startOneUploadJob(p, c.pipeFfmpeg, true) + success := <-c.startOneUploadJob(p, c.pipeFfmpeg, true, true) if !success { p.inFallbackMode = true log.Log(p.RequestID, "Entering fallback pipeline") - c.startOneUploadJob(p, c.pipeExternal, false) + c.startOneUploadJob(p, c.pipeExternal, true, false) } return }) @@ -445,9 +450,13 @@ func checkLivepeerCompatible(requestID string, strategy Strategy, iv video.Input for _, track := range iv.Tracks { // If the video codec is not compatible then override to external pipeline to avoid sending to Livepeer // We always covert the audio to AAC before sending for transcoding, so don't need to check this - if track.Type == video.TrackTypeVideo && strings.ToLower(track.Codec) != "h264" { - log.Log(requestID, "codec not supported by Livepeer pipeline", "trackType", track.Type, "codec", track.Codec) - return livepeerNotSupported(strategy) + if track.Type == video.TrackTypeVideo { + if strings.ToLower(track.Codec) == "hevc" { + strategy = StrategyBackgroundFFMpeg + } else if strings.ToLower(track.Codec) != "h264" { + log.Log(requestID, "codec not supported by Livepeer pipeline", "trackType", track.Type, "codec", track.Codec) + return livepeerNotSupported(strategy) + } } if track.Type == video.TrackTypeVideo && track.Rotation != 0 { log.Log(requestID, "video rotation not supported by Livepeer pipeline", "rotation", track.Rotation) @@ -507,7 +516,15 @@ func checkDisplayAspectRatio(track video.InputTrack, requestID string) bool { // The `hasFallback` argument means the caller has a special logic to handle // failures (today this means re-running the job in another pipeline). If it's // set to true, error callbacks from this job will not be sent. -func (c *Coordinator) startOneUploadJob(si *JobInfo, handler Handler, hasFallback bool) <-chan bool { +func (c *Coordinator) startOneUploadJob(si *JobInfo, handler Handler, foreground, hasFallback bool) <-chan bool { + if !foreground { + si.RequestID = fmt.Sprintf("bg_%s", si.RequestID) + if si.HlsTargetURL != nil { + si.HlsTargetURL = si.HlsTargetURL.JoinPath("..", handler.Name(), path.Base(si.HlsTargetURL.Path)) + } + // this will prevent the callbacks for this job from actually being sent + si.CallbackURL = "" + } log.AddContext(si.RequestID, "handler", handler.Name()) var pipeline = handler.Name() diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index a0b8bf64d..b7130e61e 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -194,6 +194,7 @@ var sourcePlaybackBucketBlocklist = []string{"lp-us-catalyst-vod-pvt-monster", " const maxBitrateSourcePb = 6_000_000 func (f *ffmpeg) sendSourcePlayback(job *JobInfo) { + // TODO no source playback for hevc / background livepeer pipeline for _, track := range job.InputFileInfo.Tracks { if track.Bitrate > maxBitrateSourcePb { log.Log(job.RequestID, "source playback not available, bitrate too high", "bitrate", track.Bitrate)