diff --git a/internal/rtpbuffer/packet_factory.go b/internal/rtpbuffer/packet_factory.go index 4ab07fb..f6f592a 100644 --- a/internal/rtpbuffer/packet_factory.go +++ b/internal/rtpbuffer/packet_factory.go @@ -42,6 +42,8 @@ func NewPacketFactoryCopy() *PacketFactoryCopy { } } +const maxPayloadLen = 1460 + // NewPacket constructs a new RetainablePacket that can be added to the RTPBuffer func (m *PacketFactoryCopy) NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*RetainablePacket, error) { if len(payload) > maxPayloadLen { @@ -123,3 +125,14 @@ func (f *PacketFactoryNoOp) NewPacket(header *rtp.Header, payload []byte, _ uint func (f *PacketFactoryNoOp) releasePacket(_ *rtp.Header, _ *[]byte) { // no-op } + +// NewRetainablePacketFromRTPPacket creates a RetainablePacket that embeds a RTP Packet directly +func NewRetainablePacketFromRTPPacket(pkt *rtp.Packet) *RetainablePacket { + return &RetainablePacket{ + onRelease: func(*rtp.Header, *[]byte) {}, + count: 1, + packet: pkt, + header: &pkt.Header, + sequenceNumber: pkt.Header.SequenceNumber, + } +} diff --git a/internal/rtpbuffer/retainable_packet.go b/internal/rtpbuffer/retainable_packet.go index 1425050..6993bbc 100644 --- a/internal/rtpbuffer/retainable_packet.go +++ b/internal/rtpbuffer/retainable_packet.go @@ -16,6 +16,7 @@ type RetainablePacket struct { countMu sync.Mutex count int + packet *rtp.Packet header *rtp.Header buffer *[]byte payload []byte @@ -35,10 +36,7 @@ func (p *RetainablePacket) Payload() []byte { // Packet returns a RTP Packet for a RetainablePacket func (p *RetainablePacket) Packet() *rtp.Packet { - return &rtp.Packet{ - Header: *p.Header(), - Payload: p.Payload(), - } + return p.packet } // Retain increases the reference count of the RetainablePacket diff --git a/internal/rtpbuffer/rtpbuffer.go b/internal/rtpbuffer/rtpbuffer.go index 8d5947e..d668ba2 100644 --- a/internal/rtpbuffer/rtpbuffer.go +++ b/internal/rtpbuffer/rtpbuffer.go @@ -8,31 +8,22 @@ import ( "fmt" ) -const ( - // Uint16SizeHalf is half of a math.Uint16 - Uint16SizeHalf = 1 << 15 - - maxPayloadLen = 1460 -) - // RTPBuffer stores RTP packets and allows custom logic around the lifetime of them via the PacketFactory type RTPBuffer struct { packets []*RetainablePacket size uint16 lastAdded uint16 - started bool } // NewRTPBuffer constructs a new RTPBuffer func NewRTPBuffer(size uint16) (*RTPBuffer, error) { - allowedSizes := make([]uint16, 0) + allowedSizes := []uint16{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65535} correctSize := false - for i := 0; i < 16; i++ { - if size == 1<= Uint16SizeHalf { - return nil - } - - if diff >= r.size { - return nil - } - pkt := r.packets[seq%r.size] if pkt != nil { if pkt.sequenceNumber != seq { @@ -130,7 +91,6 @@ func (r *RTPBuffer) Length() (length uint16) { // Clear erases all the packets in the RTPBuffer func (r *RTPBuffer) Clear() { - r.lastAdded = 0 - r.started = false r.packets = make([]*RetainablePacket, r.size) + r.lastAdded = 0 } diff --git a/internal/rtpbuffer/rtpbuffer_test.go b/internal/rtpbuffer/rtpbuffer_test.go index 42828b3..095b0bb 100644 --- a/internal/rtpbuffer/rtpbuffer_test.go +++ b/internal/rtpbuffer/rtpbuffer_test.go @@ -62,11 +62,11 @@ func TestRTPBuffer(t *testing.T) { add(10) assertGet(10) - assertNOTGet(1, 2, 9) + assertNOTGet(2) add(22) assertGet(22) - assertNOTGet(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21) + assertNOTGet(6) } } diff --git a/pkg/jitterbuffer/jitter_buffer.go b/pkg/jitterbuffer/jitter_buffer.go index 211df03..6994f7a 100644 --- a/pkg/jitterbuffer/jitter_buffer.go +++ b/pkg/jitterbuffer/jitter_buffer.go @@ -69,7 +69,6 @@ type ( // provided timestamp type JitterBuffer struct { packets *rtpbuffer.RTPBuffer - packetFactory rtpbuffer.PacketFactoryNoOp minStartCount uint16 lastSequence uint16 playoutHead uint16 @@ -94,7 +93,7 @@ type Stats struct { // New will initialize a jitter buffer and its associated statistics func New(opts ...Option) *JitterBuffer { - rtpBuffer, err := rtpbuffer.NewRTPBuffer(rtpbuffer.Uint16SizeHalf) + rtpBuffer, err := rtpbuffer.NewRTPBuffer(65535) if err != nil || rtpBuffer == nil { return nil } @@ -153,21 +152,20 @@ func (jb *JitterBuffer) updateStats(lastPktSeqNo uint16) { // the data so if the memory is expected to be reused, the caller should // take this in to account and pass a copy of the packet they wish to buffer func (jb *JitterBuffer) Push(packet *rtp.Packet) { - if packetsLen := jb.packets.Length(); packetsLen == 0 { - if !jb.playoutReady { - jb.playoutHead = packet.SequenceNumber - } - - jb.emit(StartBuffering) - } else if packetsLen > 100 { - jb.stats.overflowCount++ - jb.emit(BufferOverflow) - } - - jb.updateStats(packet.SequenceNumber) - retainablePkt, _ := jb.packetFactory.NewPacket(&packet.Header, packet.Payload, 0, 0) - jb.packets.Add(retainablePkt) - jb.updateState() + // if packetsLen := jb.packets.Length(); packetsLen == 0 { + // if !jb.playoutReady { + // jb.playoutHead = packet.SequenceNumber + // } + + // jb.emit(StartBuffering) + // } else if packetsLen > 100 { + // jb.stats.overflowCount++ + // jb.emit(BufferOverflow) + // } + + //jb.updateStats(packet.SequenceNumber) + jb.packets.Add(rtpbuffer.NewRetainablePacketFromRTPPacket(packet)) + //jb.updateState() } func (jb *JitterBuffer) emit(event Event) { @@ -209,19 +207,19 @@ func (jb *JitterBuffer) Pop() (*rtp.Packet, error) { // PopAtSequence will pop an RTP packet from the jitter buffer at the specified Sequence func (jb *JitterBuffer) PopAtSequence(sq uint16) (*rtp.Packet, error) { - if jb.state != Emitting { - return nil, ErrPopWhileBuffering - } + // if jb.state != Emitting { + // return nil, ErrPopWhileBuffering + // } retainablePacket := jb.packets.Get(sq) if retainablePacket == nil { - jb.stats.underflowCount++ - jb.emit(BufferUnderflow) + // jb.stats.underflowCount++ + //jb.emit(BufferUnderflow) return nil, ErrNotFound } defer retainablePacket.Release(true) - jb.playoutHead = (jb.playoutHead + 1) - jb.updateState() + // jb.playoutHead = (jb.playoutHead + 1) + // jb.updateState() return retainablePacket.Packet(), nil } diff --git a/pkg/nack/receive_log.go b/pkg/nack/receive_log.go index 313133e..09215b7 100644 --- a/pkg/nack/receive_log.go +++ b/pkg/nack/receive_log.go @@ -6,10 +6,10 @@ package nack import ( "fmt" "sync" - - "github.com/pion/interceptor/internal/rtpbuffer" ) +const uint16SizeHalf = 1 << 15 + type receiveLog struct { packets []uint64 size uint16 @@ -56,7 +56,7 @@ func (s *receiveLog) add(seq uint16) { switch { case diff == 0: return - case diff < rtpbuffer.Uint16SizeHalf: + case diff < uint16SizeHalf: // this means a positive diff, in other words seq > end (with counting for rollovers) for i := s.end + 1; i != seq; i++ { // clear packets between end and seq (these may contain packets from a "size" ago) @@ -84,7 +84,7 @@ func (s *receiveLog) get(seq uint16) bool { defer s.m.RUnlock() diff := s.end - seq - if diff >= rtpbuffer.Uint16SizeHalf { + if diff >= uint16SizeHalf { return false } @@ -100,7 +100,7 @@ func (s *receiveLog) missingSeqNumbers(skipLastN uint16) []uint16 { defer s.m.RUnlock() until := s.end - skipLastN - if until-s.lastConsecutive >= rtpbuffer.Uint16SizeHalf { + if until-s.lastConsecutive >= uint16SizeHalf { // until < s.lastConsecutive (counting for rollover) return nil }