From 6e29a986e4840fb038b44ac0ff7011c8965d690f Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 9 Jul 2024 15:15:31 -0700 Subject: [PATCH] update input selector usage --- pkg/pipeline/builder/image.go | 3 -- pkg/pipeline/builder/video.go | 76 ++++++++++------------------------- 2 files changed, 21 insertions(+), 58 deletions(-) diff --git a/pkg/pipeline/builder/image.go b/pkg/pipeline/builder/image.go index 9eb4c678..e58f3cc1 100644 --- a/pkg/pipeline/builder/image.go +++ b/pkg/pipeline/builder/image.go @@ -82,9 +82,6 @@ func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *confi if err != nil { return nil, errors.ErrGstPipelineError(err) } - if err = videoRate.SetProperty("max-duplication-time", uint64(time.Duration(c.CaptureInterval)*time.Second)); err != nil { - return nil, err - } if err = videoRate.SetProperty("skip-to-first", true); err != nil { return nil, err } diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 3adb6ff7..cd167628 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -23,7 +23,6 @@ import ( "github.com/go-gst/go-glib/glib" "github.com/go-gst/go-gst/gst" - "go.uber.org/atomic" "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" @@ -41,13 +40,9 @@ type VideoBin struct { bin *gstreamer.Bin conf *config.PipelineConfig - lastPTS atomic.Duration - nextPTS atomic.Duration - selectedPad string - nextPad string - mu sync.Mutex nextID int + selectedPad string pads map[string]*gst.Pad names map[string]string selector *gst.Element @@ -180,12 +175,14 @@ func (b *VideoBin) onTrackUnmuted(trackID string, pts time.Duration) { } b.mu.Lock() - defer b.mu.Unlock() - if name, ok := b.names[trackID]; ok { - b.nextPTS.Store(pts) - b.nextPad = name + if err := b.setSelectorPadLocked(name); err != nil { + b.mu.Unlock() + b.bin.OnError(err) + return + } } + b.mu.Unlock() } func (b *VideoBin) buildWebInput() error { @@ -217,9 +214,6 @@ func (b *VideoBin) buildWebInput() error { if err != nil { return errors.ErrGstPipelineError(err) } - if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil { - return err - } if err = videoRate.SetProperty("skip-to-first", true); err != nil { return err } @@ -497,14 +491,14 @@ func (b *VideoBin) addSelector() error { if err != nil { return errors.ErrGstPipelineError(err) } + if err = inputSelector.SetProperty("drop-backwards", true); err != nil { + return errors.ErrGstPipelineError(err) + } videoRate, err := gst.NewElement("videorate") if err != nil { return errors.ErrGstPipelineError(err) } - if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil { - return err - } if err = videoRate.SetProperty("skip-to-first", true); err != nil { return err } @@ -663,9 +657,6 @@ func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error { if err != nil { return errors.ErrGstPipelineError(err) } - if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil { - return err - } if err = videoRate.SetProperty("skip-to-first", true); err != nil { return err } @@ -714,49 +705,15 @@ func (b *VideoBin) createSrcPad(trackID, name string) { b.mu.Lock() defer b.mu.Unlock() - pad := b.selector.GetRequestPad("sink_%u") - pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { - buffer := info.GetBuffer() - for b.nextPTS.Load() != 0 { - time.Sleep(time.Millisecond * 100) - } - pts := *buffer.PresentationTimestamp().AsDuration() - if pts < b.lastPTS.Load() { - return gst.PadProbeDrop - } - b.lastPTS.Store(pts) - return gst.PadProbeOK - }) - b.names[trackID] = name - b.pads[name] = pad + b.pads[name] = b.selector.GetRequestPad("sink_%u") } func (b *VideoBin) createTestSrcPad() { b.mu.Lock() defer b.mu.Unlock() - pad := b.selector.GetRequestPad("sink_%u") - pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { - buffer := info.GetBuffer() - pts := *buffer.PresentationTimestamp().AsDuration() - if pts < b.lastPTS.Load() { - return gst.PadProbeDrop - } - if nextPTS := b.nextPTS.Load(); nextPTS != 0 && pts >= nextPTS { - if err := b.setSelectorPad(b.nextPad); err != nil { - logger.Errorw("failed to unmute", err) - return gst.PadProbeDrop - } - b.nextPad = "" - b.nextPTS.Store(0) - } - if b.selectedPad == videoTestSrcName { - b.lastPTS.Store(pts) - } - return gst.PadProbeOK - }) - b.pads[videoTestSrcName] = pad + b.pads[videoTestSrcName] = b.selector.GetRequestPad("sink_%u") } func (b *VideoBin) setSelectorPad(name string) error { @@ -769,6 +726,15 @@ func (b *VideoBin) setSelectorPad(name string) error { // TODO: go-gst should accept objects directly and handle conversion to C func (b *VideoBin) setSelectorPadLocked(name string) error { pad := b.pads[name] + // drop until the next keyframe + pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { + buffer := info.GetBuffer() + if buffer.HasFlags(gst.BufferFlagDeltaUnit) { + return gst.PadProbeDrop + } + logger.Debugw("active pad changed", "name", name) + return gst.PadProbeRemove + }) pt, err := b.selector.GetPropertyType("active-pad") if err != nil {