From 0191d545721f26a89048c5f8baa910a5f4c08b2d Mon Sep 17 00:00:00 2001 From: Adrian Cable <6544927+adriancable@users.noreply.github.com> Date: Sun, 1 Oct 2023 15:38:00 -0700 Subject: [PATCH] Option to limit max NACKs per lost packet --- pkg/nack/generator_interceptor.go | 62 ++++++++++++++++++++++++------- pkg/nack/generator_option.go | 9 +++++ 2 files changed, 57 insertions(+), 14 deletions(-) diff --git a/pkg/nack/generator_interceptor.go b/pkg/nack/generator_interceptor.go index faf533ba..c476acc9 100644 --- a/pkg/nack/generator_interceptor.go +++ b/pkg/nack/generator_interceptor.go @@ -21,12 +21,14 @@ type GeneratorInterceptorFactory struct { // NewInterceptor constructs a new ReceiverInterceptor func (g *GeneratorInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) { i := &GeneratorInterceptor{ - size: 512, - skipLastN: 0, - interval: time.Millisecond * 100, - receiveLogs: map[uint32]*receiveLog{}, - close: make(chan struct{}), - log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"), + size: 512, + skipLastN: 0, + maxNacksPerPacket: 0, + interval: time.Millisecond * 100, + receiveLogs: map[uint32]*receiveLog{}, + nackCountLogs: map[uint32]map[uint16]uint16{}, + close: make(chan struct{}), + log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"), } for _, opt := range g.opts { @@ -45,13 +47,15 @@ func (g *GeneratorInterceptorFactory) NewInterceptor(_ string) (interceptor.Inte // GeneratorInterceptor interceptor generates nack feedback messages. type GeneratorInterceptor struct { interceptor.NoOp - size uint16 - skipLastN uint16 - interval time.Duration - m sync.Mutex - wg sync.WaitGroup - close chan struct{} - log logging.LeveledLogger + size uint16 + skipLastN uint16 + maxNacksPerPacket uint16 + interval time.Duration + m sync.Mutex + wg sync.WaitGroup + close chan struct{} + log logging.LeveledLogger + nackCountLogs map[uint32]map[uint16]uint16 receiveLogs map[uint32]*receiveLog receiveLogsMu sync.Mutex @@ -131,6 +135,7 @@ func (n *GeneratorInterceptor) Close() error { return nil } +// nolint:gocognit func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) { defer n.wg.Done() @@ -147,14 +152,43 @@ func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) { for ssrc, receiveLog := range n.receiveLogs { missing := receiveLog.missingSeqNumbers(n.skipLastN) + + if len(missing) == 0 || n.nackCountLogs[ssrc] == nil { + n.nackCountLogs[ssrc] = map[uint16]uint16{} + } if len(missing) == 0 { continue } + filteredMissing := []uint16{} + if n.maxNacksPerPacket > 0 { + for _, missingSeq := range missing { + if n.nackCountLogs[ssrc][missingSeq] < n.maxNacksPerPacket { + filteredMissing = append(filteredMissing, missingSeq) + } + n.nackCountLogs[ssrc][missingSeq]++ + } + } else { + filteredMissing = missing + } + nack := &rtcp.TransportLayerNack{ SenderSSRC: senderSSRC, MediaSSRC: ssrc, - Nacks: rtcp.NackPairsFromSequenceNumbers(missing), + Nacks: rtcp.NackPairsFromSequenceNumbers(filteredMissing), + } + + for nackSeq := range n.nackCountLogs[ssrc] { + isMissing := false + for _, missingSeq := range missing { + if missingSeq == nackSeq { + isMissing = true + break + } + } + if !isMissing { + delete(n.nackCountLogs[ssrc], nackSeq) + } } if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil { diff --git a/pkg/nack/generator_option.go b/pkg/nack/generator_option.go index e4f46f7c..346bc999 100644 --- a/pkg/nack/generator_option.go +++ b/pkg/nack/generator_option.go @@ -30,6 +30,15 @@ func GeneratorSkipLastN(skipLastN uint16) GeneratorOption { } } +// GeneratorMaxNacksPerPacket sets the maximum number of NACKs sent per missing packet, e.g. if set to 2, a missing +// packet will only be NACKed at most twice. If set to 0 (default), max number of NACKs is unlimited +func GeneratorMaxNacksPerPacket(maxNacks uint16) GeneratorOption { + return func(r *GeneratorInterceptor) error { + r.maxNacksPerPacket = maxNacks + return nil + } +} + // GeneratorLog sets a logger for the interceptor func GeneratorLog(log logging.LeveledLogger) GeneratorOption { return func(r *GeneratorInterceptor) error {