Skip to content

Commit

Permalink
support custom streams filter in nack interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
aalekseevx committed Oct 2, 2024
1 parent 3b3394f commit f8540b1
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 6 deletions.
4 changes: 3 additions & 1 deletion pkg/nack/generator_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type GeneratorInterceptorFactory struct {
// NewInterceptor constructs a new ReceiverInterceptor
func (g *GeneratorInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
i := &GeneratorInterceptor{
streamsFilter: streamSupportNack,
size: 512,
skipLastN: 0,
maxNacksPerPacket: 0,
Expand All @@ -47,6 +48,7 @@ func (g *GeneratorInterceptorFactory) NewInterceptor(_ string) (interceptor.Inte
// GeneratorInterceptor interceptor generates nack feedback messages.
type GeneratorInterceptor struct {
interceptor.NoOp
streamsFilter func(info *interceptor.StreamInfo) bool
size uint16
skipLastN uint16
maxNacksPerPacket uint16
Expand Down Expand Up @@ -86,7 +88,7 @@ func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) int
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (n *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
if !streamSupportNack(info) {
if !n.streamsFilter(info) {
return reader
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/nack/generator_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package nack
import (
"time"

"github.com/pion/interceptor"
"github.com/pion/logging"
)

Expand Down Expand Up @@ -54,3 +55,11 @@ func GeneratorInterval(interval time.Duration) GeneratorOption {
return nil
}
}

// GeneratorStreamsFilter sets filter for remote streams
func GeneratorStreamsFilter(filter func(info *interceptor.StreamInfo) bool) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.streamsFilter = filter
return nil
}
}
10 changes: 6 additions & 4 deletions pkg/nack/responder_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ type packetFactory interface {
// NewInterceptor constructs a new ResponderInterceptor
func (r *ResponderInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
i := &ResponderInterceptor{
size: 1024,
log: logging.NewDefaultLoggerFactory().NewLogger("nack_responder"),
streams: map[uint32]*localStream{},
streamsFilter: streamSupportNack,
size: 1024,
log: logging.NewDefaultLoggerFactory().NewLogger("nack_responder"),
streams: map[uint32]*localStream{},
}

for _, opt := range r.opts {
Expand All @@ -49,6 +50,7 @@ func (r *ResponderInterceptorFactory) NewInterceptor(_ string) (interceptor.Inte
// ResponderInterceptor responds to nack feedback messages
type ResponderInterceptor struct {
interceptor.NoOp
streamsFilter func(info *interceptor.StreamInfo) bool
size uint16
log logging.LeveledLogger
packetFactory packetFactory
Expand Down Expand Up @@ -99,7 +101,7 @@ func (n *ResponderInterceptor) BindRTCPReader(reader interceptor.RTCPReader) int
// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
// will be called once per rtp packet.
func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
if !streamSupportNack(info) {
if !n.streamsFilter(info) {
return writer
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/nack/responder_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

package nack

import "github.com/pion/logging"
import (
"github.com/pion/interceptor"
"github.com/pion/logging"
)

// ResponderOption can be used to configure ResponderInterceptor
type ResponderOption func(s *ResponderInterceptor) error
Expand Down Expand Up @@ -33,3 +36,11 @@ func DisableCopy() ResponderOption {
return nil
}
}

// GeneratorStreamsFilter sets filter for remote streams

Check failure on line 40 in pkg/nack/responder_option.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: comment on exported function ResponderStreamsFilter should be of the form "ResponderStreamsFilter ..." (revive)
func ResponderStreamsFilter(filter func(info *interceptor.StreamInfo) bool) ResponderOption {
return func(r *ResponderInterceptor) error {
r.streamsFilter = filter
return nil
}
}

0 comments on commit f8540b1

Please sign in to comment.