Skip to content

Commit

Permalink
JitterBuffer: Fix queue not properly decrementing
Browse files Browse the repository at this point in the history
In some cases the Priority Queue would not properly
decrement packet count even though it did remove a
packet from the queue.
  • Loading branch information
thatsnotright committed Apr 21, 2024
1 parent 1449b4f commit d29bd2f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/jitterbuffer/jitter_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func TestJitterBuffer(t *testing.T) {
})
t.Run("Appends packets and begins playout", func(*testing.T) {
jb := New(WithMinimumPacketCount(1))
events := make([]Event, 0)
jb.Listen(BeginPlayback, func(event Event, _ *JitterBuffer) {
events = append(events, event)
})
for i := 0; i < 2; i++ {
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
Expand All @@ -54,6 +58,8 @@ func TestJitterBuffer(t *testing.T) {
head, err := jb.Pop()
assert.Equal(head.SequenceNumber, uint16(5012))
assert.Equal(err, nil)
assert.Equal(1, len(events))
assert.Equal(Event(BeginPlayback), events[0])
})

t.Run("Wraps playout correctly", func(*testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/jitterbuffer/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (q *PriorityQueue) PopAt(sqNum uint16) (*rtp.Packet, error) {
if q.next.priority == sqNum {
val := q.next.val
q.next = q.next.next
q.length--
return val, nil
}
pos := q.next
Expand All @@ -138,6 +139,7 @@ func (q *PriorityQueue) PopAt(sqNum uint16) (*rtp.Packet, error) {
if prev.next != nil {
prev.next.prev = prev
}
q.length--
return val, nil
}
prev = pos
Expand All @@ -155,6 +157,7 @@ func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) {
if q.next.val.Timestamp == timestamp {
val := q.next.val
q.next = q.next.next
q.length--
return val, nil
}
pos := q.next
Expand All @@ -166,6 +169,7 @@ func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) {
if prev.next != nil {
prev.next.prev = prev
}
q.length--
return val, nil
}
prev = pos
Expand Down
20 changes: 20 additions & 0 deletions pkg/jitterbuffer/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,26 @@ func TestPriorityQueue(t *testing.T) {
assert.Equal(pkt.SequenceNumber, uint16(5012))
assert.Equal(err, nil)
})

t.Run("Updates the length when PopAt* are called", func(*testing.T) {
pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}
q := NewQueue()
q.Push(pkt, pkt.SequenceNumber)
pkt2 := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5004, Timestamp: 500}, Payload: []byte{0x02}}
q.Push(pkt2, pkt2.SequenceNumber)
for i := 0; i < 100; i++ {
q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i))
}
assert.Equal(uint16(102), q.Length())
popped, _ := q.PopAt(uint16(5012))
assert.Equal(popped.SequenceNumber, uint16(5012))
assert.Equal(uint16(101), q.Length())

popped, err := q.PopAtTimestamp(uint32(500))
assert.Equal(popped.SequenceNumber, uint16(5000))
assert.Equal(uint16(100), q.Length())
assert.Equal(err, nil)
})
}

func TestPriorityQueue_Find(t *testing.T) {
Expand Down

0 comments on commit d29bd2f

Please sign in to comment.