From 8fb789ee83dcf0bfc047d46eed8f504feff281e7 Mon Sep 17 00:00:00 2001 From: Adrian Cable Date: Thu, 28 Sep 2023 10:36:02 -0700 Subject: [PATCH] Read() handles distinct-SSRC RTX packets --- constants.go | 5 ++++ rtpreceiver.go | 68 +++++++++++++++++++++++++++++++++++++++++++++---- track_remote.go | 39 +++++++++++++++++++++++----- 3 files changed, 101 insertions(+), 11 deletions(-) diff --git a/constants.go b/constants.go index 70778506b59..f1e7e2f93a8 100644 --- a/constants.go +++ b/constants.go @@ -31,6 +31,11 @@ const ( generatedCertificateOrigin = "WebRTC" sdesRepairRTPStreamIDURI = "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id" + + // Attributes returned when Read() returns an RTX packet from a separate RTX stream (distinct SSRC) + attributeRtxPayloadType = "rtx_payload_type" + attributeRtxSsrc = "rtx_ssrc" + attributeRtxSequenceNumber = "rtx_sequence_number" ) func defaultSrtpProtectionProfiles() []dtls.SRTPProtectionProfile { diff --git a/rtpreceiver.go b/rtpreceiver.go index 1bdec899bcf..73ab036878e 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -7,6 +7,7 @@ package webrtc import ( + "encoding/binary" "fmt" "io" "sync" @@ -14,6 +15,7 @@ import ( "github.com/pion/interceptor" "github.com/pion/rtcp" + "github.com/pion/rtp" "github.com/pion/srtp/v3" "github.com/pion/webrtc/v4/internal/util" ) @@ -31,13 +33,19 @@ type trackStreams struct { rtcpReadStream *srtp.ReadStreamSRTCP rtcpInterceptor interceptor.RTCPReader - repairReadStream *srtp.ReadStreamSRTP - repairInterceptor interceptor.RTPReader + repairReadStream *srtp.ReadStreamSRTP + repairInterceptor interceptor.RTPReader + repairStreamChannel chan rtxPacketWithAttributes repairRtcpReadStream *srtp.ReadStreamSRTCP repairRtcpInterceptor interceptor.RTCPReader } +type rtxPacketWithAttributes struct { + rtxPacket rtp.Packet + attributes interceptor.Attributes +} + // RTPReceiver allows an application to inspect the receipt of a TrackRemote type RTPReceiver struct { kind RTPCodecType @@ -145,6 +153,7 @@ func (r *RTPReceiver) configureReceive(parameters RTPReceiveParameters) { track: newTrackRemote( r.kind, parameters.Encodings[i].SSRC, + parameters.Encodings[i].RTX.SSRC, parameters.Encodings[i].RID, r, ), @@ -388,8 +397,6 @@ func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo } // receiveForRtx starts a routine that processes the repair stream -// These packets aren't exposed to the user yet, but we need to process them for -// TWCC func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error { var track *trackStreams if ssrc != 0 && len(r.tracks) == 1 { @@ -411,13 +418,42 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep track.repairInterceptor = rtpInterceptor track.repairRtcpReadStream = rtcpReadStream track.repairRtcpInterceptor = rtcpInterceptor + track.repairStreamChannel = make(chan rtxPacketWithAttributes) go func() { b := make([]byte, r.api.settingEngine.getReceiveMTU()) for { - if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil { + i, attributes, err := track.repairInterceptor.Read(b, nil) + if err != nil { + return + } + + pkt := &rtp.Packet{} + if err := pkt.Unmarshal(b[:i]); err != nil { return } + + if len(pkt.Payload) < 2 { + // BWE probe packet, ignore + continue + } + + // RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the + // payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format + // as non-RTX RTP packets + attributes.Set(attributeRtxPayloadType, pkt.Header.PayloadType) + attributes.Set(attributeRtxSsrc, pkt.Header.SSRC) + attributes.Set(attributeRtxSequenceNumber, pkt.Header.SequenceNumber) + pkt.Header.PayloadType = uint8(track.track.PayloadType()) + pkt.Header.SSRC = uint32(track.track.SSRC()) + pkt.Header.SequenceNumber = binary.BigEndian.Uint16(pkt.Payload[:2]) + pkt.Payload = pkt.Payload[2:] + + select { + case <-r.closed: + return + case track.repairStreamChannel <- rtxPacketWithAttributes{rtxPacket: *pkt, attributes: attributes}: + } } }() return nil @@ -455,3 +491,25 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote } return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC()) } + +// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil +func (r *RTPReceiver) readRTX(reader *TrackRemote) (*rtp.Packet, interceptor.Attributes) { + if !reader.HasRTX() { + return nil, interceptor.Attributes{} + } + + select { + case <-r.received: + default: + return nil, interceptor.Attributes{} + } + + if t := r.streamsForTrack(reader); t != nil { + select { + case rtxPacketReceived := <-t.repairStreamChannel: + return &rtxPacketReceived.rtxPacket, rtxPacketReceived.attributes + default: + } + } + return nil, interceptor.Attributes{} +} diff --git a/track_remote.go b/track_remote.go index 150b91bc9e1..9832b0e3096 100644 --- a/track_remote.go +++ b/track_remote.go @@ -24,6 +24,7 @@ type TrackRemote struct { payloadType PayloadType kind RTPCodecType ssrc SSRC + rtxSsrc SSRC codec RTPCodecParameters params RTPParameters rid string @@ -33,10 +34,11 @@ type TrackRemote struct { peekedAttributes interceptor.Attributes } -func newTrackRemote(kind RTPCodecType, ssrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote { +func newTrackRemote(kind RTPCodecType, ssrc, rtxSsrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote { return &TrackRemote{ kind: kind, ssrc: ssrc, + rtxSsrc: rtxSsrc, rid: rid, receiver: receiver, } @@ -125,13 +127,24 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes, } } - n, attributes, err = r.readRTP(b, t) - if err != nil { - return + // If there's a separate RTX track and an RTX packet is available, return that + if rtxPacket, rtxAttributes := r.readRTX(t); rtxPacket != nil { + n, err = rtxPacket.MarshalTo(b) + attributes = rtxAttributes + if err != nil { + return 0, nil, err + } + } else { + // If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return + // a packet from the main track + n, attributes, err = r.readRTP(b, t) + if err != nil { + return + } + err = t.checkAndUpdateTrack(b) } - err = t.checkAndUpdateTrack(b) - return + return n, attributes, err } // checkAndUpdateTrack checks payloadType for every incoming packet @@ -197,3 +210,17 @@ func (t *TrackRemote) peek(b []byte) (n int, a interceptor.Attributes, err error func (t *TrackRemote) SetReadDeadline(deadline time.Time) error { return t.receiver.setRTPReadDeadline(deadline, t) } + +// RtxSSRC returns the RTX SSRC for a track, or 0 if track does not have a separate RTX stream +func (t *TrackRemote) RtxSSRC() SSRC { + t.mu.RLock() + defer t.mu.RUnlock() + return t.rtxSsrc +} + +// HasRTX returns true if the track has a separate RTX stream +func (t *TrackRemote) HasRTX() bool { + t.mu.RLock() + defer t.mu.RUnlock() + return t.rtxSsrc != 0 +}