Skip to content

Commit

Permalink
update appwriter, input selector (#721)
Browse files Browse the repository at this point in the history
* update input selector usage

* reset lastRead on unmute

* rewrite Appwriter
  • Loading branch information
frostbyte73 authored Jul 10, 2024
1 parent 0a55a3d commit 6b66782
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 273 deletions.
9 changes: 4 additions & 5 deletions pkg/gstreamer/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package gstreamer

import (
"sync"
"time"

"github.com/go-gst/go-gst/gst"

Expand All @@ -36,7 +35,7 @@ type Callbacks struct {
// source callbacks
onTrackAdded []func(*config.TrackSource)
onTrackMuted []func(string)
onTrackUnmuted []func(string, time.Duration)
onTrackUnmuted []func(string)
onTrackRemoved []func(string)

// internal
Expand Down Expand Up @@ -110,19 +109,19 @@ func (c *Callbacks) OnTrackMuted(trackID string) {
}
}

func (c *Callbacks) AddOnTrackUnmuted(f func(string, time.Duration)) {
func (c *Callbacks) AddOnTrackUnmuted(f func(string)) {
c.mu.Lock()
c.onTrackUnmuted = append(c.onTrackUnmuted, f)
c.mu.Unlock()
}

func (c *Callbacks) OnTrackUnmuted(trackID string, pts time.Duration) {
func (c *Callbacks) OnTrackUnmuted(trackID string) {
c.mu.RLock()
onTrackUnmuted := c.onTrackUnmuted
c.mu.RUnlock()

for _, f := range onTrackUnmuted {
f(trackID, pts)
f(trackID)
}
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/pipeline/builder/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *confi
if err != nil {
return nil, errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", uint64(time.Duration(c.CaptureInterval)*time.Second)); err != nil {
return nil, err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return nil, err
}
Expand Down
78 changes: 22 additions & 56 deletions pkg/pipeline/builder/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"go.uber.org/atomic"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
Expand All @@ -41,13 +40,9 @@ type VideoBin struct {
bin *gstreamer.Bin
conf *config.PipelineConfig

lastPTS atomic.Duration
nextPTS atomic.Duration
selectedPad string
nextPad string

mu sync.Mutex
nextID int
selectedPad string
pads map[string]*gst.Pad
names map[string]string
selector *gst.Element
Expand Down Expand Up @@ -174,18 +169,20 @@ func (b *VideoBin) onTrackMuted(trackID string) {
b.mu.Unlock()
}

func (b *VideoBin) onTrackUnmuted(trackID string, pts time.Duration) {
func (b *VideoBin) onTrackUnmuted(trackID string) {
if b.bin.GetState() > gstreamer.StateRunning {
return
}

b.mu.Lock()
defer b.mu.Unlock()

if name, ok := b.names[trackID]; ok {
b.nextPTS.Store(pts)
b.nextPad = name
if err := b.setSelectorPadLocked(name); err != nil {
b.mu.Unlock()
b.bin.OnError(err)
return
}
}
b.mu.Unlock()
}

func (b *VideoBin) buildWebInput() error {
Expand Down Expand Up @@ -217,9 +214,6 @@ func (b *VideoBin) buildWebInput() error {
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
}
Expand Down Expand Up @@ -497,14 +491,14 @@ func (b *VideoBin) addSelector() error {
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = inputSelector.SetProperty("drop-backwards", true); err != nil {
return errors.ErrGstPipelineError(err)
}

videoRate, err := gst.NewElement("videorate")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
}
Expand Down Expand Up @@ -663,9 +657,6 @@ func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error {
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil {
return err
}
if err = videoRate.SetProperty("skip-to-first", true); err != nil {
return err
}
Expand Down Expand Up @@ -714,49 +705,15 @@ func (b *VideoBin) createSrcPad(trackID, name string) {
b.mu.Lock()
defer b.mu.Unlock()

pad := b.selector.GetRequestPad("sink_%u")
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buffer := info.GetBuffer()
for b.nextPTS.Load() != 0 {
time.Sleep(time.Millisecond * 100)
}
pts := *buffer.PresentationTimestamp().AsDuration()
if pts < b.lastPTS.Load() {
return gst.PadProbeDrop
}
b.lastPTS.Store(pts)
return gst.PadProbeOK
})

b.names[trackID] = name
b.pads[name] = pad
b.pads[name] = b.selector.GetRequestPad("sink_%u")
}

func (b *VideoBin) createTestSrcPad() {
b.mu.Lock()
defer b.mu.Unlock()

pad := b.selector.GetRequestPad("sink_%u")
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buffer := info.GetBuffer()
pts := *buffer.PresentationTimestamp().AsDuration()
if pts < b.lastPTS.Load() {
return gst.PadProbeDrop
}
if nextPTS := b.nextPTS.Load(); nextPTS != 0 && pts >= nextPTS {
if err := b.setSelectorPad(b.nextPad); err != nil {
logger.Errorw("failed to unmute", err)
return gst.PadProbeDrop
}
b.nextPad = ""
b.nextPTS.Store(0)
}
if b.selectedPad == videoTestSrcName {
b.lastPTS.Store(pts)
}
return gst.PadProbeOK
})
b.pads[videoTestSrcName] = pad
b.pads[videoTestSrcName] = b.selector.GetRequestPad("sink_%u")
}

func (b *VideoBin) setSelectorPad(name string) error {
Expand All @@ -769,6 +726,15 @@ func (b *VideoBin) setSelectorPad(name string) error {
// TODO: go-gst should accept objects directly and handle conversion to C
func (b *VideoBin) setSelectorPadLocked(name string) error {
pad := b.pads[name]
// drop until the next keyframe
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buffer := info.GetBuffer()
if buffer.HasFlags(gst.BufferFlagDeltaUnit) {
return gst.PadProbeDrop
}
logger.Debugw("active pad changed", "name", name)
return gst.PadProbeRemove
})

pt, err := b.selector.GetPropertyType("active-pad")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (s *WebsocketSink) OnTrackMuted(_ string) {
}
}

func (s *WebsocketSink) OnTrackUnmuted(_ string, _ time.Duration) {
func (s *WebsocketSink) OnTrackUnmuted(_ string) {
if err := s.writeMutedMessage(false); err != nil {
logger.Errorw("failed to write unmute message", err)
}
Expand Down
38 changes: 3 additions & 35 deletions pkg/pipeline/source/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *SDKSource) Playing(trackID string) {
s.mu.Unlock()

if writer != nil {
writer.Play()
writer.Playing()
}
}

Expand Down Expand Up @@ -153,8 +153,6 @@ func (s *SDKSource) joinRoom() error {
OnTrackUnmuted: s.onTrackUnmuted,
OnTrackUnsubscribed: s.onTrackUnsubscribed,
},
OnReconnecting: s.onReconnecting,
OnReconnected: s.onReconnected,
OnDisconnected: s.onDisconnected,
}
if s.RequestType == types.RequestTypeParticipant {
Expand Down Expand Up @@ -537,23 +535,11 @@ func shouldSubscribe(pub lksdk.TrackPublication) bool {
}

func (s *SDKSource) onTrackMuted(pub lksdk.TrackPublication, _ lksdk.Participant) {
s.mu.Lock()
writer := s.writers[pub.SID()]
s.mu.Unlock()

if writer != nil {
writer.SetTrackMuted(true)
}
logger.Debugw("track muted", "trackID", pub.SID())
}

func (s *SDKSource) onTrackUnmuted(pub lksdk.TrackPublication, _ lksdk.Participant) {
s.mu.Lock()
writer := s.writers[pub.SID()]
s.mu.Unlock()

if writer != nil {
writer.SetTrackMuted(false)
}
logger.Debugw("track unmuted", "trackID", pub.SID())
}

func (s *SDKSource) onTrackUnsubscribed(_ *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, _ *lksdk.RemoteParticipant) {
Expand Down Expand Up @@ -586,24 +572,6 @@ func (s *SDKSource) onParticipantDisconnected(rp *lksdk.RemoteParticipant) {
}
}

func (s *SDKSource) onReconnecting() {
s.mu.RLock()
defer s.mu.RUnlock()

for _, writer := range s.writers {
writer.SetTrackDisconnected(true)
}
}

func (s *SDKSource) onReconnected() {
s.mu.RLock()
defer s.mu.RUnlock()

for _, writer := range s.writers {
writer.SetTrackDisconnected(false)
}
}

func (s *SDKSource) onDisconnected() {
logger.Warnw("disconnected from room", nil)
s.finished()
Expand Down
Loading

0 comments on commit 6b66782

Please sign in to comment.