From 561796c5177c026924f7c6ac6c282794b8371a32 Mon Sep 17 00:00:00 2001 From: Rob Elsner Date: Mon, 2 Sep 2024 09:52:02 -0600 Subject: [PATCH] chore: Add test to JB receiver interceptor read Ensure reads only use length returned from upstream --- pkg/jitterbuffer/receiver_interceptor.go | 4 +- pkg/jitterbuffer/receiver_interceptor_test.go | 54 +++++++++++++++++-- 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/pkg/jitterbuffer/receiver_interceptor.go b/pkg/jitterbuffer/receiver_interceptor.go index ea654216..404db219 100644 --- a/pkg/jitterbuffer/receiver_interceptor.go +++ b/pkg/jitterbuffer/receiver_interceptor.go @@ -64,8 +64,8 @@ func NewInterceptor(opts ...ReceiverInterceptorOption) (*InterceptorFactory, err return &InterceptorFactory{opts}, nil } -// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method -// will be called once per rtp packet. +// BindRemoteStream lets you modify any incoming RTP packets. It is called once per +// RemoteStream. The returned method will be called once per rtp packet. func (i *ReceiverInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { buf := make([]byte, len(b)) diff --git a/pkg/jitterbuffer/receiver_interceptor_test.go b/pkg/jitterbuffer/receiver_interceptor_test.go index 58685966..06145d38 100644 --- a/pkg/jitterbuffer/receiver_interceptor_test.go +++ b/pkg/jitterbuffer/receiver_interceptor_test.go @@ -5,6 +5,7 @@ package jitterbuffer import ( "bytes" + "errors" "testing" "time" @@ -17,8 +18,6 @@ import ( ) func TestBufferStart(t *testing.T) { - buf := bytes.Buffer{} - factory, err := NewInterceptor( Log(logging.NewDefaultLoggerFactory().NewLogger("test")), ) @@ -27,8 +26,6 @@ func TestBufferStart(t *testing.T) { i, err := factory.NewInterceptor("") assert.NoError(t, err) - assert.Zero(t, buf.Len()) - stream := test.NewMockStream(&interceptor.StreamInfo{ SSRC: 123456, ClockRate: 90000, @@ -55,7 +52,6 @@ func TestBufferStart(t *testing.T) { } err = i.Close() assert.NoError(t, err) - assert.Zero(t, buf.Len()) } func TestReceiverBuffersAndPlaysout(t *testing.T) { @@ -96,3 +92,51 @@ func TestReceiverBuffersAndPlaysout(t *testing.T) { err = i.Close() assert.NoError(t, err) } + +type MockRTPReader struct { + readFunc func([]byte, interceptor.Attributes) (int, interceptor.Attributes, error) +} + +func (m *MockRTPReader) Read(data []byte, attrs interceptor.Attributes) (int, interceptor.Attributes, error) { + if m.readFunc != nil { + return m.readFunc(data, attrs) + } + return 0, nil, errors.New("mock function not implemented") +} + +func NewMockRTPReader(readFunc func([]byte, interceptor.Attributes) (int, interceptor.Attributes, error)) *MockRTPReader { + return &MockRTPReader{ + readFunc: readFunc, + } +} + +func TestReceiverInterceptorHonorsBufferLength(t *testing.T) { + buf := []byte{0x80, 0x88, 0xe6, 0xfd, 0x01, 0x01, 0x01, 0x01, 0x01, + 0xde, 0xad, 0xbe, 0xef, 0x01, 0x01, 0x01, 0x01, 0x01} + readBuf := make([]byte, 2048) + copy(readBuf[0:], buf) + copy(readBuf[17:], buf) + factory, err := NewInterceptor( + Log(logging.NewDefaultLoggerFactory().NewLogger("test")), + ) + assert.NoError(t, err) + + i, err := factory.NewInterceptor("") + + rtpReadFn := NewMockRTPReader(func(data []byte, attrs interceptor.Attributes) (int, interceptor.Attributes, error) { + copy(data, readBuf) + return 7, attrs, nil + }) + reader := i.BindRemoteStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, rtpReadFn) + + bufLen, _, err := reader.Read(readBuf, interceptor.Attributes{}) + assert.Contains(t, err.Error(), "7 < 12") + assert.Equal(t, 0, bufLen) + + err = i.Close() + assert.NoError(t, err) + +}