Skip to content

Commit

Permalink
Start pkg/jitterbuffer
Browse files Browse the repository at this point in the history
A Priority Queue JitterBuffer that will be used to back the
SampleBuilder and a new Interceptor
  • Loading branch information
thatsnotright authored and Sean-Der committed Jan 26, 2024
1 parent 450ac84 commit d9d223e
Show file tree
Hide file tree
Showing 4 changed files with 588 additions and 0 deletions.
205 changes: 205 additions & 0 deletions pkg/jitterbuffer/jitter_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package jitterbuffer implements a buffer for RTP packets designed to help
// counteract non-deterministic sources of latency
package jitterbuffer

import (
"errors"
"math"
"sync"

"github.com/pion/rtp"
)

// State tracks a JitterBuffer as either Buffering or Emitting
type State uint16

// Event represents all events a JitterBuffer can emit
type Event string

var (
// ErrBufferUnderrun is returned when the buffer has no items
ErrBufferUnderrun = errors.New("invalid Peek: Empty jitter buffer")
// ErrPopWhileBuffering is returned if a jitter buffer is not in a playback state
ErrPopWhileBuffering = errors.New("attempt to pop while buffering")
)

const (
// Buffering is the state when the jitter buffer has not started emitting yet, or has hit an underflow and needs to re-buffer packets
Buffering State = iota
// Emitting is the state when the jitter buffer is operating nominally
Emitting
)

const (
// StartBuffering is emitted when the buffer receives its first packet
StartBuffering Event = "startBuffering"
// BeginPlayback is emitted when the buffer has satisfied its buffer length
BeginPlayback = "playing"
// BufferUnderflow is emitted when the buffer does not have enough packets to Pop
BufferUnderflow = "underflow"
// BufferOverflow is emitted when the buffer has exceeded its limit
BufferOverflow = "overflow"
)

func (jbs State) String() string {
switch jbs {
case Buffering:
return "Buffering"
case Emitting:
return "Emitting"
}
return "unknown"
}

type (
// Option will Override JitterBuffer's defaults
Option func(jb *JitterBuffer)
// EventListener will be called when the corresponding Event occurs
EventListener func(event Event, jb *JitterBuffer)
)

// A JitterBuffer will accept Pushed packets, put them in sequence number
// order, and allows removing in either sequence number order or via a
// provided timestamp
type JitterBuffer struct {
packets *PriorityQueue
lastSequence uint16
playoutHead uint16
playoutReady bool
state State
stats Stats
listeners map[Event][]EventListener
mutex sync.Mutex
}

// Stats Track interesting statistics for the life of this JitterBuffer
// outOfOrderCount will provide the number of times a packet was Pushed
//
// without its predecessor being present
//
// underflowCount will provide the count of attempts to Pop an empty buffer
// overflowCount will track the number of times the jitter buffer exceeds its limit
type Stats struct {
outOfOrderCount uint32
underflowCount uint32
overflowCount uint32
}

// New will initialize a jitter buffer and its associated statistics
func New(opts ...Option) *JitterBuffer {
jb := &JitterBuffer{state: Buffering, stats: Stats{0, 0, 0}, packets: NewQueue(), listeners: make(map[Event][]EventListener)}
for _, o := range opts {
o(jb)
}
return jb
}

// Listen will register an event listener
// The jitter buffer may emit events correspnding, interested listerns should
// look at Event for available events
func (jb *JitterBuffer) Listen(event Event, cb EventListener) {
jb.listeners[event] = append(jb.listeners[event], cb)
}

func (jb *JitterBuffer) updateStats(lastPktSeqNo uint16) {
// If we have at least one packet, and the next packet being pushed in is not
// at the expected sequence number increment the out of order count
if jb.packets.Length() > 0 && lastPktSeqNo != ((jb.lastSequence+1)%math.MaxUint16) {
jb.stats.outOfOrderCount++
}
jb.lastSequence = lastPktSeqNo
}

// Push an RTP packet into the jitter buffer, this does not clone
// the data so if the memory is expected to be reused, the caller should
// take this in to account and pass a copy of the packet they wish to buffer
func (jb *JitterBuffer) Push(packet *rtp.Packet) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.packets.Length() == 0 {
jb.emit(StartBuffering)
}
if jb.packets.Length() > 100 {
jb.stats.overflowCount++
jb.emit(BufferOverflow)
}
if !jb.playoutReady && jb.packets.Length() == 0 {
jb.playoutHead = packet.SequenceNumber
}
jb.updateStats(packet.SequenceNumber)
jb.packets.Push(packet, packet.SequenceNumber)
jb.updateState()
}

func (jb *JitterBuffer) emit(event Event) {
for _, l := range jb.listeners[event] {
l(event, jb)
}
}

func (jb *JitterBuffer) updateState() {
// For now, we only look at the number of packets captured in the play buffer
if jb.packets.Length() >= 50 && jb.state == Buffering {
jb.state = Emitting
jb.playoutReady = true
jb.emit(BeginPlayback)
}
}

// Peek at the packet which is either:
//
// At the playout head when we are emitting, and the playoutHead flag is true
//
// or else
//
// At the last sequence received
func (jb *JitterBuffer) Peek(playoutHead bool) (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.packets.Length() < 1 {
return nil, ErrBufferUnderrun
}
if playoutHead && jb.state == Emitting {
return jb.packets.Find(jb.playoutHead)
}
return jb.packets.Find(jb.lastSequence)
}

// Pop an RTP packet from the jitter buffer at the current playout head
func (jb *JitterBuffer) Pop() (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.state != Emitting {
return nil, ErrPopWhileBuffering
}
packet, err := jb.packets.PopAt(jb.playoutHead)
if err != nil {
jb.stats.underflowCount++
jb.emit(BufferUnderflow)
return (*rtp.Packet)(nil), err
}
jb.playoutHead = (jb.playoutHead + 1) % math.MaxUint16
jb.updateState()
return packet, nil
}

// PopAtTimestamp pops an RTP packet from the jitter buffer with the provided timestamp
// Call this method repeatedly to drain the buffer at the timestamp
func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.state != Emitting {
return nil, ErrPopWhileBuffering
}
packet, err := jb.packets.PopAtTimestamp(ts)
if err != nil {
jb.stats.underflowCount++
jb.emit(BufferUnderflow)
return (*rtp.Packet)(nil), err
}
jb.updateState()
return packet, nil
}
123 changes: 123 additions & 0 deletions pkg/jitterbuffer/jitter_buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"math"
"testing"

"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)

func TestJitterBuffer(t *testing.T) {
assert := assert.New(t)
t.Run("Appends packets in order", func(t *testing.T) {
jb := New()
assert.Equal(jb.lastSequence, uint16(0))
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}})

assert.Equal(jb.lastSequence, uint16(5002))

jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 512}, Payload: []byte{0x02}})

assert.Equal(jb.lastSequence, uint16(5012))
assert.Equal(jb.stats.outOfOrderCount, uint32(1))
assert.Equal(jb.packets.Length(), uint16(4))
assert.Equal(jb.lastSequence, uint16(5012))
})

t.Run("Appends packets and begins playout", func(t *testing.T) {
jb := New()
for i := 0; i < 100; i++ {
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
assert.Equal(jb.packets.Length(), uint16(100))
assert.Equal(jb.state, Emitting)
assert.Equal(jb.playoutHead, uint16(5012))
head, err := jb.Pop()
assert.Equal(head.SequenceNumber, uint16(5012))
assert.Equal(err, nil)
})
t.Run("Wraps playout correctly", func(t *testing.T) {
jb := New()
for i := 0; i < 100; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
assert.Equal(jb.packets.Length(), uint16(100))
assert.Equal(jb.state, Emitting)
assert.Equal(jb.playoutHead, uint16(math.MaxUint16-32))
head, err := jb.Pop()
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
for i := 0; i < 100; i++ {
head, err := jb.Pop()
if i < 99 {
assert.Equal(head.SequenceNumber, uint16((math.MaxUint16-31+i)%math.MaxUint16))
assert.Equal(err, nil)
} else {
assert.Equal(head, (*rtp.Packet)(nil))
}
}
})
t.Run("Pops at timestamp correctly", func(t *testing.T) {
jb := New()
for i := 0; i < 100; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
assert.Equal(jb.packets.Length(), uint16(100))
assert.Equal(jb.state, Emitting)
head, err := jb.PopAtTimestamp(uint32(513))
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32+1))
assert.Equal(err, nil)
head, err = jb.PopAtTimestamp(uint32(513))
assert.Equal(head, (*rtp.Packet)(nil))
assert.NotEqual(err, nil)

head, err = jb.Pop()
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})
t.Run("Can peek at a packet", func(t *testing.T) {
jb := New()
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}})
pkt, err := jb.Peek(false)
assert.Equal(pkt.SequenceNumber, uint16(5002))
assert.Equal(err, nil)
for i := 0; i < 100; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
pkt, err = jb.Peek(true)
assert.Equal(pkt.SequenceNumber, uint16(5000))
assert.Equal(err, nil)
})
t.Run("Pops at timestamp with multiple packets", func(t *testing.T) {
jb := New()
for i := 0; i < 50; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1019, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1020, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
assert.Equal(jb.packets.Length(), uint16(52))
assert.Equal(jb.state, Emitting)
head, err := jb.PopAtTimestamp(uint32(9000))
assert.Equal(head.SequenceNumber, uint16(1019))
assert.Equal(err, nil)
head, err = jb.PopAtTimestamp(uint32(9000))
assert.Equal(head.SequenceNumber, uint16(1020))
assert.Equal(err, nil)

head, err = jb.Pop()
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})
}
Loading

0 comments on commit d9d223e

Please sign in to comment.