Skip to content

Commit

Permalink
Background ffmpeg pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Aug 9, 2024
1 parent f73e36e commit 096601b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
35 changes: 26 additions & 9 deletions pipeline/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
// Execute the FFMPEG pipeline first and fallback to the external transcoding
// provider on errors.
StrategyFallbackExternal Strategy = "fallback_external"
// Execute the external transcoder pipeline in foreground and FFMPEG / Livepeer in background.
StrategyBackgroundFFMpeg Strategy = "background_ffmpeg"
// Only mp4s of maxMP4OutDuration will have MP4s generated for each rendition
maxMP4OutDuration = 2 * time.Minute
maxRecordingMP4Duration = 12 * time.Hour
Expand All @@ -48,7 +50,7 @@ const (

func (s Strategy) IsValid() bool {
switch s {
case StrategyExternalDominance, StrategyCatalystFfmpegDominance, StrategyFallbackExternal:
case StrategyExternalDominance, StrategyCatalystFfmpegDominance, StrategyFallbackExternal, StrategyBackgroundFFMpeg:
return true
default:
return false
Expand Down Expand Up @@ -417,17 +419,20 @@ func (c *Coordinator) startUploadJob(p *JobInfo) {

switch strategy {
case StrategyExternalDominance:
c.startOneUploadJob(p, c.pipeExternal, false)
c.startOneUploadJob(p, c.pipeExternal, true, false)
case StrategyCatalystFfmpegDominance:
c.startOneUploadJob(p, c.pipeFfmpeg, false)
c.startOneUploadJob(p, c.pipeFfmpeg, true, false)
case StrategyBackgroundFFMpeg:
c.startOneUploadJob(p, c.pipeExternal, true, false)
c.startOneUploadJob(p, c.pipeFfmpeg, false, false)
case StrategyFallbackExternal:
// nolint:errcheck
go recovered(func() (t bool, e error) {
success := <-c.startOneUploadJob(p, c.pipeFfmpeg, true)
success := <-c.startOneUploadJob(p, c.pipeFfmpeg, true, true)
if !success {
p.inFallbackMode = true
log.Log(p.RequestID, "Entering fallback pipeline")
c.startOneUploadJob(p, c.pipeExternal, false)
c.startOneUploadJob(p, c.pipeExternal, true, false)
}
return
})
Expand All @@ -445,9 +450,13 @@ func checkLivepeerCompatible(requestID string, strategy Strategy, iv video.Input
for _, track := range iv.Tracks {
// If the video codec is not compatible then override to external pipeline to avoid sending to Livepeer
// We always covert the audio to AAC before sending for transcoding, so don't need to check this
if track.Type == video.TrackTypeVideo && strings.ToLower(track.Codec) != "h264" {
log.Log(requestID, "codec not supported by Livepeer pipeline", "trackType", track.Type, "codec", track.Codec)
return livepeerNotSupported(strategy)
if track.Type == video.TrackTypeVideo {
if strings.ToLower(track.Codec) == "hevc" {
strategy = StrategyBackgroundFFMpeg
} else if strings.ToLower(track.Codec) != "h264" {
log.Log(requestID, "codec not supported by Livepeer pipeline", "trackType", track.Type, "codec", track.Codec)
return livepeerNotSupported(strategy)
}
}
if track.Type == video.TrackTypeVideo && track.Rotation != 0 {
log.Log(requestID, "video rotation not supported by Livepeer pipeline", "rotation", track.Rotation)
Expand Down Expand Up @@ -507,7 +516,15 @@ func checkDisplayAspectRatio(track video.InputTrack, requestID string) bool {
// The `hasFallback` argument means the caller has a special logic to handle
// failures (today this means re-running the job in another pipeline). If it's
// set to true, error callbacks from this job will not be sent.
func (c *Coordinator) startOneUploadJob(si *JobInfo, handler Handler, hasFallback bool) <-chan bool {
func (c *Coordinator) startOneUploadJob(si *JobInfo, handler Handler, foreground, hasFallback bool) <-chan bool {
if !foreground {
si.RequestID = fmt.Sprintf("bg_%s", si.RequestID)
if si.HlsTargetURL != nil {
si.HlsTargetURL = si.HlsTargetURL.JoinPath("..", handler.Name(), path.Base(si.HlsTargetURL.Path))
}
// this will prevent the callbacks for this job from actually being sent
si.CallbackURL = ""
}
log.AddContext(si.RequestID, "handler", handler.Name())

var pipeline = handler.Name()
Expand Down
1 change: 1 addition & 0 deletions pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ var sourcePlaybackBucketBlocklist = []string{"lp-us-catalyst-vod-pvt-monster", "
const maxBitrateSourcePb = 6_000_000

func (f *ffmpeg) sendSourcePlayback(job *JobInfo) {
// TODO no source playback for hevc / background livepeer pipeline
for _, track := range job.InputFileInfo.Tracks {
if track.Bitrate > maxBitrateSourcePb {
log.Log(job.RequestID, "source playback not available, bitrate too high", "bitrate", track.Bitrate)
Expand Down

0 comments on commit 096601b

Please sign in to comment.