Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use out-of-order packets to calculate RTPTime for sender reports #197

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/report/sender_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type SenderInterceptor struct {
wg sync.WaitGroup
close chan struct{}
started chan struct{}

useLatestPacket bool
}

func (s *SenderInterceptor) isClosed() bool {
Expand Down Expand Up @@ -133,7 +135,7 @@ func (s *SenderInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
// will be called once per rtp packet.
func (s *SenderInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
stream := newSenderStream(info.SSRC, info.ClockRate)
stream := newSenderStream(info.SSRC, info.ClockRate, s.useLatestPacket)
s.streams.Store(info.SSRC, stream)

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, a interceptor.Attributes) (int, error) {
Expand Down
124 changes: 124 additions & 0 deletions pkg/report/sender_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,130 @@ func TestSenderInterceptor(t *testing.T) {
}, sr)
})

t.Run("out of order RTP packets", func(t *testing.T) {
mt := &test.MockTime{}
f, err := NewSenderInterceptor(
SenderInterval(time.Millisecond*50),
SenderLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
SenderNow(mt.Now),
)
assert.NoError(t, err)

i, err := f.NewInterceptor("")
assert.NoError(t, err)

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

// Write several packets
for i := 0; i < 10; i++ {
assert.NoError(t, stream.WriteRTP(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: uint16(i),
Timestamp: uint32(i),
},
Payload: []byte("\x00\x00"),
}))
}

// Skip a packet, then redeliver it out-of-order
assert.NoError(t, stream.WriteRTP(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 12,
Timestamp: 12,
},
Payload: []byte("\x00\x00"),
}))
assert.NoError(t, stream.WriteRTP(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 11,
Timestamp: 11,
},
Payload: []byte("\x00\x00"),
}))

pkts := <-stream.WrittenRTCP()
assert.Equal(t, len(pkts), 1)
sr, ok := pkts[0].(*rtcp.SenderReport)
assert.True(t, ok)
// The out-of-order packet is included in PacketCount and OctetCount, but the RTP
// timestamp of the last in-order packet is used for RTPTime
assert.Equal(t, &rtcp.SenderReport{
SSRC: 123456,
NTPTime: ntp.ToNTP(mt.Now()),
RTPTime: 12,
PacketCount: 12,
OctetCount: 24,
}, sr)
})

t.Run("out of order RTP packets with SenderUseLatestPacket", func(t *testing.T) {
mt := &test.MockTime{}
f, err := NewSenderInterceptor(
SenderInterval(time.Millisecond*50),
SenderLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
SenderNow(mt.Now),
SenderUseLatestPacket(),
)
assert.NoError(t, err)

i, err := f.NewInterceptor("")
assert.NoError(t, err)

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

// Write several packets
for i := 0; i < 10; i++ {
assert.NoError(t, stream.WriteRTP(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: uint16(i),
Timestamp: uint32(i),
},
Payload: []byte("\x00\x00"),
}))
}

// Skip a packet, then redeliver it out-of-order
assert.NoError(t, stream.WriteRTP(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 12,
Timestamp: 12,
},
Payload: []byte("\x00\x00"),
}))
assert.NoError(t, stream.WriteRTP(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 11,
Timestamp: 11,
},
Payload: []byte("\x00\x00"),
}))

pkts := <-stream.WrittenRTCP()
assert.Equal(t, len(pkts), 1)
sr, ok := pkts[0].(*rtcp.SenderReport)
assert.True(t, ok)
// The out-of-order packet *is* used for RTPTime
assert.Equal(t, &rtcp.SenderReport{
SSRC: 123456,
NTPTime: ntp.ToNTP(mt.Now()),
RTPTime: 11,
PacketCount: 12,
OctetCount: 24,
}, sr)
})

t.Run("inject ticker", func(t *testing.T) {
mNow := &test.MockTime{}
mTick := &test.MockTicker{
Expand Down
9 changes: 9 additions & 0 deletions pkg/report/sender_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func SenderTicker(f TickerFactory) SenderOption {
}
}

// SenderUseLatestPacket sets the interceptor to always use the latest packet, even
// if it appears to be out-of-order.
func SenderUseLatestPacket() SenderOption {
return func(r *SenderInterceptor) error {
r.useLatestPacket = true
return nil
}
}

// enableStartTracking is used by tests to synchronize whether the loop() has begun
// and it's safe to start sending ticks to the ticker.
func enableStartTracking(startedCh chan struct{}) SenderOption {
Expand Down
19 changes: 13 additions & 6 deletions pkg/report/sender_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,34 @@ type senderStream struct {
clockRate float64
m sync.Mutex

useLatestPacket bool

// data from rtp packets
lastRTPTimeRTP uint32
lastRTPTimeTime time.Time
lastRTPSN uint16
packetCount uint32
octetCount uint32
}

func newSenderStream(ssrc uint32, clockRate uint32) *senderStream {
func newSenderStream(ssrc uint32, clockRate uint32, useLatestPacket bool) *senderStream {
return &senderStream{
ssrc: ssrc,
clockRate: float64(clockRate),
ssrc: ssrc,
clockRate: float64(clockRate),
useLatestPacket: useLatestPacket,
}
}

func (stream *senderStream) processRTP(now time.Time, header *rtp.Header, payload []byte) {
stream.m.Lock()
defer stream.m.Unlock()

// always update time to minimize errors
stream.lastRTPTimeRTP = header.Timestamp
stream.lastRTPTimeTime = now
if stream.useLatestPacket || stream.packetCount == 0 || int16(header.SequenceNumber-stream.lastRTPSN) > 0 {
// Told to consider every packet, or this was the first packet, or it's in-order
stream.lastRTPSN = header.SequenceNumber
stream.lastRTPTimeRTP = header.Timestamp
stream.lastRTPTimeTime = now
}

stream.packetCount++
stream.octetCount += uint32(len(payload))
Expand Down