From 6793c6cb13c99db0032b09bca75269db8b0b383a Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Fri, 5 Nov 2021 10:23:26 +0100 Subject: [PATCH] Fix twcc feedback adapter Now only includes packet feedback for packets that were received. Previously, in flight packets could be considered lost. --- pkg/cc/feedback_adapter.go | 138 ++++++------- pkg/cc/feedback_adapter_test.go | 330 +++++++++++++++++++++++++++++--- pkg/cc/interceptor.go | 11 +- 3 files changed, 373 insertions(+), 106 deletions(-) diff --git a/pkg/cc/feedback_adapter.go b/pkg/cc/feedback_adapter.go index 4a7daae6..32ce967e 100644 --- a/pkg/cc/feedback_adapter.go +++ b/pkg/cc/feedback_adapter.go @@ -12,6 +12,7 @@ import ( ) var errMissingTWCCExtension = errors.New("missing transport layer cc header extension") +var errInvalidFeedbackPacket = errors.New("got invalid feedback packet") // TODO(mathis): make types internal only? @@ -52,116 +53,91 @@ func (f *FeedbackAdapter) OnSent(ts time.Time, pkt *rtp.Packet, attributes inter // OnIncomingTransportCC converts the incoming rtcp.TransportLayerCC to a // []PacketResult -func (f *FeedbackAdapter) OnIncomingTransportCC(ts time.Time, feedback *rtcp.TransportLayerCC) []types.PacketResult { +func (f *FeedbackAdapter) OnIncomingTransportCC(feedback *rtcp.TransportLayerCC) ([]types.PacketResult, error) { result := []types.PacketResult{} - baseSequenceNr := feedback.BaseSequenceNumber - sequenceNr := baseSequenceNr - nextRecvDelta := 0 - referenceTime := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond) // TODO(mathis): Use/save reference time globally? + packetStatusCount := uint16(0) + chunkIndex := 0 + deltaIndex := 0 + referenceTime := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond) - for _, packetChunkInterface := range feedback.PacketChunks { - switch packetChunk := packetChunkInterface.(type) { + for packetStatusCount < feedback.PacketStatusCount { + if chunkIndex >= len(feedback.PacketChunks) || len(feedback.PacketChunks) == 0 { + return nil, errInvalidFeedbackPacket + } + switch packetChunk := feedback.PacketChunks[chunkIndex].(type) { case *rtcp.RunLengthChunk: symbol := packetChunk.PacketStatusSymbol for i := uint16(0); i < packetChunk.RunLength; i++ { - if sent, ok := f.history[sequenceNr]; ok { - var delta time.Duration - - switch symbol { - case rtcp.TypeTCCPacketReceivedSmallDelta: - fallthrough - case rtcp.TypeTCCPacketReceivedLargeDelta: - if nextRecvDelta > len(feedback.RecvDeltas)-1 { + if sentPacket, ok := f.history[feedback.BaseSequenceNumber+packetStatusCount]; ok { + if symbol == rtcp.TypeTCCPacketReceivedSmallDelta || + symbol == rtcp.TypeTCCPacketReceivedLargeDelta { + if deltaIndex >= len(feedback.RecvDeltas) { // TODO(mathis): Not enough recv deltas for number - // of received packets: warn? + // of received packets: warn or error? continue } - recvDelta := feedback.RecvDeltas[nextRecvDelta] - nextRecvDelta++ - - switch recvDelta.Type { - case rtcp.TypeTCCPacketReceivedSmallDelta: - delta = time.Duration(recvDelta.Delta) * 250 * time.Microsecond - - case rtcp.TypeTCCPacketReceivedLargeDelta: - // TODO(mathis): This should not happen: symbol is - // SmallDelta, but recvDelta is largeDelta - // Warn? - delta = time.Duration(recvDelta.Delta) * time.Millisecond - - default: - // TODO(mathis): Should not happne, error invalid - // receive delta? - } - receiveTime := referenceTime.Add(delta) + receiveTime := getReceiveTime(referenceTime, feedback.RecvDeltas[deltaIndex]) referenceTime = receiveTime - result = append(result, types.PacketResult{ - SentPacket: sent, + SentPacket: sentPacket, ReceiveTime: receiveTime, Received: true, }) - delete(f.history, sequenceNr) + deltaIndex++ + } else { + result = append(result, types.PacketResult{ + SentPacket: sentPacket, + ReceiveTime: time.Time{}, + Received: false, + }) } + } else { + // TODO(mathis): got feedback for unsent packet? } - sequenceNr++ + packetStatusCount++ } + chunkIndex++ case *rtcp.StatusVectorChunk: for _, symbol := range packetChunk.SymbolList { - if sent, ok := f.history[sequenceNr]; ok { - var delta time.Duration - - switch symbol { - case rtcp.TypeTCCPacketReceivedSmallDelta: - fallthrough - case rtcp.TypeTCCPacketReceivedLargeDelta: - if nextRecvDelta > len(feedback.RecvDeltas)-1 { + if sentPacket, ok := f.history[feedback.BaseSequenceNumber+packetStatusCount]; ok { + if symbol == rtcp.TypeTCCPacketReceivedSmallDelta || + symbol == rtcp.TypeTCCPacketReceivedLargeDelta { + if deltaIndex >= len(feedback.RecvDeltas) { // TODO(mathis): Not enough recv deltas for number - // of received packets: warn? + // of received packets: warn or error? continue } - recvDelta := feedback.RecvDeltas[nextRecvDelta] - nextRecvDelta++ - - switch recvDelta.Type { - case rtcp.TypeTCCPacketReceivedSmallDelta: - delta = time.Duration(recvDelta.Delta) * 250 * time.Microsecond - - case rtcp.TypeTCCPacketReceivedLargeDelta: - // TODO(mathis): This should not happen: symbol is - // SmallDelta, but recvDelta is largeDelta - // Warn? - delta = time.Duration(recvDelta.Delta) * time.Millisecond - - default: - // TODO(mathis): Should not happne, error invalid - // receive delta? - } - receiveTime := referenceTime.Add(delta) + receiveTime := getReceiveTime(referenceTime, feedback.RecvDeltas[deltaIndex]) referenceTime = receiveTime - result = append(result, types.PacketResult{ - SentPacket: sent, + SentPacket: sentPacket, ReceiveTime: receiveTime, Received: true, }) - delete(f.history, sequenceNr) + deltaIndex++ + } else { + result = append(result, types.PacketResult{ + SentPacket: sentPacket, + ReceiveTime: time.Time{}, + Received: false, + }) } } - sequenceNr++ + packetStatusCount++ + if packetStatusCount >= feedback.PacketStatusCount { + break + } } + chunkIndex++ } } + return result, nil +} - for _, v := range sortedKeysUint16(f.history) { - result = append(result, types.PacketResult{ - SentPacket: f.history[v], - ReceiveTime: time.Time{}, - Received: false, - }) - } - return result +// OnIncomingRFC8888 converts the incoming RFC8888 packet to a []PacketResult +func (f *FeedbackAdapter) OnIncomingRFC8888(feedback *rtcp.RawPacket) ([]types.PacketResult, error) { + return nil, nil } func sortedKeysUint16(m map[uint16]types.SentPacket) []uint16 { @@ -175,7 +151,9 @@ func sortedKeysUint16(m map[uint16]types.SentPacket) []uint16 { return result } -// OnIncomingRFC8888 converts the incoming RFC8888 packet to a []PacketResult -func (f *FeedbackAdapter) OnIncomingRFC8888(ts time.Time, feedback *rtcp.RawPacket) []types.PacketResult { - return nil +func getReceiveTime(baseTime time.Time, delta *rtcp.RecvDelta) time.Time { + if delta.Type == rtcp.TypeTCCPacketReceivedSmallDelta { + return baseTime.Add(time.Duration(delta.Delta) * 250 * time.Microsecond) + } + return baseTime.Add(time.Duration(delta.Delta) * time.Millisecond) } diff --git a/pkg/cc/feedback_adapter_test.go b/pkg/cc/feedback_adapter_test.go index c6136516..4bfb3725 100644 --- a/pkg/cc/feedback_adapter_test.go +++ b/pkg/cc/feedback_adapter_test.go @@ -30,26 +30,11 @@ func getPacketWithTransportCCExt(t *testing.T, SequenceNumber uint16) *rtp.Packe func TestFeedbackAdapterTWCC(t *testing.T) { t.Run("empty", func(t *testing.T) { adapter := NewFeedbackAdapter() - result := adapter.OnIncomingTransportCC(time.Time{}, &rtcp.TransportLayerCC{}) + result, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{}) + assert.NoError(t, err) assert.Empty(t, result) }) - t.Run("returnsSentPackets", func(t *testing.T) { - adapter := NewFeedbackAdapter() - pkt := getPacketWithTransportCCExt(t, 0) - assert.NoError(t, adapter.OnSent(time.Time{}, pkt, interceptor.Attributes{twccExtension: hdrExtID})) - result := adapter.OnIncomingTransportCC(time.Time{}, &rtcp.TransportLayerCC{}) - assert.NotEmpty(t, result) - assert.Contains(t, result, types.PacketResult{ - SentPacket: types.SentPacket{ - SendTime: time.Time{}, - Header: pkt.Header, - }, - ReceiveTime: time.Time{}, - Received: false, - }) - }) - t.Run("setsCorrectReceiveTime", func(t *testing.T) { t0 := time.Time{} adapter := NewFeedbackAdapter() @@ -59,12 +44,12 @@ func TestFeedbackAdapterTWCC(t *testing.T) { headers = append(headers, pkt.Header) assert.NoError(t, adapter.OnSent(t0, pkt, interceptor.Attributes{twccExtension: hdrExtID})) } - results := adapter.OnIncomingTransportCC(t0.Add(5*time.Millisecond), &rtcp.TransportLayerCC{ + results, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ Header: rtcp.Header{}, SenderSSRC: 0, MediaSSRC: 0, BaseSequenceNumber: 0, - PacketStatusCount: 0, + PacketStatusCount: 22, ReferenceTime: 0, FbPktCount: 0, PacketChunks: []rtcp.PacketStatusChunk{ @@ -129,6 +114,8 @@ func TestFeedbackAdapterTWCC(t *testing.T) { }, }) + assert.NoError(t, err) + assert.NotEmpty(t, results) assert.Len(t, results, 22) @@ -192,10 +179,9 @@ func TestFeedbackAdapterTWCC(t *testing.T) { }) t.Run("doesNotCrashOnTooManyFeedbackReports", func(*testing.T) { - t0 := time.Time{} adapter := NewFeedbackAdapter() assert.NotPanics(t, func() { - adapter.OnIncomingTransportCC(t0.Add(5*time.Millisecond), &rtcp.TransportLayerCC{ + _, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ Header: rtcp.Header{}, SenderSSRC: 0, MediaSSRC: 0, @@ -226,6 +212,7 @@ func TestFeedbackAdapterTWCC(t *testing.T) { }, }, }) + assert.NoError(t, err) }) }) @@ -237,12 +224,12 @@ func TestFeedbackAdapterTWCC(t *testing.T) { assert.NoError(t, adapter.OnSent(t0, pkt65535, interceptor.Attributes{twccExtension: hdrExtID})) assert.NoError(t, adapter.OnSent(t0, pkt0, interceptor.Attributes{twccExtension: hdrExtID})) - results := adapter.OnIncomingTransportCC(t0.Add(5*time.Millisecond), &rtcp.TransportLayerCC{ + results, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ Header: rtcp.Header{}, SenderSSRC: 0, MediaSSRC: 0, BaseSequenceNumber: 65535, - PacketStatusCount: 0, + PacketStatusCount: 2, ReferenceTime: 0, FbPktCount: 0, PacketChunks: []rtcp.PacketStatusChunk{ @@ -272,6 +259,7 @@ func TestFeedbackAdapterTWCC(t *testing.T) { }, }, }) + assert.NoError(t, err) assert.NotEmpty(t, results) assert.Len(t, results, 2) @@ -292,4 +280,300 @@ func TestFeedbackAdapterTWCC(t *testing.T) { Received: true, }) }) + + t.Run("ignoresPossiblyInFlightPackets", func(t *testing.T) { + t0 := time.Time{} + adapter := NewFeedbackAdapter() + headers := []rtp.Header{} + for i := uint16(0); i < 8; i++ { + pkt := getPacketWithTransportCCExt(t, i) + headers = append(headers, pkt.Header) + assert.NoError(t, adapter.OnSent(t0, pkt, interceptor.Attributes{twccExtension: hdrExtID})) + } + + results, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + Header: rtcp.Header{}, + SenderSSRC: 0, + MediaSSRC: 0, + BaseSequenceNumber: 0, + PacketStatusCount: 3, + ReferenceTime: 0, + FbPktCount: 0, + PacketChunks: []rtcp.PacketStatusChunk{ + &rtcp.StatusVectorChunk{ + PacketStatusChunk: nil, + Type: rtcp.TypeTCCStatusVectorChunk, + SymbolSize: rtcp.TypeTCCSymbolSizeTwoBit, + SymbolList: []uint16{ + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + }, + }, + }, + RecvDeltas: []*rtcp.RecvDelta{ + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, // 4*250us=1ms + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, // 4*250us=1ms + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, // 4*250us=1ms + }, + }, + }) + assert.NoError(t, err) + assert.Len(t, results, 3) + for i := uint16(0); i < 3; i++ { + assert.Contains(t, results, types.PacketResult{ + SentPacket: types.SentPacket{ + SendTime: t0, + Header: headers[i], + }, + ReceiveTime: t0.Add(time.Duration(i+1) * time.Millisecond), + Received: true, + }) + } + }) + + t.Run("runLengthChunk", func(t *testing.T) { + adapter := NewFeedbackAdapter() + t0 := time.Time{} + for i := uint16(0); i < 20; i++ { + pkt := getPacketWithTransportCCExt(t, i) + assert.NoError(t, adapter.OnSent(t0, pkt, interceptor.Attributes{twccExtension: hdrExtID})) + } + packets, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + Header: rtcp.Header{}, + SenderSSRC: 0, + MediaSSRC: 0, + BaseSequenceNumber: 0, + PacketStatusCount: 3, + ReferenceTime: 0, + FbPktCount: 0, + PacketChunks: []rtcp.PacketStatusChunk{ + &rtcp.RunLengthChunk{ + PacketStatusSymbol: rtcp.TypeTCCPacketReceivedSmallDelta, + RunLength: 3, + }, + }, + RecvDeltas: []*rtcp.RecvDelta{ + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, + }, + }, + }) + + assert.NoError(t, err) + assert.Len(t, packets, 3) + }) + + t.Run("statusVectorChunk", func(t *testing.T) { + adapter := NewFeedbackAdapter() + t0 := time.Time{} + for i := uint16(0); i < 20; i++ { + pkt := getPacketWithTransportCCExt(t, i) + assert.NoError(t, adapter.OnSent(t0, pkt, interceptor.Attributes{twccExtension: hdrExtID})) + } + packets, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + Header: rtcp.Header{}, + SenderSSRC: 0, + MediaSSRC: 0, + BaseSequenceNumber: 0, + PacketStatusCount: 3, + ReferenceTime: 0, + FbPktCount: 0, + PacketChunks: []rtcp.PacketStatusChunk{ + &rtcp.StatusVectorChunk{ + SymbolSize: rtcp.TypeTCCSymbolSizeOneBit, + SymbolList: []uint16{ + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + }, + }, + }, + RecvDeltas: []*rtcp.RecvDelta{ + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, + }, + }, + }) + + assert.NoError(t, err) + assert.Len(t, packets, 3) + }) + + t.Run("mixedRunLengthAndStatusVector", func(t *testing.T) { + adapter := NewFeedbackAdapter() + + t0 := time.Time{} + for i := uint16(0); i < 20; i++ { + pkt := getPacketWithTransportCCExt(t, i) + assert.NoError(t, adapter.OnSent(t0, pkt, interceptor.Attributes{twccExtension: hdrExtID})) + } + + packets, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + Header: rtcp.Header{}, + SenderSSRC: 0, + MediaSSRC: 0, + BaseSequenceNumber: 0, + PacketStatusCount: 10, + ReferenceTime: 0, + FbPktCount: 0, + PacketChunks: []rtcp.PacketStatusChunk{ + &rtcp.StatusVectorChunk{ + SymbolSize: rtcp.TypeTCCSymbolSizeTwoBit, + SymbolList: []uint16{ + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + }, + }, + &rtcp.RunLengthChunk{ + PacketStatusSymbol: rtcp.TypeTCCPacketReceivedSmallDelta, + RunLength: 3, + }, + }, + RecvDeltas: []*rtcp.RecvDelta{ + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 4, + }, + }, + }) + assert.NoError(t, err) + assert.Len(t, packets, 10) + }) + + t.Run("doesNotcrashOnInvalidTWCCPacket", func(t *testing.T) { + + adapter := NewFeedbackAdapter() + + t0 := time.Time{} + for i := uint16(1008); i < 1030; i++ { + pkt := getPacketWithTransportCCExt(t, i) + assert.NoError(t, adapter.OnSent(t0, pkt, interceptor.Attributes{twccExtension: hdrExtID})) + } + + assert.NotPanics(t, func() { + // TODO(mathis): Run length seems off, maybe check why TWCC generated this? + packets, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + Header: rtcp.Header{}, + SenderSSRC: 0, + MediaSSRC: 0, + BaseSequenceNumber: 1008, + PacketStatusCount: 8, + ReferenceTime: 278, + FbPktCount: 170, + PacketChunks: []rtcp.PacketStatusChunk{ + &rtcp.StatusVectorChunk{ + SymbolSize: rtcp.TypeTCCSymbolSizeTwoBit, + SymbolList: []uint16{ + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketNotReceived, + }, + }, + &rtcp.RunLengthChunk{ + PacketStatusSymbol: rtcp.TypeTCCPacketReceivedSmallDelta, + RunLength: 5632, + }, + }, + RecvDeltas: []*rtcp.RecvDelta{ + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 25000, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 0, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 29500, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 16750, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 23500, + }, + { + Type: rtcp.TypeTCCPacketReceivedSmallDelta, + Delta: 0, + }, + }, + }) + assert.Error(t, err) + assert.Empty(t, packets) + }) + }) } diff --git a/pkg/cc/interceptor.go b/pkg/cc/interceptor.go index b8dacb93..1dda753b 100644 --- a/pkg/cc/interceptor.go +++ b/pkg/cc/interceptor.go @@ -181,7 +181,6 @@ func (c *ControllerInterceptor) BindLocalStream(info *interceptor.StreamInfo, wr func (c *ControllerInterceptor) loop() { ticker := time.NewTicker(5 * time.Millisecond) for { - fmt.Printf("bwe: %v\n", c.GetBandwidthEstimation(time.Now())) select { case <-c.close: return @@ -204,11 +203,17 @@ func (c *ControllerInterceptor) loop() { c.Enqueue(pkt.packet, pkt.attributes) case feedback := <-c.twccFeedbackChan: - packetResult := c.OnIncomingTransportCC(feedback.ts, feedback.TransportLayerCC) + packetResult, err := c.OnIncomingTransportCC(feedback.TransportLayerCC) + if err != nil { + // TODO(mathis): handle error + } c.OnFeedback(feedback.ts, packetResult) case feedback := <-c.rfc8888FeedbackChan: - packetResult := c.OnIncomingRFC8888(feedback.ts, feedback.RawPacket) + packetResult, err := c.OnIncomingRFC8888(feedback.RawPacket) + if err != nil { + // TODO(mathis): handle error + } c.OnFeedback(feedback.ts, packetResult) } }