From e530b4777212a7b264e65201f1a947d8b3b56c13 Mon Sep 17 00:00:00 2001 From: aggresss Date: Sun, 18 Feb 2024 16:00:55 +0800 Subject: [PATCH] Support TrackLocal RTX --- pkg/nack/nack.go | 8 +++- pkg/nack/responder_interceptor.go | 77 +++++++++++++++++++++++++------ 2 files changed, 69 insertions(+), 16 deletions(-) diff --git a/pkg/nack/nack.go b/pkg/nack/nack.go index b7589ebd..4e4fc8b6 100644 --- a/pkg/nack/nack.go +++ b/pkg/nack/nack.go @@ -4,7 +4,11 @@ // Package nack provides interceptors to implement sending and receiving negative acknowledgements package nack -import "github.com/pion/interceptor" +import ( + "strings" + + "github.com/pion/interceptor" +) func streamSupportNack(info *interceptor.StreamInfo) bool { for _, fb := range info.RTCPFeedback { @@ -13,5 +17,5 @@ func streamSupportNack(info *interceptor.StreamInfo) bool { } } - return false + return strings.HasSuffix(info.MimeType, "/rtx") } diff --git a/pkg/nack/responder_interceptor.go b/pkg/nack/responder_interceptor.go index 8f74952d..068ceab4 100644 --- a/pkg/nack/responder_interceptor.go +++ b/pkg/nack/responder_interceptor.go @@ -4,6 +4,8 @@ package nack import ( + "encoding/binary" + "strings" "sync" "github.com/pion/interceptor" @@ -60,6 +62,11 @@ type ResponderInterceptor struct { type localStream struct { sendBuffer *sendBuffer rtpWriter interceptor.RTPWriter + + rtxRtpWriter interceptor.RTPWriter + rtxSsrc uint32 + rtxPayloadType uint8 + rtxBaseSn uint16 } // NewResponderInterceptor returns a new ResponderInterceptorFactor @@ -103,20 +110,31 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri return writer } - // error is already checked in NewGeneratorInterceptor - sendBuffer, _ := newSendBuffer(n.size) n.streamsMu.Lock() - n.streams[info.SSRC] = &localStream{sendBuffer: sendBuffer, rtpWriter: writer} - n.streamsMu.Unlock() - - return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - pkt, err := n.packetFactory.NewPacket(header, payload) - if err != nil { - return 0, err + defer n.streamsMu.Unlock() + + if strings.HasSuffix(info.MimeType, "/rtx") && info.Attributes.Get("apt_ssrc") != nil { + if stream, ok := n.streams[info.Attributes.Get("apt_ssrc").(uint32)]; ok { + stream.rtxRtpWriter = writer + stream.rtxPayloadType = info.PayloadType + stream.rtxSsrc = info.SSRC + stream.rtxBaseSn = 1000 } - sendBuffer.add(pkt) - return writer.Write(header, payload, attributes) - }) + return writer + } else { + // error is already checked in NewGeneratorInterceptor + sendBuffer, _ := newSendBuffer(n.size) + n.streams[info.SSRC] = &localStream{sendBuffer: sendBuffer, rtpWriter: writer} + + return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + pkt, err := n.packetFactory.NewPacket(header, payload) + if err != nil { + return 0, err + } + sendBuffer.add(pkt) + return writer.Write(header, payload, attributes) + }) + } } // UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track. @@ -137,8 +155,39 @@ func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) { for i := range nack.Nacks { nack.Nacks[i].Range(func(seq uint16) bool { if p := stream.sendBuffer.get(seq); p != nil { - if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil { - n.log.Warnf("failed resending nacked packet: %+v", err) + if stream.rtxRtpWriter != nil { + // Store the original sequence number and rewrite the sequence number. + osn := p.Header().SequenceNumber + p.Header().SequenceNumber = stream.rtxBaseSn + stream.rtxBaseSn++ + // Rewrite the SSRC. + p.Header().SSRC = stream.rtxSsrc + // Rewrite the payload type. + p.Header().PayloadType = stream.rtxPayloadType + // Remove padding if present. + paddingLength := 0 + originPayload := p.Payload() + if p.Header().Padding { + paddingLength = int(originPayload[len(originPayload)-1]) + p.Header().Padding = false + } + // Write the original sequence number at the begining of the payload. + payload := make([]byte, 2) + binary.BigEndian.PutUint16(payload, osn) + payload = append(payload, originPayload[:len(originPayload)-paddingLength]...) + // Send RTX packet. + if _, err := stream.rtxRtpWriter.Write(p.Header(), payload, interceptor.Attributes{}); err != nil { + n.log.Warnf("failed sending rtx packet: %+v", err) + } + // Resore the Padding and SSRC. + if paddingLength > 0 { + p.Header().Padding = true + } + p.Header().SequenceNumber = osn + } else { + if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil { + n.log.Warnf("failed resending nacked packet: %+v", err) + } } p.Release() }