diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index 7c99d0e4..799865f9 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -152,6 +152,8 @@ func (s *SDKSource) joinRoom() error { OnTrackUnmuted: s.onTrackUnmuted, OnTrackUnsubscribed: s.onTrackUnsubscribed, }, + OnReconnecting: s.onReconnecting, + OnReconnected: s.onReconnected, OnDisconnected: s.onDisconnected, } if s.RequestType == types.RequestTypeParticipant { @@ -525,19 +527,42 @@ func (s *SDKSource) onTrackFinished(trackID string) { s.callbacks.OnTrackRemoved(trackID) s.sync.RemoveTrack(trackID) } else if active == 0 { - s.onDisconnected() + s.finished() } } } func (s *SDKSource) onParticipantDisconnected(rp *lksdk.RemoteParticipant) { if rp.Identity() == s.Identity { - logger.Debugw("Participant disconnected") - s.onDisconnected() + logger.Debugw("participant disconnected") + s.finished() + } +} + +func (s *SDKSource) onReconnecting() { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, writer := range s.writers { + writer.SetTrackDisconnected(true) + } +} + +func (s *SDKSource) onReconnected() { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, writer := range s.writers { + writer.SetTrackDisconnected(false) } } func (s *SDKSource) onDisconnected() { + logger.Warnw("disconnected from room", nil) + s.finished() +} + +func (s *SDKSource) finished() { select { case <-s.endRecording: return diff --git a/pkg/pipeline/source/sdk/appwriter.go b/pkg/pipeline/source/sdk/appwriter.go index 3837a9fd..a3fb96d4 100644 --- a/pkg/pipeline/source/sdk/appwriter.go +++ b/pkg/pipeline/source/sdk/appwriter.go @@ -45,6 +45,7 @@ const ( statePlaying state = iota stateMuted stateUnmuting + stateReconnecting ) const ( @@ -72,14 +73,15 @@ type AppWriter struct { *synchronizer.TrackSynchronizer // state - state state - initialized bool - ticker *time.Ticker - muted atomic.Bool - playing core.Fuse - draining core.Fuse - endStream core.Fuse - finished core.Fuse + state state + initialized bool + ticker *time.Ticker + muted atomic.Bool + disconnected atomic.Bool + playing core.Fuse + draining core.Fuse + endStream core.Fuse + finished core.Fuse } func NewAppWriter( @@ -158,13 +160,13 @@ func (w *AppWriter) TrackID() string { func (w *AppWriter) Play() { w.playing.Break() - if w.pub.IsMuted() { + if w.pub.IsMuted() || w.disconnected.Load() { w.SetTrackMuted(true) } } func (w *AppWriter) SetTrackMuted(muted bool) { - if !w.playing.IsBroken() { + if !w.playing.IsBroken() || w.muted.Load() == muted { return } @@ -180,12 +182,27 @@ func (w *AppWriter) SetTrackMuted(muted bool) { } } +func (w *AppWriter) SetTrackDisconnected(disconnected bool) { + w.disconnected.Store(disconnected) + if disconnected { + w.logger.Debugw("track disconnected", "timestamp", time.Since(w.startTime).Seconds()) + if w.playing.IsBroken() { + w.callbacks.OnTrackMuted(w.track.ID()) + } + } else { + w.logger.Debugw("track reconnected", "timestamp", time.Since(w.startTime).Seconds()) + if w.playing.IsBroken() && !w.muted.Load() && w.sendPLI != nil { + w.sendPLI() + } + } +} + // Drain blocks until finished func (w *AppWriter) Drain(force bool) { w.draining.Once(func() { w.logger.Debugw("draining") - if force || w.muted.Load() { + if force || w.muted.Load() || w.disconnected.Load() { w.endStream.Break() } else { // wait until drainTimeout before force popping @@ -202,7 +219,7 @@ func (w *AppWriter) run() { for !w.endStream.IsBroken() { switch w.state { - case stateUnmuting, statePlaying: + case statePlaying, stateReconnecting, stateUnmuting: w.handlePlaying() case stateMuted: w.handleMuted() @@ -286,6 +303,13 @@ func (w *AppWriter) handleReadError(err error) { return } + // check if reconnecting + if w.disconnected.Load() { + _ = w.pushSamples() + w.state = stateReconnecting + return + } + // check if muted if w.muted.Load() { _ = w.pushSamples() @@ -328,7 +352,7 @@ func (w *AppWriter) pushSamples() error { return err } - if w.state == stateUnmuting { + if w.state == stateUnmuting || w.state == stateReconnecting { w.callbacks.OnTrackUnmuted(w.track.ID(), pts) w.state = statePlaying }