Skip to content

Commit

Permalink
Fix(SFU): Prevent multiple receiver close calls on simulcast
Browse files Browse the repository at this point in the history
  • Loading branch information
OrlandoCo committed Jan 7, 2021
1 parent cfd4eaf commit ad7ca44
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 22 deletions.
5 changes: 3 additions & 2 deletions pkg/sfu/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type Receiver interface {
// WebRTCReceiver receives a video track
type WebRTCReceiver struct {
sync.Mutex
rtcpMu sync.RWMutex
rtcpMu sync.RWMutex
closeOnce sync.Once

peerID string
trackID string
Expand Down Expand Up @@ -215,7 +216,7 @@ func (w *WebRTCReceiver) writeRTP(layer int) {
w.closeTracks(layer)
w.nackWorker.Stop()
if w.onCloseHandler != nil {
w.onCloseHandler()
w.closeOnce.Do(w.onCloseHandler)
}
}()
for pkt := range w.buffers[layer].PacketChan() {
Expand Down
31 changes: 16 additions & 15 deletions pkg/sfu/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,19 @@ func (r *router) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRe
}
})

recv := r.receivers[trackID]
if recv == nil {
recv, ok := r.receivers[trackID]
if !ok {
recv = NewWebRTCReceiver(receiver, track, r.id)
r.receivers[trackID] = recv
recv.SetRTCPCh(r.rtcpCh)
recv.OnCloseHandler(func() {
if r.config.WithStats {
if track.Kind() == webrtc.RTPCodecTypeVideo {
stats.VideoTracks.Dec()
} else {
stats.AudioTracks.Dec()
}
}
r.deleteReceiver(trackID, uint32(track.SSRC()))
})
publish = true
Expand Down Expand Up @@ -217,11 +224,13 @@ func (r *router) addDownTrack(sub *Subscriber, recv Receiver) error {

// nolint:scopelint
downTrack.OnCloseHandler(func() {
if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {
log.Errorf("Error closing down track: %v", err)
} else {
sub.RemoveDownTrack(recv.StreamID(), downTrack)
sub.negotiate()
if sub.pc.ConnectionState() != webrtc.PeerConnectionStateClosed {
if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {
log.Errorf("Error closing down track: %v", err)
} else {
sub.RemoveDownTrack(recv.StreamID(), downTrack)
sub.negotiate()
}
}
})

Expand All @@ -236,14 +245,6 @@ func (r *router) addDownTrack(sub *Subscriber, recv Receiver) error {

func (r *router) deleteReceiver(track string, ssrc uint32) {
r.Lock()
if r.config.WithStats {
if r.receivers[track].Kind() == webrtc.RTPCodecTypeVideo {
stats.VideoTracks.Dec()
} else {
stats.AudioTracks.Dec()
}
}

delete(r.receivers, track)
delete(r.stats, ssrc)
r.Unlock()
Expand Down
12 changes: 7 additions & 5 deletions pkg/sfu/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,12 @@ func (s *Subscriber) RemoveDownTrack(streamID string, downTrack *DownTrack) {
idx = i
}
}
dts[idx] = dts[len(dts)-1]
dts[len(dts)-1] = nil
dts = dts[:len(dts)-1]
s.tracks[streamID] = dts
if idx >= 0 {
dts[idx] = dts[len(dts)-1]
dts[len(dts)-1] = nil
dts = dts[:len(dts)-1]
s.tracks[streamID] = dts
}
}
}

Expand Down Expand Up @@ -193,7 +195,7 @@ func (s *Subscriber) downTracksReports() {
for {
time.Sleep(5 * time.Second)

if s.pc.ConnectionState() == webrtc.ICETransportStateClosed {
if s.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
return
}

Expand Down

0 comments on commit ad7ca44

Please sign in to comment.