Skip to content

Commit

Permalink
Read() handles distinct-SSRC RTX packets
Browse files Browse the repository at this point in the history
  • Loading branch information
adriancable committed Sep 29, 2023
1 parent 486f54c commit 8fb789e
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 11 deletions.
5 changes: 5 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ const (
generatedCertificateOrigin = "WebRTC"

sdesRepairRTPStreamIDURI = "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id"

// Attributes returned when Read() returns an RTX packet from a separate RTX stream (distinct SSRC)
attributeRtxPayloadType = "rtx_payload_type"
attributeRtxSsrc = "rtx_ssrc"
attributeRtxSequenceNumber = "rtx_sequence_number"
)

func defaultSrtpProtectionProfiles() []dtls.SRTPProtectionProfile {
Expand Down
68 changes: 63 additions & 5 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
package webrtc

import (
"encoding/binary"
"fmt"
"io"
"sync"
"time"

"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/srtp/v3"
"github.com/pion/webrtc/v4/internal/util"
)
Expand All @@ -31,13 +33,19 @@ type trackStreams struct {
rtcpReadStream *srtp.ReadStreamSRTCP
rtcpInterceptor interceptor.RTCPReader

repairReadStream *srtp.ReadStreamSRTP
repairInterceptor interceptor.RTPReader
repairReadStream *srtp.ReadStreamSRTP
repairInterceptor interceptor.RTPReader
repairStreamChannel chan rtxPacketWithAttributes

repairRtcpReadStream *srtp.ReadStreamSRTCP
repairRtcpInterceptor interceptor.RTCPReader
}

type rtxPacketWithAttributes struct {
rtxPacket rtp.Packet
attributes interceptor.Attributes
}

// RTPReceiver allows an application to inspect the receipt of a TrackRemote
type RTPReceiver struct {
kind RTPCodecType
Expand Down Expand Up @@ -145,6 +153,7 @@ func (r *RTPReceiver) configureReceive(parameters RTPReceiveParameters) {
track: newTrackRemote(
r.kind,
parameters.Encodings[i].SSRC,
parameters.Encodings[i].RTX.SSRC,
parameters.Encodings[i].RID,
r,
),
Expand Down Expand Up @@ -388,8 +397,6 @@ func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo
}

// receiveForRtx starts a routine that processes the repair stream
// These packets aren't exposed to the user yet, but we need to process them for
// TWCC
func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error {
var track *trackStreams
if ssrc != 0 && len(r.tracks) == 1 {
Expand All @@ -411,13 +418,42 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
track.repairInterceptor = rtpInterceptor
track.repairRtcpReadStream = rtcpReadStream
track.repairRtcpInterceptor = rtcpInterceptor
track.repairStreamChannel = make(chan rtxPacketWithAttributes)

Check warning on line 421 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L421

Added line #L421 was not covered by tests

go func() {
b := make([]byte, r.api.settingEngine.getReceiveMTU())
for {
if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil {
i, attributes, err := track.repairInterceptor.Read(b, nil)
if err != nil {
return
}

Check warning on line 429 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L426-L429

Added lines #L426 - L429 were not covered by tests

pkt := &rtp.Packet{}
if err := pkt.Unmarshal(b[:i]); err != nil {

Check warning on line 432 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L431-L432

Added lines #L431 - L432 were not covered by tests
return
}

if len(pkt.Payload) < 2 {
// BWE probe packet, ignore
continue

Check warning on line 438 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L436-L438

Added lines #L436 - L438 were not covered by tests
}

// RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the
// payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format
// as non-RTX RTP packets
attributes.Set(attributeRtxPayloadType, pkt.Header.PayloadType)
attributes.Set(attributeRtxSsrc, pkt.Header.SSRC)
attributes.Set(attributeRtxSequenceNumber, pkt.Header.SequenceNumber)
pkt.Header.PayloadType = uint8(track.track.PayloadType())
pkt.Header.SSRC = uint32(track.track.SSRC())
pkt.Header.SequenceNumber = binary.BigEndian.Uint16(pkt.Payload[:2])
pkt.Payload = pkt.Payload[2:]

select {
case <-r.closed:
return
case track.repairStreamChannel <- rtxPacketWithAttributes{rtxPacket: *pkt, attributes: attributes}:

Check warning on line 455 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L444-L455

Added lines #L444 - L455 were not covered by tests
}
}
}()
return nil
Expand Down Expand Up @@ -455,3 +491,25 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote
}
return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
}

// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil
func (r *RTPReceiver) readRTX(reader *TrackRemote) (*rtp.Packet, interceptor.Attributes) {
if !reader.HasRTX() {
return nil, interceptor.Attributes{}
}

select {
case <-r.received:
default:
return nil, interceptor.Attributes{}

Check warning on line 504 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L501-L504

Added lines #L501 - L504 were not covered by tests
}

if t := r.streamsForTrack(reader); t != nil {
select {
case rtxPacketReceived := <-t.repairStreamChannel:
return &rtxPacketReceived.rtxPacket, rtxPacketReceived.attributes
default:

Check warning on line 511 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L507-L511

Added lines #L507 - L511 were not covered by tests
}
}
return nil, interceptor.Attributes{}

Check warning on line 514 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L514

Added line #L514 was not covered by tests
}
39 changes: 33 additions & 6 deletions track_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type TrackRemote struct {
payloadType PayloadType
kind RTPCodecType
ssrc SSRC
rtxSsrc SSRC
codec RTPCodecParameters
params RTPParameters
rid string
Expand All @@ -33,10 +34,11 @@ type TrackRemote struct {
peekedAttributes interceptor.Attributes
}

func newTrackRemote(kind RTPCodecType, ssrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote {
func newTrackRemote(kind RTPCodecType, ssrc, rtxSsrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote {
return &TrackRemote{
kind: kind,
ssrc: ssrc,
rtxSsrc: rtxSsrc,
rid: rid,
receiver: receiver,
}
Expand Down Expand Up @@ -125,13 +127,24 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes,
}
}

n, attributes, err = r.readRTP(b, t)
if err != nil {
return
// If there's a separate RTX track and an RTX packet is available, return that
if rtxPacket, rtxAttributes := r.readRTX(t); rtxPacket != nil {
n, err = rtxPacket.MarshalTo(b)
attributes = rtxAttributes
if err != nil {
return 0, nil, err
}

Check warning on line 136 in track_remote.go

View check run for this annotation

Codecov / codecov/patch

track_remote.go#L132-L136

Added lines #L132 - L136 were not covered by tests
} else {
// If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return
// a packet from the main track
n, attributes, err = r.readRTP(b, t)
if err != nil {
return
}
err = t.checkAndUpdateTrack(b)
}

err = t.checkAndUpdateTrack(b)
return
return n, attributes, err
}

// checkAndUpdateTrack checks payloadType for every incoming packet
Expand Down Expand Up @@ -197,3 +210,17 @@ func (t *TrackRemote) peek(b []byte) (n int, a interceptor.Attributes, err error
func (t *TrackRemote) SetReadDeadline(deadline time.Time) error {
return t.receiver.setRTPReadDeadline(deadline, t)
}

// RtxSSRC returns the RTX SSRC for a track, or 0 if track does not have a separate RTX stream
func (t *TrackRemote) RtxSSRC() SSRC {
t.mu.RLock()
defer t.mu.RUnlock()
return t.rtxSsrc

Check warning on line 218 in track_remote.go

View check run for this annotation

Codecov / codecov/patch

track_remote.go#L215-L218

Added lines #L215 - L218 were not covered by tests
}

// HasRTX returns true if the track has a separate RTX stream
func (t *TrackRemote) HasRTX() bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.rtxSsrc != 0
}

0 comments on commit 8fb789e

Please sign in to comment.