From 219c6a35ced73c793e8d822272fbdb2f948bb535 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Sun, 1 Oct 2023 22:28:19 +0800 Subject: [PATCH] Fix data race of RTX packet Fix data race of RTX packet --- rtpreceiver.go | 21 +++++++++++++++++++-- track_remote.go | 1 + 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/rtpreceiver.go b/rtpreceiver.go index 5ee8d98456d..fa15f4fdeb3 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -43,6 +43,15 @@ type trackStreams struct { type rtxPacketWithAttributes struct { pkt []byte attributes interceptor.Attributes + pool *sync.Pool +} + +func (p *rtxPacketWithAttributes) release() { + if p.pkt != nil { + b := p.pkt[:cap(p.pkt)] + p.pool.Put(b) // nolint:staticcheck + p.pkt = nil + } } // RTPReceiver allows an application to inspect the receipt of a TrackRemote @@ -59,6 +68,8 @@ type RTPReceiver struct { // A reference to the associated api object api *API + + rtxPool sync.Pool } // NewRTPReceiver constructs a new RTPReceiver @@ -74,6 +85,9 @@ func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RT closed: make(chan interface{}), received: make(chan interface{}), tracks: []trackStreams{}, + rtxPool: sync.Pool{New: func() interface{} { + return make([]byte, api.settingEngine.getReceiveMTU()) + }}, } return r, nil @@ -411,10 +425,11 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep track.repairStreamChannel = make(chan rtxPacketWithAttributes) go func() { - b := make([]byte, r.api.settingEngine.getReceiveMTU()) for { + b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert i, attributes, err := track.repairInterceptor.Read(b, nil) if err != nil { + r.rtxPool.Put(b) // nolint:staticcheck return } @@ -435,6 +450,7 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep if i-int(headerLength)-paddingLength < 2 { // BWE probe packet, ignore + r.rtxPool.Put(b) // nolint:staticcheck continue } @@ -450,8 +466,9 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep select { case <-r.closed: + r.rtxPool.Put(b) // nolint:staticcheck return - case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes}: + case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes, pool: &r.rtxPool}: } } }() diff --git a/track_remote.go b/track_remote.go index b32724fff91..dfdb12b9594 100644 --- a/track_remote.go +++ b/track_remote.go @@ -131,6 +131,7 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes, if rtxPacketReceived := r.readRTX(t); rtxPacketReceived != nil { n = copy(b, rtxPacketReceived.pkt) attributes = rtxPacketReceived.attributes + rtxPacketReceived.release() err = nil } else { // If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return