diff --git a/pkg/gstreamer/pads.go b/pkg/gstreamer/pads.go index bee84a3e..02d390b9 100644 --- a/pkg/gstreamer/pads.go +++ b/pkg/gstreamer/pads.go @@ -131,7 +131,9 @@ func matchPadsLocked(src, sink *Bin) (*gst.Pad, *gst.Pad, error) { } } - logger.Warnw("could not match pads", nil, "srcTemplates", srcTemplates, "sinkTemplates", sinkTemplates) + logger.Warnw("could not match pads", nil, + "src", src.bin.GetName(), "sink", sink.bin.GetName(), + "srcTemplates", srcTemplates, "sinkTemplates", sinkTemplates) return nil, nil, errors.ErrGhostPadFailed } diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 7e764337..5a7781db 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -43,6 +43,7 @@ type VideoBin struct { mu sync.Mutex nextID int selectedPad string + lastPTS uint64 pads map[string]*gst.Pad names map[string]string selector *gst.Element @@ -76,7 +77,7 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error if len(p.GetEncodedOutputs()) > 1 { tee, err := gst.NewElementWithName("tee", "video_tee") if err != nil { - return err + return errors.ErrGstPipelineError(err) } if err = b.bin.AddElement(tee); err != nil { @@ -202,7 +203,7 @@ func (b *VideoBin) buildWebInput() error { videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true) if err != nil { - return err + return errors.ErrGstPipelineError(err) } videoConvert, err := gst.NewElement("videoconvert") @@ -215,7 +216,7 @@ func (b *VideoBin) buildWebInput() error { return errors.ErrGstPipelineError(err) } if err = videoRate.SetProperty("skip-to-first", true); err != nil { - return err + return errors.ErrGstPipelineError(err) } caps, err := gst.NewElement("capsfilter") @@ -473,12 +474,20 @@ func (b *VideoBin) addVideoTestSrcBin() error { } videoTestSrc.SetArg("pattern", "black") + queue, err := gstreamer.BuildQueue("video_test_src_queue", config.Latency, false) + if err != nil { + return err + } + if err = queue.SetProperty("min-threshold-time", uint64(2e9)); err != nil { + return errors.ErrGstPipelineError(err) + } + caps, err := b.newVideoCapsFilter(true) if err != nil { return errors.ErrGstPipelineError(err) } - if err = testSrcBin.AddElements(videoTestSrc, caps); err != nil { + if err = testSrcBin.AddElements(videoTestSrc, queue, caps); err != nil { return err } @@ -491,16 +500,13 @@ func (b *VideoBin) addSelector() error { if err != nil { return errors.ErrGstPipelineError(err) } - if err = inputSelector.SetProperty("drop-backwards", true); err != nil { - return errors.ErrGstPipelineError(err) - } videoRate, err := gst.NewElement("videorate") if err != nil { return errors.ErrGstPipelineError(err) } if err = videoRate.SetProperty("skip-to-first", true); err != nil { - return err + return errors.ErrGstPipelineError(err) } caps, err := b.newVideoCapsFilter(true) @@ -519,7 +525,7 @@ func (b *VideoBin) addSelector() error { func (b *VideoBin) addEncoder() error { videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", config.Latency, false) if err != nil { - return err + return errors.ErrGstPipelineError(err) } if err = b.bin.AddElement(videoQueue); err != nil { return err @@ -552,7 +558,7 @@ func (b *VideoBin) addEncoder() error { bufCapacity = 10000 } if err = x264Enc.SetProperty("vbv-buf-capacity", bufCapacity); err != nil { - return err + return errors.ErrGstPipelineError(err) } if b.conf.GetStreamConfig() != nil { x264Enc.SetArg("pass", "cbr") @@ -618,7 +624,7 @@ func (b *VideoBin) addDecodedVideoSink() error { var err error b.rawVideoTee, err = gst.NewElement("tee") if err != nil { - return err + return errors.ErrGstPipelineError(err) } if err = b.bin.AddElement(b.rawVideoTee); err != nil { return err @@ -637,7 +643,7 @@ func (b *VideoBin) addDecodedVideoSink() error { func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error { videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true) if err != nil { - return err + return errors.ErrGstPipelineError(err) } videoConvert, err := gst.NewElement("videoconvert") @@ -658,7 +664,7 @@ func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error { return errors.ErrGstPipelineError(err) } if err = videoRate.SetProperty("skip-to-first", true); err != nil { - return err + return errors.ErrGstPipelineError(err) } elements = append(elements, videoRate) } @@ -706,14 +712,45 @@ func (b *VideoBin) createSrcPad(trackID, name string) { defer b.mu.Unlock() b.names[trackID] = name - b.pads[name] = b.selector.GetRequestPad("sink_%u") + + pad := b.selector.GetRequestPad("sink_%u") + pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { + pts := uint64(info.GetBuffer().PresentationTimestamp()) + b.mu.Lock() + if pts < b.lastPTS || (b.selectedPad != videoTestSrcName && b.selectedPad != name) { + b.mu.Unlock() + logger.Debugw(fmt.Sprintf("%s dropping %v", name, time.Duration(pts))) + return gst.PadProbeDrop + } + b.lastPTS = pts + b.mu.Unlock() + logger.Debugw(fmt.Sprintf("%s pushing %v", name, time.Duration(pts))) + return gst.PadProbeOK + }) + + b.pads[name] = pad } func (b *VideoBin) createTestSrcPad() { b.mu.Lock() defer b.mu.Unlock() - b.pads[videoTestSrcName] = b.selector.GetRequestPad("sink_%u") + pad := b.selector.GetRequestPad("sink_%u") + pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { + pts := uint64(info.GetBuffer().PresentationTimestamp()) + b.mu.Lock() + if pts < b.lastPTS || (b.selectedPad != videoTestSrcName) { + b.mu.Unlock() + logger.Debugw(fmt.Sprintf("%s dropping %v", videoTestSrcName, time.Duration(pts))) + return gst.PadProbeDrop + } + b.lastPTS = pts + b.mu.Unlock() + logger.Debugw(fmt.Sprintf("%s pushing %v", videoTestSrcName, time.Duration(pts))) + return gst.PadProbeOK + }) + + b.pads[videoTestSrcName] = pad } func (b *VideoBin) setSelectorPad(name string) error { @@ -726,6 +763,7 @@ func (b *VideoBin) setSelectorPad(name string) error { // TODO: go-gst should accept objects directly and handle conversion to C func (b *VideoBin) setSelectorPadLocked(name string) error { pad := b.pads[name] + // drop until the next keyframe pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { buffer := info.GetBuffer()