Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to limit max NACKs per lost packet #208

Merged
merged 1 commit into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 48 additions & 14 deletions pkg/nack/generator_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
// NewInterceptor constructs a new ReceiverInterceptor
func (g *GeneratorInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
i := &GeneratorInterceptor{
size: 512,
skipLastN: 0,
interval: time.Millisecond * 100,
receiveLogs: map[uint32]*receiveLog{},
close: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
size: 512,
skipLastN: 0,
maxNacksPerPacket: 0,
interval: time.Millisecond * 100,
receiveLogs: map[uint32]*receiveLog{},
nackCountLogs: map[uint32]map[uint16]uint16{},
close: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
}

for _, opt := range g.opts {
Expand All @@ -45,13 +47,15 @@
// GeneratorInterceptor interceptor generates nack feedback messages.
type GeneratorInterceptor struct {
interceptor.NoOp
size uint16
skipLastN uint16
interval time.Duration
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger
size uint16
skipLastN uint16
maxNacksPerPacket uint16
interval time.Duration
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger
nackCountLogs map[uint32]map[uint16]uint16

receiveLogs map[uint32]*receiveLog
receiveLogsMu sync.Mutex
Expand Down Expand Up @@ -131,6 +135,7 @@
return nil
}

// nolint:gocognit
func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
defer n.wg.Done()

Expand All @@ -147,14 +152,43 @@

for ssrc, receiveLog := range n.receiveLogs {
missing := receiveLog.missingSeqNumbers(n.skipLastN)

if len(missing) == 0 || n.nackCountLogs[ssrc] == nil {
n.nackCountLogs[ssrc] = map[uint16]uint16{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the new map a concern on every loop? Majority case is going to be len(missing) == 0. So, re-creating a map for all ssrcs every 100ms. Is that okay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that an empty map is actually nil internally (i.e. no allocation), so I think this is OK.

}
if len(missing) == 0 {
continue
}

filteredMissing := []uint16{}
if n.maxNacksPerPacket > 0 {
for _, missingSeq := range missing {
if n.nackCountLogs[ssrc][missingSeq] < n.maxNacksPerPacket {
filteredMissing = append(filteredMissing, missingSeq)
}
n.nackCountLogs[ssrc][missingSeq]++

Check warning on line 169 in pkg/nack/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/generator_interceptor.go#L165-L169

Added lines #L165 - L169 were not covered by tests
}
} else {
filteredMissing = missing
}

nack := &rtcp.TransportLayerNack{
SenderSSRC: senderSSRC,
MediaSSRC: ssrc,
Nacks: rtcp.NackPairsFromSequenceNumbers(missing),
Nacks: rtcp.NackPairsFromSequenceNumbers(filteredMissing),
}

for nackSeq := range n.nackCountLogs[ssrc] {
isMissing := false
for _, missingSeq := range missing {
if missingSeq == nackSeq {
isMissing = true
break

Check warning on line 186 in pkg/nack/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/generator_interceptor.go#L182-L186

Added lines #L182 - L186 were not covered by tests
}
}
if !isMissing {
delete(n.nackCountLogs[ssrc], nackSeq)
}

Check warning on line 191 in pkg/nack/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/generator_interceptor.go#L189-L191

Added lines #L189 - L191 were not covered by tests
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't think of an easy way to spread this load. Wondering if the N^2 would cause some performance issues. It could happen that a network brown out drops 100s of packets. Multiple SSRCs experiencing at the same time could generate a bunch of NACKs and all of them could be missing while running this loop. Don't think it is a huge concern, but also thinking about some way to amortising it. Maybe, handle only one SSRC for clean up in one loop kind of stuff, but makes code more complicated. Guess, this can be checked for any performance issues and improved later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I thought of this too but can't think of an easy improvement.


if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/nack/generator_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@
}
}

// GeneratorMaxNacksPerPacket sets the maximum number of NACKs sent per missing packet, e.g. if set to 2, a missing
// packet will only be NACKed at most twice. If set to 0 (default), max number of NACKs is unlimited
func GeneratorMaxNacksPerPacket(maxNacks uint16) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.maxNacksPerPacket = maxNacks
return nil
}

Check warning on line 39 in pkg/nack/generator_option.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/generator_option.go#L35-L39

Added lines #L35 - L39 were not covered by tests
}

// GeneratorLog sets a logger for the interceptor
func GeneratorLog(log logging.LeveledLogger) GeneratorOption {
return func(r *GeneratorInterceptor) error {
Expand Down
Loading