From e04239b79c98d5de70cd590995e6a3b62f837ef7 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Sun, 1 Oct 2023 22:28:19 +0800 Subject: [PATCH 1/2] Fix data race of RTX packet Fix data race of RTX packet --- rtpreceiver.go | 21 +++++++++++++++++++-- track_remote.go | 1 + 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/rtpreceiver.go b/rtpreceiver.go index 5bccd96c8af..b1d35e15038 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -43,6 +43,15 @@ type trackStreams struct { type rtxPacketWithAttributes struct { pkt []byte attributes interceptor.Attributes + pool *sync.Pool +} + +func (p *rtxPacketWithAttributes) release() { + if p.pkt != nil { + b := p.pkt[:cap(p.pkt)] + p.pool.Put(b) // nolint:staticcheck + p.pkt = nil + } } // RTPReceiver allows an application to inspect the receipt of a TrackRemote @@ -59,6 +68,8 @@ type RTPReceiver struct { // A reference to the associated api object api *API + + rtxPool sync.Pool } // NewRTPReceiver constructs a new RTPReceiver @@ -74,6 +85,9 @@ func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RT closed: make(chan interface{}), received: make(chan interface{}), tracks: []trackStreams{}, + rtxPool: sync.Pool{New: func() interface{} { + return make([]byte, api.settingEngine.getReceiveMTU()) + }}, } return r, nil @@ -420,10 +434,11 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep track.repairStreamChannel = make(chan rtxPacketWithAttributes) go func() { - b := make([]byte, r.api.settingEngine.getReceiveMTU()) for { + b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert i, attributes, err := track.repairInterceptor.Read(b, nil) if err != nil { + r.rtxPool.Put(b) // nolint:staticcheck return } @@ -444,6 +459,7 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep if i-int(headerLength)-paddingLength < 2 { // BWE probe packet, ignore + r.rtxPool.Put(b) // nolint:staticcheck continue } @@ -459,8 +475,9 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep select { case <-r.closed: + r.rtxPool.Put(b) // nolint:staticcheck return - case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes}: + case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes, pool: &r.rtxPool}: } } }() diff --git a/track_remote.go b/track_remote.go index b32724fff91..dfdb12b9594 100644 --- a/track_remote.go +++ b/track_remote.go @@ -131,6 +131,7 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes, if rtxPacketReceived := r.readRTX(t); rtxPacketReceived != nil { n = copy(b, rtxPacketReceived.pkt) attributes = rtxPacketReceived.attributes + rtxPacketReceived.release() err = nil } else { // If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return From 4e0586d72ed08c66d30aa101571a28b01b256d36 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Sun, 1 Oct 2023 14:36:11 +0000 Subject: [PATCH 2/2] Update AUTHORS.txt --- AUTHORS.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/AUTHORS.txt b/AUTHORS.txt index 31e930dcc92..d2e063a9250 100644 --- a/AUTHORS.txt +++ b/AUTHORS.txt @@ -8,6 +8,7 @@ Aaron Boushley Aaron France Adam Kiss Aditya Kumar +Adrian Cable <6544927+adriancable@users.noreply.github.com> Adrian Cable adwpc aggresss @@ -200,6 +201,7 @@ Steffen Vogel stephanrotolante streamer45 Suhas Gaddam +sukun Suzuki Takeo sylba2050 Tarrence van As