diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index c492ea8a..3a730b67 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -34,8 +34,7 @@ import ( ) const ( - videoTestSrcName = "video_test_src" - maxDuplicationTime = time.Second + videoTestSrcName = "video_test_src" ) type VideoBin struct { @@ -218,7 +217,7 @@ func (b *VideoBin) buildWebInput() error { if err != nil { return errors.ErrGstPipelineError(err) } - if err = videoRate.SetProperty("max-duplication-time", uint64(maxDuplicationTime)); err != nil { + if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil { return err } if err = videoRate.SetProperty("skip-to-first", true); err != nil { @@ -458,7 +457,7 @@ func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource, name string) (*gstream return nil, errors.ErrNotSupported(string(ts.MimeType)) } - if err := addVideoConverter(appSrcBin, b.conf); err != nil { + if err := b.addVideoConverter(appSrcBin); err != nil { return nil, err } @@ -480,7 +479,7 @@ func (b *VideoBin) addVideoTestSrcBin() error { } videoTestSrc.SetArg("pattern", "black") - caps, err := newVideoCapsFilter(b.conf, true) + caps, err := b.newVideoCapsFilter(true) if err != nil { return errors.ErrGstPipelineError(err) } @@ -503,14 +502,14 @@ func (b *VideoBin) addSelector() error { if err != nil { return errors.ErrGstPipelineError(err) } - if err = videoRate.SetProperty("max-duplication-time", uint64(maxDuplicationTime)); err != nil { + if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil { return err } if err = videoRate.SetProperty("skip-to-first", true); err != nil { return err } - caps, err := newVideoCapsFilter(b.conf, true) + caps, err := b.newVideoCapsFilter(true) if err != nil { return errors.ErrGstPipelineError(err) } @@ -641,7 +640,7 @@ func (b *VideoBin) addDecodedVideoSink() error { return nil } -func addVideoConverter(b *gstreamer.Bin, p *config.PipelineConfig) error { +func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error { videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true) if err != nil { return err @@ -657,26 +656,32 @@ func addVideoConverter(b *gstreamer.Bin, p *config.PipelineConfig) error { return errors.ErrGstPipelineError(err) } - videoRate, err := gst.NewElement("videorate") - if err != nil { - return errors.ErrGstPipelineError(err) - } - if err = videoRate.SetProperty("max-duplication-time", uint64(maxDuplicationTime)); err != nil { - return err - } - if err = videoRate.SetProperty("skip-to-first", true); err != nil { - return err + elements := []*gst.Element{videoQueue, videoConvert, videoScale} + + if !b.conf.VideoDecoding { + videoRate, err := gst.NewElement("videorate") + if err != nil { + return errors.ErrGstPipelineError(err) + } + if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil { + return err + } + if err = videoRate.SetProperty("skip-to-first", true); err != nil { + return err + } + elements = append(elements, videoRate) } - caps, err := newVideoCapsFilter(p, true) + caps, err := b.newVideoCapsFilter(true) if err != nil { return errors.ErrGstPipelineError(err) } + elements = append(elements, caps) - return b.AddElements(videoQueue, videoConvert, videoScale, videoRate, caps) + return bin.AddElements(elements...) } -func newVideoCapsFilter(p *config.PipelineConfig, includeFramerate bool) (*gst.Element, error) { +func (b *VideoBin) newVideoCapsFilter(includeFramerate bool) (*gst.Element, error) { caps, err := gst.NewElement("capsfilter") if err != nil { return nil, errors.ErrGstPipelineError(err) @@ -684,12 +689,12 @@ func newVideoCapsFilter(p *config.PipelineConfig, includeFramerate bool) (*gst.E if includeFramerate { err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( "video/x-raw,framerate=%d/1,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1", - p.Framerate, p.Width, p.Height, + b.conf.Framerate, b.conf.Width, b.conf.Height, ))) } else { err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( "video/x-raw,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1", - p.Width, p.Height, + b.conf.Width, b.conf.Height, ))) } if err != nil { diff --git a/pkg/pipeline/source/sdk/appwriter.go b/pkg/pipeline/source/sdk/appwriter.go index 03adac9d..1f8095fb 100644 --- a/pkg/pipeline/source/sdk/appwriter.go +++ b/pkg/pipeline/source/sdk/appwriter.go @@ -74,16 +74,16 @@ type AppWriter struct { *synchronizer.TrackSynchronizer // state - state state - initialized bool - ticker *time.Ticker - muted atomic.Bool - maybeDisconnected bool - disconnected atomic.Bool - playing core.Fuse - draining core.Fuse - endStream core.Fuse - finished core.Fuse + state state + initialized bool + ticker *time.Ticker + muted atomic.Bool + lastRead time.Time + disconnected atomic.Bool + playing core.Fuse + draining core.Fuse + endStream core.Fuse + finished core.Fuse } func NewAppWriter( @@ -266,8 +266,6 @@ func (w *AppWriter) handlePlaying() { if err != nil { w.handleReadError(err) return - } else { - w.maybeDisconnected = false } // initialize track synchronizer @@ -275,6 +273,7 @@ func (w *AppWriter) handlePlaying() { w.Initialize(pkt) w.initialized = true } + w.lastRead = time.Now() // push packet to jitter buffer w.buffer.Push(pkt) @@ -316,12 +315,10 @@ func (w *AppWriter) handleReadError(err error) { _ = w.pushSamples() w.state = stateReconnecting case errors.As(err, &netErr) && netErr.Timeout(): - if w.maybeDisconnected { + if time.Since(w.lastRead) > latency { w.SetTrackDisconnected(true) _ = w.pushSamples() w.state = stateReconnecting - } else { - w.maybeDisconnected = true } default: // log non-EOF errors