diff --git a/pkg/gstreamer/bin.go b/pkg/gstreamer/bin.go index 8af5c218..d6b5f2f4 100644 --- a/pkg/gstreamer/bin.go +++ b/pkg/gstreamer/bin.go @@ -216,32 +216,25 @@ func (b *Bin) probeRemoveSource(src *Bin) { } srcGhostPad.AddProbe(gst.PadProbeTypeBlocking, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { - if _, err := glib.IdleAdd(func() bool { - b.LockState() - - if b.GetStateLocked() > StateRunning { - b.UnlockState() - return false - } - - if sinkPad := sinkGhostPad.GetTarget(); sinkPad != nil { - b.elements[0].ReleaseRequestPad(sinkPad) - } + sinkPad := sinkGhostPad.GetTarget() + b.elements[0].ReleaseRequestPad(sinkPad) - srcGhostPad.Unlink(sinkGhostPad.Pad) - b.bin.RemovePad(sinkGhostPad.Pad) - b.UnlockState() + srcGhostPad.Unlink(sinkGhostPad.Pad) + b.bin.RemovePad(sinkGhostPad.Pad) + if _, err := glib.IdleAdd(func() bool { if err := b.pipeline.Remove(src.bin.Element); err != nil { - b.OnError(err) + logger.Warnw("failed to remove bin", err, "bin", src.bin.GetName()) + return false } if err := src.bin.SetState(gst.StateNull); err != nil { - logger.Warnw(fmt.Sprintf("failed to change %s state", src.bin.GetName()), err) + logger.Warnw("failed to change bin state", err, "bin", src.bin.GetName()) } return false }); err != nil { logger.Errorw("failed to remove src bin", err, "bin", src.bin.GetName()) } + return gst.PadProbeRemove }) } diff --git a/pkg/pipeline/builder/audio.go b/pkg/pipeline/builder/audio.go index c942e298..d96b4bb4 100644 --- a/pkg/pipeline/builder/audio.go +++ b/pkg/pipeline/builder/audio.go @@ -16,6 +16,7 @@ package builder import ( "fmt" + "math/rand" "sync" "github.com/go-gst/go-gst/gst" @@ -33,15 +34,15 @@ type AudioBin struct { bin *gstreamer.Bin conf *config.PipelineConfig - mu sync.Mutex - tracks map[string]struct{} + mu sync.Mutex + names map[string]string } func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error { b := &AudioBin{ - bin: pipeline.NewBin("audio"), - conf: p, - tracks: make(map[string]struct{}), + bin: pipeline.NewBin("audio"), + conf: p, + names: make(map[string]string), } switch p.SourceType { @@ -98,12 +99,12 @@ func (b *AudioBin) onTrackRemoved(trackID string) { } b.mu.Lock() - _, ok := b.tracks[trackID] - delete(b.tracks, trackID) + name, ok := b.names[trackID] + delete(b.names, trackID) b.mu.Unlock() if ok { - if _, err := b.bin.RemoveSourceBin(trackID); err != nil { + if _, err := b.bin.RemoveSourceBin(name); err != nil { b.bin.OnError(err) } } @@ -158,9 +159,10 @@ func (b *AudioBin) addAudioAppSrcBin(ts *config.TrackSource) error { b.mu.Lock() defer b.mu.Unlock() - b.tracks[ts.TrackID] = struct{}{} + name := fmt.Sprintf("%s_%d", ts.TrackID, rand.Int()%1000) + b.names[ts.TrackID] = name - appSrcBin := b.bin.NewBin(ts.TrackID) + appSrcBin := b.bin.NewBin(name) appSrcBin.SetEOSFunc(func() bool { return false }) diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index d144f73a..4be16ba8 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -16,6 +16,7 @@ package builder import ( "fmt" + "math/rand" "strings" "sync" "time" @@ -46,6 +47,7 @@ type VideoBin struct { mu sync.Mutex pads map[string]*gst.Pad + names map[string]string selector *gst.Element rawVideoTee *gst.Element } @@ -132,21 +134,24 @@ func (b *VideoBin) onTrackRemoved(trackID string) { } b.mu.Lock() - pad := b.pads[trackID] - if pad == nil { + name, ok := b.names[trackID] + if !ok { b.mu.Unlock() return } - delete(b.pads, trackID) - b.mu.Unlock() + delete(b.names, trackID) + delete(b.pads, name) - if b.selectedPad == trackID { - if err := b.setSelectorPad(videoTestSrcName); err != nil { + if b.selectedPad == name { + if err := b.setSelectorPadLocked(videoTestSrcName); err != nil { + b.mu.Unlock() b.bin.OnError(err) + return } } + b.mu.Unlock() - if _, err := b.bin.RemoveSourceBin(trackID); err != nil { + if _, err := b.bin.RemoveSourceBin(name); err != nil { b.bin.OnError(err) } } @@ -156,11 +161,15 @@ func (b *VideoBin) onTrackMuted(trackID string) { return } - if b.selectedPad == trackID { - if err := b.setSelectorPad(videoTestSrcName); err != nil { - logger.Errorw("failed to set selector pad", err) + b.mu.Lock() + if name, ok := b.names[trackID]; ok && b.selectedPad == name { + if err := b.setSelectorPadLocked(videoTestSrcName); err != nil { + b.mu.Unlock() + b.bin.OnError(err) + return } } + b.mu.Unlock() } func (b *VideoBin) onTrackUnmuted(trackID string, pts time.Duration) { @@ -171,12 +180,10 @@ func (b *VideoBin) onTrackUnmuted(trackID string, pts time.Duration) { b.mu.Lock() defer b.mu.Unlock() - if b.pads[trackID] == nil { - return + if name, ok := b.names[trackID]; ok { + b.nextPTS.Store(pts) + b.nextPad = name } - - b.nextPTS.Store(pts) - b.nextPad = trackID } func (b *VideoBin) buildWebInput() error { @@ -229,6 +236,7 @@ func (b *VideoBin) buildWebInput() error { func (b *VideoBin) buildSDKInput() error { b.pads = make(map[string]*gst.Pad) + b.names = make(map[string]string) // add selector first so pads can be created if b.conf.VideoDecoding { @@ -274,13 +282,15 @@ func (b *VideoBin) buildSDKInput() error { } func (b *VideoBin) addAppSrcBin(ts *config.TrackSource) error { - appSrcBin, err := b.buildAppSrcBin(ts) + name := fmt.Sprintf("%s_%d", ts.TrackID, rand.Int()%1000) + + appSrcBin, err := b.buildAppSrcBin(ts, name) if err != nil { return err } if b.conf.VideoDecoding { - b.createSrcPad(ts.TrackID) + b.createSrcPad(ts.TrackID, name) } if err = b.bin.AddSourceBin(appSrcBin); err != nil { @@ -288,14 +298,14 @@ func (b *VideoBin) addAppSrcBin(ts *config.TrackSource) error { } if b.conf.VideoDecoding { - return b.setSelectorPad(ts.TrackID) + return b.setSelectorPad(name) } return nil } -func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource) (*gstreamer.Bin, error) { - appSrcBin := b.bin.NewBin(ts.TrackID) +func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource, name string) (*gstreamer.Bin, error) { + appSrcBin := b.bin.NewBin(name) ts.AppSrc.Element.SetArg("format", "time") if err := ts.AppSrc.Element.SetProperty("is-live", true); err != nil { return nil, errors.ErrGstPipelineError(err) @@ -680,7 +690,7 @@ func (b *VideoBin) getSrcPad(name string) *gst.Pad { return b.pads[name] } -func (b *VideoBin) createSrcPad(trackID string) { +func (b *VideoBin) createSrcPad(trackID, name string) { b.mu.Lock() defer b.mu.Unlock() @@ -697,7 +707,9 @@ func (b *VideoBin) createSrcPad(trackID string) { b.lastPTS.Store(pts) return gst.PadProbeOK }) - b.pads[trackID] = pad + + b.names[trackID] = name + b.pads[name] = pad } func (b *VideoBin) createTestSrcPad() { @@ -727,11 +739,15 @@ func (b *VideoBin) createTestSrcPad() { b.pads[videoTestSrcName] = pad } -// TODO: go-gst should accept objects directly and handle conversion to C func (b *VideoBin) setSelectorPad(name string) error { b.mu.Lock() defer b.mu.Unlock() + return b.setSelectorPadLocked(name) +} + +// TODO: go-gst should accept objects directly and handle conversion to C +func (b *VideoBin) setSelectorPadLocked(name string) error { pad := b.pads[name] pt, err := b.selector.GetPropertyType("active-pad")