Skip to content

Commit

Permalink
Add support for RFC 4588
Browse files Browse the repository at this point in the history
Respect in RTX with distinict SSRC + PayloadType
  • Loading branch information
aggresss authored and Sean-Der committed Oct 5, 2024
1 parent 5ce4343 commit d98a15c
Showing 1 changed file with 50 additions and 3 deletions.
53 changes: 50 additions & 3 deletions pkg/nack/responder_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package nack

import (
"encoding/binary"
"sync"

"github.com/pion/interceptor"
Expand Down Expand Up @@ -62,6 +63,11 @@ type ResponderInterceptor struct {
type localStream struct {
sendBuffer *sendBuffer
rtpWriter interceptor.RTPWriter

// Non-zero if Retransmissions should be sent on a distinct stream
rtxSsrc uint32
rtxPayloadType uint8
rtxSequencer rtp.Sequencer
}

// NewResponderInterceptor returns a new ResponderInterceptorFactor
Expand Down Expand Up @@ -108,7 +114,13 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri
// error is already checked in NewGeneratorInterceptor
sendBuffer, _ := newSendBuffer(n.size)
n.streamsMu.Lock()
n.streams[info.SSRC] = &localStream{sendBuffer: sendBuffer, rtpWriter: writer}
n.streams[info.SSRC] = &localStream{
sendBuffer: sendBuffer,
rtpWriter: writer,
rtxSsrc: info.SSRCRetransmission,
rtxPayloadType: info.PayloadTypeRetransmission,
rtxSequencer: rtp.NewRandomSequencer(),
}
n.streamsMu.Unlock()

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
Expand Down Expand Up @@ -139,8 +151,43 @@ func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) {
for i := range nack.Nacks {
nack.Nacks[i].Range(func(seq uint16) bool {
if p := stream.sendBuffer.get(seq); p != nil {
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
n.log.Warnf("failed resending nacked packet: %+v", err)
if stream.rtxSsrc != 0 {
// Store the original sequence number and rewrite the sequence number.
originalSequenceNumber := p.Header().SequenceNumber
p.Header().SequenceNumber = stream.rtxSequencer.NextSequenceNumber()

Check warning on line 157 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L156-L157

Added lines #L156 - L157 were not covered by tests

// Rewrite the SSRC.
p.Header().SSRC = stream.rtxSsrc

Check warning on line 160 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L160

Added line #L160 was not covered by tests
// Rewrite the payload type.
p.Header().PayloadType = stream.rtxPayloadType

Check warning on line 162 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L162

Added line #L162 was not covered by tests

// Remove padding if present.
paddingLength := 0
originPayload := p.Payload()
if p.Header().Padding {
paddingLength = int(originPayload[len(originPayload)-1])
p.Header().Padding = false

Check warning on line 169 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L165-L169

Added lines #L165 - L169 were not covered by tests
}

// Write the original sequence number at the beginning of the payload.
payload := make([]byte, 2)
binary.BigEndian.PutUint16(payload, originalSequenceNumber)
payload = append(payload, originPayload[:len(originPayload)-paddingLength]...)

Check warning on line 175 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L173-L175

Added lines #L173 - L175 were not covered by tests

// Send RTX packet.
if _, err := stream.rtpWriter.Write(p.Header(), payload, interceptor.Attributes{}); err != nil {
n.log.Warnf("failed sending rtx packet: %+v", err)

Check warning on line 179 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L178-L179

Added lines #L178 - L179 were not covered by tests
}

// Resore the Padding and SSRC.
if paddingLength > 0 {
p.Header().Padding = true

Check warning on line 184 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L183-L184

Added lines #L183 - L184 were not covered by tests
}
p.Header().SequenceNumber = originalSequenceNumber

Check warning on line 186 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L186

Added line #L186 was not covered by tests
} else {
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
n.log.Warnf("failed resending nacked packet: %+v", err)

Check warning on line 189 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L189

Added line #L189 was not covered by tests
}
}
p.Release()
}
Expand Down

0 comments on commit d98a15c

Please sign in to comment.