From 60762ef1b7395a6b64e520c5c9e4c18aa488f0cc Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 8 Sep 2023 12:15:25 -0700 Subject: [PATCH 1/2] cherry picked a/v bin updates (#483) --- pkg/pipeline/builder/audio.go | 219 ++++++++------ pkg/pipeline/builder/video.go | 531 +++++++++++++++++++--------------- pkg/pipeline/controller.go | 13 +- 3 files changed, 429 insertions(+), 334 deletions(-) diff --git a/pkg/pipeline/builder/audio.go b/pkg/pipeline/builder/audio.go index 55c101d3..7a3901ce 100644 --- a/pkg/pipeline/builder/audio.go +++ b/pkg/pipeline/builder/audio.go @@ -16,6 +16,7 @@ package builder import ( "fmt" + "sync" "github.com/tinyzimmer/go-gst/gst" @@ -23,65 +24,108 @@ import ( "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/gstreamer" "github.com/livekit/egress/pkg/types" + lksdk "github.com/livekit/server-sdk-go" ) const audioMixerLatency = uint64(2e9) -func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*gstreamer.Bin, error) { - b := pipeline.NewBin("audio") +type AudioBin struct { + bin *gstreamer.Bin + conf *config.PipelineConfig + + mu sync.Mutex + tracks map[string]struct{} +} + +func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error { + b := &AudioBin{ + bin: pipeline.NewBin("audio"), + conf: p, + tracks: make(map[string]struct{}), + } switch p.SourceType { - case types.SourceTypeSDK: - if err := buildSDKAudioInput(b, p); err != nil { - return nil, err + case types.SourceTypeWeb: + if err := b.buildWebInput(); err != nil { + return err } - case types.SourceTypeWeb: - if err := buildWebAudioInput(b, p); err != nil { - return nil, err + case types.SourceTypeSDK: + if err := b.buildSDKInput(); err != nil { + return err } + + pipeline.AddOnTrackAdded(b.onTrackAdded) + pipeline.AddOnTrackRemoved(b.onTrackRemoved) } if len(p.Outputs) > 1 { tee, err := gst.NewElementWithName("tee", "audio_tee") if err != nil { - return nil, err + return err } - - if err = b.AddElement(tee); err != nil { - return nil, err + if err = b.bin.AddElement(tee); err != nil { + return err } } else { queue, err := gstreamer.BuildQueue("audio_queue", p.Latency, true) if err != nil { - return nil, errors.ErrGstPipelineError(err) + return errors.ErrGstPipelineError(err) } - if err = b.AddElement(queue); err != nil { - return nil, err + if err = b.bin.AddElement(queue); err != nil { + return err } } - return b, nil + return pipeline.AddSourceBin(b.bin) +} + +func (b *AudioBin) onTrackAdded(ts *config.TrackSource) { + if b.bin.GetState() > gstreamer.StateRunning { + return + } + + if ts.Kind == lksdk.TrackKindAudio { + if err := b.addAudioAppSrcBin(ts); err != nil { + b.bin.OnError(err) + } + } } -func buildWebAudioInput(b *gstreamer.Bin, p *config.PipelineConfig) error { +func (b *AudioBin) onTrackRemoved(trackID string) { + if b.bin.GetState() > gstreamer.StateRunning { + return + } + + b.mu.Lock() + _, ok := b.tracks[trackID] + delete(b.tracks, trackID) + b.mu.Unlock() + + if ok { + if _, err := b.bin.RemoveSourceBin(trackID); err != nil { + b.bin.OnError(err) + } + } +} + +func (b *AudioBin) buildWebInput() error { pulseSrc, err := gst.NewElement("pulsesrc") if err != nil { return errors.ErrGstPipelineError(err) } - if err = pulseSrc.SetProperty("device", fmt.Sprintf("%s.monitor", p.Info.EgressId)); err != nil { + if err = pulseSrc.SetProperty("device", fmt.Sprintf("%s.monitor", b.conf.Info.EgressId)); err != nil { return errors.ErrGstPipelineError(err) } - if err = b.AddElement(pulseSrc); err != nil { + if err = b.bin.AddElement(pulseSrc); err != nil { return err } - if err = addAudioConverter(b, p); err != nil { + if err = addAudioConverter(b.bin, b.conf); err != nil { return err } - - if p.AudioTranscoding { - if err = addAudioEncoder(b, p); err != nil { + if b.conf.AudioTranscoding { + if err = b.addEncoder(); err != nil { return err } } @@ -89,20 +133,20 @@ func buildWebAudioInput(b *gstreamer.Bin, p *config.PipelineConfig) error { return nil } -func buildSDKAudioInput(b *gstreamer.Bin, p *config.PipelineConfig) error { - if p.AudioTrack != nil { - if err := buildAudioAppSrcBin(b, p); err != nil { +func (b *AudioBin) buildSDKInput() error { + if b.conf.AudioTrack != nil { + if err := b.addAudioAppSrcBin(b.conf.AudioTrack); err != nil { return err } } - if err := buildAudioTestSrcBin(b, p); err != nil { + if err := b.addAudioTestSrcBin(); err != nil { return err } - if err := addAudioMixer(b, p); err != nil { + if err := b.addMixer(); err != nil { return err } - if p.AudioTranscoding { - if err := addAudioEncoder(b, p); err != nil { + if b.conf.AudioTranscoding { + if err := b.addEncoder(); err != nil { return err } } @@ -110,30 +154,29 @@ func buildSDKAudioInput(b *gstreamer.Bin, p *config.PipelineConfig) error { return nil } -func buildAudioAppSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) error { - track := p.AudioTrack +func (b *AudioBin) addAudioAppSrcBin(ts *config.TrackSource) error { + b.mu.Lock() + defer b.mu.Unlock() + + b.tracks[ts.TrackID] = struct{}{} - b := audioBin.NewBin(track.TrackID) - b.SetEOSFunc(func() bool { + appSrcBin := b.bin.NewBin(ts.TrackID) + appSrcBin.SetEOSFunc(func() bool { return false }) - if err := audioBin.AddSourceBin(b); err != nil { + ts.AppSrc.Element.SetArg("format", "time") + if err := ts.AppSrc.Element.SetProperty("is-live", true); err != nil { return err } - - track.AppSrc.Element.SetArg("format", "time") - if err := track.AppSrc.Element.SetProperty("is-live", true); err != nil { - return err - } - if err := b.AddElement(track.AppSrc.Element); err != nil { + if err := appSrcBin.AddElement(ts.AppSrc.Element); err != nil { return err } - switch track.MimeType { + switch ts.MimeType { case types.MimeTypeOpus: - if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + if err := ts.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( "application/x-rtp,media=audio,payload=%d,encoding-name=OPUS,clock-rate=%d", - track.PayloadType, track.ClockRate, + ts.PayloadType, ts.ClockRate, ))); err != nil { return errors.ErrGstPipelineError(err) } @@ -148,24 +191,28 @@ func buildAudioAppSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) erro return errors.ErrGstPipelineError(err) } - if err = b.AddElements(rtpOpusDepay, opusDec); err != nil { + if err = appSrcBin.AddElements(rtpOpusDepay, opusDec); err != nil { return err } default: - return errors.ErrNotSupported(string(track.MimeType)) + return errors.ErrNotSupported(string(ts.MimeType)) } - if err := addAudioConverter(b, p); err != nil { + if err := addAudioConverter(appSrcBin, b.conf); err != nil { + return err + } + + if err := b.bin.AddSourceBin(appSrcBin); err != nil { return err } return nil } -func buildAudioTestSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) error { - b := audioBin.NewBin("audio_test_src") - if err := audioBin.AddSourceBin(b); err != nil { +func (b *AudioBin) addAudioTestSrcBin() error { + testSrcBin := b.bin.NewBin("audio_test_src") + if err := b.bin.AddSourceBin(testSrcBin); err != nil { return err } @@ -183,39 +230,15 @@ func buildAudioTestSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) err return errors.ErrGstPipelineError(err) } - audioCaps, err := newAudioCapsFilter(p) - if err != nil { - return err - } - - return b.AddElements(audioTestSrc, audioCaps) -} - -func addAudioConverter(b *gstreamer.Bin, p *config.PipelineConfig) error { - audioQueue, err := gstreamer.BuildQueue("audio_input_queue", p.Latency, true) - if err != nil { - return err - } - - audioConvert, err := gst.NewElement("audioconvert") - if err != nil { - return errors.ErrGstPipelineError(err) - } - - audioResample, err := gst.NewElement("audioresample") - if err != nil { - return errors.ErrGstPipelineError(err) - } - - capsFilter, err := newAudioCapsFilter(p) + audioCaps, err := newAudioCapsFilter(b.conf) if err != nil { return err } - return b.AddElements(audioQueue, audioConvert, audioResample, capsFilter) + return testSrcBin.AddElements(audioTestSrc, audioCaps) } -func addAudioMixer(b *gstreamer.Bin, p *config.PipelineConfig) error { +func (b *AudioBin) addMixer() error { audioMixer, err := gst.NewElement("audiomixer") if err != nil { return errors.ErrGstPipelineError(err) @@ -224,44 +247,68 @@ func addAudioMixer(b *gstreamer.Bin, p *config.PipelineConfig) error { return errors.ErrGstPipelineError(err) } - mixedCaps, err := newAudioCapsFilter(p) + mixedCaps, err := newAudioCapsFilter(b.conf) if err != nil { return err } - return b.AddElements(audioMixer, mixedCaps) + return b.bin.AddElements(audioMixer, mixedCaps) } -func addAudioEncoder(b *gstreamer.Bin, p *config.PipelineConfig) error { - switch p.AudioOutCodec { +func (b *AudioBin) addEncoder() error { + switch b.conf.AudioOutCodec { case types.MimeTypeOpus: opusEnc, err := gst.NewElement("opusenc") if err != nil { return errors.ErrGstPipelineError(err) } - if err = opusEnc.SetProperty("bitrate", int(p.AudioBitrate*1000)); err != nil { + if err = opusEnc.SetProperty("bitrate", int(b.conf.AudioBitrate*1000)); err != nil { return errors.ErrGstPipelineError(err) } - return b.AddElement(opusEnc) + return b.bin.AddElement(opusEnc) case types.MimeTypeAAC: faac, err := gst.NewElement("faac") if err != nil { return errors.ErrGstPipelineError(err) } - if err = faac.SetProperty("bitrate", int(p.AudioBitrate*1000)); err != nil { + if err = faac.SetProperty("bitrate", int(b.conf.AudioBitrate*1000)); err != nil { return errors.ErrGstPipelineError(err) } - return b.AddElement(faac) + return b.bin.AddElement(faac) case types.MimeTypeRawAudio: return nil default: - return errors.ErrNotSupported(string(p.AudioOutCodec)) + return errors.ErrNotSupported(string(b.conf.AudioOutCodec)) } } +func addAudioConverter(b *gstreamer.Bin, p *config.PipelineConfig) error { + audioQueue, err := gstreamer.BuildQueue("audio_input_queue", p.Latency, true) + if err != nil { + return err + } + + audioConvert, err := gst.NewElement("audioconvert") + if err != nil { + return errors.ErrGstPipelineError(err) + } + + audioResample, err := gst.NewElement("audioresample") + if err != nil { + return errors.ErrGstPipelineError(err) + } + + capsFilter, err := newAudioCapsFilter(p) + if err != nil { + return err + } + + return b.AddElements(audioQueue, audioConvert, audioResample, capsFilter) +} + func newAudioCapsFilter(p *config.PipelineConfig) (*gst.Element, error) { var caps *gst.Caps switch p.AudioOutCodec { diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index ffa27020..8a60eeac 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -33,49 +33,97 @@ import ( const videoTestSrcName = "video_test_src" -func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*gstreamer.Bin, error) { - b := pipeline.NewBin("video") +type VideoBin struct { + bin *gstreamer.Bin + conf *config.PipelineConfig + + lastPTS atomic.Duration + nextPTS atomic.Duration + selectedPad string + nextPad string + + mu sync.Mutex + pads map[string]*gst.Pad + selector *gst.Element +} + +func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error { + b := &VideoBin{ + bin: pipeline.NewBin("video"), + conf: p, + } switch p.SourceType { - case types.SourceTypeSDK: - if err := buildSDKVideoInput(b, p); err != nil { - return nil, err + case types.SourceTypeWeb: + if err := b.buildWebInput(); err != nil { + return err } - case types.SourceTypeWeb: - if err := buildWebVideoInput(b, p); err != nil { - return nil, err + case types.SourceTypeSDK: + if err := b.buildSDKInput(); err != nil { + return err } + + pipeline.AddOnTrackMuted(b.onTrackMuted) + pipeline.AddOnTrackUnmuted(b.onTrackUnmuted) } if len(p.Outputs) > 1 { tee, err := gst.NewElementWithName("tee", "video_tee") if err != nil { - return nil, err + return err } - if err = b.AddElement(tee); err != nil { - return nil, err + if err = b.bin.AddElement(tee); err != nil { + return err } } else { queue, err := gstreamer.BuildQueue("video_queue", p.Latency, true) if err != nil { - return nil, errors.ErrGstPipelineError(err) + return errors.ErrGstPipelineError(err) } - if err = b.AddElement(queue); err != nil { - return nil, err + if err = b.bin.AddElement(queue); err != nil { + return err } } - return b, nil + return pipeline.AddSourceBin(b.bin) } -func buildWebVideoInput(b *gstreamer.Bin, p *config.PipelineConfig) error { +func (b *VideoBin) onTrackMuted(trackID string) { + if b.bin.GetState() > gstreamer.StateRunning { + return + } + + if b.selectedPad == trackID { + if err := b.setSelectorPad(videoTestSrcName); err != nil { + logger.Errorw("failed to set selector pad", err) + } + } +} + +func (b *VideoBin) onTrackUnmuted(trackID string, pts time.Duration) { + if b.bin.GetState() > gstreamer.StateRunning { + return + } + + b.mu.Lock() + defer b.mu.Unlock() + + if b.pads[trackID] == nil { + return + } + + b.nextPTS.Store(pts) + b.nextPad = trackID +} + +func (b *VideoBin) buildWebInput() error { xImageSrc, err := gst.NewElement("ximagesrc") if err != nil { return errors.ErrGstPipelineError(err) } - if err = xImageSrc.SetProperty("display-name", p.Display); err != nil { + if err = xImageSrc.SetProperty("display-name", b.conf.Display); err != nil { return errors.ErrGstPipelineError(err) } if err = xImageSrc.SetProperty("use-damage", false); err != nil { @@ -85,7 +133,7 @@ func buildWebVideoInput(b *gstreamer.Bin, p *config.PipelineConfig) error { return errors.ErrGstPipelineError(err) } - videoQueue, err := gstreamer.BuildQueue("video_input_queue", p.Latency, true) + videoQueue, err := gstreamer.BuildQueue("video_input_queue", b.conf.Latency, true) if err != nil { return err } @@ -101,71 +149,63 @@ func buildWebVideoInput(b *gstreamer.Bin, p *config.PipelineConfig) error { } if err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( "video/x-raw,framerate=%d/1", - p.Framerate, + b.conf.Framerate, ), )); err != nil { return errors.ErrGstPipelineError(err) } - if err = b.AddElements(xImageSrc, videoQueue, videoConvert, caps); err != nil { + if err = b.bin.AddElements(xImageSrc, videoQueue, videoConvert, caps); err != nil { return err } - if p.VideoTranscoding { - if err = addVideoEncoder(b, p); err != nil { + if b.conf.VideoTranscoding { + if err = b.addEncoder(); err != nil { return err } } return nil } -type videoSDKBin struct { - lastPTS atomic.Duration - nextPTS atomic.Duration - selectedPad string - nextPad string +func (b *VideoBin) buildSDKInput() error { + b.pads = make(map[string]*gst.Pad) - mu sync.Mutex - pads map[string]*gst.Pad - selector *gst.Element -} - -func buildSDKVideoInput(b *gstreamer.Bin, p *config.PipelineConfig) error { - v := &videoSDKBin{ - pads: make(map[string]*gst.Pad), - } - - if p.VideoTrack != nil { - if err := v.buildVideoAppSrcBin(b, p); err != nil { + // add selector first so pads can be created + if b.conf.VideoTranscoding { + if err := b.addSelector(); err != nil { return err } } - if p.VideoTranscoding { - if err := v.buildVideoTestSrcBin(b, p); err != nil { - return err - } - if err := v.addVideoSelector(b, p); err != nil { + if b.conf.VideoTrack != nil { + if err := b.addAppSrcBin(b.conf.VideoTrack); err != nil { return err } + } - v.createTestSrcPad() - if p.VideoTrack != nil { - v.createSrcPad(p.VideoTrack.TrackID) - if err := v.setSelectorPad(p.VideoTrack.TrackID); err != nil { - return err + if b.conf.VideoTranscoding { + b.bin.SetGetSrcPad(b.getSrcPad) + b.bin.SetEOSFunc(func() bool { + b.mu.Lock() + selected := b.selectedPad + pad := b.pads[videoTestSrcName] + b.mu.Unlock() + + if selected == videoTestSrcName { + pad.SendEvent(gst.NewEOSEvent()) } - } else { - if err := v.setSelectorPad(videoTestSrcName); err != nil { + return false + }) + + if err := b.addVideoTestSrcBin(); err != nil { + return err + } + if b.conf.VideoTrack == nil { + if err := b.setSelectorPad(videoTestSrcName); err != nil { return err } } - - b.SetGetSrcPad(v.getSrcPad) - b.Callbacks.AddOnTrackMuted(v.onTrackMuted) - b.Callbacks.AddOnTrackUnmuted(v.onTrackUnmuted) - - if err := addVideoEncoder(b, p); err != nil { + if err := b.addEncoder(); err != nil { return err } } @@ -173,261 +213,263 @@ func buildSDKVideoInput(b *gstreamer.Bin, p *config.PipelineConfig) error { return nil } -func (v *videoSDKBin) buildVideoAppSrcBin(videoBin *gstreamer.Bin, p *config.PipelineConfig) error { - track := p.VideoTrack - - b := videoBin.NewBin(track.TrackID) - b.SetEOSFunc(func() bool { - return false - }) - if err := videoBin.AddSourceBin(b); err != nil { +func (b *VideoBin) addAppSrcBin(ts *config.TrackSource) error { + appSrcBin, err := b.buildAppSrcBin(ts) + if err != nil { return err } - track.AppSrc.Element.SetArg("format", "time") - if err := track.AppSrc.Element.SetProperty("is-live", true); err != nil { - return errors.ErrGstPipelineError(err) + if b.conf.VideoTranscoding { + b.createSrcPad(ts.TrackID) } - if err := b.AddElement(track.AppSrc.Element); err != nil { + + if err = b.bin.AddSourceBin(appSrcBin); err != nil { return err } - switch track.MimeType { + if b.conf.VideoTranscoding { + return b.setSelectorPad(ts.TrackID) + } + + return nil +} + +func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource) (*gstreamer.Bin, error) { + appSrcBin := b.bin.NewBin(ts.TrackID) + ts.AppSrc.Element.SetArg("format", "time") + if err := ts.AppSrc.Element.SetProperty("is-live", true); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + if err := appSrcBin.AddElement(ts.AppSrc.Element); err != nil { + return nil, err + } + + switch ts.MimeType { case types.MimeTypeH264: - if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + if err := ts.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( "application/x-rtp,media=video,payload=%d,encoding-name=H264,clock-rate=%d", - track.PayloadType, track.ClockRate, + ts.PayloadType, ts.ClockRate, ))); err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } rtpH264Depay, err := gst.NewElement("rtph264depay") if err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } caps, err := gst.NewElement("capsfilter") if err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } if err = caps.SetProperty("caps", gst.NewCapsFromString( "video/x-h264,stream-format=byte-stream", )); err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } - if err = b.AddElements(rtpH264Depay, caps); err != nil { - return err + if err = appSrcBin.AddElements(rtpH264Depay, caps); err != nil { + return nil, err } - if p.VideoTranscoding { + if b.conf.VideoTranscoding { avDecH264, err := gst.NewElement("avdec_h264") if err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } - if err = b.AddElement(avDecH264); err != nil { - return err + if err = appSrcBin.AddElement(avDecH264); err != nil { + return nil, err } } else { h264Parse, err := gst.NewElement("h264parse") if err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } - if err = b.AddElement(h264Parse); err != nil { - return err + if err = appSrcBin.AddElement(h264Parse); err != nil { + return nil, err } - return nil + return appSrcBin, nil } case types.MimeTypeVP8: - if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + if err := ts.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( "application/x-rtp,media=video,payload=%d,encoding-name=VP8,clock-rate=%d", - track.PayloadType, track.ClockRate, + ts.PayloadType, ts.ClockRate, ))); err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } rtpVP8Depay, err := gst.NewElement("rtpvp8depay") if err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } - if err = b.AddElement(rtpVP8Depay); err != nil { - return err + if err = appSrcBin.AddElement(rtpVP8Depay); err != nil { + return nil, err } - if p.VideoTranscoding { + if b.conf.VideoTranscoding { vp8Dec, err := gst.NewElement("vp8dec") if err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } - if err = b.AddElement(vp8Dec); err != nil { - return err + if err = appSrcBin.AddElement(vp8Dec); err != nil { + return nil, err } } else { - return nil + return appSrcBin, nil } case types.MimeTypeVP9: - if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + if err := ts.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( "application/x-rtp,media=video,payload=%d,encoding-name=VP9,clock-rate=%d", - track.PayloadType, track.ClockRate, + ts.PayloadType, ts.ClockRate, ))); err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } rtpVP9Depay, err := gst.NewElement("rtpvp9depay") if err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } - if err = b.AddElement(rtpVP9Depay); err != nil { - return err + if err = appSrcBin.AddElement(rtpVP9Depay); err != nil { + return nil, err } - if p.VideoTranscoding { + if b.conf.VideoTranscoding { vp9Dec, err := gst.NewElement("vp9dec") if err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } - if err = b.AddElement(vp9Dec); err != nil { - return err + if err = appSrcBin.AddElement(vp9Dec); err != nil { + return nil, err } } else { vp9Parse, err := gst.NewElement("vp9parse") if err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } vp9Caps, err := gst.NewElement("capsfilter") if err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } if err = vp9Caps.SetProperty("caps", gst.NewCapsFromString( "video/x-vp9,width=[16,2147483647],height=[16,2147483647]", )); err != nil { - return errors.ErrGstPipelineError(err) + return nil, errors.ErrGstPipelineError(err) } - if err = b.AddElements(vp9Parse, vp9Caps); err != nil { - return err + if err = appSrcBin.AddElements(vp9Parse, vp9Caps); err != nil { + return nil, err } - return nil + + return appSrcBin, nil } default: - return errors.ErrNotSupported(string(track.MimeType)) + return nil, errors.ErrNotSupported(string(ts.MimeType)) } - videoQueue, err := gstreamer.BuildQueue("video_input_queue", p.Latency, true) - if err != nil { - return err + if err := addVideoConverter(appSrcBin, b.conf); err != nil { + return nil, err } - videoConvert, err := gst.NewElement("videoconvert") - if err != nil { - return errors.ErrGstPipelineError(err) - } + return appSrcBin, nil +} - videoScale, err := gst.NewElement("videoscale") - if err != nil { - return errors.ErrGstPipelineError(err) +func (b *VideoBin) addVideoTestSrcBin() error { + testSrcBin := b.bin.NewBin(videoTestSrcName) + if err := b.bin.AddSourceBin(testSrcBin); err != nil { + return err } - videoRate, err := gst.NewElement("videorate") + videoTestSrc, err := gst.NewElement("videotestsrc") if err != nil { return errors.ErrGstPipelineError(err) } - if err = videoRate.SetProperty("max-duplication-time", uint64(time.Second)); err != nil { - return err + if err = videoTestSrc.SetProperty("is-live", true); err != nil { + return errors.ErrGstPipelineError(err) } + videoTestSrc.SetArg("pattern", "black") - caps, err := newVideoCapsFilter(p, false) + caps, err := newVideoCapsFilter(b.conf, true) if err != nil { return errors.ErrGstPipelineError(err) } - if err = b.AddElements(videoQueue, videoConvert, videoScale, videoRate, caps); err != nil { + if err = testSrcBin.AddElements(videoTestSrc, caps); err != nil { return err } + b.createTestSrcPad() return nil } -func (v *videoSDKBin) buildVideoTestSrcBin(videoBin *gstreamer.Bin, p *config.PipelineConfig) error { - b := videoBin.NewBin(videoTestSrcName) - if err := videoBin.AddSourceBin(b); err != nil { - return err - } - - videoTestSrc, err := gst.NewElement("videotestsrc") +func (b *VideoBin) addSelector() error { + inputSelector, err := gst.NewElement("input-selector") if err != nil { return errors.ErrGstPipelineError(err) } - if err = videoTestSrc.SetProperty("is-live", true); err != nil { - return errors.ErrGstPipelineError(err) - } - videoTestSrc.SetArg("pattern", "black") - caps, err := newVideoCapsFilter(p, true) + videoRate, err := gst.NewElement("videorate") if err != nil { return errors.ErrGstPipelineError(err) } - - if err = b.AddElements(videoTestSrc, caps); err != nil { + if err = videoRate.SetProperty("max-duplication-time", uint64(time.Second)); err != nil { + return err + } + if err = videoRate.SetProperty("skip-to-first", true); err != nil { return err } - return nil -} - -func (v *videoSDKBin) addVideoSelector(b *gstreamer.Bin, p *config.PipelineConfig) error { - inputSelector, err := gst.NewElement("input-selector") + caps, err := newVideoCapsFilter(b.conf, true) if err != nil { return errors.ErrGstPipelineError(err) } - if err = b.AddElements(inputSelector); err != nil { + if err = b.bin.AddElements(inputSelector, videoRate, caps); err != nil { return err } - v.selector = inputSelector + b.selector = inputSelector return nil } -func addVideoEncoder(b *gstreamer.Bin, p *config.PipelineConfig) error { - videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", p.Latency, false) +func (b *VideoBin) addEncoder() error { + videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", b.conf.Latency, false) if err != nil { return err } - if err = b.AddElement(videoQueue); err != nil { + if err = b.bin.AddElement(videoQueue); err != nil { return err } - switch p.VideoOutCodec { + switch b.conf.VideoOutCodec { // we only encode h264, the rest are too slow case types.MimeTypeH264: x264Enc, err := gst.NewElement("x264enc") if err != nil { return errors.ErrGstPipelineError(err) } - if err = x264Enc.SetProperty("bitrate", uint(p.VideoBitrate)); err != nil { + if err = x264Enc.SetProperty("bitrate", uint(b.conf.VideoBitrate)); err != nil { return errors.ErrGstPipelineError(err) } x264Enc.SetArg("speed-preset", "veryfast") - if p.KeyFrameInterval != 0 { - if err = x264Enc.SetProperty("key-int-max", uint(p.KeyFrameInterval*float64(p.Framerate))); err != nil { + if b.conf.KeyFrameInterval != 0 { + if err = x264Enc.SetProperty("key-int-max", uint(b.conf.KeyFrameInterval*float64(b.conf.Framerate))); err != nil { return errors.ErrGstPipelineError(err) } } bufCapacity := uint(2000) // 2s - if p.GetSegmentConfig() != nil { + if b.conf.GetSegmentConfig() != nil { // avoid key frames other than at segments boundaries as splitmuxsink can become inconsistent otherwise if err = x264Enc.SetProperty("option-string", "scenecut=0"); err != nil { return errors.ErrGstPipelineError(err) } - bufCapacity = uint(time.Duration(p.GetSegmentConfig().SegmentDuration) * (time.Second / time.Millisecond)) + bufCapacity = uint(time.Duration(b.conf.GetSegmentConfig().SegmentDuration) * (time.Second / time.Millisecond)) } if err = x264Enc.SetProperty("vbv-buf-capacity", bufCapacity); err != nil { return err @@ -439,12 +481,12 @@ func addVideoEncoder(b *gstreamer.Bin, p *config.PipelineConfig) error { } if err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( "video/x-h264,profile=%s", - p.VideoProfile, + b.conf.VideoProfile, ))); err != nil { return errors.ErrGstPipelineError(err) } - if err = b.AddElements(x264Enc, caps); err != nil { + if err = b.bin.AddElements(x264Enc, caps); err != nil { return err } @@ -476,97 +518,134 @@ func addVideoEncoder(b *gstreamer.Bin, p *config.PipelineConfig) error { if err = vp9Enc.SetProperty("min-quantizer", 2); err != nil { return errors.ErrGstPipelineError(err) } - if err = b.AddElement(vp9Enc); err != nil { + if err = b.bin.AddElement(vp9Enc); err != nil { return err } fallthrough default: - return errors.ErrNotSupported(fmt.Sprintf("%s encoding", p.VideoOutCodec)) + return errors.ErrNotSupported(fmt.Sprintf("%s encoding", b.conf.VideoOutCodec)) + } +} + +func addVideoConverter(b *gstreamer.Bin, p *config.PipelineConfig) error { + videoQueue, err := gstreamer.BuildQueue("video_input_queue", p.Latency, true) + if err != nil { + return err + } + + videoConvert, err := gst.NewElement("videoconvert") + if err != nil { + return errors.ErrGstPipelineError(err) + } + + videoScale, err := gst.NewElement("videoscale") + if err != nil { + return errors.ErrGstPipelineError(err) + } + + videoRate, err := gst.NewElement("videorate") + if err != nil { + return errors.ErrGstPipelineError(err) + } + if err = videoRate.SetProperty("max-duplication-time", uint64(time.Second)); err != nil { + return err + } + if err = videoRate.SetProperty("skip-to-first", true); err != nil { + return err + } + + caps, err := newVideoCapsFilter(p, true) + if err != nil { + return errors.ErrGstPipelineError(err) } + + return b.AddElements(videoQueue, videoConvert, videoScale, videoRate, caps) } -func (v *videoSDKBin) getSrcPad(name string) *gst.Pad { - v.mu.Lock() - defer v.mu.Unlock() +func newVideoCapsFilter(p *config.PipelineConfig, includeFramerate bool) (*gst.Element, error) { + caps, err := gst.NewElement("capsfilter") + if err != nil { + return nil, errors.ErrGstPipelineError(err) + } + if includeFramerate { + err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + "video/x-raw,framerate=%d/1,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1", + p.Framerate, p.Width, p.Height, + ))) + } else { + err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + "video/x-raw,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1", + p.Width, p.Height, + ))) + } + if err != nil { + return nil, errors.ErrGstPipelineError(err) + } + return caps, nil +} + +func (b *VideoBin) getSrcPad(name string) *gst.Pad { + b.mu.Lock() + defer b.mu.Unlock() - return v.pads[name] + return b.pads[name] } -func (v *videoSDKBin) createSrcPad(trackID string) { - v.mu.Lock() - defer v.mu.Unlock() +func (b *VideoBin) createSrcPad(trackID string) { + b.mu.Lock() + defer b.mu.Unlock() - pad := v.selector.GetRequestPad("sink_%u") + pad := b.selector.GetRequestPad("sink_%u") pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { buffer := info.GetBuffer() - for v.nextPTS.Load() != 0 { + for b.nextPTS.Load() != 0 { time.Sleep(time.Millisecond * 100) } - if buffer.PresentationTimestamp() < v.lastPTS.Load() { + if buffer.PresentationTimestamp() < b.lastPTS.Load() { return gst.PadProbeDrop } - v.lastPTS.Store(buffer.PresentationTimestamp()) + b.lastPTS.Store(buffer.PresentationTimestamp()) return gst.PadProbeOK }) - v.pads[trackID] = pad + b.pads[trackID] = pad } -func (v *videoSDKBin) createTestSrcPad() { - v.mu.Lock() - defer v.mu.Unlock() +func (b *VideoBin) createTestSrcPad() { + b.mu.Lock() + defer b.mu.Unlock() - pad := v.selector.GetRequestPad("sink_%u") + pad := b.selector.GetRequestPad("sink_%u") pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { buffer := info.GetBuffer() - if buffer.PresentationTimestamp() < v.lastPTS.Load() { + if buffer.PresentationTimestamp() < b.lastPTS.Load() { return gst.PadProbeDrop } - if nextPTS := v.nextPTS.Load(); nextPTS != 0 && buffer.PresentationTimestamp() >= nextPTS { - if err := v.setSelectorPad(v.nextPad); err != nil { + if nextPTS := b.nextPTS.Load(); nextPTS != 0 && buffer.PresentationTimestamp() >= nextPTS { + if err := b.setSelectorPad(b.nextPad); err != nil { logger.Errorw("failed to unmute", err) return gst.PadProbeDrop } - v.nextPad = "" - v.nextPTS.Store(0) + b.nextPad = "" + b.nextPTS.Store(0) + } + if b.selectedPad == videoTestSrcName { + b.lastPTS.Store(buffer.PresentationTimestamp()) } - v.lastPTS.Store(buffer.PresentationTimestamp()) return gst.PadProbeOK }) - v.pads[videoTestSrcName] = pad -} - -func (v *videoSDKBin) onTrackMuted(trackID string) { - if v.selectedPad != trackID { - return - } - - if err := v.setSelectorPad(videoTestSrcName); err != nil { - logger.Errorw("failed to set selector pad", err) - } -} - -func (v *videoSDKBin) onTrackUnmuted(trackID string, pts time.Duration) { - v.mu.Lock() - defer v.mu.Unlock() - - if v.pads[trackID] == nil { - return - } - - v.nextPTS.Store(pts) - v.nextPad = trackID + b.pads[videoTestSrcName] = pad } // TODO: go-gst should accept objects directly and handle conversion to C -func (v *videoSDKBin) setSelectorPad(name string) error { - v.mu.Lock() - defer v.mu.Unlock() +func (b *VideoBin) setSelectorPad(name string) error { + b.mu.Lock() + defer b.mu.Unlock() - pad := v.pads[name] + pad := b.pads[name] - pt, err := v.selector.GetPropertyType("active-pad") + pt, err := b.selector.GetPropertyType("active-pad") if err != nil { return errors.ErrGstPipelineError(err) } @@ -577,32 +656,10 @@ func (v *videoSDKBin) setSelectorPad(name string) error { } val.SetInstance(uintptr(unsafe.Pointer(pad.Instance()))) - if err = v.selector.SetPropertyValue("active-pad", val); err != nil { + if err = b.selector.SetPropertyValue("active-pad", val); err != nil { return errors.ErrGstPipelineError(err) } - v.selectedPad = name + b.selectedPad = name return nil } - -func newVideoCapsFilter(p *config.PipelineConfig, includeFramerate bool) (*gst.Element, error) { - caps, err := gst.NewElement("capsfilter") - if err != nil { - return nil, errors.ErrGstPipelineError(err) - } - if includeFramerate { - err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( - "video/x-raw,framerate=%d/1,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1", - p.Framerate, p.Width, p.Height, - ))) - } else { - err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( - "video/x-raw,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1", - p.Width, p.Height, - ))) - } - if err != nil { - return nil, errors.ErrGstPipelineError(err) - } - return caps, nil -} diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 2f29fd41..2304282c 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -126,21 +126,12 @@ func (c *Controller) BuildPipeline() error { } if c.AudioEnabled { - audioBin, err := builder.BuildAudioBin(p, c.PipelineConfig) - if err != nil { - return err - } - if err = p.AddSourceBin(audioBin); err != nil { + if err := builder.BuildAudioBin(p, c.PipelineConfig); err != nil { return err } } - if c.VideoEnabled { - videoBin, err := builder.BuildVideoBin(p, c.PipelineConfig) - if err != nil { - return err - } - if err = p.AddSourceBin(videoBin); err != nil { + if err = builder.BuildVideoBin(p, c.PipelineConfig); err != nil { return err } } From 38d8b1ead91a17a66f7a8dd5bd5637a620b9fd92 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 8 Sep 2023 14:40:15 -0700 Subject: [PATCH 2/2] 1.7.8 --- version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version/version.go b/version/version.go index ee04fdbc..c1d2c1ae 100644 --- a/version/version.go +++ b/version/version.go @@ -14,4 +14,4 @@ package version -const Version = "1.7.7" +const Version = "1.7.8"