Skip to content

Commit

Permalink
handle ICE disconnections (#535)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored Nov 9, 2023
1 parent f77a4a0 commit b399be7
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 16 deletions.
31 changes: 28 additions & 3 deletions pkg/pipeline/source/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ func (s *SDKSource) joinRoom() error {
OnTrackUnmuted: s.onTrackUnmuted,
OnTrackUnsubscribed: s.onTrackUnsubscribed,
},
OnReconnecting: s.onReconnecting,
OnReconnected: s.onReconnected,
OnDisconnected: s.onDisconnected,
}
if s.RequestType == types.RequestTypeParticipant {
Expand Down Expand Up @@ -525,19 +527,42 @@ func (s *SDKSource) onTrackFinished(trackID string) {
s.callbacks.OnTrackRemoved(trackID)
s.sync.RemoveTrack(trackID)
} else if active == 0 {
s.onDisconnected()
s.finished()
}
}
}

func (s *SDKSource) onParticipantDisconnected(rp *lksdk.RemoteParticipant) {
if rp.Identity() == s.Identity {
logger.Debugw("Participant disconnected")
s.onDisconnected()
logger.Debugw("participant disconnected")
s.finished()
}
}

func (s *SDKSource) onReconnecting() {
s.mu.RLock()
defer s.mu.RUnlock()

for _, writer := range s.writers {
writer.SetTrackDisconnected(true)
}
}

func (s *SDKSource) onReconnected() {
s.mu.RLock()
defer s.mu.RUnlock()

for _, writer := range s.writers {
writer.SetTrackDisconnected(false)
}
}

func (s *SDKSource) onDisconnected() {
logger.Warnw("disconnected from room", nil)
s.finished()
}

func (s *SDKSource) finished() {
select {
case <-s.endRecording:
return
Expand Down
50 changes: 37 additions & 13 deletions pkg/pipeline/source/sdk/appwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
statePlaying state = iota
stateMuted
stateUnmuting
stateReconnecting
)

const (
Expand Down Expand Up @@ -72,14 +73,15 @@ type AppWriter struct {
*synchronizer.TrackSynchronizer

// state
state state
initialized bool
ticker *time.Ticker
muted atomic.Bool
playing core.Fuse
draining core.Fuse
endStream core.Fuse
finished core.Fuse
state state
initialized bool
ticker *time.Ticker
muted atomic.Bool
disconnected atomic.Bool
playing core.Fuse
draining core.Fuse
endStream core.Fuse
finished core.Fuse
}

func NewAppWriter(
Expand Down Expand Up @@ -158,13 +160,13 @@ func (w *AppWriter) TrackID() string {

func (w *AppWriter) Play() {
w.playing.Break()
if w.pub.IsMuted() {
if w.pub.IsMuted() || w.disconnected.Load() {
w.SetTrackMuted(true)
}
}

func (w *AppWriter) SetTrackMuted(muted bool) {
if !w.playing.IsBroken() {
if !w.playing.IsBroken() || w.muted.Load() == muted {
return
}

Expand All @@ -180,12 +182,27 @@ func (w *AppWriter) SetTrackMuted(muted bool) {
}
}

func (w *AppWriter) SetTrackDisconnected(disconnected bool) {
w.disconnected.Store(disconnected)
if disconnected {
w.logger.Debugw("track disconnected", "timestamp", time.Since(w.startTime).Seconds())
if w.playing.IsBroken() {
w.callbacks.OnTrackMuted(w.track.ID())
}
} else {
w.logger.Debugw("track reconnected", "timestamp", time.Since(w.startTime).Seconds())
if w.playing.IsBroken() && !w.muted.Load() && w.sendPLI != nil {
w.sendPLI()
}
}
}

// Drain blocks until finished
func (w *AppWriter) Drain(force bool) {
w.draining.Once(func() {
w.logger.Debugw("draining")

if force || w.muted.Load() {
if force || w.muted.Load() || w.disconnected.Load() {
w.endStream.Break()
} else {
// wait until drainTimeout before force popping
Expand All @@ -202,7 +219,7 @@ func (w *AppWriter) run() {

for !w.endStream.IsBroken() {
switch w.state {
case stateUnmuting, statePlaying:
case statePlaying, stateReconnecting, stateUnmuting:
w.handlePlaying()
case stateMuted:
w.handleMuted()
Expand Down Expand Up @@ -286,6 +303,13 @@ func (w *AppWriter) handleReadError(err error) {
return
}

// check if reconnecting
if w.disconnected.Load() {
_ = w.pushSamples()
w.state = stateReconnecting
return
}

// check if muted
if w.muted.Load() {
_ = w.pushSamples()
Expand Down Expand Up @@ -328,7 +352,7 @@ func (w *AppWriter) pushSamples() error {
return err
}

if w.state == stateUnmuting {
if w.state == stateUnmuting || w.state == stateReconnecting {
w.callbacks.OnTrackUnmuted(w.track.ID(), pts)
w.state = statePlaying
}
Expand Down

0 comments on commit b399be7

Please sign in to comment.