Skip to content

Commit

Permalink
Allocationless handling of RTX pkts
Browse files Browse the repository at this point in the history
  • Loading branch information
adriancable committed Sep 29, 2023
1 parent 8fb789e commit da2225d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 30 deletions.
56 changes: 32 additions & 24 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/srtp/v3"
"github.com/pion/webrtc/v4/internal/util"
)
Expand All @@ -42,7 +41,7 @@ type trackStreams struct {
}

type rtxPacketWithAttributes struct {
rtxPacket rtp.Packet
pkt []byte

Check failure on line 44 in rtpreceiver.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gci`-ed with --skip-generated -s standard,default (gci)
attributes interceptor.Attributes
}

Expand Down Expand Up @@ -428,31 +427,40 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
return
}

pkt := &rtp.Packet{}
if err := pkt.Unmarshal(b[:i]); err != nil {
return
}

if len(pkt.Payload) < 2 {
// RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the
// payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format
// as non-RTX RTP packets
hasExtension := b[0] & 0b10000 > 0

Check failure on line 433 in rtpreceiver.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gci`-ed with --skip-generated -s standard,default (gci)
hasPadding := b[0] & 0b100000 > 0
csrcCount := b[0] & 0b1111
headerLength := uint16(12 + (4 * csrcCount))
paddingLength := 0
if hasExtension {
headerLength += 4 * (1 + binary.BigEndian.Uint16(b[headerLength+2:headerLength+4]))
}
if hasPadding {
paddingLength = int(b[i-1])
}

Check warning on line 443 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L433-L443

Added lines #L433 - L443 were not covered by tests

if i - int(headerLength) - paddingLength < 2 {

Check failure on line 445 in rtpreceiver.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gci`-ed with --skip-generated -s standard,default (gci)
// BWE probe packet, ignore
continue

Check warning on line 447 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L445-L447

Added lines #L445 - L447 were not covered by tests
}

// RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the
// payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format
// as non-RTX RTP packets
attributes.Set(attributeRtxPayloadType, pkt.Header.PayloadType)
attributes.Set(attributeRtxSsrc, pkt.Header.SSRC)
attributes.Set(attributeRtxSequenceNumber, pkt.Header.SequenceNumber)
pkt.Header.PayloadType = uint8(track.track.PayloadType())
pkt.Header.SSRC = uint32(track.track.SSRC())
pkt.Header.SequenceNumber = binary.BigEndian.Uint16(pkt.Payload[:2])
pkt.Payload = pkt.Payload[2:]
attributes.Set(attributeRtxPayloadType, b[1] & 0x7F)
attributes.Set(attributeRtxSequenceNumber, binary.BigEndian.Uint16(b[2:4]))
attributes.Set(attributeRtxSsrc, binary.BigEndian.Uint32(b[8:12]))

b[1] = (b[1] & 0x80) | uint8(track.track.PayloadType())
b[2] = b[headerLength]
b[3] = b[headerLength+1]
binary.BigEndian.PutUint32(b[8:12], uint32(track.track.SSRC()))
copy(b[headerLength:i-2], b[headerLength+2:i])

select {
case <-r.closed:
return
case track.repairStreamChannel <- rtxPacketWithAttributes{rtxPacket: *pkt, attributes: attributes}:
case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes}:

Check warning on line 463 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L450-L463

Added lines #L450 - L463 were not covered by tests
}
}
}()
Expand Down Expand Up @@ -493,23 +501,23 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote
}

// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil
func (r *RTPReceiver) readRTX(reader *TrackRemote) (*rtp.Packet, interceptor.Attributes) {
func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes {
if !reader.HasRTX() {
return nil, interceptor.Attributes{}
return nil
}

select {
case <-r.received:
default:
return nil, interceptor.Attributes{}
return nil

Check warning on line 512 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L509-L512

Added lines #L509 - L512 were not covered by tests
}

if t := r.streamsForTrack(reader); t != nil {
select {
case rtxPacketReceived := <-t.repairStreamChannel:
return &rtxPacketReceived.rtxPacket, rtxPacketReceived.attributes
return &rtxPacketReceived
default:

Check warning on line 519 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L515-L519

Added lines #L515 - L519 were not covered by tests
}
}
return nil, interceptor.Attributes{}
return nil

Check warning on line 522 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L522

Added line #L522 was not covered by tests
}
10 changes: 4 additions & 6 deletions track_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,10 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes,
}

// If there's a separate RTX track and an RTX packet is available, return that
if rtxPacket, rtxAttributes := r.readRTX(t); rtxPacket != nil {
n, err = rtxPacket.MarshalTo(b)
attributes = rtxAttributes
if err != nil {
return 0, nil, err
}
if rtxPacketReceived := r.readRTX(t); rtxPacketReceived != nil {
n = copy(b, rtxPacketReceived.pkt)
attributes = rtxPacketReceived.attributes
err = nil

Check warning on line 134 in track_remote.go

View check run for this annotation

Codecov / codecov/patch

track_remote.go#L132-L134

Added lines #L132 - L134 were not covered by tests
} else {
// If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return
// a packet from the main track
Expand Down

0 comments on commit da2225d

Please sign in to comment.