From e222a96f86304fe7dcbee5165301ebe155559457 Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Fri, 19 Apr 2024 03:15:33 +0300 Subject: [PATCH 1/8] Revert "Refine rtx support" This reverts commit e2454d1c00da7219d10520f707a578c520b1de66. --- mediaengine.go | 11 +------- mediaengine_test.go | 64 +++++---------------------------------------- peerconnection.go | 9 +++---- sdp.go | 6 ----- 4 files changed, 11 insertions(+), 79 deletions(-) diff --git a/mediaengine.go b/mediaengine.go index cbdcfb6d7d5..f0dd0eda7f6 100644 --- a/mediaengine.go +++ b/mediaengine.go @@ -414,11 +414,9 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo } aptMatch := codecMatchNone - var aptCodec RTPCodecParameters for _, codec := range exactMatches { if codec.PayloadType == PayloadType(payloadType) { aptMatch = codecMatchExact - aptCodec = codec break } } @@ -427,7 +425,6 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo for _, codec := range partialMatches { if codec.PayloadType == PayloadType(payloadType) { aptMatch = codecMatchPartial - aptCodec = codec break } } @@ -437,14 +434,8 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo return codecMatchNone, nil // not an error, we just ignore this codec we don't support } - // replace the apt value with the original codec's payload type - toMatchCodec := remoteCodec - if aptMatched, mt := codecParametersFuzzySearch(aptCodec, codecs); mt == aptMatch { - toMatchCodec.SDPFmtpLine = strings.Replace(toMatchCodec.SDPFmtpLine, fmt.Sprintf("apt=%d", payloadType), fmt.Sprintf("apt=%d", aptMatched.PayloadType), 1) - } - // if apt's media codec is partial match, then apt codec must be partial match too - _, matchType := codecParametersFuzzySearch(toMatchCodec, codecs) + _, matchType := codecParametersFuzzySearch(remoteCodec, codecs) if matchType == codecMatchExact && aptMatch == codecMatchPartial { matchType = codecMatchPartial } diff --git a/mediaengine_test.go b/mediaengine_test.go index d9a9e7b1ff4..00022da1759 100644 --- a/mediaengine_test.go +++ b/mediaengine_test.go @@ -308,18 +308,8 @@ a=rtpmap:96 VP8/90000 o=- 4596489990601351948 2 IN IP4 127.0.0.1 s=- t=0 0 -m=video 60323 UDP/TLS/RTP/SAVPF 94 95 106 107 108 109 96 97 +m=video 60323 UDP/TLS/RTP/SAVPF 94 96 97 a=rtpmap:94 VP8/90000 -a=rtpmap:95 rtx/90000 -a=fmtp:95 apt=94 -a=rtpmap:106 H264/90000 -a=fmtp:106 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f -a=rtpmap:107 rtx/90000 -a=fmtp:107 apt=106 -a=rtpmap:108 H264/90000 -a=fmtp:108 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f -a=rtpmap:109 rtx/90000 -a=fmtp:109 apt=108 a=rtpmap:96 VP9/90000 a=fmtp:96 profile-id=2 a=rtpmap:97 rtx/90000 @@ -328,64 +318,22 @@ a=fmtp:97 apt=96 m := MediaEngine{} assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ RTPCodecCapability: RTPCodecCapability{MimeTypeVP8, 90000, 0, "", nil}, - PayloadType: 96, - }, RTPCodecTypeVideo)) - assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil}, - PayloadType: 97, - }, RTPCodecTypeVideo)) - assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f", nil}, - PayloadType: 102, - }, RTPCodecTypeVideo)) - assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=102", nil}, - PayloadType: 103, - }, RTPCodecTypeVideo)) - assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f", nil}, - PayloadType: 104, - }, RTPCodecTypeVideo)) - assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=104", nil}, - PayloadType: 105, + PayloadType: 94, }, RTPCodecTypeVideo)) assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ RTPCodecCapability: RTPCodecCapability{MimeTypeVP9, 90000, 0, "profile-id=2", nil}, - PayloadType: 98, + PayloadType: 96, }, RTPCodecTypeVideo)) assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=98", nil}, - PayloadType: 99, + RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil}, + PayloadType: 97, }, RTPCodecTypeVideo)) assert.NoError(t, m.updateFromRemoteDescription(mustParse(profileLevels))) assert.True(t, m.negotiatedVideo) - vp9Codec, _, err := m.getCodecByPayload(96) - assert.NoError(t, err) - assert.Equal(t, vp9Codec.MimeType, MimeTypeVP9) - vp9RTX, _, err := m.getCodecByPayload(97) - assert.NoError(t, err) - assert.Equal(t, vp9RTX.MimeType, "video/rtx") - - h264P1Codec, _, err := m.getCodecByPayload(106) - assert.NoError(t, err) - assert.Equal(t, h264P1Codec.MimeType, MimeTypeH264) - assert.Equal(t, h264P1Codec.SDPFmtpLine, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f") - h264P1RTX, _, err := m.getCodecByPayload(107) - assert.NoError(t, err) - assert.Equal(t, h264P1RTX.MimeType, "video/rtx") - assert.Equal(t, h264P1RTX.SDPFmtpLine, "apt=106") - - h264P0Codec, _, err := m.getCodecByPayload(108) - assert.NoError(t, err) - assert.Equal(t, h264P0Codec.MimeType, MimeTypeH264) - assert.Equal(t, h264P0Codec.SDPFmtpLine, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f") - h264P0RTX, _, err := m.getCodecByPayload(109) + _, _, err := m.getCodecByPayload(97) assert.NoError(t, err) - assert.Equal(t, h264P0RTX.MimeType, "video/rtx") - assert.Equal(t, h264P0RTX.SDPFmtpLine, "apt=108") }) t.Run("Matches when rtx apt for partial match codec", func(t *testing.T) { diff --git a/peerconnection.go b/peerconnection.go index 5e2fba24cf1..5b2442f221b 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -1576,11 +1576,12 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err return err } - if i < 4 { - return errRTPTooShort + var mid, rid, rsid string + payloadType, paddingOnly, err := handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID), uint8(repairStreamIDExtensionID), &mid, &rid, &rsid) + if err != nil { + return err } - payloadType := PayloadType(b[1] & 0x7f) params, err := pc.api.mediaEngine.getRTPParametersByPayloadType(payloadType) if err != nil { return err @@ -1592,8 +1593,6 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err return err } - var mid, rid, rsid string - var paddingOnly bool for readCount := 0; readCount <= simulcastProbeCount; readCount++ { if mid == "" || (rid == "" && rsid == "") { // skip padding only packets for probing diff --git a/sdp.go b/sdp.go index 980f9850779..9d6f0cfc86b 100644 --- a/sdp.go +++ b/sdp.go @@ -128,12 +128,6 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) ( } rtxRepairFlows[rtxRepairFlow] = baseSsrc tracksInMediaSection = filterTrackWithSSRC(tracksInMediaSection, SSRC(rtxRepairFlow)) // Remove if rtx was added as track before - for i := range tracksInMediaSection { - if tracksInMediaSection[i].ssrcs[0] == SSRC(baseSsrc) { - repairSsrc := SSRC(rtxRepairFlow) - tracksInMediaSection[i].repairSsrc = &repairSsrc - } - } } } From a52d699330a112f3af0c536e4d4b8a737805d73f Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Fri, 19 Apr 2024 03:15:43 +0300 Subject: [PATCH 2/8] Revert "Fix panic of rtx attributes" This reverts commit 987fb7abdd78b5080f8deab1006694f1ad8d5750. --- rtpreceiver.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/rtpreceiver.go b/rtpreceiver.go index 2457ba080a8..f7d83d6fae5 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -463,9 +463,6 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep continue } - if attributes == nil { - attributes = make(interceptor.Attributes) - } attributes.Set(AttributeRtxPayloadType, b[1]&0x7F) attributes.Set(AttributeRtxSequenceNumber, binary.BigEndian.Uint16(b[2:4])) attributes.Set(AttributeRtxSsrc, binary.BigEndian.Uint32(b[8:12])) From 618266fc6ec22ffd6459284877a078608678b050 Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Fri, 19 Apr 2024 03:15:59 +0300 Subject: [PATCH 3/8] Revert "RTX attribute constants should be public" This reverts commit d82b0d0a8810d8d70957472cac85e11a6105b11c. --- constants.go | 10 ++++------ rtpreceiver.go | 6 +++--- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/constants.go b/constants.go index ea484431b77..3bcd3beb48f 100644 --- a/constants.go +++ b/constants.go @@ -34,12 +34,10 @@ const ( sdesRepairRTPStreamIDURI = "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id" - // AttributeRtxPayloadType is the interceptor attribute added when Read() returns an RTX packet containing the RTX stream payload type - AttributeRtxPayloadType = "rtx_payload_type" - // AttributeRtxSsrc is the interceptor attribute added when Read() returns an RTX packet containing the RTX stream SSRC - AttributeRtxSsrc = "rtx_ssrc" - // AttributeRtxSequenceNumber is the interceptor attribute added when Read() returns an RTX packet containing the RTX stream sequence number - AttributeRtxSequenceNumber = "rtx_sequence_number" + // Attributes returned when Read() returns an RTX packet from a separate RTX stream (distinct SSRC) + attributeRtxPayloadType = "rtx_payload_type" + attributeRtxSsrc = "rtx_ssrc" + attributeRtxSequenceNumber = "rtx_sequence_number" ) func defaultSrtpProtectionProfiles() []dtls.SRTPProtectionProfile { diff --git a/rtpreceiver.go b/rtpreceiver.go index f7d83d6fae5..b1d35e15038 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -463,9 +463,9 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep continue } - attributes.Set(AttributeRtxPayloadType, b[1]&0x7F) - attributes.Set(AttributeRtxSequenceNumber, binary.BigEndian.Uint16(b[2:4])) - attributes.Set(AttributeRtxSsrc, binary.BigEndian.Uint32(b[8:12])) + attributes.Set(attributeRtxPayloadType, b[1]&0x7F) + attributes.Set(attributeRtxSequenceNumber, binary.BigEndian.Uint16(b[2:4])) + attributes.Set(attributeRtxSsrc, binary.BigEndian.Uint32(b[8:12])) b[1] = (b[1] & 0x80) | uint8(track.track.PayloadType()) b[2] = b[headerLength] From 67287817d9ee1d96dba046da7d814dfd37619c92 Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Fri, 19 Apr 2024 03:16:09 +0300 Subject: [PATCH 4/8] Revert "Fix data race of RTX packet" This reverts commit 219c6a35ced73c793e8d822272fbdb2f948bb535. --- rtpreceiver.go | 21 ++------------------- track_remote.go | 1 - 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/rtpreceiver.go b/rtpreceiver.go index b1d35e15038..5bccd96c8af 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -43,15 +43,6 @@ type trackStreams struct { type rtxPacketWithAttributes struct { pkt []byte attributes interceptor.Attributes - pool *sync.Pool -} - -func (p *rtxPacketWithAttributes) release() { - if p.pkt != nil { - b := p.pkt[:cap(p.pkt)] - p.pool.Put(b) // nolint:staticcheck - p.pkt = nil - } } // RTPReceiver allows an application to inspect the receipt of a TrackRemote @@ -68,8 +59,6 @@ type RTPReceiver struct { // A reference to the associated api object api *API - - rtxPool sync.Pool } // NewRTPReceiver constructs a new RTPReceiver @@ -85,9 +74,6 @@ func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RT closed: make(chan interface{}), received: make(chan interface{}), tracks: []trackStreams{}, - rtxPool: sync.Pool{New: func() interface{} { - return make([]byte, api.settingEngine.getReceiveMTU()) - }}, } return r, nil @@ -434,11 +420,10 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep track.repairStreamChannel = make(chan rtxPacketWithAttributes) go func() { + b := make([]byte, r.api.settingEngine.getReceiveMTU()) for { - b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert i, attributes, err := track.repairInterceptor.Read(b, nil) if err != nil { - r.rtxPool.Put(b) // nolint:staticcheck return } @@ -459,7 +444,6 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep if i-int(headerLength)-paddingLength < 2 { // BWE probe packet, ignore - r.rtxPool.Put(b) // nolint:staticcheck continue } @@ -475,9 +459,8 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep select { case <-r.closed: - r.rtxPool.Put(b) // nolint:staticcheck return - case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes, pool: &r.rtxPool}: + case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes}: } } }() diff --git a/track_remote.go b/track_remote.go index 691b488d83e..83520401e15 100644 --- a/track_remote.go +++ b/track_remote.go @@ -131,7 +131,6 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes, if rtxPacketReceived := r.readRTX(t); rtxPacketReceived != nil { n = copy(b, rtxPacketReceived.pkt) attributes = rtxPacketReceived.attributes - rtxPacketReceived.release() err = nil } else { // If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return From 2ccecc23e254dc367db61bd2fa85f2b32a172e38 Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Fri, 19 Apr 2024 03:16:41 +0300 Subject: [PATCH 5/8] Revert "Allocationless handling of RTX pkts" This reverts commit f68b789150eef08b72069c948b81469842e6da04. --- rtpreceiver.go | 52 +++++++++++++++++++++---------------------------- track_remote.go | 10 ++++++---- 2 files changed, 28 insertions(+), 34 deletions(-) diff --git a/rtpreceiver.go b/rtpreceiver.go index 5bccd96c8af..73ab036878e 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -15,6 +15,7 @@ import ( "github.com/pion/interceptor" "github.com/pion/rtcp" + "github.com/pion/rtp" "github.com/pion/srtp/v3" "github.com/pion/webrtc/v4/internal/util" ) @@ -41,7 +42,7 @@ type trackStreams struct { } type rtxPacketWithAttributes struct { - pkt []byte + rtxPacket rtp.Packet attributes interceptor.Attributes } @@ -427,40 +428,31 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep return } - // RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the - // payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format - // as non-RTX RTP packets - hasExtension := b[0]&0b10000 > 0 - hasPadding := b[0]&0b100000 > 0 - csrcCount := b[0] & 0b1111 - headerLength := uint16(12 + (4 * csrcCount)) - paddingLength := 0 - if hasExtension { - headerLength += 4 * (1 + binary.BigEndian.Uint16(b[headerLength+2:headerLength+4])) - } - if hasPadding { - paddingLength = int(b[i-1]) + pkt := &rtp.Packet{} + if err := pkt.Unmarshal(b[:i]); err != nil { + return } - if i-int(headerLength)-paddingLength < 2 { + if len(pkt.Payload) < 2 { // BWE probe packet, ignore continue } - attributes.Set(attributeRtxPayloadType, b[1]&0x7F) - attributes.Set(attributeRtxSequenceNumber, binary.BigEndian.Uint16(b[2:4])) - attributes.Set(attributeRtxSsrc, binary.BigEndian.Uint32(b[8:12])) - - b[1] = (b[1] & 0x80) | uint8(track.track.PayloadType()) - b[2] = b[headerLength] - b[3] = b[headerLength+1] - binary.BigEndian.PutUint32(b[8:12], uint32(track.track.SSRC())) - copy(b[headerLength:i-2], b[headerLength+2:i]) + // RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the + // payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format + // as non-RTX RTP packets + attributes.Set(attributeRtxPayloadType, pkt.Header.PayloadType) + attributes.Set(attributeRtxSsrc, pkt.Header.SSRC) + attributes.Set(attributeRtxSequenceNumber, pkt.Header.SequenceNumber) + pkt.Header.PayloadType = uint8(track.track.PayloadType()) + pkt.Header.SSRC = uint32(track.track.SSRC()) + pkt.Header.SequenceNumber = binary.BigEndian.Uint16(pkt.Payload[:2]) + pkt.Payload = pkt.Payload[2:] select { case <-r.closed: return - case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes}: + case track.repairStreamChannel <- rtxPacketWithAttributes{rtxPacket: *pkt, attributes: attributes}: } } }() @@ -501,23 +493,23 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote } // readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil -func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes { +func (r *RTPReceiver) readRTX(reader *TrackRemote) (*rtp.Packet, interceptor.Attributes) { if !reader.HasRTX() { - return nil + return nil, interceptor.Attributes{} } select { case <-r.received: default: - return nil + return nil, interceptor.Attributes{} } if t := r.streamsForTrack(reader); t != nil { select { case rtxPacketReceived := <-t.repairStreamChannel: - return &rtxPacketReceived + return &rtxPacketReceived.rtxPacket, rtxPacketReceived.attributes default: } } - return nil + return nil, interceptor.Attributes{} } diff --git a/track_remote.go b/track_remote.go index 83520401e15..897e67a8969 100644 --- a/track_remote.go +++ b/track_remote.go @@ -128,10 +128,12 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes, } // If there's a separate RTX track and an RTX packet is available, return that - if rtxPacketReceived := r.readRTX(t); rtxPacketReceived != nil { - n = copy(b, rtxPacketReceived.pkt) - attributes = rtxPacketReceived.attributes - err = nil + if rtxPacket, rtxAttributes := r.readRTX(t); rtxPacket != nil { + n, err = rtxPacket.MarshalTo(b) + attributes = rtxAttributes + if err != nil { + return 0, nil, err + } } else { // If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return // a packet from the main track From 2400c08e564e7c4da4aaa6c4f576c93a7b9753c7 Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Fri, 19 Apr 2024 03:17:06 +0300 Subject: [PATCH 6/8] Revert "Read() handles distinct-SSRC RTX packets" This reverts commit 5da72784c8d9425fe61bbf66de7b658eb94d6717. --- constants.go | 5 ---- rtpreceiver.go | 68 ++++--------------------------------------------- track_remote.go | 39 +++++----------------------- 3 files changed, 11 insertions(+), 101 deletions(-) diff --git a/constants.go b/constants.go index 3bcd3beb48f..dad0068f4a6 100644 --- a/constants.go +++ b/constants.go @@ -33,11 +33,6 @@ const ( generatedCertificateOrigin = "WebRTC" sdesRepairRTPStreamIDURI = "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id" - - // Attributes returned when Read() returns an RTX packet from a separate RTX stream (distinct SSRC) - attributeRtxPayloadType = "rtx_payload_type" - attributeRtxSsrc = "rtx_ssrc" - attributeRtxSequenceNumber = "rtx_sequence_number" ) func defaultSrtpProtectionProfiles() []dtls.SRTPProtectionProfile { diff --git a/rtpreceiver.go b/rtpreceiver.go index 73ab036878e..1bdec899bcf 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -7,7 +7,6 @@ package webrtc import ( - "encoding/binary" "fmt" "io" "sync" @@ -15,7 +14,6 @@ import ( "github.com/pion/interceptor" "github.com/pion/rtcp" - "github.com/pion/rtp" "github.com/pion/srtp/v3" "github.com/pion/webrtc/v4/internal/util" ) @@ -33,19 +31,13 @@ type trackStreams struct { rtcpReadStream *srtp.ReadStreamSRTCP rtcpInterceptor interceptor.RTCPReader - repairReadStream *srtp.ReadStreamSRTP - repairInterceptor interceptor.RTPReader - repairStreamChannel chan rtxPacketWithAttributes + repairReadStream *srtp.ReadStreamSRTP + repairInterceptor interceptor.RTPReader repairRtcpReadStream *srtp.ReadStreamSRTCP repairRtcpInterceptor interceptor.RTCPReader } -type rtxPacketWithAttributes struct { - rtxPacket rtp.Packet - attributes interceptor.Attributes -} - // RTPReceiver allows an application to inspect the receipt of a TrackRemote type RTPReceiver struct { kind RTPCodecType @@ -153,7 +145,6 @@ func (r *RTPReceiver) configureReceive(parameters RTPReceiveParameters) { track: newTrackRemote( r.kind, parameters.Encodings[i].SSRC, - parameters.Encodings[i].RTX.SSRC, parameters.Encodings[i].RID, r, ), @@ -397,6 +388,8 @@ func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo } // receiveForRtx starts a routine that processes the repair stream +// These packets aren't exposed to the user yet, but we need to process them for +// TWCC func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error { var track *trackStreams if ssrc != 0 && len(r.tracks) == 1 { @@ -418,42 +411,13 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep track.repairInterceptor = rtpInterceptor track.repairRtcpReadStream = rtcpReadStream track.repairRtcpInterceptor = rtcpInterceptor - track.repairStreamChannel = make(chan rtxPacketWithAttributes) go func() { b := make([]byte, r.api.settingEngine.getReceiveMTU()) for { - i, attributes, err := track.repairInterceptor.Read(b, nil) - if err != nil { - return - } - - pkt := &rtp.Packet{} - if err := pkt.Unmarshal(b[:i]); err != nil { + if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil { return } - - if len(pkt.Payload) < 2 { - // BWE probe packet, ignore - continue - } - - // RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the - // payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format - // as non-RTX RTP packets - attributes.Set(attributeRtxPayloadType, pkt.Header.PayloadType) - attributes.Set(attributeRtxSsrc, pkt.Header.SSRC) - attributes.Set(attributeRtxSequenceNumber, pkt.Header.SequenceNumber) - pkt.Header.PayloadType = uint8(track.track.PayloadType()) - pkt.Header.SSRC = uint32(track.track.SSRC()) - pkt.Header.SequenceNumber = binary.BigEndian.Uint16(pkt.Payload[:2]) - pkt.Payload = pkt.Payload[2:] - - select { - case <-r.closed: - return - case track.repairStreamChannel <- rtxPacketWithAttributes{rtxPacket: *pkt, attributes: attributes}: - } } }() return nil @@ -491,25 +455,3 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote } return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC()) } - -// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil -func (r *RTPReceiver) readRTX(reader *TrackRemote) (*rtp.Packet, interceptor.Attributes) { - if !reader.HasRTX() { - return nil, interceptor.Attributes{} - } - - select { - case <-r.received: - default: - return nil, interceptor.Attributes{} - } - - if t := r.streamsForTrack(reader); t != nil { - select { - case rtxPacketReceived := <-t.repairStreamChannel: - return &rtxPacketReceived.rtxPacket, rtxPacketReceived.attributes - default: - } - } - return nil, interceptor.Attributes{} -} diff --git a/track_remote.go b/track_remote.go index 897e67a8969..fb97179337c 100644 --- a/track_remote.go +++ b/track_remote.go @@ -24,7 +24,6 @@ type TrackRemote struct { payloadType PayloadType kind RTPCodecType ssrc SSRC - rtxSsrc SSRC codec RTPCodecParameters params RTPParameters rid string @@ -34,11 +33,10 @@ type TrackRemote struct { peekedAttributes interceptor.Attributes } -func newTrackRemote(kind RTPCodecType, ssrc, rtxSsrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote { +func newTrackRemote(kind RTPCodecType, ssrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote { return &TrackRemote{ kind: kind, ssrc: ssrc, - rtxSsrc: rtxSsrc, rid: rid, receiver: receiver, } @@ -127,24 +125,13 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes, } } - // If there's a separate RTX track and an RTX packet is available, return that - if rtxPacket, rtxAttributes := r.readRTX(t); rtxPacket != nil { - n, err = rtxPacket.MarshalTo(b) - attributes = rtxAttributes - if err != nil { - return 0, nil, err - } - } else { - // If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return - // a packet from the main track - n, attributes, err = r.readRTP(b, t) - if err != nil { - return - } - err = t.checkAndUpdateTrack(b) + n, attributes, err = r.readRTP(b, t) + if err != nil { + return } - return n, attributes, err + err = t.checkAndUpdateTrack(b) + return } // checkAndUpdateTrack checks payloadType for every incoming packet @@ -211,17 +198,3 @@ func (t *TrackRemote) peek(b []byte) (n int, a interceptor.Attributes, err error func (t *TrackRemote) SetReadDeadline(deadline time.Time) error { return t.receiver.setRTPReadDeadline(deadline, t) } - -// RtxSSRC returns the RTX SSRC for a track, or 0 if track does not have a separate RTX stream -func (t *TrackRemote) RtxSSRC() SSRC { - t.mu.RLock() - defer t.mu.RUnlock() - return t.rtxSsrc -} - -// HasRTX returns true if the track has a separate RTX stream -func (t *TrackRemote) HasRTX() bool { - t.mu.RLock() - defer t.mu.RUnlock() - return t.rtxSsrc != 0 -} From 83054271ab3a51f3580c5ec3a69dc54de3286893 Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Fri, 19 Apr 2024 03:17:25 +0300 Subject: [PATCH 7/8] Revert "Skip padding packet for simulcast probe" This reverts commit c7ca89028f72d3d4933fab8c6ddd5c4074febdd7. --- peerconnection.go | 9 ++------- peerconnection_media_test.go | 23 +---------------------- rtptransceiver.go | 6 +----- 3 files changed, 4 insertions(+), 34 deletions(-) diff --git a/peerconnection.go b/peerconnection.go index 5b2442f221b..2de25ba733b 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -1577,7 +1577,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err } var mid, rid, rsid string - payloadType, paddingOnly, err := handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID), uint8(repairStreamIDExtensionID), &mid, &rid, &rsid) + payloadType, err := handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID), uint8(repairStreamIDExtensionID), &mid, &rid, &rsid) if err != nil { return err } @@ -1595,17 +1595,12 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err for readCount := 0; readCount <= simulcastProbeCount; readCount++ { if mid == "" || (rid == "" && rsid == "") { - // skip padding only packets for probing - if paddingOnly { - readCount-- - } - i, _, err := interceptor.Read(b, nil) if err != nil { return err } - if _, paddingOnly, err = handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID), uint8(repairStreamIDExtensionID), &mid, &rid, &rsid); err != nil { + if _, err = handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID), uint8(repairStreamIDExtensionID), &mid, &rid, &rsid); err != nil { return err } diff --git a/peerconnection_media_test.go b/peerconnection_media_test.go index 228cfed4bc2..7868900f360 100644 --- a/peerconnection_media_test.go +++ b/peerconnection_media_test.go @@ -1280,28 +1280,7 @@ func TestPeerConnection_Simulcast(t *testing.T) { assert.NoError(t, signalPair(pcOffer, pcAnswer)) - // padding only packets should not affect simulcast probe - var sequenceNumber uint16 - for sequenceNumber = 0; sequenceNumber < simulcastProbeCount+10; sequenceNumber++ { - time.Sleep(20 * time.Millisecond) - - for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} { - pkt := &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - SequenceNumber: sequenceNumber, - PayloadType: 96, - Padding: true, - }, - Payload: []byte{0x00, 0x02}, - } - - assert.NoError(t, track.WriteRTP(pkt)) - } - } - assert.False(t, ridsFullfilled(), "Simulcast probe should not be fulfilled by padding only packets") - - for ; !ridsFullfilled(); sequenceNumber++ { + for sequenceNumber := uint16(0); !ridsFullfilled(); sequenceNumber++ { time.Sleep(20 * time.Millisecond) for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} { diff --git a/rtptransceiver.go b/rtptransceiver.go index 4ba732e8a1e..15d28f720d1 100644 --- a/rtptransceiver.go +++ b/rtptransceiver.go @@ -266,16 +266,12 @@ func satisfyTypeAndDirection(remoteKind RTPCodecType, remoteDirection RTPTransce // handleUnknownRTPPacket consumes a single RTP Packet and returns information that is helpful // for demuxing and handling an unknown SSRC (usually for Simulcast) -func handleUnknownRTPPacket(buf []byte, midExtensionID, streamIDExtensionID, repairStreamIDExtensionID uint8, mid, rid, rsid *string) (payloadType PayloadType, paddingOnly bool, err error) { +func handleUnknownRTPPacket(buf []byte, midExtensionID, streamIDExtensionID, repairStreamIDExtensionID uint8, mid, rid, rsid *string) (payloadType PayloadType, err error) { rp := &rtp.Packet{} if err = rp.Unmarshal(buf); err != nil { return } - if rp.Padding && len(rp.Payload) == 0 { - paddingOnly = true - } - if !rp.Header.Extension { return } From ebb9331e8f80805fc20f517378dec653b30e36ac Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Fri, 19 Apr 2024 03:31:21 +0300 Subject: [PATCH 8/8] Revert "Add test for ssrc-group after ssrc" This reverts commit 88d8eef8ae55938c1a945cc0f2bd79fe99ac1bd5. --- sdp_test.go | 36 ------------------------------------ 1 file changed, 36 deletions(-) diff --git a/sdp_test.go b/sdp_test.go index bda54fa6058..d3f86307193 100644 --- a/sdp_test.go +++ b/sdp_test.go @@ -271,42 +271,6 @@ func TestTrackDetailsFromSDP(t *testing.T) { } assert.Equal(t, 0, len(trackDetailsFromSDP(nil, s))) }) - - t.Run("ssrc-group after ssrc", func(t *testing.T) { - s := &sdp.SessionDescription{ - MediaDescriptions: []*sdp.MediaDescription{ - { - MediaName: sdp.MediaName{ - Media: "video", - }, - Attributes: []sdp.Attribute{ - {Key: "mid", Value: "0"}, - {Key: "sendrecv"}, - {Key: "ssrc", Value: "3000 msid:video_trk_label video_trk_guid"}, - {Key: "ssrc", Value: "4000 msid:rtx_trk_label rtx_trck_guid"}, - {Key: "ssrc-group", Value: "FID 3000 4000"}, - }, - }, - { - MediaName: sdp.MediaName{ - Media: "video", - }, - Attributes: []sdp.Attribute{ - {Key: "mid", Value: "1"}, - {Key: "sendrecv"}, - {Key: "ssrc-group", Value: "FID 5000 6000"}, - {Key: "ssrc", Value: "5000 msid:video_trk_label video_trk_guid"}, - {Key: "ssrc", Value: "6000 msid:rtx_trk_label rtx_trck_guid"}, - }, - }, - }, - } - - tracks := trackDetailsFromSDP(nil, s) - assert.Equal(t, 2, len(tracks)) - assert.Equal(t, SSRC(4000), *tracks[0].repairSsrc) - assert.Equal(t, SSRC(6000), *tracks[1].repairSsrc) - }) } func TestHaveApplicationMediaSection(t *testing.T) {