Skip to content

Commit

Permalink
Update disconnection timeout, max duplication (#717)
Browse files Browse the repository at this point in the history
* update disconnection timeout, videorate max duplication

* don't include framerate in video track bin caps
  • Loading branch information
frostbyte73 authored Jul 5, 2024
1 parent a56e158 commit 41e7c34
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 37 deletions.
49 changes: 27 additions & 22 deletions pkg/pipeline/builder/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import (
)

const (
videoTestSrcName = "video_test_src"
maxDuplicationTime = time.Second
videoTestSrcName = "video_test_src"
)

type VideoBin struct {
Expand Down Expand Up @@ -218,7 +217,7 @@ func (b *VideoBin) buildWebInput() error {
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", uint64(maxDuplicationTime)); err != nil {
if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
Expand Down Expand Up @@ -458,7 +457,7 @@ func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource, name string) (*gstream
return nil, errors.ErrNotSupported(string(ts.MimeType))
}

if err := addVideoConverter(appSrcBin, b.conf); err != nil {
if err := b.addVideoConverter(appSrcBin); err != nil {
return nil, err
}

Expand All @@ -480,7 +479,7 @@ func (b *VideoBin) addVideoTestSrcBin() error {
}
videoTestSrc.SetArg("pattern", "black")

caps, err := newVideoCapsFilter(b.conf, true)
caps, err := b.newVideoCapsFilter(true)
if err != nil {
return errors.ErrGstPipelineError(err)
}
Expand All @@ -503,14 +502,14 @@ func (b *VideoBin) addSelector() error {
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", uint64(maxDuplicationTime)); err != nil {
if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
}

caps, err := newVideoCapsFilter(b.conf, true)
caps, err := b.newVideoCapsFilter(true)
if err != nil {
return errors.ErrGstPipelineError(err)
}
Expand Down Expand Up @@ -641,7 +640,7 @@ func (b *VideoBin) addDecodedVideoSink() error {
return nil
}

func addVideoConverter(b *gstreamer.Bin, p *config.PipelineConfig) error {
func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error {
videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true)
if err != nil {
return err
Expand All @@ -657,39 +656,45 @@ func addVideoConverter(b *gstreamer.Bin, p *config.PipelineConfig) error {
return errors.ErrGstPipelineError(err)
}

videoRate, err := gst.NewElement("videorate")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", uint64(maxDuplicationTime)); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
elements := []*gst.Element{videoQueue, videoConvert, videoScale}

if !b.conf.VideoDecoding {
videoRate, err := gst.NewElement("videorate")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
}
elements = append(elements, videoRate)
}

caps, err := newVideoCapsFilter(p, true)
caps, err := b.newVideoCapsFilter(!b.conf.VideoDecoding)
if err != nil {
return errors.ErrGstPipelineError(err)
}
elements = append(elements, caps)

return b.AddElements(videoQueue, videoConvert, videoScale, videoRate, caps)
return bin.AddElements(elements...)
}

func newVideoCapsFilter(p *config.PipelineConfig, includeFramerate bool) (*gst.Element, error) {
func (b *VideoBin) newVideoCapsFilter(includeFramerate bool) (*gst.Element, error) {
caps, err := gst.NewElement("capsfilter")
if err != nil {
return nil, errors.ErrGstPipelineError(err)
}
if includeFramerate {
err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf(
"video/x-raw,framerate=%d/1,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1",
p.Framerate, p.Width, p.Height,
b.conf.Framerate, b.conf.Width, b.conf.Height,
)))
} else {
err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf(
"video/x-raw,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1",
p.Width, p.Height,
b.conf.Width, b.conf.Height,
)))
}
if err != nil {
Expand Down
27 changes: 12 additions & 15 deletions pkg/pipeline/source/sdk/appwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,16 @@ type AppWriter struct {
*synchronizer.TrackSynchronizer

// state
state state
initialized bool
ticker *time.Ticker
muted atomic.Bool
maybeDisconnected bool
disconnected atomic.Bool
playing core.Fuse
draining core.Fuse
endStream core.Fuse
finished core.Fuse
state state
initialized bool
ticker *time.Ticker
muted atomic.Bool
lastRead time.Time
disconnected atomic.Bool
playing core.Fuse
draining core.Fuse
endStream core.Fuse
finished core.Fuse
}

func NewAppWriter(
Expand Down Expand Up @@ -266,15 +266,14 @@ func (w *AppWriter) handlePlaying() {
if err != nil {
w.handleReadError(err)
return
} else {
w.maybeDisconnected = false
}

// initialize track synchronizer
if !w.initialized {
w.Initialize(pkt)
w.initialized = true
}
w.lastRead = time.Now()

// push packet to jitter buffer
w.buffer.Push(pkt)
Expand Down Expand Up @@ -316,12 +315,10 @@ func (w *AppWriter) handleReadError(err error) {
_ = w.pushSamples()
w.state = stateReconnecting
case errors.As(err, &netErr) && netErr.Timeout():
if w.maybeDisconnected {
if time.Since(w.lastRead) > latency {
w.SetTrackDisconnected(true)
_ = w.pushSamples()
w.state = stateReconnecting
} else {
w.maybeDisconnected = true
}
default:
// log non-EOF errors
Expand Down

0 comments on commit 41e7c34

Please sign in to comment.