Skip to content

Commit

Permalink
Support TrackLocal RTX
Browse files Browse the repository at this point in the history
  • Loading branch information
aggresss authored and Sean-Der committed Oct 5, 2024
1 parent 5ce4343 commit 859560c
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 16 deletions.
8 changes: 6 additions & 2 deletions pkg/nack/nack.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
// Package nack provides interceptors to implement sending and receiving negative acknowledgements
package nack

import "github.com/pion/interceptor"
import (
"strings"

"github.com/pion/interceptor"
)

func streamSupportNack(info *interceptor.StreamInfo) bool {
for _, fb := range info.RTCPFeedback {
Expand All @@ -13,5 +17,5 @@ func streamSupportNack(info *interceptor.StreamInfo) bool {
}
}

return false
return strings.HasSuffix(info.MimeType, "/rtx")

Check warning on line 20 in pkg/nack/nack.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/nack.go#L20

Added line #L20 was not covered by tests
}
77 changes: 63 additions & 14 deletions pkg/nack/responder_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package nack

import (
"encoding/binary"
"strings"
"sync"

"github.com/pion/interceptor"
Expand Down Expand Up @@ -62,6 +64,11 @@ type ResponderInterceptor struct {
type localStream struct {
sendBuffer *sendBuffer
rtpWriter interceptor.RTPWriter

rtxRtpWriter interceptor.RTPWriter

Check failure on line 68 in pkg/nack/responder_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

ST1003: struct field rtxRtpWriter should be rtxRTPWriter (stylecheck)
rtxSsrc uint32
rtxPayloadType uint8
rtxBaseSn uint16
}

// NewResponderInterceptor returns a new ResponderInterceptorFactor
Expand Down Expand Up @@ -105,20 +112,31 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri
return writer
}

// error is already checked in NewGeneratorInterceptor
sendBuffer, _ := newSendBuffer(n.size)
n.streamsMu.Lock()
n.streams[info.SSRC] = &localStream{sendBuffer: sendBuffer, rtpWriter: writer}
n.streamsMu.Unlock()

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
pkt, err := n.packetFactory.NewPacket(header, payload)
if err != nil {
return 0, err
defer n.streamsMu.Unlock()

if strings.HasSuffix(info.MimeType, "/rtx") && info.Attributes.Get("apt_ssrc") != nil {
if stream, ok := n.streams[info.Attributes.Get("apt_ssrc").(uint32)]; ok {
stream.rtxRtpWriter = writer
stream.rtxPayloadType = info.PayloadType
stream.rtxSsrc = info.SSRC
stream.rtxBaseSn = 1000

Check warning on line 123 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L119-L123

Added lines #L119 - L123 were not covered by tests
}
sendBuffer.add(pkt)
return writer.Write(header, payload, attributes)
})
return writer

Check warning on line 125 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L125

Added line #L125 was not covered by tests
} else {

Check failure on line 126 in pkg/nack/responder_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
// error is already checked in NewGeneratorInterceptor
sendBuffer, _ := newSendBuffer(n.size)
n.streams[info.SSRC] = &localStream{sendBuffer: sendBuffer, rtpWriter: writer}

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
pkt, err := n.packetFactory.NewPacket(header, payload)
if err != nil {
return 0, err

Check warning on line 134 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L134

Added line #L134 was not covered by tests
}
sendBuffer.add(pkt)
return writer.Write(header, payload, attributes)
})
}
}

// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
Expand All @@ -139,8 +157,39 @@ func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) {
for i := range nack.Nacks {
nack.Nacks[i].Range(func(seq uint16) bool {
if p := stream.sendBuffer.get(seq); p != nil {
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
n.log.Warnf("failed resending nacked packet: %+v", err)
if stream.rtxRtpWriter != nil {
// Store the original sequence number and rewrite the sequence number.
osn := p.Header().SequenceNumber
p.Header().SequenceNumber = stream.rtxBaseSn
stream.rtxBaseSn++

Check warning on line 164 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L162-L164

Added lines #L162 - L164 were not covered by tests
// Rewrite the SSRC.
p.Header().SSRC = stream.rtxSsrc

Check warning on line 166 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L166

Added line #L166 was not covered by tests
// Rewrite the payload type.
p.Header().PayloadType = stream.rtxPayloadType

Check warning on line 168 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L168

Added line #L168 was not covered by tests
// Remove padding if present.
paddingLength := 0
originPayload := p.Payload()
if p.Header().Padding {
paddingLength = int(originPayload[len(originPayload)-1])
p.Header().Padding = false

Check warning on line 174 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L170-L174

Added lines #L170 - L174 were not covered by tests
}
// Write the original sequence number at the begining of the payload.

Check failure on line 176 in pkg/nack/responder_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

`begining` is a misspelling of `beginning` (misspell)
payload := make([]byte, 2)
binary.BigEndian.PutUint16(payload, osn)
payload = append(payload, originPayload[:len(originPayload)-paddingLength]...)

Check warning on line 179 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L177-L179

Added lines #L177 - L179 were not covered by tests
// Send RTX packet.
if _, err := stream.rtxRtpWriter.Write(p.Header(), payload, interceptor.Attributes{}); err != nil {
n.log.Warnf("failed sending rtx packet: %+v", err)

Check warning on line 182 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L181-L182

Added lines #L181 - L182 were not covered by tests
}
// Resore the Padding and SSRC.
if paddingLength > 0 {
p.Header().Padding = true

Check warning on line 186 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L185-L186

Added lines #L185 - L186 were not covered by tests
}
p.Header().SequenceNumber = osn

Check warning on line 188 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L188

Added line #L188 was not covered by tests
} else {
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
n.log.Warnf("failed resending nacked packet: %+v", err)

Check warning on line 191 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L191

Added line #L191 was not covered by tests
}
}
p.Release()
}
Expand Down

0 comments on commit 859560c

Please sign in to comment.