Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support TrackLocal RTX #2676

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions rtpcodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package webrtc

import (
"fmt"
"regexp"
"strings"

"github.com/pion/webrtc/v4/internal/fmtp"
Expand Down Expand Up @@ -123,3 +125,27 @@

return RTPCodecParameters{}, codecMatchNone
}

// Do a fuzzy find for a associated codec in the list of codecs
// Used for lookup up a associated codec in an existing list to find a match
// Returns codecMatchExact, codecMatchPartial, or codecMatchNone
func codecParametersAssociatedSearch(needle RTPCodecParameters, haystack []RTPCodecParameters) (RTPCodecParameters, codecMatchType) {

Check failure on line 132 in rtpcodec.go

View workflow job for this annotation

GitHub Actions / lint / Go

unnecessary leading newline (whitespace)

Check failure on line 133 in rtpcodec.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gofumpt`-ed (gofumpt)
// First attempt to match Exact
for _, c := range haystack {
if c.SDPFmtpLine == fmt.Sprintf("apt=%d", needle.PayloadType) {
return c, codecMatchExact
}
}

// Fallback to just has apt codec
if re, err := regexp.Compile(`^apt=\d+$`); err == nil {

Check failure on line 142 in rtpcodec.go

View workflow job for this annotation

GitHub Actions / lint / Go

regexpMust: for const patterns like `^apt=\d+$`, use regexp.MustCompile (gocritic)
for _, c := range haystack {
if re.MatchString(c.SDPFmtpLine) {
return c, codecMatchPartial
}

Check warning on line 146 in rtpcodec.go

View check run for this annotation

Codecov / codecov/patch

rtpcodec.go#L145-L146

Added lines #L145 - L146 were not covered by tests
}
}

return RTPCodecParameters{}, codecMatchNone
}
91 changes: 85 additions & 6 deletions rtpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@
)

type trackEncoding struct {
track TrackLocal

srtpStream *srtpWriterFuture
track TrackLocal
context *baseTrackLocalContext

ssrc SSRC
srtpStream *srtpWriterFuture
rtcpInterceptor interceptor.RTCPReader
streamInfo interceptor.StreamInfo

context *baseTrackLocalContext

ssrc SSRC
rtxSsrc SSRC
rtxSrtpStream *srtpWriterFuture
rtxRtcpInterceptor interceptor.RTCPReader
rtxStreamInfo interceptor.StreamInfo
}

// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer
Expand Down Expand Up @@ -125,6 +127,7 @@
RID: rid,
SSRC: trackEncoding.ssrc,
PayloadType: r.payloadType,
RTX: RTPRtxParameters{SSRC: trackEncoding.rtxSsrc},
},
})
}
Expand Down Expand Up @@ -204,6 +207,16 @@
ssrc: SSRC(randutil.NewMathRandomGenerator().Uint32()),
}

if r.api.settingEngine.trackLocalRtx {
codecs := r.api.mediaEngine.getCodecsByKind(track.Kind())
for _, c := range codecs {
if _, matchType := codecParametersAssociatedSearch(c, codecs); matchType != codecMatchNone {
trackEncoding.rtxSsrc = SSRC(randutil.NewMathRandomGenerator().Uint32())
break

Check warning on line 215 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L211-L215

Added lines #L211 - L215 were not covered by tests
}
}
}

r.trackEncodings = append(r.trackEncodings, trackEncoding)
}

Expand Down Expand Up @@ -339,6 +352,38 @@
)

writeStream.interceptor.Store(rtpInterceptor)

if rtxCodec, matchType := codecParametersAssociatedSearch(codec, r.api.mediaEngine.getCodecsByKind(r.kind)); matchType == codecMatchExact &&
parameters.Encodings[idx].RTX.SSRC != 0 {

Check failure on line 357 in rtpsender.go

View workflow job for this annotation

GitHub Actions / lint / Go

unnecessary leading newline (whitespace)

rtxSrtpStream := &srtpWriterFuture{ssrc: parameters.Encodings[idx].RTX.SSRC, rtpSender: r}

trackEncoding.rtxSrtpStream = rtxSrtpStream
trackEncoding.rtxSsrc = parameters.Encodings[idx].RTX.SSRC

trackEncoding.rtxStreamInfo = *createStreamInfo(
r.id+"_rtx",
parameters.Encodings[idx].RTX.SSRC,
rtxCodec.PayloadType,
rtxCodec.RTPCodecCapability,
parameters.HeaderExtensions,
)
trackEncoding.rtxStreamInfo.Attributes.Set("apt_ssrc", uint32(parameters.Encodings[idx].SSRC))

trackEncoding.rtxRtcpInterceptor = r.api.interceptor.BindRTCPReader(
interceptor.RTCPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
n, err = trackEncoding.rtxSrtpStream.Read(in)
return n, a, err
}),

Check warning on line 377 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L358-L377

Added lines #L358 - L377 were not covered by tests
)

r.api.interceptor.BindLocalStream(
&trackEncoding.rtxStreamInfo,
interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
return rtxSrtpStream.WriteRTP(header, payload)
}),

Check warning on line 384 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L380-L384

Added lines #L380 - L384 were not covered by tests
)
}
}

close(r.sendCalled)
Expand Down Expand Up @@ -371,6 +416,10 @@
if trackEncoding.srtpStream != nil {
errs = append(errs, trackEncoding.srtpStream.Close())
}
if trackEncoding.rtxSrtpStream != nil {
r.api.interceptor.UnbindLocalStream(&trackEncoding.rtxStreamInfo)
errs = append(errs, trackEncoding.rtxSrtpStream.Close())
}

Check warning on line 422 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L420-L422

Added lines #L420 - L422 were not covered by tests
}

return util.FlattenErrs(errs)
Expand Down Expand Up @@ -402,6 +451,36 @@
return pkts, attributes, nil
}

// ReadRtx reads incoming RTX Stream RTCP for this RTPSender
func (r *RTPSender) ReadRtx(b []byte) (n int, a interceptor.Attributes, err error) {
if r.trackEncodings[0].rtxRtcpInterceptor == nil {
return 0, nil, io.ErrNoProgress
}

Check warning on line 458 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L455-L458

Added lines #L455 - L458 were not covered by tests

select {
case <-r.sendCalled:
return r.trackEncodings[0].rtxRtcpInterceptor.Read(b, a)
case <-r.stopCalled:
return 0, nil, io.ErrClosedPipe

Check warning on line 464 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L460-L464

Added lines #L460 - L464 were not covered by tests
}
}

// ReadRtxRTCP is a convenience method that wraps ReadRtx and unmarshals for you.
func (r *RTPSender) ReadRtxRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
b := make([]byte, r.api.settingEngine.getReceiveMTU())
i, attributes, err := r.ReadRtx(b)
if err != nil {
return nil, nil, err
}

Check warning on line 474 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L469-L474

Added lines #L469 - L474 were not covered by tests

pkts, err := rtcp.Unmarshal(b[:i])
if err != nil {
return nil, nil, err
}

Check warning on line 479 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L476-L479

Added lines #L476 - L479 were not covered by tests

return pkts, attributes, nil

Check warning on line 481 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L481

Added line #L481 was not covered by tests
}

// ReadSimulcast reads incoming RTCP for this RTPSender for given rid
func (r *RTPSender) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
select {
Expand Down
6 changes: 6 additions & 0 deletions sdp.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,13 @@

sendParameters := sender.GetParameters()
for _, encoding := range sendParameters.Encodings {
if encoding.RTX.SSRC != 0 {
media = media.WithValueAttribute(sdp.AttrKeySSRCGroup, fmt.Sprintf("FID %d %d", encoding.SSRC, encoding.RTX.SSRC))
}

Check warning on line 394 in sdp.go

View check run for this annotation

Codecov / codecov/patch

sdp.go#L393-L394

Added lines #L393 - L394 were not covered by tests
media = media.WithMediaSource(uint32(encoding.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
if encoding.RTX.SSRC != 0 {
media = media.WithMediaSource(uint32(encoding.RTX.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
}

Check warning on line 398 in sdp.go

View check run for this annotation

Codecov / codecov/patch

sdp.go#L397-L398

Added lines #L397 - L398 were not covered by tests
if !isPlanB {
media = media.WithPropertyAttribute("msid:" + track.StreamID() + " " + track.ID())
}
Expand Down
6 changes: 6 additions & 0 deletions settingengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
srtpProtectionProfiles []dtls.SRTPProtectionProfile
receiveMTU uint
iceMaxBindingRequests *uint16
trackLocalRtx bool
}

// getReceiveMTU returns the configured MTU. If SettingEngine's MTU is configured to 0 it returns the default
Expand Down Expand Up @@ -437,3 +438,8 @@
func (e *SettingEngine) SetDTLSCustomerCipherSuites(customCipherSuites func() []dtls.CipherSuite) {
e.dtls.customCipherSuites = customCipherSuites
}

// SetTrackLocalRtx allows track local use RTX.
func (e *SettingEngine) SetTrackLocalRtx(enable bool) {
e.trackLocalRtx = enable

Check warning on line 444 in settingengine.go

View check run for this annotation

Codecov / codecov/patch

settingengine.go#L443-L444

Added lines #L443 - L444 were not covered by tests
}
5 changes: 3 additions & 2 deletions track_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ type TrackLocalContext interface {
}

type baseTrackLocalContext struct {
id string
params RTPParameters
id string
params RTPParameters

ssrc SSRC
writeStream TrackLocalWriter
rtcpInterceptor interceptor.RTCPReader
Expand Down
Loading