Skip to content

Commit

Permalink
JitterBuffer: Fix queue not properly decrementing
Browse files Browse the repository at this point in the history
In some cases the Priority Queue would not properly
decrement packet count even though it did remove a
packet from the queue.
  • Loading branch information
thatsnotright committed Apr 21, 2024
1 parent 1449b4f commit 7a6b1a3
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/jitterbuffer/jitter_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func TestJitterBuffer(t *testing.T) {
})
t.Run("Appends packets and begins playout", func(*testing.T) {
jb := New(WithMinimumPacketCount(1))
events := make([]Event, 0)
jb.Listen(BeginPlayback, func(event Event, jb *JitterBuffer) {

Check failure on line 49 in pkg/jitterbuffer/jitter_buffer_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

unused-parameter: parameter 'event' seems to be unused, consider removing or renaming it as _ (revive)
events = append(events, BeginPlayback)
})
for i := 0; i < 2; i++ {
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
Expand All @@ -54,6 +58,8 @@ func TestJitterBuffer(t *testing.T) {
head, err := jb.Pop()
assert.Equal(head.SequenceNumber, uint16(5012))
assert.Equal(err, nil)
assert.Equal(1, len(events))
assert.Equal(Event(BeginPlayback), events[0])
})

t.Run("Wraps playout correctly", func(*testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/jitterbuffer/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (q *PriorityQueue) PopAt(sqNum uint16) (*rtp.Packet, error) {
if q.next.priority == sqNum {
val := q.next.val
q.next = q.next.next
q.length--
return val, nil
}
pos := q.next
Expand All @@ -138,6 +139,7 @@ func (q *PriorityQueue) PopAt(sqNum uint16) (*rtp.Packet, error) {
if prev.next != nil {
prev.next.prev = prev
}
q.length--
return val, nil
}
prev = pos
Expand All @@ -155,6 +157,7 @@ func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) {
if q.next.val.Timestamp == timestamp {
val := q.next.val
q.next = q.next.next
q.length--
return val, nil
}
pos := q.next
Expand All @@ -166,6 +169,7 @@ func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) {
if prev.next != nil {
prev.next.prev = prev
}
q.length--
return val, nil
}
prev = pos
Expand Down
20 changes: 20 additions & 0 deletions pkg/jitterbuffer/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,26 @@ func TestPriorityQueue(t *testing.T) {
assert.Equal(pkt.SequenceNumber, uint16(5012))
assert.Equal(err, nil)
})

t.Run("Updates the length when PopAt* are called", func(*testing.T) {
pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}
q := NewQueue()
q.Push(pkt, pkt.SequenceNumber)
pkt2 := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5004, Timestamp: 500}, Payload: []byte{0x02}}
q.Push(pkt2, pkt2.SequenceNumber)
for i := 0; i < 100; i++ {
q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i))
}
assert.Equal(uint16(102), q.Length())
popped, _ := q.PopAt(uint16(5012))
assert.Equal(popped.SequenceNumber, uint16(5012))
assert.Equal(uint16(101), q.Length())

popped, err := q.PopAtTimestamp(uint32(500))
assert.Equal(popped.SequenceNumber, uint16(5000))
assert.Equal(uint16(100), q.Length())
assert.Equal(err, nil)
})
}

func TestPriorityQueue_Find(t *testing.T) {
Expand Down

0 comments on commit 7a6b1a3

Please sign in to comment.