Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delay videotestsrc, revert input-selector drop-backwards #725

Merged
merged 1 commit into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/gstreamer/pads.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func matchPadsLocked(src, sink *Bin) (*gst.Pad, *gst.Pad, error) {
}
}

logger.Warnw("could not match pads", nil, "srcTemplates", srcTemplates, "sinkTemplates", sinkTemplates)
logger.Warnw("could not match pads", nil,
"src", src.bin.GetName(), "sink", sink.bin.GetName(),
"srcTemplates", srcTemplates, "sinkTemplates", sinkTemplates)
return nil, nil, errors.ErrGhostPadFailed
}

Expand Down
68 changes: 53 additions & 15 deletions pkg/pipeline/builder/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type VideoBin struct {
mu sync.Mutex
nextID int
selectedPad string
lastPTS uint64
pads map[string]*gst.Pad
names map[string]string
selector *gst.Element
Expand Down Expand Up @@ -76,7 +77,7 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error
if len(p.GetEncodedOutputs()) > 1 {
tee, err := gst.NewElementWithName("tee", "video_tee")
if err != nil {
return err
return errors.ErrGstPipelineError(err)
}

if err = b.bin.AddElement(tee); err != nil {
Expand Down Expand Up @@ -202,7 +203,7 @@ func (b *VideoBin) buildWebInput() error {

videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true)
if err != nil {
return err
return errors.ErrGstPipelineError(err)
}

videoConvert, err := gst.NewElement("videoconvert")
Expand All @@ -215,7 +216,7 @@ func (b *VideoBin) buildWebInput() error {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
return errors.ErrGstPipelineError(err)
}

caps, err := gst.NewElement("capsfilter")
Expand Down Expand Up @@ -473,12 +474,20 @@ func (b *VideoBin) addVideoTestSrcBin() error {
}
videoTestSrc.SetArg("pattern", "black")

queue, err := gstreamer.BuildQueue("video_test_src_queue", config.Latency, false)
if err != nil {
return err
}
if err = queue.SetProperty("min-threshold-time", uint64(2e9)); err != nil {
return errors.ErrGstPipelineError(err)
}

caps, err := b.newVideoCapsFilter(true)
if err != nil {
return errors.ErrGstPipelineError(err)
}

if err = testSrcBin.AddElements(videoTestSrc, caps); err != nil {
if err = testSrcBin.AddElements(videoTestSrc, queue, caps); err != nil {
return err
}

Expand All @@ -491,16 +500,13 @@ func (b *VideoBin) addSelector() error {
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = inputSelector.SetProperty("drop-backwards", true); err != nil {
return errors.ErrGstPipelineError(err)
}

videoRate, err := gst.NewElement("videorate")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
return errors.ErrGstPipelineError(err)
}

caps, err := b.newVideoCapsFilter(true)
Expand All @@ -519,7 +525,7 @@ func (b *VideoBin) addSelector() error {
func (b *VideoBin) addEncoder() error {
videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", config.Latency, false)
if err != nil {
return err
return errors.ErrGstPipelineError(err)
}
if err = b.bin.AddElement(videoQueue); err != nil {
return err
Expand Down Expand Up @@ -552,7 +558,7 @@ func (b *VideoBin) addEncoder() error {
bufCapacity = 10000
}
if err = x264Enc.SetProperty("vbv-buf-capacity", bufCapacity); err != nil {
return err
return errors.ErrGstPipelineError(err)
}
if b.conf.GetStreamConfig() != nil {
x264Enc.SetArg("pass", "cbr")
Expand Down Expand Up @@ -618,7 +624,7 @@ func (b *VideoBin) addDecodedVideoSink() error {
var err error
b.rawVideoTee, err = gst.NewElement("tee")
if err != nil {
return err
return errors.ErrGstPipelineError(err)
}
if err = b.bin.AddElement(b.rawVideoTee); err != nil {
return err
Expand All @@ -637,7 +643,7 @@ func (b *VideoBin) addDecodedVideoSink() error {
func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error {
videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true)
if err != nil {
return err
return errors.ErrGstPipelineError(err)
}

videoConvert, err := gst.NewElement("videoconvert")
Expand All @@ -658,7 +664,7 @@ func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
return errors.ErrGstPipelineError(err)
}
elements = append(elements, videoRate)
}
Expand Down Expand Up @@ -706,14 +712,45 @@ func (b *VideoBin) createSrcPad(trackID, name string) {
defer b.mu.Unlock()

b.names[trackID] = name
b.pads[name] = b.selector.GetRequestPad("sink_%u")

pad := b.selector.GetRequestPad("sink_%u")
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
pts := uint64(info.GetBuffer().PresentationTimestamp())
b.mu.Lock()
if pts < b.lastPTS || (b.selectedPad != videoTestSrcName && b.selectedPad != name) {
b.mu.Unlock()
logger.Debugw(fmt.Sprintf("%s dropping %v", name, time.Duration(pts)))
return gst.PadProbeDrop
}
b.lastPTS = pts
b.mu.Unlock()
logger.Debugw(fmt.Sprintf("%s pushing %v", name, time.Duration(pts)))
return gst.PadProbeOK
})

b.pads[name] = pad
}

func (b *VideoBin) createTestSrcPad() {
b.mu.Lock()
defer b.mu.Unlock()

b.pads[videoTestSrcName] = b.selector.GetRequestPad("sink_%u")
pad := b.selector.GetRequestPad("sink_%u")
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
pts := uint64(info.GetBuffer().PresentationTimestamp())
b.mu.Lock()
if pts < b.lastPTS || (b.selectedPad != videoTestSrcName) {
b.mu.Unlock()
logger.Debugw(fmt.Sprintf("%s dropping %v", videoTestSrcName, time.Duration(pts)))
return gst.PadProbeDrop
}
b.lastPTS = pts
b.mu.Unlock()
logger.Debugw(fmt.Sprintf("%s pushing %v", videoTestSrcName, time.Duration(pts)))
return gst.PadProbeOK
})

b.pads[videoTestSrcName] = pad
}

func (b *VideoBin) setSelectorPad(name string) error {
Expand All @@ -726,6 +763,7 @@ func (b *VideoBin) setSelectorPad(name string) error {
// TODO: go-gst should accept objects directly and handle conversion to C
func (b *VideoBin) setSelectorPadLocked(name string) error {
pad := b.pads[name]

// drop until the next keyframe
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buffer := info.GetBuffer()
Expand Down
Loading