Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race of RTX packet #2595

Merged
merged 2 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AUTHORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Aaron Boushley <[email protected]>
Aaron France <[email protected]>
Adam Kiss <[email protected]>
Aditya Kumar <[email protected]>
Adrian Cable <[email protected]>
Adrian Cable <[email protected]>
adwpc <[email protected]>
aggresss <[email protected]>
Expand Down Expand Up @@ -200,6 +201,7 @@ Steffen Vogel <[email protected]>
stephanrotolante <[email protected]>
streamer45 <[email protected]>
Suhas Gaddam <[email protected]>
sukun <[email protected]>
Suzuki Takeo <[email protected]>
sylba2050 <[email protected]>
Tarrence van As <[email protected]>
Expand Down
21 changes: 19 additions & 2 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ type trackStreams struct {
type rtxPacketWithAttributes struct {
pkt []byte
attributes interceptor.Attributes
pool *sync.Pool
}

func (p *rtxPacketWithAttributes) release() {
if p.pkt != nil {
b := p.pkt[:cap(p.pkt)]
p.pool.Put(b) // nolint:staticcheck
p.pkt = nil
}
}

// RTPReceiver allows an application to inspect the receipt of a TrackRemote
Expand All @@ -59,6 +68,8 @@ type RTPReceiver struct {

// A reference to the associated api object
api *API

rtxPool sync.Pool
}

// NewRTPReceiver constructs a new RTPReceiver
Expand All @@ -74,6 +85,9 @@ func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RT
closed: make(chan interface{}),
received: make(chan interface{}),
tracks: []trackStreams{},
rtxPool: sync.Pool{New: func() interface{} {
return make([]byte, api.settingEngine.getReceiveMTU())
}},
}

return r, nil
Expand Down Expand Up @@ -420,10 +434,11 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
track.repairStreamChannel = make(chan rtxPacketWithAttributes)

go func() {
b := make([]byte, r.api.settingEngine.getReceiveMTU())
for {
b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert
i, attributes, err := track.repairInterceptor.Read(b, nil)
if err != nil {
r.rtxPool.Put(b) // nolint:staticcheck
return
}

Expand All @@ -444,6 +459,7 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep

if i-int(headerLength)-paddingLength < 2 {
// BWE probe packet, ignore
r.rtxPool.Put(b) // nolint:staticcheck
continue
}

Expand All @@ -459,8 +475,9 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep

select {
case <-r.closed:
r.rtxPool.Put(b) // nolint:staticcheck
return
case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes}:
case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes, pool: &r.rtxPool}:
}
}
}()
Expand Down
1 change: 1 addition & 0 deletions track_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes,
if rtxPacketReceived := r.readRTX(t); rtxPacketReceived != nil {
n = copy(b, rtxPacketReceived.pkt)
attributes = rtxPacketReceived.attributes
rtxPacketReceived.release()
err = nil
} else {
// If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return
Expand Down