diff --git a/pkg/gstreamer/callbacks.go b/pkg/gstreamer/callbacks.go index b88bd286..c91af663 100644 --- a/pkg/gstreamer/callbacks.go +++ b/pkg/gstreamer/callbacks.go @@ -16,7 +16,6 @@ package gstreamer import ( "sync" - "time" "github.com/go-gst/go-gst/gst" @@ -36,7 +35,7 @@ type Callbacks struct { // source callbacks onTrackAdded []func(*config.TrackSource) onTrackMuted []func(string) - onTrackUnmuted []func(string, time.Duration) + onTrackUnmuted []func(string) onTrackRemoved []func(string) // internal @@ -110,19 +109,19 @@ func (c *Callbacks) OnTrackMuted(trackID string) { } } -func (c *Callbacks) AddOnTrackUnmuted(f func(string, time.Duration)) { +func (c *Callbacks) AddOnTrackUnmuted(f func(string)) { c.mu.Lock() c.onTrackUnmuted = append(c.onTrackUnmuted, f) c.mu.Unlock() } -func (c *Callbacks) OnTrackUnmuted(trackID string, pts time.Duration) { +func (c *Callbacks) OnTrackUnmuted(trackID string) { c.mu.RLock() onTrackUnmuted := c.onTrackUnmuted c.mu.RUnlock() for _, f := range onTrackUnmuted { - f(trackID, pts) + f(trackID) } } diff --git a/pkg/pipeline/builder/image.go b/pkg/pipeline/builder/image.go index 9eb4c678..e58f3cc1 100644 --- a/pkg/pipeline/builder/image.go +++ b/pkg/pipeline/builder/image.go @@ -82,9 +82,6 @@ func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *confi if err != nil { return nil, errors.ErrGstPipelineError(err) } - if err = videoRate.SetProperty("max-duplication-time", uint64(time.Duration(c.CaptureInterval)*time.Second)); err != nil { - return nil, err - } if err = videoRate.SetProperty("skip-to-first", true); err != nil { return nil, err } diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 3adb6ff7..7e764337 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -23,7 +23,6 @@ import ( "github.com/go-gst/go-glib/glib" "github.com/go-gst/go-gst/gst" - "go.uber.org/atomic" "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" @@ -41,13 +40,9 @@ type VideoBin struct { bin *gstreamer.Bin conf *config.PipelineConfig - lastPTS atomic.Duration - nextPTS atomic.Duration - selectedPad string - nextPad string - mu sync.Mutex nextID int + selectedPad string pads map[string]*gst.Pad names map[string]string selector *gst.Element @@ -174,18 +169,20 @@ func (b *VideoBin) onTrackMuted(trackID string) { b.mu.Unlock() } -func (b *VideoBin) onTrackUnmuted(trackID string, pts time.Duration) { +func (b *VideoBin) onTrackUnmuted(trackID string) { if b.bin.GetState() > gstreamer.StateRunning { return } b.mu.Lock() - defer b.mu.Unlock() - if name, ok := b.names[trackID]; ok { - b.nextPTS.Store(pts) - b.nextPad = name + if err := b.setSelectorPadLocked(name); err != nil { + b.mu.Unlock() + b.bin.OnError(err) + return + } } + b.mu.Unlock() } func (b *VideoBin) buildWebInput() error { @@ -217,9 +214,6 @@ func (b *VideoBin) buildWebInput() error { if err != nil { return errors.ErrGstPipelineError(err) } - if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil { - return err - } if err = videoRate.SetProperty("skip-to-first", true); err != nil { return err } @@ -497,14 +491,14 @@ func (b *VideoBin) addSelector() error { if err != nil { return errors.ErrGstPipelineError(err) } + if err = inputSelector.SetProperty("drop-backwards", true); err != nil { + return errors.ErrGstPipelineError(err) + } videoRate, err := gst.NewElement("videorate") if err != nil { return errors.ErrGstPipelineError(err) } - if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil { - return err - } if err = videoRate.SetProperty("skip-to-first", true); err != nil { return err } @@ -663,9 +657,6 @@ func (b *VideoBin) addVideoConverter(bin *gstreamer.Bin) error { if err != nil { return errors.ErrGstPipelineError(err) } - if err = videoRate.SetProperty("max-duplication-time", config.Latency); err != nil { - return err - } if err = videoRate.SetProperty("skip-to-first", true); err != nil { return err } @@ -714,49 +705,15 @@ func (b *VideoBin) createSrcPad(trackID, name string) { b.mu.Lock() defer b.mu.Unlock() - pad := b.selector.GetRequestPad("sink_%u") - pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { - buffer := info.GetBuffer() - for b.nextPTS.Load() != 0 { - time.Sleep(time.Millisecond * 100) - } - pts := *buffer.PresentationTimestamp().AsDuration() - if pts < b.lastPTS.Load() { - return gst.PadProbeDrop - } - b.lastPTS.Store(pts) - return gst.PadProbeOK - }) - b.names[trackID] = name - b.pads[name] = pad + b.pads[name] = b.selector.GetRequestPad("sink_%u") } func (b *VideoBin) createTestSrcPad() { b.mu.Lock() defer b.mu.Unlock() - pad := b.selector.GetRequestPad("sink_%u") - pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { - buffer := info.GetBuffer() - pts := *buffer.PresentationTimestamp().AsDuration() - if pts < b.lastPTS.Load() { - return gst.PadProbeDrop - } - if nextPTS := b.nextPTS.Load(); nextPTS != 0 && pts >= nextPTS { - if err := b.setSelectorPad(b.nextPad); err != nil { - logger.Errorw("failed to unmute", err) - return gst.PadProbeDrop - } - b.nextPad = "" - b.nextPTS.Store(0) - } - if b.selectedPad == videoTestSrcName { - b.lastPTS.Store(pts) - } - return gst.PadProbeOK - }) - b.pads[videoTestSrcName] = pad + b.pads[videoTestSrcName] = b.selector.GetRequestPad("sink_%u") } func (b *VideoBin) setSelectorPad(name string) error { @@ -769,6 +726,15 @@ func (b *VideoBin) setSelectorPad(name string) error { // TODO: go-gst should accept objects directly and handle conversion to C func (b *VideoBin) setSelectorPadLocked(name string) error { pad := b.pads[name] + // drop until the next keyframe + pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { + buffer := info.GetBuffer() + if buffer.HasFlags(gst.BufferFlagDeltaUnit) { + return gst.PadProbeDrop + } + logger.Debugw("active pad changed", "name", name) + return gst.PadProbeRemove + }) pt, err := b.selector.GetPropertyType("active-pad") if err != nil { diff --git a/pkg/pipeline/sink/websocket.go b/pkg/pipeline/sink/websocket.go index a9e72db2..486191a7 100644 --- a/pkg/pipeline/sink/websocket.go +++ b/pkg/pipeline/sink/websocket.go @@ -169,7 +169,7 @@ func (s *WebsocketSink) OnTrackMuted(_ string) { } } -func (s *WebsocketSink) OnTrackUnmuted(_ string, _ time.Duration) { +func (s *WebsocketSink) OnTrackUnmuted(_ string) { if err := s.writeMutedMessage(false); err != nil { logger.Errorw("failed to write unmute message", err) } diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index 724314cd..c438a6b5 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -105,7 +105,7 @@ func (s *SDKSource) Playing(trackID string) { s.mu.Unlock() if writer != nil { - writer.Play() + writer.Playing() } } @@ -153,8 +153,6 @@ func (s *SDKSource) joinRoom() error { OnTrackUnmuted: s.onTrackUnmuted, OnTrackUnsubscribed: s.onTrackUnsubscribed, }, - OnReconnecting: s.onReconnecting, - OnReconnected: s.onReconnected, OnDisconnected: s.onDisconnected, } if s.RequestType == types.RequestTypeParticipant { @@ -537,23 +535,11 @@ func shouldSubscribe(pub lksdk.TrackPublication) bool { } func (s *SDKSource) onTrackMuted(pub lksdk.TrackPublication, _ lksdk.Participant) { - s.mu.Lock() - writer := s.writers[pub.SID()] - s.mu.Unlock() - - if writer != nil { - writer.SetTrackMuted(true) - } + logger.Debugw("track muted", "trackID", pub.SID()) } func (s *SDKSource) onTrackUnmuted(pub lksdk.TrackPublication, _ lksdk.Participant) { - s.mu.Lock() - writer := s.writers[pub.SID()] - s.mu.Unlock() - - if writer != nil { - writer.SetTrackMuted(false) - } + logger.Debugw("track unmuted", "trackID", pub.SID()) } func (s *SDKSource) onTrackUnsubscribed(_ *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, _ *lksdk.RemoteParticipant) { @@ -586,24 +572,6 @@ func (s *SDKSource) onParticipantDisconnected(rp *lksdk.RemoteParticipant) { } } -func (s *SDKSource) onReconnecting() { - s.mu.RLock() - defer s.mu.RUnlock() - - for _, writer := range s.writers { - writer.SetTrackDisconnected(true) - } -} - -func (s *SDKSource) onReconnected() { - s.mu.RLock() - defer s.mu.RUnlock() - - for _, writer := range s.writers { - writer.SetTrackDisconnected(false) - } -} - func (s *SDKSource) onDisconnected() { logger.Warnw("disconnected from room", nil) s.finished() diff --git a/pkg/pipeline/source/sdk/appwriter.go b/pkg/pipeline/source/sdk/appwriter.go index 1f8095fb..d5d021f2 100644 --- a/pkg/pipeline/source/sdk/appwriter.go +++ b/pkg/pipeline/source/sdk/appwriter.go @@ -39,15 +39,6 @@ import ( "github.com/livekit/server-sdk-go/v2/pkg/synchronizer" ) -type state int - -const ( - statePlaying state = iota - stateMuted - stateUnmuting - stateReconnecting -) - const ( latency = time.Second * 2 drainTimeout = time.Second * 4 @@ -74,16 +65,12 @@ type AppWriter struct { *synchronizer.TrackSynchronizer // state - state state - initialized bool - ticker *time.Ticker - muted atomic.Bool - lastRead time.Time - disconnected atomic.Bool - playing core.Fuse - draining core.Fuse - endStream core.Fuse - finished core.Fuse + lastRead time.Time + active atomic.Bool + playing core.Fuse + draining core.Fuse + endStream core.Fuse + finished core.Fuse } func NewAppWriter( @@ -154,104 +141,35 @@ func NewAppWriter( jitter.WithLogger(w.logger), ) - go w.run() + go w.start() return w, nil } -func (w *AppWriter) TrackID() string { - return w.track.ID() -} - -func (w *AppWriter) Play() { - w.playing.Break() - if w.pub.IsMuted() || w.disconnected.Load() { - w.SetTrackMuted(true) - } -} - -func (w *AppWriter) SetTrackMuted(muted bool) { - if !w.playing.IsBroken() || w.muted.Load() == muted { - return - } - - w.muted.Store(muted) - if muted { - w.logger.Debugw("track muted", "timestamp", time.Since(w.startTime).Seconds()) - w.callbacks.OnTrackMuted(w.track.ID()) - } else { - w.logger.Debugw("track unmuted", "timestamp", time.Since(w.startTime).Seconds()) - if w.sendPLI != nil { - w.sendPLI() - } - } -} - -func (w *AppWriter) SetTrackDisconnected(disconnected bool) { - if w.disconnected.Swap(disconnected) == disconnected { - return - } - - if disconnected { - w.logger.Debugw("track disconnected", "timestamp", time.Since(w.startTime).Seconds()) - if w.playing.IsBroken() { - w.callbacks.OnTrackMuted(w.track.ID()) - } - } else { - w.logger.Debugw("track reconnected", "timestamp", time.Since(w.startTime).Seconds()) - if w.playing.IsBroken() && !w.muted.Load() && w.sendPLI != nil { - w.sendPLI() - } - } -} - -// Drain blocks until finished -func (w *AppWriter) Drain(force bool) { - w.draining.Once(func() { - w.logger.Debugw("draining") - - if force || w.muted.Load() || w.disconnected.Load() { - w.endStream.Break() - } else { - // wait until drainTimeout before force popping - time.AfterFunc(drainTimeout, w.endStream.Break) - } - }) - - // wait until finished - <-w.finished.Watch() -} - -func (w *AppWriter) run() { +func (w *AppWriter) start() { w.startTime = time.Now() + w.active.Store(true) for !w.endStream.IsBroken() { - switch w.state { - case statePlaying, stateReconnecting, stateUnmuting: - w.handlePlaying() - case stateMuted: - w.handleMuted() - } + w.readNext() } // clean up - _ = w.pushSamples() if w.playing.IsBroken() { if flow := w.src.EndStream(); flow != gst.FlowOK && flow != gst.FlowFlushing { w.logger.Errorw("unexpected flow return", nil, "flowReturn", flow.String()) } } + w.draining.Break() + stats := w.GetTrackStats() loss := w.buffer.PacketLoss() - - w.draining.Break() w.logger.Infow("writer finished", "sampleDuration", fmt.Sprint(w.GetFrameDuration()), "avgDrift", fmt.Sprint(time.Duration(stats.AvgDrift)), "maxDrift", fmt.Sprint(stats.MaxDrift), "packetLoss", fmt.Sprintf("%.2f%%", loss*100), ) - if w.logFile != nil { _ = w.logFile.Close() } @@ -259,84 +177,68 @@ func (w *AppWriter) run() { w.finished.Break() } -func (w *AppWriter) handlePlaying() { - // read next packet +func (w *AppWriter) readNext() { _ = w.track.SetReadDeadline(time.Now().Add(time.Millisecond * 500)) pkt, _, err := w.track.ReadRTP() if err != nil { - w.handleReadError(err) + var netErr net.Error + switch { + case w.draining.IsBroken(): + w.endStream.Break() + case errors.As(err, &netErr) && netErr.Timeout(): + if !w.active.Load() { + return + } + timeout := w.lastRead + if timeout.IsZero() { + timeout = w.startTime + } + if w.pub.IsMuted() || time.Since(timeout) > latency { + // set track inactive + w.logger.Debugw("track inactive", "timestamp", time.Since(w.startTime)) + w.active.Store(false) + w.callbacks.OnTrackMuted(w.track.ID()) + } + case err.Error() == errBufferTooSmall: + w.logger.Warnw("read error", err) + default: + if !errors.Is(err, io.EOF) { + w.logger.Errorw("could not read packet", err) + } + w.endStream.Break() + } return } - // initialize track synchronizer - if !w.initialized { + // initialize on first packet + if w.lastRead.IsZero() { w.Initialize(pkt) - w.initialized = true } w.lastRead = time.Now() + if !w.active.Swap(true) { + // set track active + w.logger.Debugw("track active", "timestamp", time.Since(w.startTime)) + w.callbacks.OnTrackUnmuted(w.track.ID()) + if w.sendPLI != nil { + w.sendPLI() + } + } // push packet to jitter buffer w.buffer.Push(pkt) + // buffers can only be pushed to the appsrc while in the playing state + if !w.playing.IsBroken() { + return + } + // push completed packets to appsrc if err = w.pushSamples(); err != nil { w.draining.Once(w.endStream.Break) } } -func (w *AppWriter) handleMuted() { - switch { - case w.draining.IsBroken(): - w.ticker.Stop() - w.endStream.Break() - - case !w.muted.Load(): - w.ticker.Stop() - w.state = stateUnmuting - - default: - <-w.ticker.C - } -} - -func (w *AppWriter) handleReadError(err error) { - var netErr net.Error - - switch { - case w.draining.IsBroken(): - w.endStream.Break() - case err.Error() == errBufferTooSmall: - w.logger.Warnw("read error", err) - case w.muted.Load(): - _ = w.pushSamples() - w.ticker = time.NewTicker(w.GetFrameDuration()) - w.state = stateMuted - case w.disconnected.Load(): - _ = w.pushSamples() - w.state = stateReconnecting - case errors.As(err, &netErr) && netErr.Timeout(): - if time.Since(w.lastRead) > latency { - w.SetTrackDisconnected(true) - _ = w.pushSamples() - w.state = stateReconnecting - } - default: - // log non-EOF errors - if !errors.Is(err, io.EOF) { - w.logger.Errorw("could not read packet", err) - } - - // end stream - w.endStream.Break() - } -} - func (w *AppWriter) pushSamples() error { - // buffers can only be pushed to the appsrc while in the playing state - if !w.playing.IsBroken() { - return nil - } - pkts := w.buffer.Pop(false) for _, pkt := range pkts { sn := pkt.SequenceNumber @@ -361,35 +263,38 @@ func (w *AppWriter) pushSamples() error { )) } - if w.state == stateUnmuting { - w.callbacks.OnTrackUnmuted(w.track.ID(), pts) - w.state = statePlaying - } else if w.state == stateReconnecting { - w.SetTrackDisconnected(false) - w.callbacks.OnTrackUnmuted(w.track.ID(), pts) - w.state = statePlaying + p, err := pkt.Marshal() + if err != nil { + w.logger.Errorw("could not marshal packet", err) + return err } - if err = w.pushPacket(pkt, pts); err != nil { - return err + b := gst.NewBufferFromBytes(p) + b.SetPresentationTimestamp(gst.ClockTime(uint64(pts))) + if flow := w.src.PushBuffer(b); flow != gst.FlowOK { + w.logger.Infow("unexpected flow return", "flow", flow) } } return nil } -func (w *AppWriter) pushPacket(pkt *rtp.Packet, pts time.Duration) error { - p, err := pkt.Marshal() - if err != nil { - w.logger.Errorw("could not marshal packet", err) - return err - } +func (w *AppWriter) Playing() { + w.playing.Break() +} - b := gst.NewBufferFromBytes(p) - b.SetPresentationTimestamp(gst.ClockTime(uint64(pts))) - if flow := w.src.PushBuffer(b); flow != gst.FlowOK { - w.logger.Infow("unexpected flow return", "flow", flow) - } +// Drain blocks until finished +func (w *AppWriter) Drain(force bool) { + w.draining.Once(func() { + w.logger.Debugw("draining") - return nil + if force || !w.active.Load() { + w.endStream.Break() + } else { + time.AfterFunc(drainTimeout, w.endStream.Break) + } + }) + + // wait until finished + <-w.finished.Watch() }