Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JitterBuffer: Improve performance for SampleBuilder use #292

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/test/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc
if !ok {
return 0, nil, io.EOF
}

marshaled, err := p.Marshal()
if err != nil {
return 0, nil, io.EOF
Expand Down
12 changes: 8 additions & 4 deletions pkg/jitterbuffer/jitter_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
// order, and allows removing in either sequence number order or via a
// provided timestamp
type JitterBuffer struct {
packets *PriorityQueue
packets *RBTree
minStartCount uint16
lastSequence uint16
playoutHead uint16
Expand Down Expand Up @@ -128,6 +128,12 @@
return jb.playoutHead
}

func (jb *JitterBuffer) Length() uint16 {
jb.mutex.Lock()
defer jb.mutex.Unlock()
return jb.packets.Length()

Check warning on line 134 in pkg/jitterbuffer/jitter_buffer.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/jitter_buffer.go#L131-L134

Added lines #L131 - L134 were not covered by tests
}

// SetPlayoutHead allows you to manually specify the packet you wish to pop next
// If you have encountered a packet that hasn't resolved you can skip it
func (jb *JitterBuffer) SetPlayoutHead(playoutHead uint16) {
Expand Down Expand Up @@ -155,7 +161,7 @@
if jb.packets.Length() == 0 {
jb.emit(StartBuffering)
}
if jb.packets.Length() > 100 {
if jb.packets.Length() > 2*jb.minStartCount {
jb.stats.overflowCount++
jb.emit(BufferOverflow)
}
Expand Down Expand Up @@ -240,8 +246,6 @@
// PeekAtSequence will return an RTP packet from the jitter buffer at the specified Sequence
// without removing it from the buffer
func (jb *JitterBuffer) PeekAtSequence(sq uint16) (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
packet, err := jb.packets.Find(sq)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/jitterbuffer/jitter_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(jb.packets.Length(), uint16(100))
assert.Equal(jb.state, Emitting)
head, err := jb.PopAtTimestamp(uint32(513))
assert.NotNil(head)
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32+1))
assert.Equal(err, nil)
head, err = jb.PopAtTimestamp(uint32(513))
Expand Down
7 changes: 7 additions & 0 deletions pkg/jitterbuffer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,10 @@ func Log(log logging.LeveledLogger) ReceiverInterceptorOption {
return nil
}
}

func WithSkipMissingPackets() ReceiverInterceptorOption {
return func(d *ReceiverInterceptor) error {
d.skipMissingPackets = true
return nil
}
}
Loading
Loading