Skip to content

Commit

Permalink
delay videotestsrc
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Jul 17, 2024
1 parent fff2b1b commit 6242414
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 16 deletions.
4 changes: 3 additions & 1 deletion pkg/gstreamer/pads.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func matchPadsLocked(src, sink *Bin) (*gst.Pad, *gst.Pad, error) {
}
}

logger.Warnw("could not match pads", nil, "srcTemplates", srcTemplates, "sinkTemplates", sinkTemplates)
logger.Warnw("could not match pads", nil,
"src", src.bin.GetName(), "sink", sink.bin.GetName(),
"srcTemplates", srcTemplates, "sinkTemplates", sinkTemplates)
return nil, nil, errors.ErrGhostPadFailed
}

Expand Down
68 changes: 53 additions & 15 deletions pkg/pipeline/builder/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type VideoBin struct {
mu sync.Mutex
nextID int
selectedPad string
lastPTS uint64
pads map[string]*gst.Pad
names map[string]string
selector *gst.Element
Expand Down Expand Up @@ -76,7 +77,7 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error
if len(p.GetEncodedOutputs()) > 1 {
tee, err := gst.NewElementWithName("tee", "video_tee")
if err != nil {
return err
return errors.ErrGstPipelineError(err)
}

if err = b.bin.AddElement(tee); err != nil {
Expand Down Expand Up @@ -202,7 +203,7 @@ func (b *VideoBin) buildWebInput() error {

videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true)
if err != nil {
return err
return errors.ErrGstPipelineError(err)
}

videoConvert, err := gst.NewElement("videoconvert")
Expand All @@ -215,7 +216,7 @@ func (b *VideoBin) buildWebInput() error {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
return errors.ErrGstPipelineError(err)
}

caps, err := gst.NewElement("capsfilter")
Expand Down Expand Up @@ -473,12 +474,20 @@ func (b *VideoBin) addVideoTestSrcBin() error {
}
videoTestSrc.SetArg("pattern", "black")

queue, err := gstreamer.BuildQueue("video_test_src_queue", config.Latency, false)
if err != nil {
return err
}
if err = queue.SetProperty("min-threshold-time", uint64(2e9)); err != nil {
return errors.ErrGstPipelineError(err)
}

caps, err := b.newVideoCapsFilter(true)
if err != nil {
return errors.ErrGstPipelineError(err)
}

if err = testSrcBin.AddElements(videoTestSrc, caps); err != nil {
if err = testSrcBin.AddElements(videoTestSrc, queue, caps); err != nil {
return err
}

Expand All @@ -491,16 +500,13 @@ func (b *VideoBin) addSelector() error {
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = inputSelector.SetProperty("drop-backwards", true); err != nil {
return errors.ErrGstPipelineError(err)
}

videoRate, err := gst.NewElement("videorate")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
return errors.ErrGstPipelineError(err)
}

caps, err := b.newVideoCapsFilter(true)
Expand All @@ -519,7 +525,7 @@ func (b *VideoBin) addSelector() error {
func (b *VideoBin) addEncoder() error {
videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", config.Latency, false)
if err != nil {
return err
return errors.ErrGstPipelineError(err)
}
if err = b.bin.AddElement(videoQueue); err != nil {
return err
Expand Down Expand Up @@ -552,7 +558,7 @@ func (b *VideoBin) addEncoder() error {
bufCapacity = 10000
}
if err = x264Enc.SetProperty("vbv-buf-capacity", bufCapacity); err != nil {
return err
return errors.ErrGstPipelineError(err)
}
if b.conf.GetStreamConfig() != nil {
x264Enc.SetArg("pass", "cbr")
Expand Down Expand Up @@ -618,7 +624,7 @@ func (b *VideoBin) addDecodedVideoSink() error {
var err error
b.rawVideoTee, err = gst.NewElement("tee")
if err != nil {
return err
return errors.ErrGstPipelineError(err)
}
if err = b.bin.AddElement(b.rawVideoTee); err != nil {
return err
Expand All @@ -637,7 +643,7 @@ func (b *VideoBin) addDecodedVideoSink() error {
func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error {
videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true)
if err != nil {
return err
return errors.ErrGstPipelineError(err)
}

videoConvert, err := gst.NewElement("videoconvert")
Expand All @@ -658,7 +664,7 @@ func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
return errors.ErrGstPipelineError(err)
}
elements = append(elements, videoRate)
}
Expand Down Expand Up @@ -706,14 +712,45 @@ func (b *VideoBin) createSrcPad(trackID, name string) {
defer b.mu.Unlock()

b.names[trackID] = name
b.pads[name] = b.selector.GetRequestPad("sink_%u")

pad := b.selector.GetRequestPad("sink_%u")
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
pts := uint64(info.GetBuffer().PresentationTimestamp())
b.mu.Lock()
if pts < b.lastPTS || (b.selectedPad != videoTestSrcName && b.selectedPad != name) {
b.mu.Unlock()
logger.Debugw(fmt.Sprintf("%s dropping %v", name, time.Duration(pts)))
return gst.PadProbeDrop
}
b.lastPTS = pts
b.mu.Unlock()
logger.Debugw(fmt.Sprintf("%s pushing %v", name, time.Duration(pts)))
return gst.PadProbeOK
})

b.pads[name] = pad
}

func (b *VideoBin) createTestSrcPad() {
b.mu.Lock()
defer b.mu.Unlock()

b.pads[videoTestSrcName] = b.selector.GetRequestPad("sink_%u")
pad := b.selector.GetRequestPad("sink_%u")
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
pts := uint64(info.GetBuffer().PresentationTimestamp())
b.mu.Lock()
if pts < b.lastPTS || (b.selectedPad != videoTestSrcName) {
b.mu.Unlock()
logger.Debugw(fmt.Sprintf("%s dropping %v", videoTestSrcName, time.Duration(pts)))
return gst.PadProbeDrop
}
b.lastPTS = pts
b.mu.Unlock()
logger.Debugw(fmt.Sprintf("%s pushing %v", videoTestSrcName, time.Duration(pts)))
return gst.PadProbeOK
})

b.pads[videoTestSrcName] = pad
}

func (b *VideoBin) setSelectorPad(name string) error {
Expand All @@ -726,6 +763,7 @@ func (b *VideoBin) setSelectorPad(name string) error {
// TODO: go-gst should accept objects directly and handle conversion to C
func (b *VideoBin) setSelectorPadLocked(name string) error {
pad := b.pads[name]

// drop until the next keyframe
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buffer := info.GetBuffer()
Expand Down

0 comments on commit 6242414

Please sign in to comment.