Skip to content

Commit

Permalink
jitterbuffer: Add packet skipping in interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
thatsnotright committed Oct 23, 2024
1 parent d34a3c5 commit 749922a
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 23 deletions.
1 change: 0 additions & 1 deletion internal/test/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc
if !ok {
return 0, nil, io.EOF
}

marshaled, err := p.Marshal()
if err != nil {
return 0, nil, io.EOF
Expand Down
8 changes: 7 additions & 1 deletion pkg/jitterbuffer/jitter_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ func (jb *JitterBuffer) PlayoutHead() uint16 {
return jb.playoutHead
}

func (jb *JitterBuffer) Length() uint16 {
jb.mutex.Lock()
defer jb.mutex.Unlock()
return jb.packets.Length()
}

// SetPlayoutHead allows you to manually specify the packet you wish to pop next
// If you have encountered a packet that hasn't resolved you can skip it
func (jb *JitterBuffer) SetPlayoutHead(playoutHead uint16) {
Expand Down Expand Up @@ -155,7 +161,7 @@ func (jb *JitterBuffer) Push(packet *rtp.Packet) {
if jb.packets.Length() == 0 {
jb.emit(StartBuffering)
}
if jb.packets.Length() > 100 {
if jb.packets.Length() > 2*jb.minStartCount {
jb.stats.overflowCount++
jb.emit(BufferOverflow)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/jitterbuffer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,10 @@ func Log(log logging.LeveledLogger) ReceiverInterceptorOption {
return nil
}
}

func WithSkipMissingPackets() ReceiverInterceptorOption {
return func(d *ReceiverInterceptor) error {
d.skipMissingPackets = true
return nil
}
}
51 changes: 32 additions & 19 deletions pkg/jitterbuffer/receiver_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package jitterbuffer

import (
"errors"
"sync"

"github.com/pion/interceptor"
Expand All @@ -17,10 +18,13 @@ type InterceptorFactory struct {
}

// NewInterceptor constructs a new ReceiverInterceptor
func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
func (g *InterceptorFactory) NewInterceptor(logName string) (interceptor.Interceptor, error) {
if logName == "" {
logName = "jitterbuffer"
}
i := &ReceiverInterceptor{
close: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("jitterbuffer"),
log: logging.NewDefaultLoggerFactory().NewLogger(logName),
buffer: New(),
}

Expand Down Expand Up @@ -52,11 +56,11 @@ func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor,
// arriving) quickly enough.
type ReceiverInterceptor struct {
interceptor.NoOp
buffer *JitterBuffer
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger
buffer *JitterBuffer
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger
skipMissingPackets bool
}

// NewInterceptor returns a new InterceptorFactory
Expand All @@ -74,19 +78,32 @@ func (i *ReceiverInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader
return n, attr, err
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(buf); err != nil {
if err := packet.Unmarshal(buf[:n]); err != nil {
return 0, nil, err
}
i.m.Lock()
defer i.m.Unlock()
i.buffer.Push(packet)
if i.buffer.state == Emitting {
newPkt, err := i.buffer.Pop()
if err != nil {
return 0, nil, err
for {
newPkt, err := i.buffer.Pop()
if err != nil {
if errors.Is(err, ErrNotFound) {
if i.skipMissingPackets {
i.log.Warn("Skipping missing packet")
i.buffer.SetPlayoutHead(i.buffer.PlayoutHead() + 1)
continue
}
}
return 0, nil, err
}
if newPkt != nil {
nlen, err := newPkt.MarshalTo(b)
return nlen, attr, err

}
if i.buffer.Length() == 0 {
break
}
}
nlen, err := newPkt.MarshalTo(b)
return nlen, attr, err
}
return n, attr, ErrPopWhileBuffering
})
Expand All @@ -95,16 +112,12 @@ func (i *ReceiverInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader
// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (i *ReceiverInterceptor) UnbindRemoteStream(_ *interceptor.StreamInfo) {
defer i.wg.Wait()
i.m.Lock()
defer i.m.Unlock()
i.buffer.Clear(true)
}

// Close closes the interceptor
func (i *ReceiverInterceptor) Close() error {
defer i.wg.Wait()
i.m.Lock()
defer i.m.Unlock()
i.buffer.Clear(true)
return nil
}
57 changes: 55 additions & 2 deletions pkg/jitterbuffer/receiver_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,72 @@ func TestReceiverBuffersAndPlaysout(t *testing.T) {
SenderSSRC: 123,
MediaSSRC: 456,
}})
for s := 0; s < 61; s++ {
for s := 0; s < 910; s++ {
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
SequenceNumber: uint16(s),
}})
}
// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)
for s := 0; s < 10; s++ {
for s := 0; s < 50; s++ {
read := <-stream.ReadRTP()
if read.Err != nil {
t.Fatal(read.Err)
}
seq := read.Packet.Header.SequenceNumber
assert.EqualValues(t, uint16(s), seq)
}
assert.NoError(t, stream.Close())
err = i.Close()
assert.NoError(t, err)
}

func TestReceiverBuffersAndPlaysoutSkippingMissingPackets(t *testing.T) {
buf := bytes.Buffer{}

factory, err := NewInterceptor(
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
WithSkipMissingPackets(),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("jitterbuffer")
assert.NoError(t, err)

assert.EqualValues(t, 0, buf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)

for s := 0; s < 420; s++ {
if s == 6 {
s++
}
if s == 40 {
s = s + 20
}
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
SequenceNumber: uint16(s),
}})
}

for s := 0; s < 100; s++ {
read := <-stream.ReadRTP()
if read.Err != nil {
continue
}
seq := read.Packet.Header.SequenceNumber
if s == 6 {
s++
}
if s == 40 {
s = s + 20
}
assert.EqualValues(t, uint16(s), seq)
}
assert.NoError(t, stream.Close())
err = i.Close()
assert.NoError(t, err)
}

0 comments on commit 749922a

Please sign in to comment.