Skip to content

Commit

Permalink
update input selector usage
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Jul 9, 2024
1 parent 0a55a3d commit 6e29a98
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 58 deletions.
3 changes: 0 additions & 3 deletions pkg/pipeline/builder/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *confi
if err != nil {
return nil, errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", uint64(time.Duration(c.CaptureInterval)*time.Second)); err != nil {
return nil, err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return nil, err
}
Expand Down
76 changes: 21 additions & 55 deletions pkg/pipeline/builder/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"go.uber.org/atomic"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
Expand All @@ -41,13 +40,9 @@ type VideoBin struct {
bin *gstreamer.Bin
conf *config.PipelineConfig

lastPTS atomic.Duration
nextPTS atomic.Duration
selectedPad string
nextPad string

mu sync.Mutex
nextID int
selectedPad string
pads map[string]*gst.Pad
names map[string]string
selector *gst.Element
Expand Down Expand Up @@ -180,12 +175,14 @@ func (b *VideoBin) onTrackUnmuted(trackID string, pts time.Duration) {
}

b.mu.Lock()
defer b.mu.Unlock()

if name, ok := b.names[trackID]; ok {
b.nextPTS.Store(pts)
b.nextPad = name
if err := b.setSelectorPadLocked(name); err != nil {
b.mu.Unlock()
b.bin.OnError(err)
return
}
}
b.mu.Unlock()
}

func (b *VideoBin) buildWebInput() error {
Expand Down Expand Up @@ -217,9 +214,6 @@ func (b *VideoBin) buildWebInput() error {
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
}
Expand Down Expand Up @@ -497,14 +491,14 @@ func (b *VideoBin) addSelector() error {
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = inputSelector.SetProperty("drop-backwards", true); err != nil {
return errors.ErrGstPipelineError(err)
}

videoRate, err := gst.NewElement("videorate")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
}
Expand Down Expand Up @@ -663,9 +657,6 @@ func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error {
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
}
Expand Down Expand Up @@ -714,49 +705,15 @@ func (b *VideoBin) createSrcPad(trackID, name string) {
b.mu.Lock()
defer b.mu.Unlock()

pad := b.selector.GetRequestPad("sink_%u")
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buffer := info.GetBuffer()
for b.nextPTS.Load() != 0 {
time.Sleep(time.Millisecond * 100)
}
pts := *buffer.PresentationTimestamp().AsDuration()
if pts < b.lastPTS.Load() {
return gst.PadProbeDrop
}
b.lastPTS.Store(pts)
return gst.PadProbeOK
})

b.names[trackID] = name
b.pads[name] = pad
b.pads[name] = b.selector.GetRequestPad("sink_%u")
}

func (b *VideoBin) createTestSrcPad() {
b.mu.Lock()
defer b.mu.Unlock()

pad := b.selector.GetRequestPad("sink_%u")
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buffer := info.GetBuffer()
pts := *buffer.PresentationTimestamp().AsDuration()
if pts < b.lastPTS.Load() {
return gst.PadProbeDrop
}
if nextPTS := b.nextPTS.Load(); nextPTS != 0 && pts >= nextPTS {
if err := b.setSelectorPad(b.nextPad); err != nil {
logger.Errorw("failed to unmute", err)
return gst.PadProbeDrop
}
b.nextPad = ""
b.nextPTS.Store(0)
}
if b.selectedPad == videoTestSrcName {
b.lastPTS.Store(pts)
}
return gst.PadProbeOK
})
b.pads[videoTestSrcName] = pad
b.pads[videoTestSrcName] = b.selector.GetRequestPad("sink_%u")
}

func (b *VideoBin) setSelectorPad(name string) error {
Expand All @@ -769,6 +726,15 @@ func (b *VideoBin) setSelectorPad(name string) error {
// TODO: go-gst should accept objects directly and handle conversion to C
func (b *VideoBin) setSelectorPadLocked(name string) error {
pad := b.pads[name]
// drop until the next keyframe
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buffer := info.GetBuffer()
if buffer.HasFlags(gst.BufferFlagDeltaUnit) {
return gst.PadProbeDrop
}
logger.Debugw("active pad changed", "name", name)
return gst.PadProbeRemove
})

pt, err := b.selector.GetPropertyType("active-pad")
if err != nil {
Expand Down

0 comments on commit 6e29a98

Please sign in to comment.