diff --git a/constants.go b/constants.go index ea484431b77..dad0068f4a6 100644 --- a/constants.go +++ b/constants.go @@ -33,13 +33,6 @@ const ( generatedCertificateOrigin = "WebRTC" sdesRepairRTPStreamIDURI = "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id" - - // AttributeRtxPayloadType is the interceptor attribute added when Read() returns an RTX packet containing the RTX stream payload type - AttributeRtxPayloadType = "rtx_payload_type" - // AttributeRtxSsrc is the interceptor attribute added when Read() returns an RTX packet containing the RTX stream SSRC - AttributeRtxSsrc = "rtx_ssrc" - // AttributeRtxSequenceNumber is the interceptor attribute added when Read() returns an RTX packet containing the RTX stream sequence number - AttributeRtxSequenceNumber = "rtx_sequence_number" ) func defaultSrtpProtectionProfiles() []dtls.SRTPProtectionProfile { diff --git a/mediaengine.go b/mediaengine.go index cbdcfb6d7d5..f0dd0eda7f6 100644 --- a/mediaengine.go +++ b/mediaengine.go @@ -414,11 +414,9 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo } aptMatch := codecMatchNone - var aptCodec RTPCodecParameters for _, codec := range exactMatches { if codec.PayloadType == PayloadType(payloadType) { aptMatch = codecMatchExact - aptCodec = codec break } } @@ -427,7 +425,6 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo for _, codec := range partialMatches { if codec.PayloadType == PayloadType(payloadType) { aptMatch = codecMatchPartial - aptCodec = codec break } } @@ -437,14 +434,8 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo return codecMatchNone, nil // not an error, we just ignore this codec we don't support } - // replace the apt value with the original codec's payload type - toMatchCodec := remoteCodec - if aptMatched, mt := codecParametersFuzzySearch(aptCodec, codecs); mt == aptMatch { - toMatchCodec.SDPFmtpLine = strings.Replace(toMatchCodec.SDPFmtpLine, fmt.Sprintf("apt=%d", payloadType), fmt.Sprintf("apt=%d", aptMatched.PayloadType), 1) - } - // if apt's media codec is partial match, then apt codec must be partial match too - _, matchType := codecParametersFuzzySearch(toMatchCodec, codecs) + _, matchType := codecParametersFuzzySearch(remoteCodec, codecs) if matchType == codecMatchExact && aptMatch == codecMatchPartial { matchType = codecMatchPartial } diff --git a/mediaengine_test.go b/mediaengine_test.go index d9a9e7b1ff4..00022da1759 100644 --- a/mediaengine_test.go +++ b/mediaengine_test.go @@ -308,18 +308,8 @@ a=rtpmap:96 VP8/90000 o=- 4596489990601351948 2 IN IP4 127.0.0.1 s=- t=0 0 -m=video 60323 UDP/TLS/RTP/SAVPF 94 95 106 107 108 109 96 97 +m=video 60323 UDP/TLS/RTP/SAVPF 94 96 97 a=rtpmap:94 VP8/90000 -a=rtpmap:95 rtx/90000 -a=fmtp:95 apt=94 -a=rtpmap:106 H264/90000 -a=fmtp:106 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f -a=rtpmap:107 rtx/90000 -a=fmtp:107 apt=106 -a=rtpmap:108 H264/90000 -a=fmtp:108 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f -a=rtpmap:109 rtx/90000 -a=fmtp:109 apt=108 a=rtpmap:96 VP9/90000 a=fmtp:96 profile-id=2 a=rtpmap:97 rtx/90000 @@ -328,64 +318,22 @@ a=fmtp:97 apt=96 m := MediaEngine{} assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ RTPCodecCapability: RTPCodecCapability{MimeTypeVP8, 90000, 0, "", nil}, - PayloadType: 96, - }, RTPCodecTypeVideo)) - assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil}, - PayloadType: 97, - }, RTPCodecTypeVideo)) - assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f", nil}, - PayloadType: 102, - }, RTPCodecTypeVideo)) - assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=102", nil}, - PayloadType: 103, - }, RTPCodecTypeVideo)) - assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f", nil}, - PayloadType: 104, - }, RTPCodecTypeVideo)) - assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=104", nil}, - PayloadType: 105, + PayloadType: 94, }, RTPCodecTypeVideo)) assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ RTPCodecCapability: RTPCodecCapability{MimeTypeVP9, 90000, 0, "profile-id=2", nil}, - PayloadType: 98, + PayloadType: 96, }, RTPCodecTypeVideo)) assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=98", nil}, - PayloadType: 99, + RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil}, + PayloadType: 97, }, RTPCodecTypeVideo)) assert.NoError(t, m.updateFromRemoteDescription(mustParse(profileLevels))) assert.True(t, m.negotiatedVideo) - vp9Codec, _, err := m.getCodecByPayload(96) - assert.NoError(t, err) - assert.Equal(t, vp9Codec.MimeType, MimeTypeVP9) - vp9RTX, _, err := m.getCodecByPayload(97) - assert.NoError(t, err) - assert.Equal(t, vp9RTX.MimeType, "video/rtx") - - h264P1Codec, _, err := m.getCodecByPayload(106) - assert.NoError(t, err) - assert.Equal(t, h264P1Codec.MimeType, MimeTypeH264) - assert.Equal(t, h264P1Codec.SDPFmtpLine, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f") - h264P1RTX, _, err := m.getCodecByPayload(107) - assert.NoError(t, err) - assert.Equal(t, h264P1RTX.MimeType, "video/rtx") - assert.Equal(t, h264P1RTX.SDPFmtpLine, "apt=106") - - h264P0Codec, _, err := m.getCodecByPayload(108) - assert.NoError(t, err) - assert.Equal(t, h264P0Codec.MimeType, MimeTypeH264) - assert.Equal(t, h264P0Codec.SDPFmtpLine, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f") - h264P0RTX, _, err := m.getCodecByPayload(109) + _, _, err := m.getCodecByPayload(97) assert.NoError(t, err) - assert.Equal(t, h264P0RTX.MimeType, "video/rtx") - assert.Equal(t, h264P0RTX.SDPFmtpLine, "apt=108") }) t.Run("Matches when rtx apt for partial match codec", func(t *testing.T) { diff --git a/peerconnection.go b/peerconnection.go index 5e2fba24cf1..2de25ba733b 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -1576,11 +1576,12 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err return err } - if i < 4 { - return errRTPTooShort + var mid, rid, rsid string + payloadType, err := handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID), uint8(repairStreamIDExtensionID), &mid, &rid, &rsid) + if err != nil { + return err } - payloadType := PayloadType(b[1] & 0x7f) params, err := pc.api.mediaEngine.getRTPParametersByPayloadType(payloadType) if err != nil { return err @@ -1592,21 +1593,14 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err return err } - var mid, rid, rsid string - var paddingOnly bool for readCount := 0; readCount <= simulcastProbeCount; readCount++ { if mid == "" || (rid == "" && rsid == "") { - // skip padding only packets for probing - if paddingOnly { - readCount-- - } - i, _, err := interceptor.Read(b, nil) if err != nil { return err } - if _, paddingOnly, err = handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID), uint8(repairStreamIDExtensionID), &mid, &rid, &rsid); err != nil { + if _, err = handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID), uint8(repairStreamIDExtensionID), &mid, &rid, &rsid); err != nil { return err } diff --git a/peerconnection_media_test.go b/peerconnection_media_test.go index 228cfed4bc2..7868900f360 100644 --- a/peerconnection_media_test.go +++ b/peerconnection_media_test.go @@ -1280,28 +1280,7 @@ func TestPeerConnection_Simulcast(t *testing.T) { assert.NoError(t, signalPair(pcOffer, pcAnswer)) - // padding only packets should not affect simulcast probe - var sequenceNumber uint16 - for sequenceNumber = 0; sequenceNumber < simulcastProbeCount+10; sequenceNumber++ { - time.Sleep(20 * time.Millisecond) - - for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} { - pkt := &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - SequenceNumber: sequenceNumber, - PayloadType: 96, - Padding: true, - }, - Payload: []byte{0x00, 0x02}, - } - - assert.NoError(t, track.WriteRTP(pkt)) - } - } - assert.False(t, ridsFullfilled(), "Simulcast probe should not be fulfilled by padding only packets") - - for ; !ridsFullfilled(); sequenceNumber++ { + for sequenceNumber := uint16(0); !ridsFullfilled(); sequenceNumber++ { time.Sleep(20 * time.Millisecond) for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} { diff --git a/rtpreceiver.go b/rtpreceiver.go index 2457ba080a8..1bdec899bcf 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -7,7 +7,6 @@ package webrtc import ( - "encoding/binary" "fmt" "io" "sync" @@ -32,28 +31,13 @@ type trackStreams struct { rtcpReadStream *srtp.ReadStreamSRTCP rtcpInterceptor interceptor.RTCPReader - repairReadStream *srtp.ReadStreamSRTP - repairInterceptor interceptor.RTPReader - repairStreamChannel chan rtxPacketWithAttributes + repairReadStream *srtp.ReadStreamSRTP + repairInterceptor interceptor.RTPReader repairRtcpReadStream *srtp.ReadStreamSRTCP repairRtcpInterceptor interceptor.RTCPReader } -type rtxPacketWithAttributes struct { - pkt []byte - attributes interceptor.Attributes - pool *sync.Pool -} - -func (p *rtxPacketWithAttributes) release() { - if p.pkt != nil { - b := p.pkt[:cap(p.pkt)] - p.pool.Put(b) // nolint:staticcheck - p.pkt = nil - } -} - // RTPReceiver allows an application to inspect the receipt of a TrackRemote type RTPReceiver struct { kind RTPCodecType @@ -68,8 +52,6 @@ type RTPReceiver struct { // A reference to the associated api object api *API - - rtxPool sync.Pool } // NewRTPReceiver constructs a new RTPReceiver @@ -85,9 +67,6 @@ func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RT closed: make(chan interface{}), received: make(chan interface{}), tracks: []trackStreams{}, - rtxPool: sync.Pool{New: func() interface{} { - return make([]byte, api.settingEngine.getReceiveMTU()) - }}, } return r, nil @@ -166,7 +145,6 @@ func (r *RTPReceiver) configureReceive(parameters RTPReceiveParameters) { track: newTrackRemote( r.kind, parameters.Encodings[i].SSRC, - parameters.Encodings[i].RTX.SSRC, parameters.Encodings[i].RID, r, ), @@ -410,6 +388,8 @@ func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo } // receiveForRtx starts a routine that processes the repair stream +// These packets aren't exposed to the user yet, but we need to process them for +// TWCC func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error { var track *trackStreams if ssrc != 0 && len(r.tracks) == 1 { @@ -431,56 +411,12 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep track.repairInterceptor = rtpInterceptor track.repairRtcpReadStream = rtcpReadStream track.repairRtcpInterceptor = rtcpInterceptor - track.repairStreamChannel = make(chan rtxPacketWithAttributes) go func() { + b := make([]byte, r.api.settingEngine.getReceiveMTU()) for { - b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert - i, attributes, err := track.repairInterceptor.Read(b, nil) - if err != nil { - r.rtxPool.Put(b) // nolint:staticcheck - return - } - - // RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the - // payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format - // as non-RTX RTP packets - hasExtension := b[0]&0b10000 > 0 - hasPadding := b[0]&0b100000 > 0 - csrcCount := b[0] & 0b1111 - headerLength := uint16(12 + (4 * csrcCount)) - paddingLength := 0 - if hasExtension { - headerLength += 4 * (1 + binary.BigEndian.Uint16(b[headerLength+2:headerLength+4])) - } - if hasPadding { - paddingLength = int(b[i-1]) - } - - if i-int(headerLength)-paddingLength < 2 { - // BWE probe packet, ignore - r.rtxPool.Put(b) // nolint:staticcheck - continue - } - - if attributes == nil { - attributes = make(interceptor.Attributes) - } - attributes.Set(AttributeRtxPayloadType, b[1]&0x7F) - attributes.Set(AttributeRtxSequenceNumber, binary.BigEndian.Uint16(b[2:4])) - attributes.Set(AttributeRtxSsrc, binary.BigEndian.Uint32(b[8:12])) - - b[1] = (b[1] & 0x80) | uint8(track.track.PayloadType()) - b[2] = b[headerLength] - b[3] = b[headerLength+1] - binary.BigEndian.PutUint32(b[8:12], uint32(track.track.SSRC())) - copy(b[headerLength:i-2], b[headerLength+2:i]) - - select { - case <-r.closed: - r.rtxPool.Put(b) // nolint:staticcheck + if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil { return - case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes, pool: &r.rtxPool}: } } }() @@ -519,25 +455,3 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote } return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC()) } - -// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil -func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes { - if !reader.HasRTX() { - return nil - } - - select { - case <-r.received: - default: - return nil - } - - if t := r.streamsForTrack(reader); t != nil { - select { - case rtxPacketReceived := <-t.repairStreamChannel: - return &rtxPacketReceived - default: - } - } - return nil -} diff --git a/rtptransceiver.go b/rtptransceiver.go index 4ba732e8a1e..15d28f720d1 100644 --- a/rtptransceiver.go +++ b/rtptransceiver.go @@ -266,16 +266,12 @@ func satisfyTypeAndDirection(remoteKind RTPCodecType, remoteDirection RTPTransce // handleUnknownRTPPacket consumes a single RTP Packet and returns information that is helpful // for demuxing and handling an unknown SSRC (usually for Simulcast) -func handleUnknownRTPPacket(buf []byte, midExtensionID, streamIDExtensionID, repairStreamIDExtensionID uint8, mid, rid, rsid *string) (payloadType PayloadType, paddingOnly bool, err error) { +func handleUnknownRTPPacket(buf []byte, midExtensionID, streamIDExtensionID, repairStreamIDExtensionID uint8, mid, rid, rsid *string) (payloadType PayloadType, err error) { rp := &rtp.Packet{} if err = rp.Unmarshal(buf); err != nil { return } - if rp.Padding && len(rp.Payload) == 0 { - paddingOnly = true - } - if !rp.Header.Extension { return } diff --git a/sdp.go b/sdp.go index 980f9850779..9d6f0cfc86b 100644 --- a/sdp.go +++ b/sdp.go @@ -128,12 +128,6 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) ( } rtxRepairFlows[rtxRepairFlow] = baseSsrc tracksInMediaSection = filterTrackWithSSRC(tracksInMediaSection, SSRC(rtxRepairFlow)) // Remove if rtx was added as track before - for i := range tracksInMediaSection { - if tracksInMediaSection[i].ssrcs[0] == SSRC(baseSsrc) { - repairSsrc := SSRC(rtxRepairFlow) - tracksInMediaSection[i].repairSsrc = &repairSsrc - } - } } } diff --git a/sdp_test.go b/sdp_test.go index bda54fa6058..d3f86307193 100644 --- a/sdp_test.go +++ b/sdp_test.go @@ -271,42 +271,6 @@ func TestTrackDetailsFromSDP(t *testing.T) { } assert.Equal(t, 0, len(trackDetailsFromSDP(nil, s))) }) - - t.Run("ssrc-group after ssrc", func(t *testing.T) { - s := &sdp.SessionDescription{ - MediaDescriptions: []*sdp.MediaDescription{ - { - MediaName: sdp.MediaName{ - Media: "video", - }, - Attributes: []sdp.Attribute{ - {Key: "mid", Value: "0"}, - {Key: "sendrecv"}, - {Key: "ssrc", Value: "3000 msid:video_trk_label video_trk_guid"}, - {Key: "ssrc", Value: "4000 msid:rtx_trk_label rtx_trck_guid"}, - {Key: "ssrc-group", Value: "FID 3000 4000"}, - }, - }, - { - MediaName: sdp.MediaName{ - Media: "video", - }, - Attributes: []sdp.Attribute{ - {Key: "mid", Value: "1"}, - {Key: "sendrecv"}, - {Key: "ssrc-group", Value: "FID 5000 6000"}, - {Key: "ssrc", Value: "5000 msid:video_trk_label video_trk_guid"}, - {Key: "ssrc", Value: "6000 msid:rtx_trk_label rtx_trck_guid"}, - }, - }, - }, - } - - tracks := trackDetailsFromSDP(nil, s) - assert.Equal(t, 2, len(tracks)) - assert.Equal(t, SSRC(4000), *tracks[0].repairSsrc) - assert.Equal(t, SSRC(6000), *tracks[1].repairSsrc) - }) } func TestHaveApplicationMediaSection(t *testing.T) { diff --git a/track_remote.go b/track_remote.go index 691b488d83e..fb97179337c 100644 --- a/track_remote.go +++ b/track_remote.go @@ -24,7 +24,6 @@ type TrackRemote struct { payloadType PayloadType kind RTPCodecType ssrc SSRC - rtxSsrc SSRC codec RTPCodecParameters params RTPParameters rid string @@ -34,11 +33,10 @@ type TrackRemote struct { peekedAttributes interceptor.Attributes } -func newTrackRemote(kind RTPCodecType, ssrc, rtxSsrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote { +func newTrackRemote(kind RTPCodecType, ssrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote { return &TrackRemote{ kind: kind, ssrc: ssrc, - rtxSsrc: rtxSsrc, rid: rid, receiver: receiver, } @@ -127,23 +125,13 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes, } } - // If there's a separate RTX track and an RTX packet is available, return that - if rtxPacketReceived := r.readRTX(t); rtxPacketReceived != nil { - n = copy(b, rtxPacketReceived.pkt) - attributes = rtxPacketReceived.attributes - rtxPacketReceived.release() - err = nil - } else { - // If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return - // a packet from the main track - n, attributes, err = r.readRTP(b, t) - if err != nil { - return - } - err = t.checkAndUpdateTrack(b) + n, attributes, err = r.readRTP(b, t) + if err != nil { + return } - return n, attributes, err + err = t.checkAndUpdateTrack(b) + return } // checkAndUpdateTrack checks payloadType for every incoming packet @@ -210,17 +198,3 @@ func (t *TrackRemote) peek(b []byte) (n int, a interceptor.Attributes, err error func (t *TrackRemote) SetReadDeadline(deadline time.Time) error { return t.receiver.setRTPReadDeadline(deadline, t) } - -// RtxSSRC returns the RTX SSRC for a track, or 0 if track does not have a separate RTX stream -func (t *TrackRemote) RtxSSRC() SSRC { - t.mu.RLock() - defer t.mu.RUnlock() - return t.rtxSsrc -} - -// HasRTX returns true if the track has a separate RTX stream -func (t *TrackRemote) HasRTX() bool { - t.mu.RLock() - defer t.mu.RUnlock() - return t.rtxSsrc != 0 -}