From 11b5c4207b87fa52800430364a71a43fbedd54eb Mon Sep 17 00:00:00 2001 From: Rob Elsner Date: Thu, 11 Apr 2024 14:37:29 -0400 Subject: [PATCH] JitterBuffer: WIP interceptor --- attributes.go | 4 +- pkg/jitterbuffer/generator_interceptor.go | 94 +++++++++++++++++++++++ pkg/jitterbuffer/option.go | 19 +++++ 3 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 pkg/jitterbuffer/generator_interceptor.go create mode 100644 pkg/jitterbuffer/option.go diff --git a/attributes.go b/attributes.go index d7936d52..8b6d0f5c 100644 --- a/attributes.go +++ b/attributes.go @@ -33,7 +33,7 @@ func (a Attributes) Set(key interface{}, val interface{}) { } // GetRTPHeader gets the RTP header if present. If it is not present, it will be -// unmarshalled from the raw byte slice and stored in the attribtues. +// unmarshalled from the raw byte slice and stored in the attributes. func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) { if val, ok := a[rtpHeaderKey]; ok { if header, ok := val.(*rtp.Header); ok { @@ -50,7 +50,7 @@ func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) { } // GetRTCPPackets gets the RTCP packets if present. If the packet slice is not -// present, it will be unmarshaled from the raw byte slice and stored in the +// present, it will be unmarshalled from the raw byte slice and stored in the // attributes. func (a Attributes) GetRTCPPackets(raw []byte) ([]rtcp.Packet, error) { if val, ok := a[rtcpPacketsKey]; ok { diff --git a/pkg/jitterbuffer/generator_interceptor.go b/pkg/jitterbuffer/generator_interceptor.go new file mode 100644 index 00000000..154c6d18 --- /dev/null +++ b/pkg/jitterbuffer/generator_interceptor.go @@ -0,0 +1,94 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "sync" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtp" +) + +// JitterBufferInterceptorFactory is a interceptor.Factory for a GeneratorInterceptor +type JitterBufferInterceptorFactory struct { + opts []JitterBufferOption +} + +// NewInterceptor constructs a new ReceiverInterceptor +func (g *JitterBufferInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) { + i := &JitterBufferInterceptor{ + close: make(chan struct{}), + log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"), + buffer: New(), + } + + for _, opt := range g.opts { + if err := opt(i); err != nil { + return nil, err + } + } + + return i, nil +} + +// JitterBufferInterceptor interceptor places a JitterBuffer in the chain to smooth packet arrival +// and allow for network jitter +type JitterBufferInterceptor struct { + interceptor.NoOp + buffer *JitterBuffer + m sync.Mutex + wg sync.WaitGroup + close chan struct{} + log logging.LeveledLogger +} + +// NewGeneratorInterceptor returns a new GeneratorInterceptorFactory +func NewInterceptor(opts ...JitterBufferOption) (*JitterBufferInterceptorFactory, error) { + return &JitterBufferInterceptorFactory{opts}, nil +} + +// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method +// will be called once per rtp packet. +func (i *JitterBufferInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + n, attr, err := reader.Read(b, a) + if err != nil { + return n, attr, err + } + packet := &rtp.Packet{} + if err := packet.Unmarshal(b[:n]); err != nil { + return 0, nil, err + } + i.buffer.Push(packet); + + if i.buffer.state == Emitting { + newPkt, err := i.buffer.Pop() + if err != nil { + return 0, nil, err + } + n, err = newPkt.MarshalTo(b) + if err != nil { + return 0, nil, err + } + + return n, attr, nil + } + return 0, attr, nil + }) +} + +// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track. +func (n *JitterBufferInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) { + +} + +// Close closes the interceptor +func (n *JitterBufferInterceptor) Close() error { + defer n.wg.Wait() + n.m.Lock() + defer n.m.Unlock() + + return nil +} \ No newline at end of file diff --git a/pkg/jitterbuffer/option.go b/pkg/jitterbuffer/option.go new file mode 100644 index 00000000..5c25b9ff --- /dev/null +++ b/pkg/jitterbuffer/option.go @@ -0,0 +1,19 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "github.com/pion/logging" +) + +// JitterBufferOption can be used to configure SenderInterceptor +type JitterBufferOption func(d *JitterBufferInterceptor) error + +// Log sets a logger for the interceptor +func Log(log logging.LeveledLogger) JitterBufferOption { + return func(d *JitterBufferInterceptor) error { + d.log = log + return nil + } +}