Skip to content

Commit

Permalink
JitterBuffer: WIP interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
thatsnotright committed Apr 11, 2024
1 parent ace759a commit 11b5c42
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 2 deletions.
4 changes: 2 additions & 2 deletions attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (a Attributes) Set(key interface{}, val interface{}) {
}

// GetRTPHeader gets the RTP header if present. If it is not present, it will be
// unmarshalled from the raw byte slice and stored in the attribtues.
// unmarshalled from the raw byte slice and stored in the attributes.
func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) {
if val, ok := a[rtpHeaderKey]; ok {
if header, ok := val.(*rtp.Header); ok {
Expand All @@ -50,7 +50,7 @@ func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) {
}

// GetRTCPPackets gets the RTCP packets if present. If the packet slice is not
// present, it will be unmarshaled from the raw byte slice and stored in the
// present, it will be unmarshalled from the raw byte slice and stored in the
// attributes.
func (a Attributes) GetRTCPPackets(raw []byte) ([]rtcp.Packet, error) {
if val, ok := a[rtcpPacketsKey]; ok {
Expand Down
94 changes: 94 additions & 0 deletions pkg/jitterbuffer/generator_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"sync"

"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtp"
)

// JitterBufferInterceptorFactory is a interceptor.Factory for a GeneratorInterceptor
type JitterBufferInterceptorFactory struct {

Check warning on line 15 in pkg/jitterbuffer/generator_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: type name will be used as jitterbuffer.JitterBufferInterceptorFactory by other packages, and that stutters; consider calling this InterceptorFactory (revive)
opts []JitterBufferOption
}

// NewInterceptor constructs a new ReceiverInterceptor
func (g *JitterBufferInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
i := &JitterBufferInterceptor{
close: make(chan struct{}),

Check failure on line 22 in pkg/jitterbuffer/generator_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
buffer: New(),

Check warning on line 24 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L20-L24

Added lines #L20 - L24 were not covered by tests
}

for _, opt := range g.opts {
if err := opt(i); err != nil {
return nil, err

Check warning on line 29 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L27-L29

Added lines #L27 - L29 were not covered by tests
}
}

return i, nil

Check warning on line 33 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L33

Added line #L33 was not covered by tests
}

// JitterBufferInterceptor interceptor places a JitterBuffer in the chain to smooth packet arrival

Check failure on line 36 in pkg/jitterbuffer/generator_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
// and allow for network jitter
type JitterBufferInterceptor struct {

Check warning on line 38 in pkg/jitterbuffer/generator_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: type name will be used as jitterbuffer.JitterBufferInterceptor by other packages, and that stutters; consider calling this Interceptor (revive)
interceptor.NoOp
buffer *JitterBuffer

Check failure on line 40 in pkg/jitterbuffer/generator_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger
}

// NewGeneratorInterceptor returns a new GeneratorInterceptorFactory

Check warning on line 47 in pkg/jitterbuffer/generator_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: comment on exported function NewInterceptor should be of the form "NewInterceptor ..." (revive)
func NewInterceptor(opts ...JitterBufferOption) (*JitterBufferInterceptorFactory, error) {
return &JitterBufferInterceptorFactory{opts}, nil

Check warning on line 49 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L48-L49

Added lines #L48 - L49 were not covered by tests
}

// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (i *JitterBufferInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {

Check warning on line 54 in pkg/jitterbuffer/generator_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

unused-parameter: parameter 'info' seems to be unused, consider removing or renaming it as _ (revive)
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
n, attr, err := reader.Read(b, a)
if err != nil {
return n, attr, err

Check warning on line 58 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L54-L58

Added lines #L54 - L58 were not covered by tests
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(b[:n]); err != nil {
return 0, nil, err

Check warning on line 62 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L60-L62

Added lines #L60 - L62 were not covered by tests
}
i.buffer.Push(packet);

Check warning on line 64 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L64

Added line #L64 was not covered by tests

if i.buffer.state == Emitting {
newPkt, err := i.buffer.Pop()
if err != nil {
return 0, nil, err

Check warning on line 69 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L66-L69

Added lines #L66 - L69 were not covered by tests
}
n, err = newPkt.MarshalTo(b)
if err != nil {
return 0, nil, err

Check warning on line 73 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L71-L73

Added lines #L71 - L73 were not covered by tests
}

return n, attr, nil

Check warning on line 76 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L76

Added line #L76 was not covered by tests
}
return 0, attr, nil

Check warning on line 78 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L78

Added line #L78 was not covered by tests
})
}

// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (n *JitterBufferInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {

Check warning on line 83 in pkg/jitterbuffer/generator_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

receiver-naming: receiver name n should be consistent with previous receiver name i for JitterBufferInterceptor (revive)

Check warning on line 83 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L83

Added line #L83 was not covered by tests

Check failure on line 84 in pkg/jitterbuffer/generator_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gofumpt`-ed (gofumpt)
}

// Close closes the interceptor
func (n *JitterBufferInterceptor) Close() error {

Check warning on line 88 in pkg/jitterbuffer/generator_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

receiver-naming: receiver name n should be consistent with previous receiver name i for JitterBufferInterceptor (revive)
defer n.wg.Wait()
n.m.Lock()
defer n.m.Unlock()

Check warning on line 91 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L88-L91

Added lines #L88 - L91 were not covered by tests

return nil

Check warning on line 93 in pkg/jitterbuffer/generator_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/generator_interceptor.go#L93

Added line #L93 was not covered by tests
}
19 changes: 19 additions & 0 deletions pkg/jitterbuffer/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"github.com/pion/logging"
)

// JitterBufferOption can be used to configure SenderInterceptor
type JitterBufferOption func(d *JitterBufferInterceptor) error

Check warning on line 11 in pkg/jitterbuffer/option.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: type name will be used as jitterbuffer.JitterBufferOption by other packages, and that stutters; consider calling this Option (revive)

// Log sets a logger for the interceptor
func Log(log logging.LeveledLogger) JitterBufferOption {
return func(d *JitterBufferInterceptor) error {
d.log = log
return nil

Check warning on line 17 in pkg/jitterbuffer/option.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/option.go#L14-L17

Added lines #L14 - L17 were not covered by tests
}
}

0 comments on commit 11b5c42

Please sign in to comment.