Skip to content

Commit

Permalink
Video selector (#472)
Browse files Browse the repository at this point in the history
* freezing

* 2 bins

* pipeline/bin

* pipeline/bin

* pipeline refactor

* fix streaming

* fix linking

* fix segments

* more pad edge cases

* segment multi broken

* fix multi

* finish merging (untested)

* video selector
  • Loading branch information
frostbyte73 authored Aug 25, 2023
1 parent 8016d39 commit 89db2a0
Show file tree
Hide file tree
Showing 13 changed files with 427 additions and 292 deletions.
2 changes: 2 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var (
ErrNoConfig = psrpc.NewErrorf(psrpc.Internal, "missing config")
ErrGhostPadFailed = psrpc.NewErrorf(psrpc.Internal, "failed to add ghost pad to bin")
ErrStreamAlreadyExists = psrpc.NewErrorf(psrpc.AlreadyExists, "stream already exists")
ErrBinAlreadyAdded = psrpc.NewErrorf(psrpc.Internal, "bin already added to pipeline")
ErrWrongHierarchy = psrpc.NewErrorf(psrpc.Internal, "pipeline can contain bins or elements, not both")
ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress")
ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found")
ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs")
Expand Down
17 changes: 17 additions & 0 deletions pkg/gstreamer/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Bin struct {
getSrcPad func(string) *gst.Pad
getSinkPad func(string) *gst.Pad

added bool
srcs []*Bin // source bins
elements []*gst.Element // elements within this bin
queues map[string]*gst.Element // used with BinTypeMultiStream
Expand All @@ -59,6 +60,14 @@ func (b *Bin) AddSourceBin(src *Bin) error {
b.mu.Lock()
defer b.mu.Unlock()

src.mu.Lock()
alreadyAdded := src.added
src.added = true
src.mu.Unlock()
if alreadyAdded {
return errors.ErrBinAlreadyAdded
}

b.srcs = append(b.srcs, src)
if err := b.pipeline.Add(src.bin.Element); err != nil {
return errors.ErrGstPipelineError(err)
Expand Down Expand Up @@ -89,6 +98,14 @@ func (b *Bin) AddSinkBin(sink *Bin) error {
b.mu.Lock()
defer b.mu.Unlock()

sink.mu.Lock()
alreadyAdded := sink.added
sink.added = true
sink.mu.Unlock()
if alreadyAdded {
return errors.ErrBinAlreadyAdded
}

b.sinks = append(b.sinks, sink)
if err := b.pipeline.Add(sink.bin.Element); err != nil {
return errors.ErrGstPipelineError(err)
Expand Down
9 changes: 5 additions & 4 deletions pkg/gstreamer/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gstreamer

import (
"sync"
"time"

"github.com/tinyzimmer/go-gst/gst"

Expand All @@ -34,7 +35,7 @@ type Callbacks struct {
// source callbacks
onTrackAdded []func(*config.TrackSource)
onTrackMuted []func(string)
onTrackUnmuted []func(string)
onTrackUnmuted []func(string, time.Duration)
onTrackRemoved []func(string)

// internal
Expand Down Expand Up @@ -101,16 +102,16 @@ func (c *Callbacks) OnTrackMuted(trackID string) {
c.mu.RUnlock()
}

func (c *Callbacks) AddOnTrackUnmuted(f func(string)) {
func (c *Callbacks) AddOnTrackUnmuted(f func(string, time.Duration)) {
c.mu.Lock()
c.onTrackUnmuted = append(c.onTrackUnmuted, f)
c.mu.Unlock()
}

func (c *Callbacks) OnTrackUnmuted(trackID string) {
func (c *Callbacks) OnTrackUnmuted(trackID string, pts time.Duration) {
c.mu.RLock()
for _, onTrackUnmuted := range c.onTrackUnmuted {
onTrackUnmuted(trackID)
onTrackUnmuted(trackID, pts)
}
c.mu.RUnlock()
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/gstreamer/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ type Pipeline struct {

loop *glib.MainLoop

started core.Fuse
running chan struct{}
binsAdded bool
elementsAdded bool
started core.Fuse
running chan struct{}
}

// A pipeline can have either elements or src and sink bins. If you add both you will get a wrong hierarchy error
Expand All @@ -54,6 +56,38 @@ func NewPipeline(name string, latency uint64, callbacks *Callbacks) (*Pipeline,
}, nil
}

func (p *Pipeline) AddSourceBin(src *Bin) error {
if p.elementsAdded {
return errors.ErrWrongHierarchy
}
p.binsAdded = true
return p.Bin.AddSourceBin(src)
}

func (p *Pipeline) AddSinkBin(sink *Bin) error {
if p.elementsAdded {
return errors.ErrWrongHierarchy
}
p.binsAdded = true
return p.Bin.AddSinkBin(sink)
}

func (p *Pipeline) AddElement(e *gst.Element) error {
if p.binsAdded {
return errors.ErrWrongHierarchy
}
p.elementsAdded = true
return p.Bin.AddElement(e)
}

func (p *Pipeline) AddElements(elements ...*gst.Element) error {
if p.binsAdded {
return errors.ErrWrongHierarchy
}
p.elementsAdded = true
return p.Bin.AddElements(elements...)
}

func (p *Pipeline) Link() error {
return p.link()
}
Expand Down
64 changes: 27 additions & 37 deletions pkg/pipeline/builder/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func buildWebAudioInput(b *gstreamer.Bin, p *config.PipelineConfig) error {
if err = pulseSrc.SetProperty("device", fmt.Sprintf("%s.monitor", p.Info.EgressId)); err != nil {
return errors.ErrGstPipelineError(err)
}

if err = b.AddElement(pulseSrc); err != nil {
return err
}
Expand All @@ -84,48 +83,40 @@ func buildWebAudioInput(b *gstreamer.Bin, p *config.PipelineConfig) error {

func buildSDKAudioInput(b *gstreamer.Bin, p *config.PipelineConfig) error {
if p.AudioTrack != nil {
appSrcBin, err := buildAudioAppSrcBin(b, p)
if err != nil {
if err := buildAudioAppSrcBin(b, p); err != nil {
return err
}
if err = b.AddSourceBin(appSrcBin); err != nil {
return err
}
}

testSrcBin, err := buildAudioTestSrcBin(b, p)
if err != nil {
return err
}
if err = b.AddSourceBin(testSrcBin); err != nil {
if err := buildAudioTestSrcBin(b, p); err != nil {
return err
}

if err = addAudioMixer(b, p); err != nil {
if err := addAudioMixer(b, p); err != nil {
return err
}
if p.AudioTranscoding {
if err = addAudioEncoder(b, p); err != nil {
if err := addAudioEncoder(b, p); err != nil {
return err
}
}

return nil
}

func buildAudioAppSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) (*gstreamer.Bin, error) {
func buildAudioAppSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) error {
track := p.AudioTrack

b := audioBin.NewBin(track.TrackID)
b.SetEOSFunc(track.EOSFunc)
if err := audioBin.AddSourceBin(b); err != nil {
return err
}

track.AppSrc.Element.SetArg("format", "time")
if err := track.AppSrc.Element.SetProperty("is-live", true); err != nil {
return nil, err
return err
}

if err := b.AddElement(track.AppSrc.Element); err != nil {
return nil, err
return err
}

switch track.MimeType {
Expand All @@ -134,61 +125,60 @@ func buildAudioAppSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) (*gs
"application/x-rtp,media=audio,payload=%d,encoding-name=OPUS,clock-rate=%d",
track.PayloadType, track.ClockRate,
))); err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}

rtpOpusDepay, err := gst.NewElement("rtpopusdepay")
if err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}

opusDec, err := gst.NewElement("opusdec")
if err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}

if err = b.AddElements(rtpOpusDepay, opusDec); err != nil {
return nil, err
return err
}

default:
return nil, errors.ErrNotSupported(string(track.MimeType))
return errors.ErrNotSupported(string(track.MimeType))
}

if err := addAudioConverter(b, p); err != nil {
return nil, err
return err
}

return b, nil
return nil
}

func buildAudioTestSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) (*gstreamer.Bin, error) {
func buildAudioTestSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) error {
b := audioBin.NewBin("audio_test_src")
if err := audioBin.AddSourceBin(b); err != nil {
return err
}

audioTestSrc, err := gst.NewElement("audiotestsrc")
if err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}
if err = audioTestSrc.SetProperty("volume", 0.0); err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}
if err = audioTestSrc.SetProperty("do-timestamp", true); err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}
if err = audioTestSrc.SetProperty("is-live", true); err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}

audioCaps, err := newAudioCapsFilter(p)
if err != nil {
return nil, err
}

if err = b.AddElements(audioTestSrc, audioCaps); err != nil {
return nil, err
return err
}

return b, nil
return b.AddElements(audioTestSrc, audioCaps)
}

func addAudioConverter(b *gstreamer.Bin, p *config.PipelineConfig) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/builder/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func BuildSegmentBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*g
return nil, errors.ErrGstPipelineError(err)
}

b.SetGetSrcPad(func(b string) *gst.Pad {
if b == "audio" {
b.SetGetSrcPad(func(name string) *gst.Pad {
if name == "audio" {
return sink.GetRequestPad("audio_%u")
} else {
return h264parse.GetStaticPad("sink")
Expand Down
Loading

0 comments on commit 89db2a0

Please sign in to comment.