diff --git a/attributes.go b/attributes.go index d7936d52..8b6d0f5c 100644 --- a/attributes.go +++ b/attributes.go @@ -33,7 +33,7 @@ func (a Attributes) Set(key interface{}, val interface{}) { } // GetRTPHeader gets the RTP header if present. If it is not present, it will be -// unmarshalled from the raw byte slice and stored in the attribtues. +// unmarshalled from the raw byte slice and stored in the attributes. func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) { if val, ok := a[rtpHeaderKey]; ok { if header, ok := val.(*rtp.Header); ok { @@ -50,7 +50,7 @@ func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) { } // GetRTCPPackets gets the RTCP packets if present. If the packet slice is not -// present, it will be unmarshaled from the raw byte slice and stored in the +// present, it will be unmarshalled from the raw byte slice and stored in the // attributes. func (a Attributes) GetRTCPPackets(raw []byte) ([]rtcp.Packet, error) { if val, ok := a[rtcpPacketsKey]; ok { diff --git a/internal/test/mock_stream.go b/internal/test/mock_stream.go index 1b8641b4..bf96e31b 100644 --- a/internal/test/mock_stream.go +++ b/internal/test/mock_stream.go @@ -129,6 +129,9 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc for { i, _, err := s.rtpReader.Read(buf, interceptor.Attributes{}) if err != nil { + if err.Error() == "attempt to pop while buffering" { + continue + } if errors.Is(err, io.EOF) { s.rtpInModified <- RTPWithError{Err: err} } @@ -160,12 +163,12 @@ func (s *MockStream) WriteRTP(p *rtp.Packet) error { return err } -// ReceiveRTCP schedules a new rtcp batch, so it can be read be the stream +// ReceiveRTCP schedules a new rtcp batch, so it can be read by the stream func (s *MockStream) ReceiveRTCP(pkts []rtcp.Packet) { s.rtcpIn <- pkts } -// ReceiveRTP schedules a rtp packet, so it can be read be the stream +// ReceiveRTP schedules a rtp packet, so it can be read by the stream func (s *MockStream) ReceiveRTP(packet *rtp.Packet) { s.rtpIn <- packet } diff --git a/pkg/jitterbuffer/jitter_buffer.go b/pkg/jitterbuffer/jitter_buffer.go index f2f60919..976ea763 100644 --- a/pkg/jitterbuffer/jitter_buffer.go +++ b/pkg/jitterbuffer/jitter_buffer.go @@ -267,3 +267,16 @@ func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) { jb.updateState() return packet, nil } + +// Clear will empty the buffer and optionally reset the state +func (jb *JitterBuffer) Clear(resetState bool) { + jb.mutex.Lock() + defer jb.mutex.Unlock() + jb.packets.Clear() + if resetState { + jb.lastSequence = 0 + jb.state = Buffering + jb.stats = Stats{0, 0, 0} + jb.minStartCount = 50 + } +} diff --git a/pkg/jitterbuffer/jitter_buffer_test.go b/pkg/jitterbuffer/jitter_buffer_test.go index 0ed73023..52e4e7c1 100644 --- a/pkg/jitterbuffer/jitter_buffer_test.go +++ b/pkg/jitterbuffer/jitter_buffer_test.go @@ -25,7 +25,6 @@ func TestJitterBuffer(t *testing.T) { jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 512}, Payload: []byte{0x02}}) - assert.Equal(jb.lastSequence, uint16(5012)) assert.Equal(jb.stats.outOfOrderCount, uint32(1)) assert.Equal(jb.packets.Length(), uint16(4)) assert.Equal(jb.lastSequence, uint16(5012)) @@ -214,4 +213,20 @@ func TestJitterBuffer(t *testing.T) { assert.NotNil(pkt) } }) + + t.Run("Allows clearing the buffer", func(*testing.T) { + jb := New() + jb.Clear(false) + + assert.Equal(jb.lastSequence, uint16(0)) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}}) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}}) + + assert.Equal(jb.lastSequence, uint16(5002)) + jb.Clear(true) + assert.Equal(jb.lastSequence, uint16(0)) + assert.Equal(jb.stats.outOfOrderCount, uint32(0)) + assert.Equal(jb.packets.Length(), uint16(0)) + }) } diff --git a/pkg/jitterbuffer/option.go b/pkg/jitterbuffer/option.go new file mode 100644 index 00000000..9a33c22e --- /dev/null +++ b/pkg/jitterbuffer/option.go @@ -0,0 +1,19 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "github.com/pion/logging" +) + +// ReceiverInterceptorOption can be used to configure ReceiverInterceptor +type ReceiverInterceptorOption func(d *ReceiverInterceptor) error + +// Log sets a logger for the interceptor +func Log(log logging.LeveledLogger) ReceiverInterceptorOption { + return func(d *ReceiverInterceptor) error { + d.log = log + return nil + } +} diff --git a/pkg/jitterbuffer/priority_queue.go b/pkg/jitterbuffer/priority_queue.go index 366ff10a..7454b521 100644 --- a/pkg/jitterbuffer/priority_queue.go +++ b/pkg/jitterbuffer/priority_queue.go @@ -173,3 +173,13 @@ func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) { } return nil, ErrNotFound } + +// Clear will empty a PriorityQueue +func (q *PriorityQueue) Clear() { + next := q.next + q.length = 0 + for next != nil { + next.prev = nil + next = next.next + } +} diff --git a/pkg/jitterbuffer/priority_queue_test.go b/pkg/jitterbuffer/priority_queue_test.go index 44d0e010..f7d08a74 100644 --- a/pkg/jitterbuffer/priority_queue_test.go +++ b/pkg/jitterbuffer/priority_queue_test.go @@ -101,3 +101,18 @@ func TestPriorityQueue_Find(t *testing.T) { _, err = packets.Find(1001) assert.Error(t, err) } + +func TestPriorityQueue_Clean(t *testing.T) { + packets := NewQueue() + packets.Clear() + packets.Push(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 1000, + Timestamp: 5, + SSRC: 5, + }, + Payload: []uint8{0xA}, + }, 1000) + assert.EqualValues(t, 1, packets.Length()) + packets.Clear() +} diff --git a/pkg/jitterbuffer/receiver_interceptor.go b/pkg/jitterbuffer/receiver_interceptor.go new file mode 100644 index 00000000..b4c032b9 --- /dev/null +++ b/pkg/jitterbuffer/receiver_interceptor.go @@ -0,0 +1,110 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "sync" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtp" +) + +// InterceptorFactory is a interceptor.Factory for a GeneratorInterceptor +type InterceptorFactory struct { + opts []ReceiverInterceptorOption +} + +// NewInterceptor constructs a new ReceiverInterceptor +func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) { + i := &ReceiverInterceptor{ + close: make(chan struct{}), + log: logging.NewDefaultLoggerFactory().NewLogger("jitterbuffer"), + buffer: New(), + } + + for _, opt := range g.opts { + if err := opt(i); err != nil { + return nil, err + } + } + + return i, nil +} + +// ReceiverInterceptor places a JitterBuffer in the chain to smooth packet arrival +// and allow for network jitter +// +// The Interceptor is designed to fit in a RemoteStream +// pipeline and buffer incoming packets for a short period (currently +// defaulting to 50 packets) before emitting packets to be consumed by the +// next step in the pipeline. +// +// The caller must ensure they are prepared to handle an +// ErrPopWhileBuffering in the case that insufficient packets have been +// received by the jitter buffer. The caller should retry the operation +// at some point later as the buffer may have been filled in the interim. +// +// The caller should also be aware that an ErrBufferUnderrun may be +// returned in the case that the initial buffering was sufficient and +// playback began but the caller is consuming packets (or they are not +// arriving) quickly enough. +type ReceiverInterceptor struct { + interceptor.NoOp + buffer *JitterBuffer + m sync.Mutex + wg sync.WaitGroup + close chan struct{} + log logging.LeveledLogger +} + +// NewInterceptor returns a new InterceptorFactory +func NewInterceptor(opts ...ReceiverInterceptorOption) (*InterceptorFactory, error) { + return &InterceptorFactory{opts}, nil +} + +// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method +// will be called once per rtp packet. +func (i *ReceiverInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + buf := make([]byte, len(b)) + n, attr, err := reader.Read(buf, a) + if err != nil { + return n, attr, err + } + packet := &rtp.Packet{} + if err := packet.Unmarshal(buf); err != nil { + return 0, nil, err + } + i.m.Lock() + defer i.m.Unlock() + i.buffer.Push(packet) + if i.buffer.state == Emitting { + newPkt, err := i.buffer.Pop() + if err != nil { + return 0, nil, err + } + nlen, err := newPkt.MarshalTo(b) + return nlen, attr, err + } + return n, attr, ErrPopWhileBuffering + }) +} + +// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track. +func (i *ReceiverInterceptor) UnbindRemoteStream(_ *interceptor.StreamInfo) { + defer i.wg.Wait() + i.m.Lock() + defer i.m.Unlock() + i.buffer.Clear(true) +} + +// Close closes the interceptor +func (i *ReceiverInterceptor) Close() error { + defer i.wg.Wait() + i.m.Lock() + defer i.m.Unlock() + i.buffer.Clear(true) + return nil +} diff --git a/pkg/jitterbuffer/receiver_interceptor_test.go b/pkg/jitterbuffer/receiver_interceptor_test.go new file mode 100644 index 00000000..58685966 --- /dev/null +++ b/pkg/jitterbuffer/receiver_interceptor_test.go @@ -0,0 +1,98 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "bytes" + "testing" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/internal/test" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestBufferStart(t *testing.T) { + buf := bytes.Buffer{} + + factory, err := NewInterceptor( + Log(logging.NewDefaultLoggerFactory().NewLogger("test")), + ) + assert.NoError(t, err) + + i, err := factory.NewInterceptor("") + assert.NoError(t, err) + + assert.Zero(t, buf.Len()) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{ + SenderSSRC: 123, + MediaSSRC: 456, + }}) + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: uint16(0), + }}) + + // Give time for packets to be handled and stream written to. + time.Sleep(50 * time.Millisecond) + select { + case pkt := <-stream.ReadRTP(): + assert.EqualValues(t, nil, pkt) + default: + // No data ready to read, this is what we expect + } + err = i.Close() + assert.NoError(t, err) + assert.Zero(t, buf.Len()) +} + +func TestReceiverBuffersAndPlaysout(t *testing.T) { + buf := bytes.Buffer{} + + factory, err := NewInterceptor( + Log(logging.NewDefaultLoggerFactory().NewLogger("test")), + ) + assert.NoError(t, err) + + i, err := factory.NewInterceptor("") + assert.NoError(t, err) + + assert.EqualValues(t, 0, buf.Len()) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + + stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{ + SenderSSRC: 123, + MediaSSRC: 456, + }}) + for s := 0; s < 61; s++ { + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: uint16(s), + }}) + } + // Give time for packets to be handled and stream written to. + time.Sleep(50 * time.Millisecond) + for s := 0; s < 10; s++ { + read := <-stream.ReadRTP() + seq := read.Packet.Header.SequenceNumber + assert.EqualValues(t, uint16(s), seq) + } + assert.NoError(t, stream.Close()) + err = i.Close() + assert.NoError(t, err) +}