From 4c1a9d42a35204ce9ed37c559adcddf14ebe7f99 Mon Sep 17 00:00:00 2001 From: Kevin Caffrey Date: Wed, 27 Sep 2023 14:04:08 -0400 Subject: [PATCH] TWCC: Fix small receive delta rounding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prior to this change, a sequence of small receive deltas could get erroneously encoded as a series of 0s. If the actual receive deltas were {200μs, 200μs, ..., 200μs}, what was appearing in the TWCC feedback packet (after marshaling) was {0, 0, ..., 0}. (Note: the added test, without the code fixes, would fail by returning deltas of {200, 200, ..., 200}, but marshalling the feedback packet would truncate all of those to 0). To fix this, receive deltas are rounded to the closest "delta tick" period (250μs) as opposed to truncating. Additionally, `lastTimestampUS` is moved forward by the rounded delta instead of set to the timestamp of the last packet. This lets rounding errors accumulate so that a series of packets will have the correct sum of receive deltas (within the margin of error of a delta tick). --- pkg/twcc/twcc.go | 12 ++++++++--- pkg/twcc/twcc_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/pkg/twcc/twcc.go b/pkg/twcc/twcc.go index 7f7b857a..f4039987 100644 --- a/pkg/twcc/twcc.go +++ b/pkg/twcc/twcc.go @@ -154,10 +154,16 @@ func (f *feedback) getRTCP() *rtcp.TransportLayerCC { func (f *feedback) addReceived(sequenceNumber uint16, timestampUS int64) bool { deltaUS := timestampUS - f.lastTimestampUS - delta250US := deltaUS / 250 + var delta250US int64 + if deltaUS >= 0 { + delta250US = (deltaUS + rtcp.TypeTCCDeltaScaleFactor/2) / rtcp.TypeTCCDeltaScaleFactor + } else { + delta250US = (deltaUS - rtcp.TypeTCCDeltaScaleFactor/2) / rtcp.TypeTCCDeltaScaleFactor + } if delta250US < math.MinInt16 || delta250US > math.MaxInt16 { // delta doesn't fit into 16 bit, need to create new packet return false } + deltaUSRounded := delta250US * rtcp.TypeTCCDeltaScaleFactor for ; f.nextSequenceNumber != sequenceNumber; f.nextSequenceNumber++ { if !f.lastChunk.canAdd(rtcp.TypeTCCPacketNotReceived) { @@ -183,9 +189,9 @@ func (f *feedback) addReceived(sequenceNumber uint16, timestampUS int64) bool { f.lastChunk.add(recvDelta) f.deltas = append(f.deltas, &rtcp.RecvDelta{ Type: recvDelta, - Delta: deltaUS, + Delta: deltaUSRounded, }) - f.lastTimestampUS = timestampUS + f.lastTimestampUS += deltaUSRounded f.sequenceNumberCount++ f.nextSequenceNumber++ return true diff --git a/pkg/twcc/twcc_test.go b/pkg/twcc/twcc_test.go index 5ada7a00..c8291596 100644 --- a/pkg/twcc/twcc_test.go +++ b/pkg/twcc/twcc_test.go @@ -255,6 +255,53 @@ func Test_feedback(t *testing.T) { } }) + t.Run("add received small deltas", func(t *testing.T) { + f := newFeedback(0, 0, 0) + base := int64(320 * 1000) + deltaUS := int64(200) + f.setBase(5, base) + + for i := int64(0); i < 5; i++ { + got := f.addReceived(5+uint16(i+1), base+deltaUS*i) + assert.True(t, got) + } + + pkt := f.getRTCP() + + expectedDeltas := []*rtcp.RecvDelta{ + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 0, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + // NOTE: The delta is less than the scale factor, but it should be rounded up. + // (rtcp.RecvDelta).Marshal() simply truncates to an interval of the scale factor, + // so we want to make sure that the deltas have any rounding applied when building + // the feedback. + Delta: 1 * rtcp.TypeTCCDeltaScaleFactor, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 1 * rtcp.TypeTCCDeltaScaleFactor, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + // NOTE: This is zero because even though the deltas are all the same, the rounding error has + // built up enough by this packet to cause it to be rounded down. + Delta: 0 * rtcp.TypeTCCDeltaScaleFactor, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 1 * rtcp.TypeTCCDeltaScaleFactor, + }, + } + assert.Equal(t, len(expectedDeltas), len(pkt.RecvDeltas)) + for i, d := range expectedDeltas { + assert.Equal(t, d, pkt.RecvDeltas[i]) + } + }) + t.Run("add received wrapped sequence number", func(t *testing.T) { f := newFeedback(0, 0, 0) f.setBase(65535, 320*1000)