Skip to content

Commit

Permalink
fix bin removal again (#626)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored Mar 7, 2024
1 parent 4fb51d7 commit cb2116b
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 49 deletions.
25 changes: 9 additions & 16 deletions pkg/gstreamer/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,32 +216,25 @@ func (b *Bin) probeRemoveSource(src *Bin) {
}

srcGhostPad.AddProbe(gst.PadProbeTypeBlocking, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
if _, err := glib.IdleAdd(func() bool {
b.LockState()

if b.GetStateLocked() > StateRunning {
b.UnlockState()
return false
}

if sinkPad := sinkGhostPad.GetTarget(); sinkPad != nil {
b.elements[0].ReleaseRequestPad(sinkPad)
}
sinkPad := sinkGhostPad.GetTarget()
b.elements[0].ReleaseRequestPad(sinkPad)

srcGhostPad.Unlink(sinkGhostPad.Pad)
b.bin.RemovePad(sinkGhostPad.Pad)
b.UnlockState()
srcGhostPad.Unlink(sinkGhostPad.Pad)
b.bin.RemovePad(sinkGhostPad.Pad)

if _, err := glib.IdleAdd(func() bool {
if err := b.pipeline.Remove(src.bin.Element); err != nil {
b.OnError(err)
logger.Warnw("failed to remove bin", err, "bin", src.bin.GetName())
return false
}
if err := src.bin.SetState(gst.StateNull); err != nil {
logger.Warnw(fmt.Sprintf("failed to change %s state", src.bin.GetName()), err)
logger.Warnw("failed to change bin state", err, "bin", src.bin.GetName())
}
return false
}); err != nil {
logger.Errorw("failed to remove src bin", err, "bin", src.bin.GetName())
}

return gst.PadProbeRemove
})
}
Expand Down
22 changes: 12 additions & 10 deletions pkg/pipeline/builder/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package builder

import (
"fmt"
"math/rand"
"sync"

"github.com/go-gst/go-gst/gst"
Expand All @@ -33,15 +34,15 @@ type AudioBin struct {
bin *gstreamer.Bin
conf *config.PipelineConfig

mu sync.Mutex
tracks map[string]struct{}
mu sync.Mutex
names map[string]string
}

func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error {
b := &AudioBin{
bin: pipeline.NewBin("audio"),
conf: p,
tracks: make(map[string]struct{}),
bin: pipeline.NewBin("audio"),
conf: p,
names: make(map[string]string),
}

switch p.SourceType {
Expand Down Expand Up @@ -98,12 +99,12 @@ func (b *AudioBin) onTrackRemoved(trackID string) {
}

b.mu.Lock()
_, ok := b.tracks[trackID]
delete(b.tracks, trackID)
name, ok := b.names[trackID]
delete(b.names, trackID)
b.mu.Unlock()

if ok {
if _, err := b.bin.RemoveSourceBin(trackID); err != nil {
if _, err := b.bin.RemoveSourceBin(name); err != nil {
b.bin.OnError(err)
}
}
Expand Down Expand Up @@ -158,9 +159,10 @@ func (b *AudioBin) addAudioAppSrcBin(ts *config.TrackSource) error {
b.mu.Lock()
defer b.mu.Unlock()

b.tracks[ts.TrackID] = struct{}{}
name := fmt.Sprintf("%s_%d", ts.TrackID, rand.Int()%1000)
b.names[ts.TrackID] = name

appSrcBin := b.bin.NewBin(ts.TrackID)
appSrcBin := b.bin.NewBin(name)
appSrcBin.SetEOSFunc(func() bool {
return false
})
Expand Down
62 changes: 39 additions & 23 deletions pkg/pipeline/builder/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package builder

import (
"fmt"
"math/rand"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -46,6 +47,7 @@ type VideoBin struct {

mu sync.Mutex
pads map[string]*gst.Pad
names map[string]string
selector *gst.Element
rawVideoTee *gst.Element
}
Expand Down Expand Up @@ -132,21 +134,24 @@ func (b *VideoBin) onTrackRemoved(trackID string) {
}

b.mu.Lock()
pad := b.pads[trackID]
if pad == nil {
name, ok := b.names[trackID]
if !ok {
b.mu.Unlock()
return
}
delete(b.pads, trackID)
b.mu.Unlock()
delete(b.names, trackID)
delete(b.pads, name)

if b.selectedPad == trackID {
if err := b.setSelectorPad(videoTestSrcName); err != nil {
if b.selectedPad == name {
if err := b.setSelectorPadLocked(videoTestSrcName); err != nil {
b.mu.Unlock()
b.bin.OnError(err)
return
}
}
b.mu.Unlock()

if _, err := b.bin.RemoveSourceBin(trackID); err != nil {
if _, err := b.bin.RemoveSourceBin(name); err != nil {
b.bin.OnError(err)
}
}
Expand All @@ -156,11 +161,15 @@ func (b *VideoBin) onTrackMuted(trackID string) {
return
}

if b.selectedPad == trackID {
if err := b.setSelectorPad(videoTestSrcName); err != nil {
logger.Errorw("failed to set selector pad", err)
b.mu.Lock()
if name, ok := b.names[trackID]; ok && b.selectedPad == name {
if err := b.setSelectorPadLocked(videoTestSrcName); err != nil {
b.mu.Unlock()
b.bin.OnError(err)
return
}
}
b.mu.Unlock()
}

func (b *VideoBin) onTrackUnmuted(trackID string, pts time.Duration) {
Expand All @@ -171,12 +180,10 @@ func (b *VideoBin) onTrackUnmuted(trackID string, pts time.Duration) {
b.mu.Lock()
defer b.mu.Unlock()

if b.pads[trackID] == nil {
return
if name, ok := b.names[trackID]; ok {
b.nextPTS.Store(pts)
b.nextPad = name
}

b.nextPTS.Store(pts)
b.nextPad = trackID
}

func (b *VideoBin) buildWebInput() error {
Expand Down Expand Up @@ -229,6 +236,7 @@ func (b *VideoBin) buildWebInput() error {

func (b *VideoBin) buildSDKInput() error {
b.pads = make(map[string]*gst.Pad)
b.names = make(map[string]string)

// add selector first so pads can be created
if b.conf.VideoDecoding {
Expand Down Expand Up @@ -274,28 +282,30 @@ func (b *VideoBin) buildSDKInput() error {
}

func (b *VideoBin) addAppSrcBin(ts *config.TrackSource) error {
appSrcBin, err := b.buildAppSrcBin(ts)
name := fmt.Sprintf("%s_%d", ts.TrackID, rand.Int()%1000)

appSrcBin, err := b.buildAppSrcBin(ts, name)
if err != nil {
return err
}

if b.conf.VideoDecoding {
b.createSrcPad(ts.TrackID)
b.createSrcPad(ts.TrackID, name)
}

if err = b.bin.AddSourceBin(appSrcBin); err != nil {
return err
}

if b.conf.VideoDecoding {
return b.setSelectorPad(ts.TrackID)
return b.setSelectorPad(name)
}

return nil
}

func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource) (*gstreamer.Bin, error) {
appSrcBin := b.bin.NewBin(ts.TrackID)
func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource, name string) (*gstreamer.Bin, error) {
appSrcBin := b.bin.NewBin(name)
ts.AppSrc.Element.SetArg("format", "time")
if err := ts.AppSrc.Element.SetProperty("is-live", true); err != nil {
return nil, errors.ErrGstPipelineError(err)
Expand Down Expand Up @@ -680,7 +690,7 @@ func (b *VideoBin) getSrcPad(name string) *gst.Pad {
return b.pads[name]
}

func (b *VideoBin) createSrcPad(trackID string) {
func (b *VideoBin) createSrcPad(trackID, name string) {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -697,7 +707,9 @@ func (b *VideoBin) createSrcPad(trackID string) {
b.lastPTS.Store(pts)
return gst.PadProbeOK
})
b.pads[trackID] = pad

b.names[trackID] = name
b.pads[name] = pad
}

func (b *VideoBin) createTestSrcPad() {
Expand Down Expand Up @@ -727,11 +739,15 @@ func (b *VideoBin) createTestSrcPad() {
b.pads[videoTestSrcName] = pad
}

// TODO: go-gst should accept objects directly and handle conversion to C
func (b *VideoBin) setSelectorPad(name string) error {
b.mu.Lock()
defer b.mu.Unlock()

return b.setSelectorPadLocked(name)
}

// TODO: go-gst should accept objects directly and handle conversion to C
func (b *VideoBin) setSelectorPadLocked(name string) error {
pad := b.pads[name]

pt, err := b.selector.GetPropertyType("active-pad")
Expand Down

0 comments on commit cb2116b

Please sign in to comment.