Skip to content

Commit

Permalink
Support TrackLocal RTX
Browse files Browse the repository at this point in the history
  • Loading branch information
aggresss committed Feb 19, 2024
1 parent 5574fda commit e530b47
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 16 deletions.
8 changes: 6 additions & 2 deletions pkg/nack/nack.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
// Package nack provides interceptors to implement sending and receiving negative acknowledgements
package nack

import "github.com/pion/interceptor"
import (
"strings"

"github.com/pion/interceptor"
)

func streamSupportNack(info *interceptor.StreamInfo) bool {
for _, fb := range info.RTCPFeedback {
Expand All @@ -13,5 +17,5 @@ func streamSupportNack(info *interceptor.StreamInfo) bool {
}
}

return false
return strings.HasSuffix(info.MimeType, "/rtx")

Check warning on line 20 in pkg/nack/nack.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/nack.go#L20

Added line #L20 was not covered by tests
}
77 changes: 63 additions & 14 deletions pkg/nack/responder_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package nack

import (
"encoding/binary"
"strings"
"sync"

"github.com/pion/interceptor"
Expand Down Expand Up @@ -60,6 +62,11 @@ type ResponderInterceptor struct {
type localStream struct {
sendBuffer *sendBuffer
rtpWriter interceptor.RTPWriter

rtxRtpWriter interceptor.RTPWriter

Check failure on line 66 in pkg/nack/responder_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

ST1003: struct field rtxRtpWriter should be rtxRTPWriter (stylecheck)
rtxSsrc uint32
rtxPayloadType uint8
rtxBaseSn uint16
}

// NewResponderInterceptor returns a new ResponderInterceptorFactor
Expand Down Expand Up @@ -103,20 +110,31 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri
return writer
}

// error is already checked in NewGeneratorInterceptor
sendBuffer, _ := newSendBuffer(n.size)
n.streamsMu.Lock()
n.streams[info.SSRC] = &localStream{sendBuffer: sendBuffer, rtpWriter: writer}
n.streamsMu.Unlock()

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
pkt, err := n.packetFactory.NewPacket(header, payload)
if err != nil {
return 0, err
defer n.streamsMu.Unlock()

if strings.HasSuffix(info.MimeType, "/rtx") && info.Attributes.Get("apt_ssrc") != nil {
if stream, ok := n.streams[info.Attributes.Get("apt_ssrc").(uint32)]; ok {
stream.rtxRtpWriter = writer
stream.rtxPayloadType = info.PayloadType
stream.rtxSsrc = info.SSRC
stream.rtxBaseSn = 1000

Check warning on line 121 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L117-L121

Added lines #L117 - L121 were not covered by tests
}
sendBuffer.add(pkt)
return writer.Write(header, payload, attributes)
})
return writer

Check warning on line 123 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L123

Added line #L123 was not covered by tests
} else {

Check warning on line 124 in pkg/nack/responder_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
// error is already checked in NewGeneratorInterceptor
sendBuffer, _ := newSendBuffer(n.size)
n.streams[info.SSRC] = &localStream{sendBuffer: sendBuffer, rtpWriter: writer}

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
pkt, err := n.packetFactory.NewPacket(header, payload)
if err != nil {
return 0, err
}

Check warning on line 133 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L132-L133

Added lines #L132 - L133 were not covered by tests
sendBuffer.add(pkt)
return writer.Write(header, payload, attributes)
})
}
}

// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
Expand All @@ -137,8 +155,39 @@ func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) {
for i := range nack.Nacks {
nack.Nacks[i].Range(func(seq uint16) bool {
if p := stream.sendBuffer.get(seq); p != nil {
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
n.log.Warnf("failed resending nacked packet: %+v", err)
if stream.rtxRtpWriter != nil {
// Store the original sequence number and rewrite the sequence number.
osn := p.Header().SequenceNumber
p.Header().SequenceNumber = stream.rtxBaseSn
stream.rtxBaseSn++
// Rewrite the SSRC.
p.Header().SSRC = stream.rtxSsrc
// Rewrite the payload type.
p.Header().PayloadType = stream.rtxPayloadType
// Remove padding if present.
paddingLength := 0
originPayload := p.Payload()
if p.Header().Padding {
paddingLength = int(originPayload[len(originPayload)-1])
p.Header().Padding = false
}

Check warning on line 173 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L159-L173

Added lines #L159 - L173 were not covered by tests
// Write the original sequence number at the begining of the payload.

Check failure on line 174 in pkg/nack/responder_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

`begining` is a misspelling of `beginning` (misspell)
payload := make([]byte, 2)
binary.BigEndian.PutUint16(payload, osn)
payload = append(payload, originPayload[:len(originPayload)-paddingLength]...)
// Send RTX packet.
if _, err := stream.rtxRtpWriter.Write(p.Header(), payload, interceptor.Attributes{}); err != nil {
n.log.Warnf("failed sending rtx packet: %+v", err)
}

Check warning on line 181 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L175-L181

Added lines #L175 - L181 were not covered by tests
// Resore the Padding and SSRC.
if paddingLength > 0 {
p.Header().Padding = true
}
p.Header().SequenceNumber = osn

Check warning on line 186 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L183-L186

Added lines #L183 - L186 were not covered by tests
} else {
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
n.log.Warnf("failed resending nacked packet: %+v", err)
}

Check warning on line 190 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L189-L190

Added lines #L189 - L190 were not covered by tests
}
p.Release()
}
Expand Down

0 comments on commit e530b47

Please sign in to comment.