Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Sean-Der committed Oct 14, 2024
1 parent 60a8329 commit 9c84738
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 82 deletions.
13 changes: 13 additions & 0 deletions internal/rtpbuffer/packet_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func NewPacketFactoryCopy() *PacketFactoryCopy {
}
}

const maxPayloadLen = 1460

// NewPacket constructs a new RetainablePacket that can be added to the RTPBuffer
func (m *PacketFactoryCopy) NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*RetainablePacket, error) {
if len(payload) > maxPayloadLen {
Expand Down Expand Up @@ -123,3 +125,14 @@ func (f *PacketFactoryNoOp) NewPacket(header *rtp.Header, payload []byte, _ uint
func (f *PacketFactoryNoOp) releasePacket(_ *rtp.Header, _ *[]byte) {
// no-op
}

// NewRetainablePacketFromRTPPacket creates a RetainablePacket that embeds a RTP Packet directly
func NewRetainablePacketFromRTPPacket(pkt *rtp.Packet) *RetainablePacket {
return &RetainablePacket{
onRelease: func(*rtp.Header, *[]byte) {},
count: 1,
packet: pkt,
header: &pkt.Header,
sequenceNumber: pkt.Header.SequenceNumber,
}
}
6 changes: 2 additions & 4 deletions internal/rtpbuffer/retainable_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type RetainablePacket struct {
countMu sync.Mutex
count int

packet *rtp.Packet
header *rtp.Header
buffer *[]byte
payload []byte
Expand All @@ -35,10 +36,7 @@ func (p *RetainablePacket) Payload() []byte {

// Packet returns a RTP Packet for a RetainablePacket
func (p *RetainablePacket) Packet() *rtp.Packet {
return &rtp.Packet{
Header: *p.Header(),
Payload: p.Payload(),
}
return p.packet
}

// Retain increases the reference count of the RetainablePacket
Expand Down
54 changes: 7 additions & 47 deletions internal/rtpbuffer/rtpbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,22 @@ import (
"fmt"
)

const (
// Uint16SizeHalf is half of a math.Uint16
Uint16SizeHalf = 1 << 15

maxPayloadLen = 1460
)

// RTPBuffer stores RTP packets and allows custom logic around the lifetime of them via the PacketFactory
type RTPBuffer struct {
packets []*RetainablePacket
size uint16
lastAdded uint16
started bool
}

// NewRTPBuffer constructs a new RTPBuffer
func NewRTPBuffer(size uint16) (*RTPBuffer, error) {
allowedSizes := make([]uint16, 0)
allowedSizes := []uint16{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65535}
correctSize := false
for i := 0; i < 16; i++ {
if size == 1<<i {
for _, v := range allowedSizes {
if v == size {
correctSize = true
break
}
allowedSizes = append(allowedSizes, 1<<i)
}

if !correctSize {
Expand All @@ -48,47 +39,17 @@ func NewRTPBuffer(size uint16) (*RTPBuffer, error) {
// Add places the RetainablePacket in the RTPBuffer
func (r *RTPBuffer) Add(packet *RetainablePacket) {
seq := packet.sequenceNumber
if !r.started {
r.packets[seq%r.size] = packet
r.lastAdded = seq
r.started = true
return
}

diff := seq - r.lastAdded
if diff == 0 {
return
} else if diff < Uint16SizeHalf {
for i := r.lastAdded + 1; i != seq; i++ {
idx := i % r.size
prevPacket := r.packets[idx]
if prevPacket != nil {
prevPacket.Release(false)
}
r.packets[idx] = nil
}
}

idx := seq % r.size
prevPacket := r.packets[idx]
if prevPacket != nil {

if prevPacket := r.packets[idx]; prevPacket != nil {
prevPacket.Release(false)
}

r.packets[idx] = packet
r.lastAdded = seq
}

// Get returns the RetainablePacket for the requested sequence number
func (r *RTPBuffer) Get(seq uint16) *RetainablePacket {
diff := r.lastAdded - seq
if diff >= Uint16SizeHalf {
return nil
}

if diff >= r.size {
return nil
}

pkt := r.packets[seq%r.size]
if pkt != nil {
if pkt.sequenceNumber != seq {
Expand Down Expand Up @@ -130,7 +91,6 @@ func (r *RTPBuffer) Length() (length uint16) {

// Clear erases all the packets in the RTPBuffer
func (r *RTPBuffer) Clear() {
r.lastAdded = 0
r.started = false
r.packets = make([]*RetainablePacket, r.size)
r.lastAdded = 0
}
4 changes: 2 additions & 2 deletions internal/rtpbuffer/rtpbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ func TestRTPBuffer(t *testing.T) {

add(10)
assertGet(10)
assertNOTGet(1, 2, 9)
assertNOTGet(2)

add(22)
assertGet(22)
assertNOTGet(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21)
assertNOTGet(6)
}
}

Expand Down
46 changes: 22 additions & 24 deletions pkg/jitterbuffer/jitter_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ type (
// provided timestamp
type JitterBuffer struct {
packets *rtpbuffer.RTPBuffer
packetFactory rtpbuffer.PacketFactoryNoOp
minStartCount uint16
lastSequence uint16
playoutHead uint16
Expand All @@ -94,7 +93,7 @@ type Stats struct {

// New will initialize a jitter buffer and its associated statistics
func New(opts ...Option) *JitterBuffer {
rtpBuffer, err := rtpbuffer.NewRTPBuffer(rtpbuffer.Uint16SizeHalf)
rtpBuffer, err := rtpbuffer.NewRTPBuffer(65535)
if err != nil || rtpBuffer == nil {
return nil
}
Expand Down Expand Up @@ -153,21 +152,20 @@ func (jb *JitterBuffer) updateStats(lastPktSeqNo uint16) {
// the data so if the memory is expected to be reused, the caller should
// take this in to account and pass a copy of the packet they wish to buffer
func (jb *JitterBuffer) Push(packet *rtp.Packet) {
if packetsLen := jb.packets.Length(); packetsLen == 0 {
if !jb.playoutReady {
jb.playoutHead = packet.SequenceNumber
}

jb.emit(StartBuffering)
} else if packetsLen > 100 {
jb.stats.overflowCount++
jb.emit(BufferOverflow)
}

jb.updateStats(packet.SequenceNumber)
retainablePkt, _ := jb.packetFactory.NewPacket(&packet.Header, packet.Payload, 0, 0)
jb.packets.Add(retainablePkt)
jb.updateState()
// if packetsLen := jb.packets.Length(); packetsLen == 0 {
// if !jb.playoutReady {
// jb.playoutHead = packet.SequenceNumber
// }

// jb.emit(StartBuffering)
// } else if packetsLen > 100 {
// jb.stats.overflowCount++
// jb.emit(BufferOverflow)
// }

//jb.updateStats(packet.SequenceNumber)

Check failure on line 166 in pkg/jitterbuffer/jitter_buffer.go

View workflow job for this annotation

GitHub Actions / lint / Go

commentFormatting: put a space between `//` and comment text (gocritic)
jb.packets.Add(rtpbuffer.NewRetainablePacketFromRTPPacket(packet))
//jb.updateState()

Check failure on line 168 in pkg/jitterbuffer/jitter_buffer.go

View workflow job for this annotation

GitHub Actions / lint / Go

commentFormatting: put a space between `//` and comment text (gocritic)
}

func (jb *JitterBuffer) emit(event Event) {
Expand Down Expand Up @@ -209,19 +207,19 @@ func (jb *JitterBuffer) Pop() (*rtp.Packet, error) {

// PopAtSequence will pop an RTP packet from the jitter buffer at the specified Sequence
func (jb *JitterBuffer) PopAtSequence(sq uint16) (*rtp.Packet, error) {
if jb.state != Emitting {
return nil, ErrPopWhileBuffering
}
// if jb.state != Emitting {
// return nil, ErrPopWhileBuffering
// }
retainablePacket := jb.packets.Get(sq)
if retainablePacket == nil {
jb.stats.underflowCount++
jb.emit(BufferUnderflow)
// jb.stats.underflowCount++
//jb.emit(BufferUnderflow)

Check failure on line 216 in pkg/jitterbuffer/jitter_buffer.go

View workflow job for this annotation

GitHub Actions / lint / Go

commentFormatting: put a space between `//` and comment text (gocritic)
return nil, ErrNotFound
}

defer retainablePacket.Release(true)
jb.playoutHead = (jb.playoutHead + 1)
jb.updateState()
// jb.playoutHead = (jb.playoutHead + 1)
// jb.updateState()
return retainablePacket.Packet(), nil
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/nack/receive_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ package nack
import (
"fmt"
"sync"

"github.com/pion/interceptor/internal/rtpbuffer"
)

const uint16SizeHalf = 1 << 15

type receiveLog struct {
packets []uint64
size uint16
Expand Down Expand Up @@ -56,7 +56,7 @@ func (s *receiveLog) add(seq uint16) {
switch {
case diff == 0:
return
case diff < rtpbuffer.Uint16SizeHalf:
case diff < uint16SizeHalf:
// this means a positive diff, in other words seq > end (with counting for rollovers)
for i := s.end + 1; i != seq; i++ {
// clear packets between end and seq (these may contain packets from a "size" ago)
Expand Down Expand Up @@ -84,7 +84,7 @@ func (s *receiveLog) get(seq uint16) bool {
defer s.m.RUnlock()

diff := s.end - seq
if diff >= rtpbuffer.Uint16SizeHalf {
if diff >= uint16SizeHalf {
return false
}

Expand All @@ -100,7 +100,7 @@ func (s *receiveLog) missingSeqNumbers(skipLastN uint16) []uint16 {
defer s.m.RUnlock()

until := s.end - skipLastN
if until-s.lastConsecutive >= rtpbuffer.Uint16SizeHalf {
if until-s.lastConsecutive >= uint16SizeHalf {
// until < s.lastConsecutive (counting for rollover)
return nil
}
Expand Down

0 comments on commit 9c84738

Please sign in to comment.