Skip to content

Commit

Permalink
Don't use out-of-order packets in sender reports
Browse files Browse the repository at this point in the history
If a sender report is generated immediately after sending an
out-of-order packet (which can happen e.g. in an SFU when forwarding
media), the timestamp from the last in-order packet should be
extrapolated, since the departure time of an out-of-order packet is is
not properly correlated with its RTP timestamp.

Leave an option to re-enable the old behavior.
  • Loading branch information
cptpcrd authored and adriancable committed Sep 29, 2023
1 parent 38fe7f5 commit 9ef04d4
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 7 deletions.
4 changes: 3 additions & 1 deletion pkg/report/sender_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type SenderInterceptor struct {
wg sync.WaitGroup
close chan struct{}
started chan struct{}

useLatestPacket bool
}

func (s *SenderInterceptor) isClosed() bool {
Expand Down Expand Up @@ -133,7 +135,7 @@ func (s *SenderInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
// will be called once per rtp packet.
func (s *SenderInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
stream := newSenderStream(info.SSRC, info.ClockRate)
stream := newSenderStream(info.SSRC, info.ClockRate, s.useLatestPacket)
s.streams.Store(info.SSRC, stream)

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, a interceptor.Attributes) (int, error) {
Expand Down
124 changes: 124 additions & 0 deletions pkg/report/sender_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,130 @@ func TestSenderInterceptor(t *testing.T) {
}, sr)
})

t.Run("out of order RTP packets", func(t *testing.T) {
mt := &test.MockTime{}
f, err := NewSenderInterceptor(
SenderInterval(time.Millisecond*50),
SenderLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
SenderNow(mt.Now),
)
assert.NoError(t, err)

i, err := f.NewInterceptor("")
assert.NoError(t, err)

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

// Write several packets
for i := 0; i < 10; i++ {
assert.NoError(t, stream.WriteRTP(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: uint16(i),
Timestamp: uint32(i),
},
Payload: []byte("\x00\x00"),
}))
}

// Skip a packet, then redeliver it out-of-order
assert.NoError(t, stream.WriteRTP(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 12,
Timestamp: 12,
},
Payload: []byte("\x00\x00"),
}))
assert.NoError(t, stream.WriteRTP(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 11,
Timestamp: 11,
},
Payload: []byte("\x00\x00"),
}))

pkts := <-stream.WrittenRTCP()
assert.Equal(t, len(pkts), 1)
sr, ok := pkts[0].(*rtcp.SenderReport)
assert.True(t, ok)
// The out-of-order packet is included in PacketCount and OctetCount, but the RTP
// timestamp of the last in-order packet is used for RTPTime
assert.Equal(t, &rtcp.SenderReport{
SSRC: 123456,
NTPTime: ntp.ToNTP(mt.Now()),
RTPTime: 12,
PacketCount: 12,
OctetCount: 24,
}, sr)
})

t.Run("out of order RTP packets with SenderUseLatestPacket", func(t *testing.T) {
mt := &test.MockTime{}
f, err := NewSenderInterceptor(
SenderInterval(time.Millisecond*50),
SenderLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
SenderNow(mt.Now),
SenderUseLatestPacket(),
)
assert.NoError(t, err)

i, err := f.NewInterceptor("")
assert.NoError(t, err)

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

// Write several packets
for i := 0; i < 10; i++ {
assert.NoError(t, stream.WriteRTP(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: uint16(i),
Timestamp: uint32(i),
},
Payload: []byte("\x00\x00"),
}))
}

// Skip a packet, then redeliver it out-of-order
assert.NoError(t, stream.WriteRTP(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 12,
Timestamp: 12,
},
Payload: []byte("\x00\x00"),
}))
assert.NoError(t, stream.WriteRTP(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 11,
Timestamp: 11,
},
Payload: []byte("\x00\x00"),
}))

pkts := <-stream.WrittenRTCP()
assert.Equal(t, len(pkts), 1)
sr, ok := pkts[0].(*rtcp.SenderReport)
assert.True(t, ok)
// The out-of-order packet *is* used for RTPTime
assert.Equal(t, &rtcp.SenderReport{
SSRC: 123456,
NTPTime: ntp.ToNTP(mt.Now()),
RTPTime: 11,
PacketCount: 12,
OctetCount: 24,
}, sr)
})

t.Run("inject ticker", func(t *testing.T) {
mNow := &test.MockTime{}
mTick := &test.MockTicker{
Expand Down
9 changes: 9 additions & 0 deletions pkg/report/sender_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func SenderTicker(f TickerFactory) SenderOption {
}
}

// SenderUseLatestPacket sets the interceptor to always use the latest packet, even
// if it appears to be out-of-order.
func SenderUseLatestPacket() SenderOption {
return func(r *SenderInterceptor) error {
r.useLatestPacket = true
return nil
}
}

// enableStartTracking is used by tests to synchronize whether the loop() has begun
// and it's safe to start sending ticks to the ticker.
func enableStartTracking(startedCh chan struct{}) SenderOption {
Expand Down
19 changes: 13 additions & 6 deletions pkg/report/sender_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,34 @@ type senderStream struct {
clockRate float64
m sync.Mutex

useLatestPacket bool

// data from rtp packets
lastRTPTimeRTP uint32
lastRTPTimeTime time.Time
lastRTPSN uint16
packetCount uint32
octetCount uint32
}

func newSenderStream(ssrc uint32, clockRate uint32) *senderStream {
func newSenderStream(ssrc uint32, clockRate uint32, useLatestPacket bool) *senderStream {
return &senderStream{
ssrc: ssrc,
clockRate: float64(clockRate),
ssrc: ssrc,
clockRate: float64(clockRate),
useLatestPacket: useLatestPacket,
}
}

func (stream *senderStream) processRTP(now time.Time, header *rtp.Header, payload []byte) {
stream.m.Lock()
defer stream.m.Unlock()

// always update time to minimize errors
stream.lastRTPTimeRTP = header.Timestamp
stream.lastRTPTimeTime = now
if stream.useLatestPacket || stream.packetCount == 0 || int16(header.SequenceNumber-stream.lastRTPSN) > 0 {
// Told to consider every packet, or this was the first packet, or it's in-order
stream.lastRTPSN = header.SequenceNumber
stream.lastRTPTimeRTP = header.Timestamp
stream.lastRTPTimeTime = now
}

stream.packetCount++
stream.octetCount += uint32(len(payload))
Expand Down

0 comments on commit 9ef04d4

Please sign in to comment.