Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an RTX-associated TrackLocal #2852

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions rtpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type trackEncoding struct {
context *baseTrackLocalContext

ssrc SSRC

rtx *trackEncoding
}

// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer
Expand Down Expand Up @@ -199,12 +201,19 @@ func (r *RTPSender) AddEncoding(track TrackLocal) error {
}

func (r *RTPSender) addEncoding(track TrackLocal) {
trackEncoding := &trackEncoding{
baseTrackEncoding := &trackEncoding{
track: track,
ssrc: SSRC(randutil.NewMathRandomGenerator().Uint32()),
}

r.trackEncodings = append(r.trackEncodings, trackEncoding)
if r.api.settingEngine.distinctSSRCRTX {
baseTrackEncoding.rtx = &trackEncoding{
track: track,
ssrc: SSRC(randutil.NewMathRandomGenerator().Uint32()),
}
}

r.trackEncodings = append(r.trackEncodings, baseTrackEncoding)
}

// Track returns the RTCRtpTransceiver track, or nil
Expand Down Expand Up @@ -339,6 +348,63 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error {
)

writeStream.interceptor.Store(rtpInterceptor)

if trackEncoding.rtx != nil {
// also handle the rtx track. check the sdp for the associated
// a=fmtp:<pt> apt=<rtx-pt> line (see https://datatracker.ietf.org/doc/html/rfc4588#section-8.1)

codecs := r.api.mediaEngine.getCodecsByKind(trackEncoding.track.Kind())
var rtxCodec *RTPCodecParameters
for _, c := range codecs {
if fmt.Sprintf("apt=%d", c.PayloadType) == codec.SDPFmtpLine {
rtxCodec = &c
break
}
}

if rtxCodec == nil {
return fmt.Errorf("no RTX codec found for %s", codec.SDPFmtpLine)
}

rtxSRTPStream := &srtpWriterFuture{ssrc: parameters.Encodings[idx].RTX.SSRC, rtpSender: r}
rtxWriteStream := &interceptorToTrackLocalWriter{}

trackEncoding.rtx.srtpStream = rtxSRTPStream
trackEncoding.rtx.ssrc = parameters.Encodings[idx].RTX.SSRC
trackEncoding.rtx.context = &baseTrackLocalContext{
id: r.id,
params: r.api.mediaEngine.getRTPParametersByKind(trackEncoding.track.Kind(), []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}),
ssrc: parameters.Encodings[idx].RTX.SSRC,
writeStream: rtxWriteStream,
rtcpInterceptor: trackEncoding.rtcpInterceptor,
}

trackEncoding.rtx.context.params.Codecs = []RTPCodecParameters{*rtxCodec}

trackEncoding.rtx.streamInfo = *createStreamInfo(
fmt.Sprintf("%s-rtx", r.id),
parameters.Encodings[idx].RTX.SSRC,
rtxCodec.PayloadType,
rtxCodec.RTPCodecCapability,
parameters.HeaderExtensions,
)

trackEncoding.rtx.rtcpInterceptor = r.api.interceptor.BindRTCPReader(
interceptor.RTCPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
n, err = trackEncoding.rtx.srtpStream.Read(in)
return n, a, err
}),
)

rtxRTPInterceptor := r.api.interceptor.BindLocalStream(
&trackEncoding.rtx.streamInfo,
interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, _ interceptor.Attributes) (int, error) {
return rtxSRTPStream.WriteRTP(header, payload)
}),
)

rtxWriteStream.interceptor.Store(rtxRTPInterceptor)
}
}

close(r.sendCalled)
Expand Down
8 changes: 7 additions & 1 deletion sdp.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,13 @@ func addSenderSDP(

sendParameters := sender.GetParameters()
for _, encoding := range sendParameters.Encodings {
media = media.WithMediaSource(uint32(encoding.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
if encoding.RTX.SSRC == 0 {
media = media.WithMediaSource(uint32(encoding.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
} else {
media = media.WithValueAttribute(sdp.AttrKeySSRCGroup, fmt.Sprintf("FID %d %d", encoding.SSRC, encoding.RTX.SSRC)).
WithMediaSource(uint32(encoding.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID()).
WithMediaSource(uint32(encoding.RTX.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
}
if !isPlanB {
media = media.WithPropertyAttribute("msid:" + track.StreamID() + " " + track.ID())
}
Expand Down
6 changes: 6 additions & 0 deletions settingengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type SettingEngine struct {
srtpProtectionProfiles []dtls.SRTPProtectionProfile
receiveMTU uint
iceMaxBindingRequests *uint16
distinctSSRCRTX bool
}

// getReceiveMTU returns the configured MTU. If SettingEngine's MTU is configured to 0 it returns the default
Expand Down Expand Up @@ -491,3 +492,8 @@ func (e *SettingEngine) SetSCTPRTOMax(rtoMax time.Duration) {
func (e *SettingEngine) SetICEBindingRequestHandler(bindingRequestHandler func(m *stun.Message, local, remote ice.Candidate, pair *ice.CandidatePair) bool) {
e.iceBindingRequestHandler = bindingRequestHandler
}

// SetDistinctSSRCRTX configures if RTX should be sent with distinct SSRC instead of in-band.
func (e *SettingEngine) SetDistinctSSRCRTX(distinctSSRCRTX bool) {
e.distinctSSRCRTX = distinctSSRCRTX
}
11 changes: 7 additions & 4 deletions track_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ type TrackLocalContext interface {
// both PeerConnections and the SSRC/PayloadTypes
HeaderExtensions() []RTPHeaderExtensionParameter

// SSRC requires the negotiated SSRC of this track
// This track may have multiple if RTX is enabled
// SSRC requires the negotiated SSRC of this track.
SSRC() SSRC

// WriteStream returns the WriteStream for this TrackLocal. The implementer writes the outbound
Expand Down Expand Up @@ -63,8 +62,7 @@ func (t *baseTrackLocalContext) HeaderExtensions() []RTPHeaderExtensionParameter
return t.params.HeaderExtensions
}

// SSRC requires the negotiated SSRC of this track
// This track may have multiple if RTX is enabled
// SSRC requires the negotiated SSRC of this track.
func (t *baseTrackLocalContext) SSRC() SSRC {
return t.ssrc
}
Expand Down Expand Up @@ -111,4 +109,9 @@ type TrackLocal interface {

// Kind controls if this TrackLocal is audio or video
Kind() RTPCodecType

// RTXTrackLocal returns the retransmission TrackLocal for this TrackLocal.
// If this TrackLocal does not have an RTX TrackLocal or is an RTX TrackLocal itself,
// this will return nil.
RTXTrackLocal() TrackLocal
}
Loading