From d9d223e1e2ff305334b50769ba5b7bba9bb95905 Mon Sep 17 00:00:00 2001 From: Rob Elsner Date: Tue, 23 Jan 2024 19:42:58 -0400 Subject: [PATCH] Start pkg/jitterbuffer A Priority Queue JitterBuffer that will be used to back the SampleBuilder and a new Interceptor --- pkg/jitterbuffer/jitter_buffer.go | 205 ++++++++++++++++++++++++ pkg/jitterbuffer/jitter_buffer_test.go | 123 ++++++++++++++ pkg/jitterbuffer/priority_queue.go | 181 +++++++++++++++++++++ pkg/jitterbuffer/priority_queue_test.go | 79 +++++++++ 4 files changed, 588 insertions(+) create mode 100644 pkg/jitterbuffer/jitter_buffer.go create mode 100644 pkg/jitterbuffer/jitter_buffer_test.go create mode 100644 pkg/jitterbuffer/priority_queue.go create mode 100644 pkg/jitterbuffer/priority_queue_test.go diff --git a/pkg/jitterbuffer/jitter_buffer.go b/pkg/jitterbuffer/jitter_buffer.go new file mode 100644 index 00000000..b66d5014 --- /dev/null +++ b/pkg/jitterbuffer/jitter_buffer.go @@ -0,0 +1,205 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +// Package jitterbuffer implements a buffer for RTP packets designed to help +// counteract non-deterministic sources of latency +package jitterbuffer + +import ( + "errors" + "math" + "sync" + + "github.com/pion/rtp" +) + +// State tracks a JitterBuffer as either Buffering or Emitting +type State uint16 + +// Event represents all events a JitterBuffer can emit +type Event string + +var ( + // ErrBufferUnderrun is returned when the buffer has no items + ErrBufferUnderrun = errors.New("invalid Peek: Empty jitter buffer") + // ErrPopWhileBuffering is returned if a jitter buffer is not in a playback state + ErrPopWhileBuffering = errors.New("attempt to pop while buffering") +) + +const ( + // Buffering is the state when the jitter buffer has not started emitting yet, or has hit an underflow and needs to re-buffer packets + Buffering State = iota + // Emitting is the state when the jitter buffer is operating nominally + Emitting +) + +const ( + // StartBuffering is emitted when the buffer receives its first packet + StartBuffering Event = "startBuffering" + // BeginPlayback is emitted when the buffer has satisfied its buffer length + BeginPlayback = "playing" + // BufferUnderflow is emitted when the buffer does not have enough packets to Pop + BufferUnderflow = "underflow" + // BufferOverflow is emitted when the buffer has exceeded its limit + BufferOverflow = "overflow" +) + +func (jbs State) String() string { + switch jbs { + case Buffering: + return "Buffering" + case Emitting: + return "Emitting" + } + return "unknown" +} + +type ( + // Option will Override JitterBuffer's defaults + Option func(jb *JitterBuffer) + // EventListener will be called when the corresponding Event occurs + EventListener func(event Event, jb *JitterBuffer) +) + +// A JitterBuffer will accept Pushed packets, put them in sequence number +// order, and allows removing in either sequence number order or via a +// provided timestamp +type JitterBuffer struct { + packets *PriorityQueue + lastSequence uint16 + playoutHead uint16 + playoutReady bool + state State + stats Stats + listeners map[Event][]EventListener + mutex sync.Mutex +} + +// Stats Track interesting statistics for the life of this JitterBuffer +// outOfOrderCount will provide the number of times a packet was Pushed +// +// without its predecessor being present +// +// underflowCount will provide the count of attempts to Pop an empty buffer +// overflowCount will track the number of times the jitter buffer exceeds its limit +type Stats struct { + outOfOrderCount uint32 + underflowCount uint32 + overflowCount uint32 +} + +// New will initialize a jitter buffer and its associated statistics +func New(opts ...Option) *JitterBuffer { + jb := &JitterBuffer{state: Buffering, stats: Stats{0, 0, 0}, packets: NewQueue(), listeners: make(map[Event][]EventListener)} + for _, o := range opts { + o(jb) + } + return jb +} + +// Listen will register an event listener +// The jitter buffer may emit events correspnding, interested listerns should +// look at Event for available events +func (jb *JitterBuffer) Listen(event Event, cb EventListener) { + jb.listeners[event] = append(jb.listeners[event], cb) +} + +func (jb *JitterBuffer) updateStats(lastPktSeqNo uint16) { + // If we have at least one packet, and the next packet being pushed in is not + // at the expected sequence number increment the out of order count + if jb.packets.Length() > 0 && lastPktSeqNo != ((jb.lastSequence+1)%math.MaxUint16) { + jb.stats.outOfOrderCount++ + } + jb.lastSequence = lastPktSeqNo +} + +// Push an RTP packet into the jitter buffer, this does not clone +// the data so if the memory is expected to be reused, the caller should +// take this in to account and pass a copy of the packet they wish to buffer +func (jb *JitterBuffer) Push(packet *rtp.Packet) { + jb.mutex.Lock() + defer jb.mutex.Unlock() + if jb.packets.Length() == 0 { + jb.emit(StartBuffering) + } + if jb.packets.Length() > 100 { + jb.stats.overflowCount++ + jb.emit(BufferOverflow) + } + if !jb.playoutReady && jb.packets.Length() == 0 { + jb.playoutHead = packet.SequenceNumber + } + jb.updateStats(packet.SequenceNumber) + jb.packets.Push(packet, packet.SequenceNumber) + jb.updateState() +} + +func (jb *JitterBuffer) emit(event Event) { + for _, l := range jb.listeners[event] { + l(event, jb) + } +} + +func (jb *JitterBuffer) updateState() { + // For now, we only look at the number of packets captured in the play buffer + if jb.packets.Length() >= 50 && jb.state == Buffering { + jb.state = Emitting + jb.playoutReady = true + jb.emit(BeginPlayback) + } +} + +// Peek at the packet which is either: +// +// At the playout head when we are emitting, and the playoutHead flag is true +// +// or else +// +// At the last sequence received +func (jb *JitterBuffer) Peek(playoutHead bool) (*rtp.Packet, error) { + jb.mutex.Lock() + defer jb.mutex.Unlock() + if jb.packets.Length() < 1 { + return nil, ErrBufferUnderrun + } + if playoutHead && jb.state == Emitting { + return jb.packets.Find(jb.playoutHead) + } + return jb.packets.Find(jb.lastSequence) +} + +// Pop an RTP packet from the jitter buffer at the current playout head +func (jb *JitterBuffer) Pop() (*rtp.Packet, error) { + jb.mutex.Lock() + defer jb.mutex.Unlock() + if jb.state != Emitting { + return nil, ErrPopWhileBuffering + } + packet, err := jb.packets.PopAt(jb.playoutHead) + if err != nil { + jb.stats.underflowCount++ + jb.emit(BufferUnderflow) + return (*rtp.Packet)(nil), err + } + jb.playoutHead = (jb.playoutHead + 1) % math.MaxUint16 + jb.updateState() + return packet, nil +} + +// PopAtTimestamp pops an RTP packet from the jitter buffer with the provided timestamp +// Call this method repeatedly to drain the buffer at the timestamp +func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) { + jb.mutex.Lock() + defer jb.mutex.Unlock() + if jb.state != Emitting { + return nil, ErrPopWhileBuffering + } + packet, err := jb.packets.PopAtTimestamp(ts) + if err != nil { + jb.stats.underflowCount++ + jb.emit(BufferUnderflow) + return (*rtp.Packet)(nil), err + } + jb.updateState() + return packet, nil +} diff --git a/pkg/jitterbuffer/jitter_buffer_test.go b/pkg/jitterbuffer/jitter_buffer_test.go new file mode 100644 index 00000000..ff97d2ca --- /dev/null +++ b/pkg/jitterbuffer/jitter_buffer_test.go @@ -0,0 +1,123 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "math" + "testing" + + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestJitterBuffer(t *testing.T) { + assert := assert.New(t) + t.Run("Appends packets in order", func(t *testing.T) { + jb := New() + assert.Equal(jb.lastSequence, uint16(0)) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}}) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}}) + + assert.Equal(jb.lastSequence, uint16(5002)) + + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 512}, Payload: []byte{0x02}}) + + assert.Equal(jb.lastSequence, uint16(5012)) + assert.Equal(jb.stats.outOfOrderCount, uint32(1)) + assert.Equal(jb.packets.Length(), uint16(4)) + assert.Equal(jb.lastSequence, uint16(5012)) + }) + + t.Run("Appends packets and begins playout", func(t *testing.T) { + jb := New() + for i := 0; i < 100; i++ { + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) + } + assert.Equal(jb.packets.Length(), uint16(100)) + assert.Equal(jb.state, Emitting) + assert.Equal(jb.playoutHead, uint16(5012)) + head, err := jb.Pop() + assert.Equal(head.SequenceNumber, uint16(5012)) + assert.Equal(err, nil) + }) + t.Run("Wraps playout correctly", func(t *testing.T) { + jb := New() + for i := 0; i < 100; i++ { + sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) + } + assert.Equal(jb.packets.Length(), uint16(100)) + assert.Equal(jb.state, Emitting) + assert.Equal(jb.playoutHead, uint16(math.MaxUint16-32)) + head, err := jb.Pop() + assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) + assert.Equal(err, nil) + for i := 0; i < 100; i++ { + head, err := jb.Pop() + if i < 99 { + assert.Equal(head.SequenceNumber, uint16((math.MaxUint16-31+i)%math.MaxUint16)) + assert.Equal(err, nil) + } else { + assert.Equal(head, (*rtp.Packet)(nil)) + } + } + }) + t.Run("Pops at timestamp correctly", func(t *testing.T) { + jb := New() + for i := 0; i < 100; i++ { + sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) + } + assert.Equal(jb.packets.Length(), uint16(100)) + assert.Equal(jb.state, Emitting) + head, err := jb.PopAtTimestamp(uint32(513)) + assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32+1)) + assert.Equal(err, nil) + head, err = jb.PopAtTimestamp(uint32(513)) + assert.Equal(head, (*rtp.Packet)(nil)) + assert.NotEqual(err, nil) + + head, err = jb.Pop() + assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) + assert.Equal(err, nil) + }) + t.Run("Can peek at a packet", func(t *testing.T) { + jb := New() + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}}) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}}) + pkt, err := jb.Peek(false) + assert.Equal(pkt.SequenceNumber, uint16(5002)) + assert.Equal(err, nil) + for i := 0; i < 100; i++ { + sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) + } + pkt, err = jb.Peek(true) + assert.Equal(pkt.SequenceNumber, uint16(5000)) + assert.Equal(err, nil) + }) + t.Run("Pops at timestamp with multiple packets", func(t *testing.T) { + jb := New() + for i := 0; i < 50; i++ { + sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) + } + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1019, Timestamp: uint32(9000)}, Payload: []byte{0x02}}) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1020, Timestamp: uint32(9000)}, Payload: []byte{0x02}}) + assert.Equal(jb.packets.Length(), uint16(52)) + assert.Equal(jb.state, Emitting) + head, err := jb.PopAtTimestamp(uint32(9000)) + assert.Equal(head.SequenceNumber, uint16(1019)) + assert.Equal(err, nil) + head, err = jb.PopAtTimestamp(uint32(9000)) + assert.Equal(head.SequenceNumber, uint16(1020)) + assert.Equal(err, nil) + + head, err = jb.Pop() + assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) + assert.Equal(err, nil) + }) +} diff --git a/pkg/jitterbuffer/priority_queue.go b/pkg/jitterbuffer/priority_queue.go new file mode 100644 index 00000000..cecc59c1 --- /dev/null +++ b/pkg/jitterbuffer/priority_queue.go @@ -0,0 +1,181 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "errors" + + "github.com/pion/rtp" +) + +// PriorityQueue provides a linked list sorting of RTP packets by SequenceNumber +type PriorityQueue struct { + next *node + length uint16 +} + +type node struct { + val *rtp.Packet + next *node + prev *node + prio uint16 +} + +var ( + // ErrInvalidOperation may be returned if a Pop or Find operation is performed on an empty queue + ErrInvalidOperation = errors.New("attempt to find or pop on an empty list") + // ErrNotFound will be returned if the packet cannot be found in the queue + ErrNotFound = errors.New("priority not found") +) + +// NewQueue will create a new PriorityQueue whose order relies on monotonically +// increasing Sequence Number, wrapping at MaxUint16, so +// a packet with sequence number MaxUint16 - 1 will be after 0 +func NewQueue() *PriorityQueue { + return &PriorityQueue{ + next: nil, + length: 0, + } +} + +func newNode(val *rtp.Packet, prio uint16) *node { + return &node{ + val: val, + prev: nil, + next: nil, + prio: prio, + } +} + +// Find a packet in the queue with the provided sequence number, +// regardless of position (the packet is retained in the queue) +func (q *PriorityQueue) Find(sqNum uint16) (*rtp.Packet, error) { + if q.next.prio == sqNum { + return q.next.val, nil + } + + if sqNum < q.next.prio { + return nil, ErrInvalidOperation + } + next := q.next + for next != nil { + if next.prio == sqNum { + return next.val, nil + } + next = next.next + } + return nil, ErrNotFound +} + +// Push will insert a packet in to the queue in order of sequence number +func (q *PriorityQueue) Push(val *rtp.Packet, prio uint16) { + newPq := newNode(val, prio) + if q.next == nil { + q.next = newPq + q.length++ + return + } + if prio < q.next.prio { + newPq.next = q.next + q.next.prev = newPq + q.next = newPq + q.length++ + return + } + head := q.next + prev := q.next + for head != nil { + if prio <= head.prio { + break + } + prev = head + head = head.next + } + if head == nil { + if prev != nil { + prev.next = newPq + } + newPq.prev = prev + } else { + newPq.next = head + newPq.prev = prev + if prev != nil { + prev.next = newPq + } + head.prev = newPq + } + q.length++ +} + +// Length will get the total length of the queue +func (q *PriorityQueue) Length() uint16 { + return q.length +} + +// Pop removes the first element from the queue, regardless +// sequence number +func (q *PriorityQueue) Pop() (*rtp.Packet, error) { + if q.next == nil { + return nil, ErrInvalidOperation + } + val := q.next.val + q.length-- + q.next = q.next.next + return val, nil +} + +// PopAt removes an element at the specified sequence number (priority) +func (q *PriorityQueue) PopAt(sqNum uint16) (*rtp.Packet, error) { + if q.next == nil { + return nil, ErrInvalidOperation + } + if q.next.prio == sqNum { + val := q.next.val + q.next = q.next.next + return val, nil + } + pos := q.next + prev := q.next.prev + for pos != nil { + if pos.prio == sqNum { + val := pos.val + prev.next = pos.next + if prev.next != nil { + prev.next.prev = prev + } + return val, nil + } + prev = pos + pos = pos.next + } + return nil, ErrNotFound +} + +// PopAtTimestamp removes and returns a packet at the given RTP Timestamp, regardless +// sequence number order +func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) { + if q.next == nil { + return nil, ErrInvalidOperation + } + if q.next.val.Timestamp == timestamp { + val := q.next.val + q.next = q.next.next + return val, nil + } + pos := q.next + prev := q.next.prev + for pos != nil { + if pos.val.Timestamp == timestamp { + val := pos.val + prev.next = pos.next + if prev.next != nil { + prev.next.prev = prev + } + return val, nil + } + prev = pos + pos = pos.next + } + return nil, ErrNotFound +} diff --git a/pkg/jitterbuffer/priority_queue_test.go b/pkg/jitterbuffer/priority_queue_test.go new file mode 100644 index 00000000..89989a9c --- /dev/null +++ b/pkg/jitterbuffer/priority_queue_test.go @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "testing" + + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestPriorityQueue(t *testing.T) { + assert := assert.New(t) + t.Run("Appends packets in order", func(t *testing.T) { + pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}} + q := NewQueue() + q.Push(pkt, pkt.SequenceNumber) + pkt2 := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5004, Timestamp: 500}, Payload: []byte{0x02}} + q.Push(pkt2, pkt2.SequenceNumber) + assert.Equal(q.next.next.val, pkt2) + assert.Equal(q.next.prio, uint16(5000)) + assert.Equal(q.next.next.prio, uint16(5004)) + }) + t.Run("Appends many in order", func(t *testing.T) { + q := NewQueue() + for i := 0; i < 100; i++ { + q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i)) + } + assert.Equal(uint16(100), q.Length()) + last := (*node)(nil) + cur := q.next + for cur != nil { + last = cur + cur = cur.next + if cur != nil { + assert.Equal(cur.prio, last.prio+1) + } + } + assert.Equal(q.next.prio, uint16(5012)) + assert.Equal(last.prio, uint16(5012+99)) + }) + t.Run("Can remove an element", func(t *testing.T) { + pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}} + q := NewQueue() + q.Push(pkt, pkt.SequenceNumber) + pkt2 := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5004, Timestamp: 500}, Payload: []byte{0x02}} + q.Push(pkt2, pkt2.SequenceNumber) + for i := 0; i < 100; i++ { + q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i)) + } + popped, _ := q.Pop() + assert.Equal(popped.SequenceNumber, uint16(5000)) + _, _ = q.Pop() + nextPop, _ := q.Pop() + assert.Equal(nextPop.SequenceNumber, uint16(5012)) + }) + t.Run("Appends in order", func(t *testing.T) { + q := NewQueue() + for i := 0; i < 100; i++ { + q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i)) + } + assert.Equal(uint16(100), q.Length()) + pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}} + q.Push(pkt, pkt.SequenceNumber) + assert.Equal(pkt, q.next.val) + assert.Equal(uint16(101), q.Length()) + assert.Equal(q.next.prio, uint16(5000)) + }) + t.Run("Can find", func(t *testing.T) { + q := NewQueue() + for i := 0; i < 100; i++ { + q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i)) + } + pkt, err := q.Find(5012) + assert.Equal(pkt.SequenceNumber, uint16(5012)) + assert.Equal(err, nil) + }) +}