Skip to content

Commit

Permalink
Add SetPlayoutHead
Browse files Browse the repository at this point in the history
Allow the user to Get+Set PlayoutHead of the JitterBuffer
  • Loading branch information
Sean-Der committed Apr 11, 2024
1 parent bf4e938 commit 490d213
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 1 deletion.
27 changes: 26 additions & 1 deletion pkg/jitterbuffer/jitter_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,18 @@ type Stats struct {

// New will initialize a jitter buffer and its associated statistics
func New(opts ...Option) *JitterBuffer {
jb := &JitterBuffer{state: Buffering, stats: Stats{0, 0, 0}, minStartCount: 50, packets: NewQueue(), listeners: make(map[Event][]EventListener)}
jb := &JitterBuffer{
state: Buffering,
stats: Stats{0, 0, 0},
minStartCount: 50,
packets: NewQueue(),
listeners: make(map[Event][]EventListener),
}

for _, o := range opts {
o(jb)
}

return jb
}

Expand All @@ -113,6 +121,23 @@ func (jb *JitterBuffer) Listen(event Event, cb EventListener) {
jb.listeners[event] = append(jb.listeners[event], cb)
}

// PlayoutHead returns the SequenceNumber that will be attempted to Pop next
func (jb *JitterBuffer) PlayoutHead() uint16 {
jb.mutex.Lock()
defer jb.mutex.Unlock()

return jb.playoutHead
}

// SetPlayoutHead allows you to manually specify the packet you wish to pop next
// If you have encountered a packet that hasn't resolved you can skip it
func (jb *JitterBuffer) SetPlayoutHead(playoutHead uint16) {
jb.mutex.Lock()
defer jb.mutex.Unlock()

jb.playoutHead = playoutHead
}

func (jb *JitterBuffer) updateStats(lastPktSeqNo uint16) {
// If we have at least one packet, and the next packet being pushed in is not
// at the expected sequence number increment the out of order count
Expand Down
48 changes: 48 additions & 0 deletions pkg/jitterbuffer/jitter_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

func TestJitterBuffer(t *testing.T) {
assert := assert.New(t)

t.Run("Appends packets in order", func(*testing.T) {
jb := New()
assert.Equal(jb.lastSequence, uint16(0))
Expand All @@ -29,6 +30,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(jb.packets.Length(), uint16(4))
assert.Equal(jb.lastSequence, uint16(5012))
})

t.Run("Appends packets and begins playout", func(*testing.T) {
jb := New()
for i := 0; i < 100; i++ {
Expand All @@ -53,6 +55,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(head.SequenceNumber, uint16(5012))
assert.Equal(err, nil)
})

t.Run("Wraps playout correctly", func(*testing.T) {
jb := New()
for i := 0; i < 100; i++ {
Expand All @@ -75,6 +78,7 @@ func TestJitterBuffer(t *testing.T) {
}
}
})

t.Run("Pops at timestamp correctly", func(*testing.T) {
jb := New()
for i := 0; i < 100; i++ {
Expand All @@ -94,6 +98,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})

t.Run("Can peek at a packet", func(*testing.T) {
jb := New()
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
Expand All @@ -110,6 +115,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(pkt.SequenceNumber, uint16(5000))
assert.Equal(err, nil)
})

t.Run("Pops at sequence with an invalid sequence number", func(*testing.T) {
jb := New()
for i := 0; i < 50; i++ {
Expand All @@ -124,6 +130,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(head, (*rtp.Packet)(nil))
assert.NotEqual(err, nil)
})

t.Run("Pops at timestamp with multiple packets", func(*testing.T) {
jb := New()
for i := 0; i < 50; i++ {
Expand All @@ -145,6 +152,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})

t.Run("Peeks at timestamp with multiple packets", func(*testing.T) {
jb := New()
for i := 0; i < 50; i++ {
Expand All @@ -166,4 +174,44 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})

t.Run("SetPlayoutHead", func(*testing.T) {
jb := New(WithMinimumPacketCount(1))

// Push packets 0-9, but no packet 4
for i := uint16(0); i < 10; i++ {
if i == 4 {
continue
}
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: i, Timestamp: uint32(512 + i)}, Payload: []byte{0x00}})
}

// The first 3 packets will be able to popped
for i := 0; i < 4; i++ {
pkt, err := jb.Pop()
assert.NoError(err)
assert.NotNil(pkt)
}

// The next pop will fail because of gap
pkt, err := jb.Pop()
assert.ErrorIs(err, ErrNotFound)
assert.Nil(pkt)
assert.Equal(jb.PlayoutHead(), uint16(4))

// Assert that PlayoutHead isn't modified with pushing/popping again
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 10, Timestamp: uint32(522)}, Payload: []byte{0x00}})
pkt, err = jb.Pop()
assert.ErrorIs(err, ErrNotFound)
assert.Nil(pkt)
assert.Equal(jb.PlayoutHead(), uint16(4))

// Increment the PlayoutHead and popping will work again
jb.SetPlayoutHead(jb.PlayoutHead() + 1)
for i := 0; i < 6; i++ {
pkt, err := jb.Pop()
assert.NoError(err)
assert.NotNil(pkt)
}
})
}

0 comments on commit 490d213

Please sign in to comment.