Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update disconnection timeout, max duplication #717

Merged
merged 2 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 27 additions & 22 deletions pkg/pipeline/builder/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import (
)

const (
videoTestSrcName = "video_test_src"
maxDuplicationTime = time.Second
videoTestSrcName = "video_test_src"
)

type VideoBin struct {
Expand Down Expand Up @@ -218,7 +217,7 @@ func (b *VideoBin) buildWebInput() error {
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", uint64(maxDuplicationTime)); err != nil {
if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
Expand Down Expand Up @@ -458,7 +457,7 @@ func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource, name string) (*gstream
return nil, errors.ErrNotSupported(string(ts.MimeType))
}

if err := addVideoConverter(appSrcBin, b.conf); err != nil {
if err := b.addVideoConverter(appSrcBin); err != nil {
return nil, err
}

Expand All @@ -480,7 +479,7 @@ func (b *VideoBin) addVideoTestSrcBin() error {
}
videoTestSrc.SetArg("pattern", "black")

caps, err := newVideoCapsFilter(b.conf, true)
caps, err := b.newVideoCapsFilter(true)
if err != nil {
return errors.ErrGstPipelineError(err)
}
Expand All @@ -503,14 +502,14 @@ func (b *VideoBin) addSelector() error {
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", uint64(maxDuplicationTime)); err != nil {
if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
}

caps, err := newVideoCapsFilter(b.conf, true)
caps, err := b.newVideoCapsFilter(true)
if err != nil {
return errors.ErrGstPipelineError(err)
}
Expand Down Expand Up @@ -641,7 +640,7 @@ func (b *VideoBin) addDecodedVideoSink() error {
return nil
}

func addVideoConverter(b *gstreamer.Bin, p *config.PipelineConfig) error {
func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error {
videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true)
if err != nil {
return err
Expand All @@ -657,39 +656,45 @@ func addVideoConverter(b *gstreamer.Bin, p *config.PipelineConfig) error {
return errors.ErrGstPipelineError(err)
}

videoRate, err := gst.NewElement("videorate")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", uint64(maxDuplicationTime)); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
elements := []*gst.Element{videoQueue, videoConvert, videoScale}

if !b.conf.VideoDecoding {
videoRate, err := gst.NewElement("videorate")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
}
elements = append(elements, videoRate)
}

caps, err := newVideoCapsFilter(p, true)
caps, err := b.newVideoCapsFilter(!b.conf.VideoDecoding)
if err != nil {
return errors.ErrGstPipelineError(err)
}
elements = append(elements, caps)

return b.AddElements(videoQueue, videoConvert, videoScale, videoRate, caps)
return bin.AddElements(elements...)
}

func newVideoCapsFilter(p *config.PipelineConfig, includeFramerate bool) (*gst.Element, error) {
func (b *VideoBin) newVideoCapsFilter(includeFramerate bool) (*gst.Element, error) {
caps, err := gst.NewElement("capsfilter")
if err != nil {
return nil, errors.ErrGstPipelineError(err)
}
if includeFramerate {
err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf(
"video/x-raw,framerate=%d/1,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1",
p.Framerate, p.Width, p.Height,
b.conf.Framerate, b.conf.Width, b.conf.Height,
)))
} else {
err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf(
"video/x-raw,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1",
p.Width, p.Height,
b.conf.Width, b.conf.Height,
)))
}
if err != nil {
Expand Down
27 changes: 12 additions & 15 deletions pkg/pipeline/source/sdk/appwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,16 @@ type AppWriter struct {
*synchronizer.TrackSynchronizer

// state
state state
initialized bool
ticker *time.Ticker
muted atomic.Bool
maybeDisconnected bool
disconnected atomic.Bool
playing core.Fuse
draining core.Fuse
endStream core.Fuse
finished core.Fuse
state state
initialized bool
ticker *time.Ticker
muted atomic.Bool
lastRead time.Time
disconnected atomic.Bool
playing core.Fuse
draining core.Fuse
endStream core.Fuse
finished core.Fuse
}

func NewAppWriter(
Expand Down Expand Up @@ -266,15 +266,14 @@ func (w *AppWriter) handlePlaying() {
if err != nil {
w.handleReadError(err)
return
} else {
w.maybeDisconnected = false
}

// initialize track synchronizer
if !w.initialized {
w.Initialize(pkt)
w.initialized = true
}
w.lastRead = time.Now()

// push packet to jitter buffer
w.buffer.Push(pkt)
Expand Down Expand Up @@ -316,12 +315,10 @@ func (w *AppWriter) handleReadError(err error) {
_ = w.pushSamples()
w.state = stateReconnecting
case errors.As(err, &netErr) && netErr.Timeout():
if w.maybeDisconnected {
if time.Since(w.lastRead) > latency {
w.SetTrackDisconnected(true)
_ = w.pushSamples()
w.state = stateReconnecting
} else {
w.maybeDisconnected = true
}
default:
// log non-EOF errors
Expand Down
Loading