Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SetPlayoutHead #246

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion pkg/jitterbuffer/jitter_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,18 @@ type Stats struct {

// New will initialize a jitter buffer and its associated statistics
func New(opts ...Option) *JitterBuffer {
jb := &JitterBuffer{state: Buffering, stats: Stats{0, 0, 0}, minStartCount: 50, packets: NewQueue(), listeners: make(map[Event][]EventListener)}
jb := &JitterBuffer{
state: Buffering,
stats: Stats{0, 0, 0},
minStartCount: 50,
packets: NewQueue(),
listeners: make(map[Event][]EventListener),
}

for _, o := range opts {
o(jb)
}

return jb
}

Expand All @@ -113,6 +121,23 @@ func (jb *JitterBuffer) Listen(event Event, cb EventListener) {
jb.listeners[event] = append(jb.listeners[event], cb)
}

// PlayoutHead returns the SequenceNumber that will be attempted to Pop next
func (jb *JitterBuffer) PlayoutHead() uint16 {
jb.mutex.Lock()
defer jb.mutex.Unlock()

return jb.playoutHead
}

// SetPlayoutHead allows you to manually specify the packet you wish to pop next
// If you have encountered a packet that hasn't resolved you can skip it
func (jb *JitterBuffer) SetPlayoutHead(playoutHead uint16) {
jb.mutex.Lock()
defer jb.mutex.Unlock()

jb.playoutHead = playoutHead
}

func (jb *JitterBuffer) updateStats(lastPktSeqNo uint16) {
// If we have at least one packet, and the next packet being pushed in is not
// at the expected sequence number increment the out of order count
Expand Down
48 changes: 48 additions & 0 deletions pkg/jitterbuffer/jitter_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

func TestJitterBuffer(t *testing.T) {
assert := assert.New(t)

t.Run("Appends packets in order", func(*testing.T) {
jb := New()
assert.Equal(jb.lastSequence, uint16(0))
Expand All @@ -29,6 +30,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(jb.packets.Length(), uint16(4))
assert.Equal(jb.lastSequence, uint16(5012))
})

t.Run("Appends packets and begins playout", func(*testing.T) {
jb := New()
for i := 0; i < 100; i++ {
Expand All @@ -53,6 +55,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(head.SequenceNumber, uint16(5012))
assert.Equal(err, nil)
})

t.Run("Wraps playout correctly", func(*testing.T) {
jb := New()
for i := 0; i < 100; i++ {
Expand All @@ -75,6 +78,7 @@ func TestJitterBuffer(t *testing.T) {
}
}
})

t.Run("Pops at timestamp correctly", func(*testing.T) {
jb := New()
for i := 0; i < 100; i++ {
Expand All @@ -94,6 +98,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})

t.Run("Can peek at a packet", func(*testing.T) {
jb := New()
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
Expand All @@ -110,6 +115,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(pkt.SequenceNumber, uint16(5000))
assert.Equal(err, nil)
})

t.Run("Pops at sequence with an invalid sequence number", func(*testing.T) {
jb := New()
for i := 0; i < 50; i++ {
Expand All @@ -124,6 +130,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(head, (*rtp.Packet)(nil))
assert.NotEqual(err, nil)
})

t.Run("Pops at timestamp with multiple packets", func(*testing.T) {
jb := New()
for i := 0; i < 50; i++ {
Expand All @@ -145,6 +152,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})

t.Run("Peeks at timestamp with multiple packets", func(*testing.T) {
jb := New()
for i := 0; i < 50; i++ {
Expand All @@ -166,4 +174,44 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})

t.Run("SetPlayoutHead", func(*testing.T) {
jb := New(WithMinimumPacketCount(1))

// Push packets 0-9, but no packet 4
for i := uint16(0); i < 10; i++ {
if i == 4 {
continue
}
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: i, Timestamp: uint32(512 + i)}, Payload: []byte{0x00}})
}

// The first 3 packets will be able to popped
for i := 0; i < 4; i++ {
pkt, err := jb.Pop()
assert.NoError(err)
assert.NotNil(pkt)
}

// The next pop will fail because of gap
pkt, err := jb.Pop()
assert.ErrorIs(err, ErrNotFound)
assert.Nil(pkt)
assert.Equal(jb.PlayoutHead(), uint16(4))

// Assert that PlayoutHead isn't modified with pushing/popping again
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 10, Timestamp: uint32(522)}, Payload: []byte{0x00}})
pkt, err = jb.Pop()
assert.ErrorIs(err, ErrNotFound)
assert.Nil(pkt)
assert.Equal(jb.PlayoutHead(), uint16(4))

// Increment the PlayoutHead and popping will work again
jb.SetPlayoutHead(jb.PlayoutHead() + 1)
for i := 0; i < 6; i++ {
pkt, err := jb.Pop()
assert.NoError(err)
assert.NotNil(pkt)
}
})
}
Loading