From 62424143dad25b424737a219afe62afbfbec9d5c Mon Sep 17 00:00:00 2001
From: David Colburn <xero73@gmail.com>
Date: Wed, 17 Jul 2024 15:31:15 -0400
Subject: [PATCH] delay videotestsrc

---
 pkg/gstreamer/pads.go         |  4 ++-
 pkg/pipeline/builder/video.go | 68 +++++++++++++++++++++++++++--------
 2 files changed, 56 insertions(+), 16 deletions(-)

diff --git a/pkg/gstreamer/pads.go b/pkg/gstreamer/pads.go
index bee84a3e..02d390b9 100644
--- a/pkg/gstreamer/pads.go
+++ b/pkg/gstreamer/pads.go
@@ -131,7 +131,9 @@ func matchPadsLocked(src, sink *Bin) (*gst.Pad, *gst.Pad, error) {
 		}
 	}
 
-	logger.Warnw("could not match pads", nil, "srcTemplates", srcTemplates, "sinkTemplates", sinkTemplates)
+	logger.Warnw("could not match pads", nil,
+		"src", src.bin.GetName(), "sink", sink.bin.GetName(),
+		"srcTemplates", srcTemplates, "sinkTemplates", sinkTemplates)
 	return nil, nil, errors.ErrGhostPadFailed
 }
 
diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go
index 7e764337..5a7781db 100644
--- a/pkg/pipeline/builder/video.go
+++ b/pkg/pipeline/builder/video.go
@@ -43,6 +43,7 @@ type VideoBin struct {
 	mu          sync.Mutex
 	nextID      int
 	selectedPad string
+	lastPTS     uint64
 	pads        map[string]*gst.Pad
 	names       map[string]string
 	selector    *gst.Element
@@ -76,7 +77,7 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error
 	if len(p.GetEncodedOutputs()) > 1 {
 		tee, err := gst.NewElementWithName("tee", "video_tee")
 		if err != nil {
-			return err
+			return errors.ErrGstPipelineError(err)
 		}
 
 		if err = b.bin.AddElement(tee); err != nil {
@@ -202,7 +203,7 @@ func (b *VideoBin) buildWebInput() error {
 
 	videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true)
 	if err != nil {
-		return err
+		return errors.ErrGstPipelineError(err)
 	}
 
 	videoConvert, err := gst.NewElement("videoconvert")
@@ -215,7 +216,7 @@ func (b *VideoBin) buildWebInput() error {
 		return errors.ErrGstPipelineError(err)
 	}
 	if err = videoRate.SetProperty("skip-to-first", true); err != nil {
-		return err
+		return errors.ErrGstPipelineError(err)
 	}
 
 	caps, err := gst.NewElement("capsfilter")
@@ -473,12 +474,20 @@ func (b *VideoBin) addVideoTestSrcBin() error {
 	}
 	videoTestSrc.SetArg("pattern", "black")
 
+	queue, err := gstreamer.BuildQueue("video_test_src_queue", config.Latency, false)
+	if err != nil {
+		return err
+	}
+	if err = queue.SetProperty("min-threshold-time", uint64(2e9)); err != nil {
+		return errors.ErrGstPipelineError(err)
+	}
+
 	caps, err := b.newVideoCapsFilter(true)
 	if err != nil {
 		return errors.ErrGstPipelineError(err)
 	}
 
-	if err = testSrcBin.AddElements(videoTestSrc, caps); err != nil {
+	if err = testSrcBin.AddElements(videoTestSrc, queue, caps); err != nil {
 		return err
 	}
 
@@ -491,16 +500,13 @@ func (b *VideoBin) addSelector() error {
 	if err != nil {
 		return errors.ErrGstPipelineError(err)
 	}
-	if err = inputSelector.SetProperty("drop-backwards", true); err != nil {
-		return errors.ErrGstPipelineError(err)
-	}
 
 	videoRate, err := gst.NewElement("videorate")
 	if err != nil {
 		return errors.ErrGstPipelineError(err)
 	}
 	if err = videoRate.SetProperty("skip-to-first", true); err != nil {
-		return err
+		return errors.ErrGstPipelineError(err)
 	}
 
 	caps, err := b.newVideoCapsFilter(true)
@@ -519,7 +525,7 @@ func (b *VideoBin) addSelector() error {
 func (b *VideoBin) addEncoder() error {
 	videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", config.Latency, false)
 	if err != nil {
-		return err
+		return errors.ErrGstPipelineError(err)
 	}
 	if err = b.bin.AddElement(videoQueue); err != nil {
 		return err
@@ -552,7 +558,7 @@ func (b *VideoBin) addEncoder() error {
 			bufCapacity = 10000
 		}
 		if err = x264Enc.SetProperty("vbv-buf-capacity", bufCapacity); err != nil {
-			return err
+			return errors.ErrGstPipelineError(err)
 		}
 		if b.conf.GetStreamConfig() != nil {
 			x264Enc.SetArg("pass", "cbr")
@@ -618,7 +624,7 @@ func (b *VideoBin) addDecodedVideoSink() error {
 	var err error
 	b.rawVideoTee, err = gst.NewElement("tee")
 	if err != nil {
-		return err
+		return errors.ErrGstPipelineError(err)
 	}
 	if err = b.bin.AddElement(b.rawVideoTee); err != nil {
 		return err
@@ -637,7 +643,7 @@ func (b *VideoBin) addDecodedVideoSink() error {
 func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error {
 	videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true)
 	if err != nil {
-		return err
+		return errors.ErrGstPipelineError(err)
 	}
 
 	videoConvert, err := gst.NewElement("videoconvert")
@@ -658,7 +664,7 @@ func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error {
 			return errors.ErrGstPipelineError(err)
 		}
 		if err = videoRate.SetProperty("skip-to-first", true); err != nil {
-			return err
+			return errors.ErrGstPipelineError(err)
 		}
 		elements = append(elements, videoRate)
 	}
@@ -706,14 +712,45 @@ func (b *VideoBin) createSrcPad(trackID, name string) {
 	defer b.mu.Unlock()
 
 	b.names[trackID] = name
-	b.pads[name] = b.selector.GetRequestPad("sink_%u")
+
+	pad := b.selector.GetRequestPad("sink_%u")
+	pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
+		pts := uint64(info.GetBuffer().PresentationTimestamp())
+		b.mu.Lock()
+		if pts < b.lastPTS || (b.selectedPad != videoTestSrcName && b.selectedPad != name) {
+			b.mu.Unlock()
+			logger.Debugw(fmt.Sprintf("%s dropping %v", name, time.Duration(pts)))
+			return gst.PadProbeDrop
+		}
+		b.lastPTS = pts
+		b.mu.Unlock()
+		logger.Debugw(fmt.Sprintf("%s pushing %v", name, time.Duration(pts)))
+		return gst.PadProbeOK
+	})
+
+	b.pads[name] = pad
 }
 
 func (b *VideoBin) createTestSrcPad() {
 	b.mu.Lock()
 	defer b.mu.Unlock()
 
-	b.pads[videoTestSrcName] = b.selector.GetRequestPad("sink_%u")
+	pad := b.selector.GetRequestPad("sink_%u")
+	pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
+		pts := uint64(info.GetBuffer().PresentationTimestamp())
+		b.mu.Lock()
+		if pts < b.lastPTS || (b.selectedPad != videoTestSrcName) {
+			b.mu.Unlock()
+			logger.Debugw(fmt.Sprintf("%s dropping %v", videoTestSrcName, time.Duration(pts)))
+			return gst.PadProbeDrop
+		}
+		b.lastPTS = pts
+		b.mu.Unlock()
+		logger.Debugw(fmt.Sprintf("%s pushing %v", videoTestSrcName, time.Duration(pts)))
+		return gst.PadProbeOK
+	})
+
+	b.pads[videoTestSrcName] = pad
 }
 
 func (b *VideoBin) setSelectorPad(name string) error {
@@ -726,6 +763,7 @@ func (b *VideoBin) setSelectorPad(name string) error {
 // TODO: go-gst should accept objects directly and handle conversion to C
 func (b *VideoBin) setSelectorPadLocked(name string) error {
 	pad := b.pads[name]
+
 	// drop until the next keyframe
 	pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
 		buffer := info.GetBuffer()