From 567f61b595e3eec1469d88d50a8275769e7df67d Mon Sep 17 00:00:00 2001 From: cptpcrd <31829097+cptpcrd@users.noreply.github.com> Date: Wed, 26 Jul 2023 14:33:00 -0400 Subject: [PATCH] Don't use out-of-order packets in sender reports If a sender report is generated immediately after sending an out-of-order packet (which can happen e.g. in an SFU when forwarding media), the timestamp from the last in-order packet should be extrapolated, since the departure time of an out-of-order packet is is not properly correlated with its RTP timestamp. Leave an option to re-enable the old behavior. --- pkg/report/sender_interceptor.go | 4 +- pkg/report/sender_interceptor_test.go | 124 ++++++++++++++++++++++++++ pkg/report/sender_option.go | 9 ++ pkg/report/sender_stream.go | 19 ++-- 4 files changed, 149 insertions(+), 7 deletions(-) diff --git a/pkg/report/sender_interceptor.go b/pkg/report/sender_interceptor.go index 508d7e9b..087838e7 100644 --- a/pkg/report/sender_interceptor.go +++ b/pkg/report/sender_interceptor.go @@ -59,6 +59,8 @@ type SenderInterceptor struct { wg sync.WaitGroup close chan struct{} started chan struct{} + + useLatestPacket bool } func (s *SenderInterceptor) isClosed() bool { @@ -133,7 +135,7 @@ func (s *SenderInterceptor) loop(rtcpWriter interceptor.RTCPWriter) { // BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method // will be called once per rtp packet. func (s *SenderInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { - stream := newSenderStream(info.SSRC, info.ClockRate) + stream := newSenderStream(info.SSRC, info.ClockRate, s.useLatestPacket) s.streams.Store(info.SSRC, stream) return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, a interceptor.Attributes) (int, error) { diff --git a/pkg/report/sender_interceptor_test.go b/pkg/report/sender_interceptor_test.go index cc034fba..930be6dd 100644 --- a/pkg/report/sender_interceptor_test.go +++ b/pkg/report/sender_interceptor_test.go @@ -92,6 +92,130 @@ func TestSenderInterceptor(t *testing.T) { }, sr) }) + t.Run("out of order RTP packets", func(t *testing.T) { + mt := &test.MockTime{} + f, err := NewSenderInterceptor( + SenderInterval(time.Millisecond*50), + SenderLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + SenderNow(mt.Now), + ) + assert.NoError(t, err) + + i, err := f.NewInterceptor("") + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + // Write several packets + for i := 0; i < 10; i++ { + assert.NoError(t, stream.WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: uint16(i), + Timestamp: uint32(i), + }, + Payload: []byte("\x00\x00"), + })) + } + + // Skip a packet, then redeliver it out-of-order + assert.NoError(t, stream.WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 12, + Timestamp: 12, + }, + Payload: []byte("\x00\x00"), + })) + assert.NoError(t, stream.WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 11, + Timestamp: 11, + }, + Payload: []byte("\x00\x00"), + })) + + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + sr, ok := pkts[0].(*rtcp.SenderReport) + assert.True(t, ok) + // The out-of-order packet is included in PacketCount and OctetCount, but the RTP + // timestamp of the last in-order packet is used for RTPTime + assert.Equal(t, &rtcp.SenderReport{ + SSRC: 123456, + NTPTime: ntp.ToNTP(mt.Now()), + RTPTime: 12, + PacketCount: 12, + OctetCount: 24, + }, sr) + }) + + t.Run("out of order RTP packets with SenderUseLatestPacket", func(t *testing.T) { + mt := &test.MockTime{} + f, err := NewSenderInterceptor( + SenderInterval(time.Millisecond*50), + SenderLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + SenderNow(mt.Now), + SenderUseLatestPacket(), + ) + assert.NoError(t, err) + + i, err := f.NewInterceptor("") + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + // Write several packets + for i := 0; i < 10; i++ { + assert.NoError(t, stream.WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: uint16(i), + Timestamp: uint32(i), + }, + Payload: []byte("\x00\x00"), + })) + } + + // Skip a packet, then redeliver it out-of-order + assert.NoError(t, stream.WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 12, + Timestamp: 12, + }, + Payload: []byte("\x00\x00"), + })) + assert.NoError(t, stream.WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 11, + Timestamp: 11, + }, + Payload: []byte("\x00\x00"), + })) + + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + sr, ok := pkts[0].(*rtcp.SenderReport) + assert.True(t, ok) + // The out-of-order packet *is* used for RTPTime + assert.Equal(t, &rtcp.SenderReport{ + SSRC: 123456, + NTPTime: ntp.ToNTP(mt.Now()), + RTPTime: 11, + PacketCount: 12, + OctetCount: 24, + }, sr) + }) + t.Run("inject ticker", func(t *testing.T) { mNow := &test.MockTime{} mTick := &test.MockTicker{ diff --git a/pkg/report/sender_option.go b/pkg/report/sender_option.go index 50abafbf..1e489b0b 100644 --- a/pkg/report/sender_option.go +++ b/pkg/report/sender_option.go @@ -44,6 +44,15 @@ func SenderTicker(f TickerFactory) SenderOption { } } +// SenderUseLatestPacket sets the interceptor to always use the latest packet, even +// if it appears to be out-of-order. +func SenderUseLatestPacket() SenderOption { + return func(r *SenderInterceptor) error { + r.useLatestPacket = true + return nil + } +} + // enableStartTracking is used by tests to synchronize whether the loop() has begun // and it's safe to start sending ticks to the ticker. func enableStartTracking(startedCh chan struct{}) SenderOption { diff --git a/pkg/report/sender_stream.go b/pkg/report/sender_stream.go index 29d14da4..6c708bda 100644 --- a/pkg/report/sender_stream.go +++ b/pkg/report/sender_stream.go @@ -17,17 +17,21 @@ type senderStream struct { clockRate float64 m sync.Mutex + useLatestPacket bool + // data from rtp packets lastRTPTimeRTP uint32 lastRTPTimeTime time.Time + lastRTPSN uint16 packetCount uint32 octetCount uint32 } -func newSenderStream(ssrc uint32, clockRate uint32) *senderStream { +func newSenderStream(ssrc uint32, clockRate uint32, useLatestPacket bool) *senderStream { return &senderStream{ - ssrc: ssrc, - clockRate: float64(clockRate), + ssrc: ssrc, + clockRate: float64(clockRate), + useLatestPacket: useLatestPacket, } } @@ -35,9 +39,12 @@ func (stream *senderStream) processRTP(now time.Time, header *rtp.Header, payloa stream.m.Lock() defer stream.m.Unlock() - // always update time to minimize errors - stream.lastRTPTimeRTP = header.Timestamp - stream.lastRTPTimeTime = now + if stream.useLatestPacket || stream.packetCount == 0 || int16(header.SequenceNumber-stream.lastRTPSN) > 0 { + // Told to consider every packet, or this was the first packet, or it's in-order + stream.lastRTPSN = header.SequenceNumber + stream.lastRTPTimeRTP = header.Timestamp + stream.lastRTPTimeTime = now + } stream.packetCount++ stream.octetCount += uint32(len(payload))