Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix receiveForRtx usage for simulcast #2751

Closed
wants to merge 8 commits into from
Closed
7 changes: 0 additions & 7 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ const (
generatedCertificateOrigin = "WebRTC"

sdesRepairRTPStreamIDURI = "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id"

// AttributeRtxPayloadType is the interceptor attribute added when Read() returns an RTX packet containing the RTX stream payload type
AttributeRtxPayloadType = "rtx_payload_type"
// AttributeRtxSsrc is the interceptor attribute added when Read() returns an RTX packet containing the RTX stream SSRC
AttributeRtxSsrc = "rtx_ssrc"
// AttributeRtxSequenceNumber is the interceptor attribute added when Read() returns an RTX packet containing the RTX stream sequence number
AttributeRtxSequenceNumber = "rtx_sequence_number"
)

func defaultSrtpProtectionProfiles() []dtls.SRTPProtectionProfile {
Expand Down
11 changes: 1 addition & 10 deletions mediaengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,9 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo
}

aptMatch := codecMatchNone
var aptCodec RTPCodecParameters
for _, codec := range exactMatches {
if codec.PayloadType == PayloadType(payloadType) {
aptMatch = codecMatchExact
aptCodec = codec
break
}
}
Expand All @@ -427,7 +425,6 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo
for _, codec := range partialMatches {
if codec.PayloadType == PayloadType(payloadType) {
aptMatch = codecMatchPartial
aptCodec = codec
break
}
}
Expand All @@ -437,14 +434,8 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo
return codecMatchNone, nil // not an error, we just ignore this codec we don't support
}

// replace the apt value with the original codec's payload type
toMatchCodec := remoteCodec
if aptMatched, mt := codecParametersFuzzySearch(aptCodec, codecs); mt == aptMatch {
toMatchCodec.SDPFmtpLine = strings.Replace(toMatchCodec.SDPFmtpLine, fmt.Sprintf("apt=%d", payloadType), fmt.Sprintf("apt=%d", aptMatched.PayloadType), 1)
}

// if apt's media codec is partial match, then apt codec must be partial match too
_, matchType := codecParametersFuzzySearch(toMatchCodec, codecs)
_, matchType := codecParametersFuzzySearch(remoteCodec, codecs)
if matchType == codecMatchExact && aptMatch == codecMatchPartial {
matchType = codecMatchPartial
}
Expand Down
64 changes: 6 additions & 58 deletions mediaengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,18 +308,8 @@ a=rtpmap:96 VP8/90000
o=- 4596489990601351948 2 IN IP4 127.0.0.1
s=-
t=0 0
m=video 60323 UDP/TLS/RTP/SAVPF 94 95 106 107 108 109 96 97
m=video 60323 UDP/TLS/RTP/SAVPF 94 96 97
a=rtpmap:94 VP8/90000
a=rtpmap:95 rtx/90000
a=fmtp:95 apt=94
a=rtpmap:106 H264/90000
a=fmtp:106 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f
a=rtpmap:107 rtx/90000
a=fmtp:107 apt=106
a=rtpmap:108 H264/90000
a=fmtp:108 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f
a=rtpmap:109 rtx/90000
a=fmtp:109 apt=108
a=rtpmap:96 VP9/90000
a=fmtp:96 profile-id=2
a=rtpmap:97 rtx/90000
Expand All @@ -328,64 +318,22 @@ a=fmtp:97 apt=96
m := MediaEngine{}
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeVP8, 90000, 0, "", nil},
PayloadType: 96,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil},
PayloadType: 97,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f", nil},
PayloadType: 102,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=102", nil},
PayloadType: 103,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f", nil},
PayloadType: 104,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=104", nil},
PayloadType: 105,
PayloadType: 94,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeVP9, 90000, 0, "profile-id=2", nil},
PayloadType: 98,
PayloadType: 96,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=98", nil},
PayloadType: 99,
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil},
PayloadType: 97,
}, RTPCodecTypeVideo))
assert.NoError(t, m.updateFromRemoteDescription(mustParse(profileLevels)))

assert.True(t, m.negotiatedVideo)

vp9Codec, _, err := m.getCodecByPayload(96)
assert.NoError(t, err)
assert.Equal(t, vp9Codec.MimeType, MimeTypeVP9)
vp9RTX, _, err := m.getCodecByPayload(97)
assert.NoError(t, err)
assert.Equal(t, vp9RTX.MimeType, "video/rtx")

h264P1Codec, _, err := m.getCodecByPayload(106)
assert.NoError(t, err)
assert.Equal(t, h264P1Codec.MimeType, MimeTypeH264)
assert.Equal(t, h264P1Codec.SDPFmtpLine, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f")
h264P1RTX, _, err := m.getCodecByPayload(107)
assert.NoError(t, err)
assert.Equal(t, h264P1RTX.MimeType, "video/rtx")
assert.Equal(t, h264P1RTX.SDPFmtpLine, "apt=106")

h264P0Codec, _, err := m.getCodecByPayload(108)
assert.NoError(t, err)
assert.Equal(t, h264P0Codec.MimeType, MimeTypeH264)
assert.Equal(t, h264P0Codec.SDPFmtpLine, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f")
h264P0RTX, _, err := m.getCodecByPayload(109)
_, _, err := m.getCodecByPayload(97)
assert.NoError(t, err)
assert.Equal(t, h264P0RTX.MimeType, "video/rtx")
assert.Equal(t, h264P0RTX.SDPFmtpLine, "apt=108")
})

t.Run("Matches when rtx apt for partial match codec", func(t *testing.T) {
Expand Down
16 changes: 5 additions & 11 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,11 +1576,12 @@
return err
}

if i < 4 {
return errRTPTooShort
var mid, rid, rsid string
payloadType, err := handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID), uint8(repairStreamIDExtensionID), &mid, &rid, &rsid)
if err != nil {
return err

Check warning on line 1582 in peerconnection.go

View check run for this annotation

Codecov / codecov/patch

peerconnection.go#L1582

Added line #L1582 was not covered by tests
}

payloadType := PayloadType(b[1] & 0x7f)
params, err := pc.api.mediaEngine.getRTPParametersByPayloadType(payloadType)
if err != nil {
return err
Expand All @@ -1592,21 +1593,14 @@
return err
}

var mid, rid, rsid string
var paddingOnly bool
for readCount := 0; readCount <= simulcastProbeCount; readCount++ {
if mid == "" || (rid == "" && rsid == "") {
// skip padding only packets for probing
if paddingOnly {
readCount--
}

i, _, err := interceptor.Read(b, nil)
if err != nil {
return err
}

if _, paddingOnly, err = handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID), uint8(repairStreamIDExtensionID), &mid, &rid, &rsid); err != nil {
if _, err = handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID), uint8(repairStreamIDExtensionID), &mid, &rid, &rsid); err != nil {
return err
}

Expand Down
23 changes: 1 addition & 22 deletions peerconnection_media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,28 +1280,7 @@ func TestPeerConnection_Simulcast(t *testing.T) {

assert.NoError(t, signalPair(pcOffer, pcAnswer))

// padding only packets should not affect simulcast probe
var sequenceNumber uint16
for sequenceNumber = 0; sequenceNumber < simulcastProbeCount+10; sequenceNumber++ {
time.Sleep(20 * time.Millisecond)

for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
Padding: true,
},
Payload: []byte{0x00, 0x02},
}

assert.NoError(t, track.WriteRTP(pkt))
}
}
assert.False(t, ridsFullfilled(), "Simulcast probe should not be fulfilled by padding only packets")

for ; !ridsFullfilled(); sequenceNumber++ {
for sequenceNumber := uint16(0); !ridsFullfilled(); sequenceNumber++ {
time.Sleep(20 * time.Millisecond)

for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
Expand Down
98 changes: 6 additions & 92 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package webrtc

import (
"encoding/binary"
"fmt"
"io"
"sync"
Expand All @@ -32,28 +31,13 @@
rtcpReadStream *srtp.ReadStreamSRTCP
rtcpInterceptor interceptor.RTCPReader

repairReadStream *srtp.ReadStreamSRTP
repairInterceptor interceptor.RTPReader
repairStreamChannel chan rtxPacketWithAttributes
repairReadStream *srtp.ReadStreamSRTP
repairInterceptor interceptor.RTPReader

repairRtcpReadStream *srtp.ReadStreamSRTCP
repairRtcpInterceptor interceptor.RTCPReader
}

type rtxPacketWithAttributes struct {
pkt []byte
attributes interceptor.Attributes
pool *sync.Pool
}

func (p *rtxPacketWithAttributes) release() {
if p.pkt != nil {
b := p.pkt[:cap(p.pkt)]
p.pool.Put(b) // nolint:staticcheck
p.pkt = nil
}
}

// RTPReceiver allows an application to inspect the receipt of a TrackRemote
type RTPReceiver struct {
kind RTPCodecType
Expand All @@ -68,8 +52,6 @@

// A reference to the associated api object
api *API

rtxPool sync.Pool
}

// NewRTPReceiver constructs a new RTPReceiver
Expand All @@ -85,9 +67,6 @@
closed: make(chan interface{}),
received: make(chan interface{}),
tracks: []trackStreams{},
rtxPool: sync.Pool{New: func() interface{} {
return make([]byte, api.settingEngine.getReceiveMTU())
}},
}

return r, nil
Expand Down Expand Up @@ -166,7 +145,6 @@
track: newTrackRemote(
r.kind,
parameters.Encodings[i].SSRC,
parameters.Encodings[i].RTX.SSRC,
parameters.Encodings[i].RID,
r,
),
Expand Down Expand Up @@ -410,6 +388,8 @@
}

// receiveForRtx starts a routine that processes the repair stream
// These packets aren't exposed to the user yet, but we need to process them for
// TWCC
func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error {
var track *trackStreams
if ssrc != 0 && len(r.tracks) == 1 {
Expand All @@ -431,56 +411,12 @@
track.repairInterceptor = rtpInterceptor
track.repairRtcpReadStream = rtcpReadStream
track.repairRtcpInterceptor = rtcpInterceptor
track.repairStreamChannel = make(chan rtxPacketWithAttributes)

go func() {
b := make([]byte, r.api.settingEngine.getReceiveMTU())

Check warning on line 416 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L416

Added line #L416 was not covered by tests
for {
b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert
i, attributes, err := track.repairInterceptor.Read(b, nil)
if err != nil {
r.rtxPool.Put(b) // nolint:staticcheck
return
}

// RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the
// payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format
// as non-RTX RTP packets
hasExtension := b[0]&0b10000 > 0
hasPadding := b[0]&0b100000 > 0
csrcCount := b[0] & 0b1111
headerLength := uint16(12 + (4 * csrcCount))
paddingLength := 0
if hasExtension {
headerLength += 4 * (1 + binary.BigEndian.Uint16(b[headerLength+2:headerLength+4]))
}
if hasPadding {
paddingLength = int(b[i-1])
}

if i-int(headerLength)-paddingLength < 2 {
// BWE probe packet, ignore
r.rtxPool.Put(b) // nolint:staticcheck
continue
}

if attributes == nil {
attributes = make(interceptor.Attributes)
}
attributes.Set(AttributeRtxPayloadType, b[1]&0x7F)
attributes.Set(AttributeRtxSequenceNumber, binary.BigEndian.Uint16(b[2:4]))
attributes.Set(AttributeRtxSsrc, binary.BigEndian.Uint32(b[8:12]))

b[1] = (b[1] & 0x80) | uint8(track.track.PayloadType())
b[2] = b[headerLength]
b[3] = b[headerLength+1]
binary.BigEndian.PutUint32(b[8:12], uint32(track.track.SSRC()))
copy(b[headerLength:i-2], b[headerLength+2:i])

select {
case <-r.closed:
r.rtxPool.Put(b) // nolint:staticcheck
if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil {

Check warning on line 418 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L418

Added line #L418 was not covered by tests
return
case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes, pool: &r.rtxPool}:
}
}
}()
Expand Down Expand Up @@ -519,25 +455,3 @@
}
return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
}

// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil
func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes {
if !reader.HasRTX() {
return nil
}

select {
case <-r.received:
default:
return nil
}

if t := r.streamsForTrack(reader); t != nil {
select {
case rtxPacketReceived := <-t.repairStreamChannel:
return &rtxPacketReceived
default:
}
}
return nil
}
6 changes: 1 addition & 5 deletions rtptransceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,12 @@ func satisfyTypeAndDirection(remoteKind RTPCodecType, remoteDirection RTPTransce

// handleUnknownRTPPacket consumes a single RTP Packet and returns information that is helpful
// for demuxing and handling an unknown SSRC (usually for Simulcast)
func handleUnknownRTPPacket(buf []byte, midExtensionID, streamIDExtensionID, repairStreamIDExtensionID uint8, mid, rid, rsid *string) (payloadType PayloadType, paddingOnly bool, err error) {
func handleUnknownRTPPacket(buf []byte, midExtensionID, streamIDExtensionID, repairStreamIDExtensionID uint8, mid, rid, rsid *string) (payloadType PayloadType, err error) {
rp := &rtp.Packet{}
if err = rp.Unmarshal(buf); err != nil {
return
}

if rp.Padding && len(rp.Payload) == 0 {
paddingOnly = true
}

if !rp.Header.Extension {
return
}
Expand Down
6 changes: 0 additions & 6 deletions sdp.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,6 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) (
}
rtxRepairFlows[rtxRepairFlow] = baseSsrc
tracksInMediaSection = filterTrackWithSSRC(tracksInMediaSection, SSRC(rtxRepairFlow)) // Remove if rtx was added as track before
for i := range tracksInMediaSection {
if tracksInMediaSection[i].ssrcs[0] == SSRC(baseSsrc) {
repairSsrc := SSRC(rtxRepairFlow)
tracksInMediaSection[i].repairSsrc = &repairSsrc
}
}
}
}

Expand Down
Loading
Loading