From 179e1e6f0b0470b0a2e50b8f418215a393c2ca27 Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Tue, 1 Oct 2024 09:23:24 -0400 Subject: [PATCH] Add Retransmission and FEC to TrackLocal If the MediaEngine contains support for them a SSRC will be generated appropriately Co-authored-by: aggresss Co-authored-by: Kevin Wang --- interceptor_test.go | 6 +- mediaengine.go | 48 ++++++++--- mediaengine_test.go | 16 ++-- rtpcodec.go | 13 +++ rtpcodingparameters.go | 7 ++ rtpreceiver_go_test.go | 52 +++--------- rtpsender.go | 24 +++++- rtpsender_test.go | 72 +++++++++++++++++ rtptransceiver_test.go | 4 +- sdp.go | 9 +++ sdp_test.go | 162 +++++++++++++++++++++++++++++++++++++ track_local.go | 34 +++++--- track_local_static.go | 27 ++++--- track_local_static_test.go | 23 ++++++ 14 files changed, 408 insertions(+), 89 deletions(-) diff --git a/interceptor_test.go b/interceptor_test.go index c7c3879d790..aff8b67573c 100644 --- a/interceptor_test.go +++ b/interceptor_test.go @@ -196,10 +196,10 @@ func Test_Interceptor_BindUnbind(t *testing.T) { if cnt := atomic.LoadUint32(&cntUnbindLocalStream); cnt != 1 { t.Errorf("UnbindLocalStreamFn is expected to be called once, but called %d times", cnt) } - if cnt := atomic.LoadUint32(&cntBindRemoteStream); cnt != 1 { + if cnt := atomic.LoadUint32(&cntBindRemoteStream); cnt != 2 { t.Errorf("BindRemoteStreamFn is expected to be called once, but called %d times", cnt) } - if cnt := atomic.LoadUint32(&cntUnbindRemoteStream); cnt != 1 { + if cnt := atomic.LoadUint32(&cntUnbindRemoteStream); cnt != 2 { t.Errorf("UnbindRemoteStreamFn is expected to be called once, but called %d times", cnt) } @@ -207,7 +207,7 @@ func Test_Interceptor_BindUnbind(t *testing.T) { if cnt := atomic.LoadUint32(&cntBindRTCPWriter); cnt != 2 { t.Errorf("BindRTCPWriterFn is expected to be called twice, but called %d times", cnt) } - if cnt := atomic.LoadUint32(&cntBindRTCPReader); cnt != 2 { + if cnt := atomic.LoadUint32(&cntBindRTCPReader); cnt != 3 { t.Errorf("BindRTCPReaderFn is expected to be called twice, but called %d times", cnt) } if cnt := atomic.LoadUint32(&cntClose); cnt != 2 { diff --git a/mediaengine.go b/mediaengine.go index 47a08d2f24c..a386d78a7fb 100644 --- a/mediaengine.go +++ b/mediaengine.go @@ -47,6 +47,12 @@ const ( // MimeTypePCMA PCMA MIME type // Note: Matching should be case insensitive. MimeTypePCMA = "audio/PCMA" + // MimeTypeRTX RTX MIME type + // Note: Matching should be case insensitive. + MimeTypeRTX = "video/rtx" + // MimeTypeFlexFEC FEC MIME Type + // Note: Matching should be case insensitive. + MimeTypeFlexFEC = "video/flexfec" ) type mediaEngineHeaderExtension struct { @@ -106,7 +112,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error { PayloadType: 96, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=96", nil}, PayloadType: 97, }, @@ -115,7 +121,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error { PayloadType: 102, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=102", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=102", nil}, PayloadType: 103, }, @@ -124,7 +130,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error { PayloadType: 104, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=104", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=104", nil}, PayloadType: 105, }, @@ -133,7 +139,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error { PayloadType: 106, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=106", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=106", nil}, PayloadType: 107, }, @@ -142,7 +148,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error { PayloadType: 108, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=108", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=108", nil}, PayloadType: 109, }, @@ -151,7 +157,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error { PayloadType: 127, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=127", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=127", nil}, PayloadType: 125, }, @@ -160,7 +166,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error { PayloadType: 39, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=39", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=39", nil}, PayloadType: 40, }, @@ -169,7 +175,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error { PayloadType: 45, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=45", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=45", nil}, PayloadType: 46, }, @@ -178,7 +184,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error { PayloadType: 98, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=98", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=98", nil}, PayloadType: 99, }, @@ -187,7 +193,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error { PayloadType: 100, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=100", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=100", nil}, PayloadType: 101, }, @@ -196,7 +202,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error { PayloadType: 112, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=112", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=112", nil}, PayloadType: 113, }, } { @@ -702,3 +708,23 @@ func payloaderForCodec(codec RTPCodecCapability) (rtp.Payloader, error) { return nil, ErrNoPayloaderForCodec } } + +func (m *MediaEngine) isRTXEnabled(typ RTPCodecType, directions []RTPTransceiverDirection) bool { + for _, p := range m.getRTPParametersByKind(typ, directions).Codecs { + if p.MimeType == MimeTypeRTX { + return true + } + } + + return false +} + +func (m *MediaEngine) isFECEnabled(typ RTPCodecType, directions []RTPTransceiverDirection) bool { + for _, p := range m.getRTPParametersByKind(typ, directions).Codecs { + if strings.Contains(p.MimeType, MimeTypeFlexFEC) { + return true + } + } + + return false +} diff --git a/mediaengine_test.go b/mediaengine_test.go index 786cbcac0af..d598c03cb89 100644 --- a/mediaengine_test.go +++ b/mediaengine_test.go @@ -364,7 +364,7 @@ a=fmtp:97 apt=96 PayloadType: 96, }, RTPCodecTypeVideo)) assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=96", nil}, PayloadType: 97, }, RTPCodecTypeVideo)) assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ @@ -372,7 +372,7 @@ a=fmtp:97 apt=96 PayloadType: 102, }, RTPCodecTypeVideo)) assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=102", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=102", nil}, PayloadType: 103, }, RTPCodecTypeVideo)) assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ @@ -380,7 +380,7 @@ a=fmtp:97 apt=96 PayloadType: 104, }, RTPCodecTypeVideo)) assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=104", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=104", nil}, PayloadType: 105, }, RTPCodecTypeVideo)) assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ @@ -388,7 +388,7 @@ a=fmtp:97 apt=96 PayloadType: 98, }, RTPCodecTypeVideo)) assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=98", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=98", nil}, PayloadType: 99, }, RTPCodecTypeVideo)) assert.NoError(t, m.updateFromRemoteDescription(mustParse(profileLevels))) @@ -400,7 +400,7 @@ a=fmtp:97 apt=96 assert.Equal(t, vp9Codec.MimeType, MimeTypeVP9) vp9RTX, _, err := m.getCodecByPayload(97) assert.NoError(t, err) - assert.Equal(t, vp9RTX.MimeType, "video/rtx") + assert.Equal(t, vp9RTX.MimeType, MimeTypeRTX) h264P1Codec, _, err := m.getCodecByPayload(106) assert.NoError(t, err) @@ -408,7 +408,7 @@ a=fmtp:97 apt=96 assert.Equal(t, h264P1Codec.SDPFmtpLine, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f") h264P1RTX, _, err := m.getCodecByPayload(107) assert.NoError(t, err) - assert.Equal(t, h264P1RTX.MimeType, "video/rtx") + assert.Equal(t, h264P1RTX.MimeType, MimeTypeRTX) assert.Equal(t, h264P1RTX.SDPFmtpLine, "apt=106") h264P0Codec, _, err := m.getCodecByPayload(108) @@ -417,7 +417,7 @@ a=fmtp:97 apt=96 assert.Equal(t, h264P0Codec.SDPFmtpLine, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f") h264P0RTX, _, err := m.getCodecByPayload(109) assert.NoError(t, err) - assert.Equal(t, h264P0RTX.MimeType, "video/rtx") + assert.Equal(t, h264P0RTX.MimeType, MimeTypeRTX) assert.Equal(t, h264P0RTX.SDPFmtpLine, "apt=108") }) @@ -443,7 +443,7 @@ a=fmtp:97 apt=96 PayloadType: 96, }, RTPCodecTypeVideo)) assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=96", nil}, PayloadType: 97, }, RTPCodecTypeVideo)) assert.NoError(t, m.updateFromRemoteDescription(mustParse(profileLevels))) diff --git a/rtpcodec.go b/rtpcodec.go index 40463dcb0ce..ca91ab22069 100644 --- a/rtpcodec.go +++ b/rtpcodec.go @@ -4,6 +4,7 @@ package webrtc import ( + "fmt" "strings" "github.com/pion/webrtc/v4/internal/fmtp" @@ -123,3 +124,15 @@ func codecParametersFuzzySearch(needle RTPCodecParameters, haystack []RTPCodecPa return RTPCodecParameters{}, codecMatchNone } + +// Given a CodecParameters find the RTX CodecParameters if one exists +func findRTXCodecParameters(needle RTPCodecParameters, haystack []RTPCodecParameters) (RTPCodecParameters, bool) { + aptStr := fmt.Sprintf("apt=%d", needle.PayloadType) + for _, c := range haystack { + if aptStr == c.SDPFmtpLine { + return c, true + } + } + + return RTPCodecParameters{}, false +} diff --git a/rtpcodingparameters.go b/rtpcodingparameters.go index f03d8c35f45..fa39a631d41 100644 --- a/rtpcodingparameters.go +++ b/rtpcodingparameters.go @@ -9,6 +9,12 @@ type RTPRtxParameters struct { SSRC SSRC `json:"ssrc"` } +// RTPFecParameters dictionary contains information relating to forward error correction (FEC) settings. +// https://draft.ortc.org/#dom-rtcrtpfecparameters +type RTPFecParameters struct { + SSRC SSRC `json:"ssrc"` +} + // RTPCodingParameters provides information relating to both encoding and decoding. // This is a subset of the RFC since Pion WebRTC doesn't implement encoding/decoding itself // http://draft.ortc.org/#dom-rtcrtpcodingparameters @@ -17,4 +23,5 @@ type RTPCodingParameters struct { SSRC SSRC `json:"ssrc"` PayloadType PayloadType `json:"payloadType"` RTX RTPRtxParameters `json:"rtx"` + FEC RTPFecParameters `json:"fec"` } diff --git a/rtpreceiver_go_test.go b/rtpreceiver_go_test.go index 6a8b2b23c2d..60c3fd8e95b 100644 --- a/rtpreceiver_go_test.go +++ b/rtpreceiver_go_test.go @@ -7,18 +7,13 @@ package webrtc import ( - "bufio" "context" "encoding/binary" "errors" - "fmt" "io" - "strconv" - "strings" "testing" "time" - "github.com/pion/randutil" "github.com/pion/rtp" "github.com/pion/sdp/v3" "github.com/pion/transport/v3/test" @@ -86,19 +81,18 @@ func TestSetRTPParameters(t *testing.T) { func Test_RTX_Read(t *testing.T) { defer test.TimeOut(time.Second * 30).Stop() - var ssrc *uint32 - ssrcLines := "" - rtxSsrc := randutil.NewMathRandomGenerator().Uint32() - pcOffer, pcAnswer, err := newPair() assert.NoError(t, err) track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id") assert.NoError(t, err) - _, err = pcOffer.AddTrack(track) + rtpSender, err := pcOffer.AddTrack(track) assert.NoError(t, err) + rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC + ssrc := rtpSender.GetParameters().Encodings[0].SSRC + rtxRead, rtxReadCancel := context.WithCancel(context.Background()) pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) { for { @@ -111,7 +105,7 @@ func Test_RTX_Read(t *testing.T) { assert.NoError(t, readRTPErr) assert.NotNil(t, pkt) - assert.Equal(t, pkt.SSRC, *ssrc) + assert.Equal(t, pkt.SSRC, uint32(ssrc)) assert.Equal(t, pkt.PayloadType, uint8(96)) assert.Equal(t, pkt.Payload, []byte{0xB, 0xA, 0xD}) @@ -120,7 +114,7 @@ func Test_RTX_Read(t *testing.T) { rtxSSRC := attributes.Get(AttributeRtxSsrc) if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil { assert.Equal(t, rtxPayloadType, uint8(97)) - assert.Equal(t, rtxSSRC, rtxSsrc) + assert.Equal(t, rtxSSRC, uint32(rtxSsrc)) assert.Equal(t, rtxSequenceNumber, pkt.SequenceNumber+500) rtxReadCancel() @@ -128,42 +122,14 @@ func Test_RTX_Read(t *testing.T) { } }) - assert.NoError(t, signalPairWithModification(pcOffer, pcAnswer, func(offer string) (modified string) { - scanner := bufio.NewScanner(strings.NewReader(offer)) - for scanner.Scan() { - l := scanner.Text() - - if strings.HasPrefix(l, "a=ssrc") { - if ssrc == nil { - lineSplit := strings.Split(l, " ")[0] - parsed, atoiErr := strconv.ParseUint(strings.TrimPrefix(lineSplit, "a=ssrc:"), 10, 32) - assert.NoError(t, atoiErr) - - parsedSsrc := uint32(parsed) - ssrc = &parsedSsrc - - modified += fmt.Sprintf("a=ssrc-group:FID %d %d\r\n", *ssrc, rtxSsrc) - } - - ssrcLines += l + "\n" - } else if ssrcLines != "" { - ssrcLines = strings.ReplaceAll(ssrcLines, fmt.Sprintf("%d", *ssrc), fmt.Sprintf("%d", rtxSsrc)) - modified += ssrcLines - ssrcLines = "" - } - - modified += l + "\n" - } - - return modified - })) + assert.NoError(t, signalPair(pcOffer, pcAnswer)) func() { for i := uint16(0); ; i++ { pkt := rtp.Packet{ Header: rtp.Header{ Version: 2, - SSRC: *ssrc, + SSRC: uint32(ssrc), PayloadType: 96, SequenceNumber: i, }, @@ -182,7 +148,7 @@ func Test_RTX_Read(t *testing.T) { // Send the RTX _, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{ Version: 2, - SSRC: rtxSsrc, + SSRC: uint32(rtxSsrc), PayloadType: 97, SequenceNumber: i + 500, }, rtxPayload) diff --git a/rtpsender.go b/rtpsender.go index 79d58d6371c..e9137ebd733 100644 --- a/rtpsender.go +++ b/rtpsender.go @@ -29,7 +29,7 @@ type trackEncoding struct { context *baseTrackLocalContext - ssrc SSRC + ssrc, ssrcRTX, ssrcFEC SSRC } // RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer @@ -124,6 +124,8 @@ func (r *RTPSender) getParameters() RTPSendParameters { RTPCodingParameters: RTPCodingParameters{ RID: rid, SSRC: trackEncoding.ssrc, + RTX: RTPRtxParameters{SSRC: trackEncoding.ssrcRTX}, + FEC: RTPFecParameters{SSRC: trackEncoding.ssrcFEC}, PayloadType: r.payloadType, }, }) @@ -199,9 +201,21 @@ func (r *RTPSender) AddEncoding(track TrackLocal) error { } func (r *RTPSender) addEncoding(track TrackLocal) { + ssrcFEC, ssrcRTX := SSRC(0), SSRC(0) + + if r.api.mediaEngine.isRTXEnabled(r.kind, []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}) { + ssrcRTX = SSRC(randutil.NewMathRandomGenerator().Uint32()) + } + + if r.api.mediaEngine.isFECEnabled(r.kind, []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}) { + ssrcFEC = SSRC(randutil.NewMathRandomGenerator().Uint32()) + } + trackEncoding := &trackEncoding{ - track: track, - ssrc: SSRC(randutil.NewMathRandomGenerator().Uint32()), + track: track, + ssrc: SSRC(randutil.NewMathRandomGenerator().Uint32()), + ssrcFEC: ssrcFEC, + ssrcRTX: ssrcRTX, } r.trackEncodings = append(r.trackEncodings, trackEncoding) @@ -261,6 +275,8 @@ func (r *RTPSender) ReplaceTrack(track TrackLocal) error { id: context.ID(), params: r.api.mediaEngine.getRTPParametersByKind(track.Kind(), []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}), ssrc: context.SSRC(), + ssrcRTX: context.SSRCRetransmission(), + ssrcFEC: context.SSRCForwardErrorCorrection(), writeStream: context.WriteStream(), rtcpInterceptor: context.RTCPReader(), }) @@ -306,6 +322,8 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error { id: r.id, params: r.api.mediaEngine.getRTPParametersByKind(trackEncoding.track.Kind(), []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}), ssrc: parameters.Encodings[idx].SSRC, + ssrcFEC: parameters.Encodings[idx].FEC.SSRC, + ssrcRTX: parameters.Encodings[idx].RTX.SSRC, writeStream: writeStream, rtcpInterceptor: trackEncoding.rtcpInterceptor, } diff --git a/rtpsender_test.go b/rtpsender_test.go index 31807708c8a..0d7d337247a 100644 --- a/rtpsender_test.go +++ b/rtpsender_test.go @@ -401,3 +401,75 @@ func Test_RTPSender_Add_Encoding(t *testing.T) { assert.NoError(t, peerConnection.Close()) } + +// nolint: dupl +func Test_RTPSender_FEC_Support(t *testing.T) { + t.Run("FEC SSRC by Default", func(t *testing.T) { + track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion") + assert.NoError(t, err) + + peerConnection, err := NewPeerConnection(Configuration{}) + assert.NoError(t, err) + + rtpSender, err := peerConnection.AddTrack(track) + assert.NoError(t, err) + + assert.NotZero(t, rtpSender.GetParameters().Encodings[0].FEC.SSRC) + }) + + t.Run("FEC can be disabled", func(t *testing.T) { + m := MediaEngine{} + assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ + RTPCodecCapability: RTPCodecCapability{MimeTypeVP8, 90000, 0, "", nil}, + PayloadType: 94, + }, RTPCodecTypeVideo)) + api := NewAPI(WithMediaEngine(&m)) + + track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion") + assert.NoError(t, err) + + peerConnection, err := api.NewPeerConnection(Configuration{}) + assert.NoError(t, err) + + rtpSender, err := peerConnection.AddTrack(track) + assert.NoError(t, err) + + assert.Zero(t, rtpSender.GetParameters().Encodings[0].FEC.SSRC) + }) +} + +// nolint: dupl +func Test_RTPSender_RTX_Support(t *testing.T) { + t.Run("RTX SSRC by Default", func(t *testing.T) { + track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion") + assert.NoError(t, err) + + peerConnection, err := NewPeerConnection(Configuration{}) + assert.NoError(t, err) + + rtpSender, err := peerConnection.AddTrack(track) + assert.NoError(t, err) + + assert.NotZero(t, rtpSender.GetParameters().Encodings[0].RTX.SSRC) + }) + + t.Run("RTX can be disabled", func(t *testing.T) { + m := MediaEngine{} + assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ + RTPCodecCapability: RTPCodecCapability{MimeTypeVP8, 90000, 0, "", nil}, + PayloadType: 94, + }, RTPCodecTypeVideo)) + api := NewAPI(WithMediaEngine(&m)) + + track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion") + assert.NoError(t, err) + + peerConnection, err := api.NewPeerConnection(Configuration{}) + assert.NoError(t, err) + + rtpSender, err := peerConnection.AddTrack(track) + assert.NoError(t, err) + + assert.Zero(t, rtpSender.GetParameters().Encodings[0].RTX.SSRC) + }) +} diff --git a/rtptransceiver_test.go b/rtptransceiver_test.go index 3d7096ee726..8c64fe475aa 100644 --- a/rtptransceiver_test.go +++ b/rtptransceiver_test.go @@ -60,7 +60,7 @@ func Test_RTPTransceiver_SetCodecPreferences(t *testing.T) { PayloadType: 96, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=96", nil}, PayloadType: 97, }, @@ -69,7 +69,7 @@ func Test_RTPTransceiver_SetCodecPreferences(t *testing.T) { PayloadType: 98, }, { - RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=98", nil}, + RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=98", nil}, PayloadType: 99, }, }, diff --git a/sdp.go b/sdp.go index a39ba7e84c4..3cfd43fd853 100644 --- a/sdp.go +++ b/sdp.go @@ -392,7 +392,16 @@ func addSenderSDP( sendParameters := sender.GetParameters() for _, encoding := range sendParameters.Encodings { + if encoding.RTX.SSRC != 0 { + media = media.WithValueAttribute("ssrc-group", fmt.Sprintf("FID %d %d", encoding.SSRC, encoding.RTX.SSRC)) + } + + if encoding.FEC.SSRC != 0 { + media = media.WithValueAttribute("ssrc-group", fmt.Sprintf("FEC-FR %d %d", encoding.SSRC, encoding.FEC.SSRC)) + } + media = media.WithMediaSource(uint32(encoding.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID()) + if !isPlanB { media = media.WithPropertyAttribute("msid:" + track.StreamID() + " " + track.ID()) } diff --git a/sdp_test.go b/sdp_test.go index c448ac6e0d2..e8d13b05f72 100644 --- a/sdp_test.go +++ b/sdp_test.go @@ -746,3 +746,165 @@ func TestRtpExtensionsFromMediaDescription(t *testing.T) { assert.Equal(t, extensions[sdp.ABSSendTimeURI], 1) assert.Equal(t, extensions[sdp.SDESMidURI], 3) } + +// Assert that FEC and RTX SSRCes are present if they are enabled in the MediaEngine +func Test_SSRC_Groups(t *testing.T) { + const offerWithRTX = `v=0 +o=- 930222930247584370 1727933945 IN IP4 0.0.0.0 +s=- +t=0 0 +a=msid-semantic:WMS* +a=fingerprint:sha-256 11:3F:1C:8D:D4:1D:8D:E7:E1:3E:AF:38:06:0D:1D:40:22:DC:FE:C9:93:E4:80:D8:0B:17:9F:2E:C1:CA:C8:3D +a=extmap-allow-mixed +a=group:BUNDLE 0 1 +m=audio 9 UDP/TLS/RTP/SAVPF 101 +c=IN IP4 0.0.0.0 +a=setup:actpass +a=mid:0 +a=ice-ufrag:yIgpPUMarFReduuM +a=ice-pwd:VmnVaqCByWiOTatFoDBbMGhSFGlsxviz +a=rtcp-mux +a=rtcp-rsize +a=rtpmap:101 opus/90000 +a=rtcp-fb:101 transport-cc +a=extmap:4 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01 +a=ssrc:3566446228 cname:stream-id +a=ssrc:3566446228 msid:stream-id audio-id +a=ssrc:3566446228 mslabel:stream-id +a=ssrc:3566446228 label:audio-id +a=msid:stream-id audio-id +a=sendrecv +m=video 9 UDP/TLS/RTP/SAVPF 96 97 +c=IN IP4 0.0.0.0 +a=setup:actpass +a=mid:1 +a=ice-ufrag:yIgpPUMarFReduuM +a=ice-pwd:VmnVaqCByWiOTatFoDBbMGhSFGlsxviz +a=rtpmap:96 VP8/90000 +a=rtcp-fb:96 nack +a=rtcp-fb:96 nack pli +a=rtcp-fb:96 transport-cc +a=rtpmap:97 rtx/90000 +a=fmtp:97 apt=96 +a=ssrc-group:FID 1701050765 2578535262 +a=ssrc:1701050765 cname:stream-id +a=ssrc:1701050765 msid:stream-id track-id +a=ssrc:1701050765 mslabel:stream-id +a=ssrc:1701050765 label:track-id +a=msid:stream-id track-id +a=sendrecv +` + + const offerNoRTX = `v=0 +o=- 930222930247584370 1727933945 IN IP4 0.0.0.0 +s=- +t=0 0 +a=msid-semantic:WMS* +a=fingerprint:sha-256 11:3F:1C:8D:D4:1D:8D:E7:E1:3E:AF:38:06:0D:1D:40:22:DC:FE:C9:93:E4:80:D8:0B:17:9F:2E:C1:CA:C8:3D +a=extmap-allow-mixed +a=group:BUNDLE 0 1 +m=audio 9 UDP/TLS/RTP/SAVPF 101 +a=mid:0 +a=ice-ufrag:yIgpPUMarFReduuM +a=ice-pwd:VmnVaqCByWiOTatFoDBbMGhSFGlsxviz +a=rtcp-mux +a=rtcp-rsize +a=rtpmap:101 opus/90000 +a=rtcp-fb:101 transport-cc +a=extmap:4 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01 +a=ssrc:3566446228 cname:stream-id +a=ssrc:3566446228 msid:stream-id audio-id +a=ssrc:3566446228 mslabel:stream-id +a=ssrc:3566446228 label:audio-id +a=msid:stream-id audio-id +a=sendrecv +m=video 9 UDP/TLS/RTP/SAVPF 96 +c=IN IP4 0.0.0.0 +a=setup:actpass +a=mid:1 +a=ice-ufrag:yIgpPUMarFReduuM +a=ice-pwd:VmnVaqCByWiOTatFoDBbMGhSFGlsxviz +a=rtpmap:96 VP8/90000 +a=rtcp-fb:96 nack +a=rtcp-fb:96 nack pli +a=rtcp-fb:96 transport-cc +a=ssrc-group:FID 1701050765 2578535262 +a=ssrc:1701050765 cname:stream-id +a=ssrc:1701050765 msid:stream-id track-id +a=ssrc:1701050765 mslabel:stream-id +a=ssrc:1701050765 label:track-id +a=msid:stream-id track-id +a=sendrecv +` + + for _, testCase := range []struct { + name string + enableRTXInMediaEngine bool + rtxExpected bool + remoteOffer string + }{ + {"Offer", true, true, ""}, + {"Offer no Local Groups", false, false, ""}, + {"Answer", true, true, offerWithRTX}, + {"Answer No Local Groups", false, false, offerWithRTX}, + {"Answer No Remote Groups", true, true, offerNoRTX}, + } { + t.Run(testCase.name, func(t *testing.T) { + checkRTXSupport := func(s *sdp.SessionDescription) { + // RTX is never enabled for audio + assert.Nil(t, trackDetailsFromSDP(nil, s)[0].repairSsrc) + + // RTX is conditionally enabled for video + if testCase.rtxExpected { + assert.NotNil(t, trackDetailsFromSDP(nil, s)[1].repairSsrc) + } else { + assert.Nil(t, trackDetailsFromSDP(nil, s)[1].repairSsrc) + } + } + + m := &MediaEngine{} + assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ + RTPCodecCapability: RTPCodecCapability{MimeType: MimeTypeOpus, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil}, + PayloadType: 101, + }, RTPCodecTypeAudio)) + assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ + RTPCodecCapability: RTPCodecCapability{MimeType: MimeTypeVP8, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil}, + PayloadType: 96, + }, RTPCodecTypeVideo)) + if testCase.enableRTXInMediaEngine { + assert.NoError(t, m.RegisterCodec(RTPCodecParameters{ + RTPCodecCapability: RTPCodecCapability{MimeType: MimeTypeRTX, ClockRate: 90000, Channels: 0, SDPFmtpLine: "apt=96", RTCPFeedback: nil}, + PayloadType: 97, + }, RTPCodecTypeVideo)) + } + + peerConnection, err := NewAPI(WithMediaEngine(m)).NewPeerConnection(Configuration{}) + assert.NoError(t, err) + + audioTrack, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeOpus}, "audio-id", "stream-id") + assert.NoError(t, err) + + _, err = peerConnection.AddTrack(audioTrack) + assert.NoError(t, err) + + videoTrack, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video-id", "stream-id") + assert.NoError(t, err) + + _, err = peerConnection.AddTrack(videoTrack) + assert.NoError(t, err) + + if testCase.remoteOffer == "" { + offer, err := peerConnection.CreateOffer(nil) + assert.NoError(t, err) + checkRTXSupport(offer.parsed) + } else { + assert.NoError(t, peerConnection.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: testCase.remoteOffer})) + answer, err := peerConnection.CreateAnswer(nil) + assert.NoError(t, err) + checkRTXSupport(answer.parsed) + } + + assert.NoError(t, peerConnection.Close()) + }) + } +} diff --git a/track_local.go b/track_local.go index 21131c81119..86a6b7f8371 100644 --- a/track_local.go +++ b/track_local.go @@ -21,17 +21,22 @@ type TrackLocalWriter interface { // in Interceptors. type TrackLocalContext interface { // CodecParameters returns the negotiated RTPCodecParameters. These are the codecs supported by both - // PeerConnections and the SSRC/PayloadTypes + // PeerConnections and the PayloadTypes CodecParameters() []RTPCodecParameters // HeaderExtensions returns the negotiated RTPHeaderExtensionParameters. These are the header extensions supported by - // both PeerConnections and the SSRC/PayloadTypes + // both PeerConnections and the URI/IDs HeaderExtensions() []RTPHeaderExtensionParameter - // SSRC requires the negotiated SSRC of this track - // This track may have multiple if RTX is enabled + // SSRC returns the negotiated SSRC of this track SSRC() SSRC + // SSRCRetransmission returns the negotiated SSRC used to send retransmissions for this track + SSRCRetransmission() SSRC + + // SSRCForwardErrorCorrection returns the negotiated SSRC to send forward error correction for this track + SSRCForwardErrorCorrection() SSRC + // WriteStream returns the WriteStream for this TrackLocal. The implementer writes the outbound // media packets to it WriteStream() TrackLocalWriter @@ -44,11 +49,11 @@ type TrackLocalContext interface { } type baseTrackLocalContext struct { - id string - params RTPParameters - ssrc SSRC - writeStream TrackLocalWriter - rtcpInterceptor interceptor.RTCPReader + id string + params RTPParameters + ssrc, ssrcRTX, ssrcFEC SSRC + writeStream TrackLocalWriter + rtcpInterceptor interceptor.RTCPReader } // CodecParameters returns the negotiated RTPCodecParameters. These are the codecs supported by both @@ -64,11 +69,20 @@ func (t *baseTrackLocalContext) HeaderExtensions() []RTPHeaderExtensionParameter } // SSRC requires the negotiated SSRC of this track -// This track may have multiple if RTX is enabled func (t *baseTrackLocalContext) SSRC() SSRC { return t.ssrc } +// SSRCRetransmission returns the negotiated SSRC used to send retransmissions for this track +func (t *baseTrackLocalContext) SSRCRetransmission() SSRC { + return t.ssrcRTX +} + +// SSRCForwardErrorCorrection returns the negotiated SSRC to send forward error correction for this track +func (t *baseTrackLocalContext) SSRCForwardErrorCorrection() SSRC { + return t.ssrcFEC +} + // WriteStream returns the WriteStream for this TrackLocal. The implementer writes the outbound // media packets to it func (t *baseTrackLocalContext) WriteStream() TrackLocalWriter { diff --git a/track_local_static.go b/track_local_static.go index ef26e006aec..19dbbb75a40 100644 --- a/track_local_static.go +++ b/track_local_static.go @@ -19,10 +19,10 @@ import ( // Bind can be called multiple times, this stores the // result for a single bind call so that it can be used when writing type trackBinding struct { - id string - ssrc SSRC - payloadType PayloadType - writeStream TrackLocalWriter + id string + ssrc, ssrcRTX, ssrcFEC SSRC + payloadType, payloadTypeRTX PayloadType + writeStream TrackLocalWriter } // TrackLocalStaticRTP is a TrackLocal that has a pre-set codec and accepts RTP Packets. @@ -59,19 +59,28 @@ func WithRTPStreamID(rid string) func(*TrackLocalStaticRTP) { // Bind is called by the PeerConnection after negotiation is complete // This asserts that the code requested is supported by the remote peer. -// If so it setups all the state (SSRC and PayloadType) to have a call +// If so it sets up all the state (SSRC and PayloadType) to have a call func (s *TrackLocalStaticRTP) Bind(t TrackLocalContext) (RTPCodecParameters, error) { s.mu.Lock() defer s.mu.Unlock() parameters := RTPCodecParameters{RTPCodecCapability: s.codec} if codec, matchType := codecParametersFuzzySearch(parameters, t.CodecParameters()); matchType != codecMatchNone { + payloadTypeRTX := PayloadType(0) + if rtxParameters, ok := findRTXCodecParameters(codec, t.CodecParameters()); ok { + payloadTypeRTX = rtxParameters.PayloadType + } + s.bindings = append(s.bindings, trackBinding{ - ssrc: t.SSRC(), - payloadType: codec.PayloadType, - writeStream: t.WriteStream(), - id: t.ID(), + ssrc: t.SSRC(), + ssrcRTX: t.SSRCRetransmission(), + ssrcFEC: t.SSRCForwardErrorCorrection(), + payloadType: codec.PayloadType, + payloadTypeRTX: payloadTypeRTX, + writeStream: t.WriteStream(), + id: t.ID(), }) + return codec, nil } diff --git a/track_local_static_test.go b/track_local_static_test.go index f4d9dfd92fd..d3e8e7dd56a 100644 --- a/track_local_static_test.go +++ b/track_local_static_test.go @@ -313,3 +313,26 @@ func Test_TrackLocalStatic_Padding(t *testing.T) { closePairNow(t, offerer, answerer) } + +func Test_TrackLocalStatic_RTX(t *testing.T) { + // defer test.TimeOut(time.Second * 30).Stop() + // defer test.CheckRoutines(t)() + + offerer, answerer, err := newPair() + assert.NoError(t, err) + + track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion") + assert.NoError(t, err) + + _, err = offerer.AddTrack(track) + assert.NoError(t, err) + + assert.NoError(t, signalPair(offerer, answerer)) + + track.mu.Lock() + assert.NotZero(t, track.bindings[0].ssrcRTX) + assert.NotZero(t, track.bindings[0].payloadTypeRTX) + track.mu.Unlock() + + closePairNow(t, offerer, answerer) +}