Skip to content

Commit

Permalink
Close unhandled rtcp simulcast streams
Browse files Browse the repository at this point in the history
handleIncomingSSRC will call streamsForSSRC which
opens rtp/rtcp streams that if unhandled can be
leaked resources. Now we will proactively open
them before calling handleIncomingSSRC and close
then later. In the future it would be better to
do this inside handleIncomingSSRC to protect other
callers.
  • Loading branch information
edaniels committed Jul 23, 2024
1 parent a598bab commit 69cd4e4
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
14 changes: 10 additions & 4 deletions dtlstransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type DTLSTransport struct {

srtpSession, srtcpSession atomic.Value
srtpEndpoint, srtcpEndpoint *mux.Endpoint
simulcastStreams []*srtp.ReadStreamSRTP
simulcastStreams []simulcastStreamPair
srtpReady chan struct{}

dtlsMatcher mux.MatchFunc
Expand All @@ -59,6 +59,11 @@ type DTLSTransport struct {
log logging.LeveledLogger
}

type simulcastStreamPair struct {
srtp *srtp.ReadStreamSRTP
srtcp *srtp.ReadStreamSRTCP
}

// NewDTLSTransport creates a new DTLSTransport.
// This constructor is part of the ORTC API. It is not
// meant to be used together with the basic WebRTC API.
Expand Down Expand Up @@ -433,7 +438,8 @@ func (t *DTLSTransport) Stop() error {
}

for i := range t.simulcastStreams {
closeErrs = append(closeErrs, t.simulcastStreams[i].Close())
closeErrs = append(closeErrs, t.simulcastStreams[i].srtp.Close())
closeErrs = append(closeErrs, t.simulcastStreams[i].srtcp.Close())
}

if t.conn != nil {
Expand Down Expand Up @@ -474,11 +480,11 @@ func (t *DTLSTransport) ensureICEConn() error {
return nil
}

func (t *DTLSTransport) storeSimulcastStream(s *srtp.ReadStreamSRTP) {
func (t *DTLSTransport) storeSimulcastStream(srtpReadStream *srtp.ReadStreamSRTP, srtcpReadStream *srtp.ReadStreamSRTCP) {
t.lock.Lock()
defer t.lock.Unlock()

t.simulcastStreams = append(t.simulcastStreams, s)
t.simulcastStreams = append(t.simulcastStreams, simulcastStreamPair{srtpReadStream, srtcpReadStream})
}

func (t *DTLSTransport) streamsForSSRC(ssrc SSRC, streamInfo interceptor.StreamInfo) (*srtp.ReadStreamSRTP, interceptor.RTPReader, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) {
Expand Down
26 changes: 21 additions & 5 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1670,26 +1670,42 @@ func (pc *PeerConnection) undeclaredRTPMediaProcessor() {
return
}

stream, ssrc, err := srtpSession.AcceptStream()
srtcpSession, err := pc.dtlsTransport.getSRTCPSession()
if err != nil {
pc.log.Warnf("undeclaredMediaProcessor failed to open SrtcpSession: %v", err)
return
}

srtpReadStream, ssrc, err := srtpSession.AcceptStream()
if err != nil {
pc.log.Warnf("Failed to accept RTP %v", err)
return
}

// open accompanying srtcp stream
srtcpReadStream, err := srtcpSession.OpenReadStream(ssrc)
if err != nil {
pc.log.Warnf("Failed to open RTCP stream for %d: %v", ssrc, err)
return
}

if pc.isClosed.get() {
if err = stream.Close(); err != nil {
if err = srtpReadStream.Close(); err != nil {
pc.log.Warnf("Failed to close RTP stream %v", err)
}
if err = srtcpReadStream.Close(); err != nil {
pc.log.Warnf("Failed to close RTCP stream %v", err)
}
continue
}

pc.dtlsTransport.storeSimulcastStream(srtpReadStream, srtcpReadStream)

if ssrc == 0 {
go pc.handleNonMediaBandwidthProbe()
continue
}

pc.dtlsTransport.storeSimulcastStream(stream)

if atomic.AddUint64(&simulcastRoutineCount, 1) >= simulcastMaxProbeRoutines {
atomic.AddUint64(&simulcastRoutineCount, ^uint64(0))
pc.log.Warn(ErrSimulcastProbeOverflow.Error())
Expand All @@ -1701,7 +1717,7 @@ func (pc *PeerConnection) undeclaredRTPMediaProcessor() {
pc.log.Errorf(incomingUnhandledRTPSsrc, ssrc, err)
}
atomic.AddUint64(&simulcastRoutineCount, ^uint64(0))
}(stream, SSRC(ssrc))
}(srtpReadStream, SSRC(ssrc))
}
}

Expand Down

0 comments on commit 69cd4e4

Please sign in to comment.