Skip to content

Commit

Permalink
Option to limit max NACKs per lost packet
Browse files Browse the repository at this point in the history
  • Loading branch information
adriancable committed Oct 3, 2023
1 parent 9ef04d4 commit be587db
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 14 deletions.
62 changes: 48 additions & 14 deletions pkg/nack/generator_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ type GeneratorInterceptorFactory struct {
// NewInterceptor constructs a new ReceiverInterceptor
func (g *GeneratorInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
i := &GeneratorInterceptor{
size: 512,
skipLastN: 0,
interval: time.Millisecond * 100,
receiveLogs: map[uint32]*receiveLog{},
close: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
size: 512,
skipLastN: 0,
maxNacksPerPacket: 0,
interval: time.Millisecond * 100,
receiveLogs: map[uint32]*receiveLog{},
nackCountLogs: map[uint32]map[uint16]uint16{},
close: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
}

for _, opt := range g.opts {
Expand All @@ -45,13 +47,15 @@ func (g *GeneratorInterceptorFactory) NewInterceptor(_ string) (interceptor.Inte
// GeneratorInterceptor interceptor generates nack feedback messages.
type GeneratorInterceptor struct {
interceptor.NoOp
size uint16
skipLastN uint16
interval time.Duration
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger
size uint16
skipLastN uint16
maxNacksPerPacket uint16
interval time.Duration
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger
nackCountLogs map[uint32]map[uint16]uint16

receiveLogs map[uint32]*receiveLog
receiveLogsMu sync.Mutex
Expand Down Expand Up @@ -131,6 +135,7 @@ func (n *GeneratorInterceptor) Close() error {
return nil
}

// nolint:gocognit
func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
defer n.wg.Done()

Expand All @@ -147,14 +152,43 @@ func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {

for ssrc, receiveLog := range n.receiveLogs {
missing := receiveLog.missingSeqNumbers(n.skipLastN)

if len(missing) == 0 || n.nackCountLogs[ssrc] == nil {
n.nackCountLogs[ssrc] = map[uint16]uint16{}
}
if len(missing) == 0 {
continue
}

filteredMissing := []uint16{}
if n.maxNacksPerPacket > 0 {
for _, missingSeq := range missing {
if n.nackCountLogs[ssrc][missingSeq] < n.maxNacksPerPacket {
filteredMissing = append(filteredMissing, missingSeq)
}
n.nackCountLogs[ssrc][missingSeq]++

Check warning on line 169 in pkg/nack/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/generator_interceptor.go#L165-L169

Added lines #L165 - L169 were not covered by tests
}
} else {
filteredMissing = missing
}

nack := &rtcp.TransportLayerNack{
SenderSSRC: senderSSRC,
MediaSSRC: ssrc,
Nacks: rtcp.NackPairsFromSequenceNumbers(missing),
Nacks: rtcp.NackPairsFromSequenceNumbers(filteredMissing),
}

for nackSeq := range n.nackCountLogs[ssrc] {
isMissing := false
for _, missingSeq := range missing {
if missingSeq == nackSeq {
isMissing = true
break

Check warning on line 186 in pkg/nack/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/generator_interceptor.go#L182-L186

Added lines #L182 - L186 were not covered by tests
}
}
if !isMissing {
delete(n.nackCountLogs[ssrc], nackSeq)
}

Check warning on line 191 in pkg/nack/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/generator_interceptor.go#L189-L191

Added lines #L189 - L191 were not covered by tests
}

if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/nack/generator_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ func GeneratorSkipLastN(skipLastN uint16) GeneratorOption {
}
}

// GeneratorMaxNacksPerPacket sets the maximum number of NACKs sent per missing packet, e.g. if set to 2, a missing
// packet will only be NACKed at most twice. If set to 0 (default), max number of NACKs is unlimited
func GeneratorMaxNacksPerPacket(maxNacks uint16) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.maxNacksPerPacket = maxNacks
return nil
}

Check warning on line 39 in pkg/nack/generator_option.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/generator_option.go#L35-L39

Added lines #L35 - L39 were not covered by tests
}

// GeneratorLog sets a logger for the interceptor
func GeneratorLog(log logging.LeveledLogger) GeneratorOption {
return func(r *GeneratorInterceptor) error {
Expand Down

0 comments on commit be587db

Please sign in to comment.