Skip to content

Commit

Permalink
another attempt to fix source bin removal
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Mar 29, 2024
1 parent 2ebb8ae commit 4c3d1d5
Showing 1 changed file with 25 additions and 18 deletions.
43 changes: 25 additions & 18 deletions pkg/gstreamer/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"go.uber.org/atomic"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/logger"
Expand Down Expand Up @@ -218,33 +219,39 @@ func (b *Bin) probeRemoveSource(src *Bin) {
return
}

var removed atomic.Bool
srcPad := srcGhostPad.GetTarget()
srcPad.AddProbe(gst.PadProbeTypeAllBoth, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
if removed.Load() {
return gst.PadProbeRemove
}
return gst.PadProbeDrop
})
sinkPad := sinkGhostPad.GetTarget()
sinkPad.AddProbe(gst.PadProbeTypeBlockUpstream, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
// drop all upstream events
sinkPad.AddProbe(gst.PadProbeTypeAllBoth, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
if removed.Load() {
return gst.PadProbeRemove
}
return gst.PadProbeDrop
})

srcGhostPad.AddProbe(gst.PadProbeTypeIdle, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
if _, err := glib.IdleAdd(func() bool {
b.elements[0].ReleaseRequestPad(sinkPad)

srcGhostPad.Unlink(sinkGhostPad.Pad)
b.bin.RemovePad(sinkGhostPad.Pad)

if _, err := glib.IdleAdd(func() bool {
if err := b.pipeline.Remove(src.bin.Element); err != nil {
logger.Warnw("failed to remove bin", err, "bin", src.bin.GetName())
return false
}
if err := src.bin.SetState(gst.StateNull); err != nil {
logger.Warnw("failed to change bin state", err, "bin", src.bin.GetName())
}
removed.Store(true)
if err := b.pipeline.Remove(src.bin.Element); err != nil {
logger.Warnw("failed to remove bin", err, "bin", src.bin.GetName())
return false
}); err != nil {
logger.Errorw("failed to remove src bin", err, "bin", src.bin.GetName())
}

return gst.PadProbeRemove
})
if err := src.bin.SetState(gst.StateNull); err != nil {
logger.Warnw("failed to change bin state", err, "bin", src.bin.GetName())
return false
}
return false
}); err != nil {
logger.Errorw("failed to remove bin", err, "bin", src.bin.GetName())
}
}

func (b *Bin) probeRemoveSink(sink *Bin) {
Expand Down

0 comments on commit 4c3d1d5

Please sign in to comment.