From b34057698025bb44437fa54ed1b332561494a8cc Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Tue, 14 Dec 2021 14:58:50 +0100 Subject: [PATCH] Refactor feedback adapter --- pkg/gcc/acknowledgment.go | 13 + pkg/gcc/delay_based_bwe.go | 16 +- pkg/gcc/delay_based_bwe_test.go | 19 +- pkg/gcc/feedback_adapter.go | 145 ++++---- pkg/gcc/feedback_adapter_test.go | 560 +++++++++++++++++++++++++++++-- pkg/gcc/interceptor.go | 2 +- 6 files changed, 650 insertions(+), 105 deletions(-) diff --git a/pkg/gcc/acknowledgment.go b/pkg/gcc/acknowledgment.go index 56968212..3ff17aa9 100644 --- a/pkg/gcc/acknowledgment.go +++ b/pkg/gcc/acknowledgment.go @@ -1,6 +1,7 @@ package gcc import ( + "fmt" "time" "github.com/pion/rtp" @@ -9,9 +10,21 @@ import ( // Acknowledgment holds information about a packet and if/when it has been // sent/received. type Acknowledgment struct { + TLCC uint16 Header *rtp.Header Size int Departure time.Time Arrival time.Time RTT time.Duration } + +func (a Acknowledgment) String() string { + s := "ACK:\n" + s += fmt.Sprintf("\tTLCC:\t%v\n", a.TLCC) + s += fmt.Sprintf("\tHEADER:\t%v\n", a.Header) + s += fmt.Sprintf("\tSIZE:\t%v\n", a.Size) + s += fmt.Sprintf("\tDEPARTURE:\t%v\n", a.Departure) + s += fmt.Sprintf("\tARRIVAL:\t%v\n", a.Arrival) + s += fmt.Sprintf("\tRTT:\t%v\n", a.RTT) + return s +} diff --git a/pkg/gcc/delay_based_bwe.go b/pkg/gcc/delay_based_bwe.go index 99826108..35350a48 100644 --- a/pkg/gcc/delay_based_bwe.go +++ b/pkg/gcc/delay_based_bwe.go @@ -323,17 +323,24 @@ func calculateReceivedRate(log []Acknowledgment) int { if len(log) == 0 { return 0 } - sum := 0 + if len(log) == 1 && !log[0].Arrival.IsZero() { + return log[0].Header.MarshalSize() + log[0].Size + } + start := log[0].Arrival end := log[len(log)-1].Arrival + d := end.Sub(start) + if d == 0 { + return 0 + } + sum := 0 for _, ack := range log { if !ack.Arrival.IsZero() { sum += ack.Header.MarshalSize() + ack.Size } } - d := end.Sub(start) rate := int(float64(8*sum) / d.Seconds()) // fmt.Printf("calculating rate for: from %v to %v => %v / %v = %v\n", start, end, sum, d.Seconds(), rate) return rate @@ -367,11 +374,6 @@ func preFilter(log []Acknowledgment) []arrivalGroup { ag.add(p) res = append(res, ag) } - // for _, r := range res { - // for _, n := range r.packets { - // fmt.Printf("%v:%v\n", n.TLCC, n.Arrival.Sub(time.Time{}).Milliseconds()) - // } - // } return res } diff --git a/pkg/gcc/delay_based_bwe_test.go b/pkg/gcc/delay_based_bwe_test.go index 74953031..48a2412b 100644 --- a/pkg/gcc/delay_based_bwe_test.go +++ b/pkg/gcc/delay_based_bwe_test.go @@ -150,14 +150,25 @@ func TestPreFilter(t *testing.T) { { log: []Acknowledgment{ { - Arrival: time.Time{}, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(time.Millisecond), + RTT: 0, }, }, exp: []arrivalGroup{ { - packets: []Acknowledgment{{}}, - arrival: time.Time{}, + packets: []Acknowledgment{{ + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(time.Millisecond), + RTT: 0, + }}, + arrival: time.Time{}.Add(time.Millisecond), departure: time.Time{}, + rtt: 0, }, }, }, @@ -268,7 +279,7 @@ func TestCalculateReceivingRate(t *testing.T) { }, }, { - expected: 12 + 12 + 1200 + 1200, + expected: (12 + 12 + 1200 + 1200) * 8 * 10, // *8: Bytes to bits, *10: calculate rate in 100ms log: []Acknowledgment{ { Header: &rtp.Header{}, diff --git a/pkg/gcc/feedback_adapter.go b/pkg/gcc/feedback_adapter.go index 71c64290..bf2be237 100644 --- a/pkg/gcc/feedback_adapter.go +++ b/pkg/gcc/feedback_adapter.go @@ -13,6 +13,8 @@ import ( var ( errMissingTWCCExtension = errors.New("missing transport layer cc header extension") errUnknownFeedbackFormat = errors.New("unknown feedback format") + + errInvalidFeedback = errors.New("invalid feedback") ) // FeedbackAdapter converts incoming feedback from the wireformat to a @@ -47,96 +49,113 @@ func (f *FeedbackAdapter) OnSent(ts time.Time, header *rtp.Header, size int, att f.lock.Lock() defer f.lock.Unlock() f.history[tccExt.TransportSequence] = Acknowledgment{ + TLCC: tccExt.TransportSequence, Header: header, Size: size, Departure: ts, Arrival: time.Time{}, + RTT: 0, } return nil } // OnFeedback converts incoming RTCP packet feedback to Acknowledgments. // Currently only TWCC is supported. -func (f *FeedbackAdapter) OnFeedback(feedback rtcp.Packet) ([]Acknowledgment, error) { +func (f *FeedbackAdapter) OnFeedback(ts time.Time, feedback rtcp.Packet) ([]Acknowledgment, error) { + f.lock.Lock() + defer f.lock.Unlock() + switch fb := feedback.(type) { case *rtcp.TransportLayerCC: - return f.OnIncomingTransportCC(fb) + return f.onIncomingTransportCC(ts, fb) default: return nil, errUnknownFeedbackFormat } } -// OnIncomingTransportCC converts the incoming rtcp.TransportLayerCC to a -// []PacketResult -func (f *FeedbackAdapter) OnIncomingTransportCC(feedback *rtcp.TransportLayerCC) ([]Acknowledgment, error) { - t0 := time.Now() - f.lock.Lock() - defer f.lock.Unlock() +func (f *FeedbackAdapter) unpackRunLengthChunk(ts time.Time, start uint16, refTime time.Time, chunk *rtcp.RunLengthChunk, deltas []*rtcp.RecvDelta) (consumedDeltas int, nextRef time.Time, acks []Acknowledgment, err error) { + result := make([]Acknowledgment, chunk.RunLength) + deltaIndex := 0 - result := []Acknowledgment{} + // Rollover if necessary + end := int(start + chunk.RunLength) + if end < int(start) { + end += 65536 + } + resultIndex := 0 + for i := int(start); i < end; i++ { + if ack, ok := f.history[uint16(i)]; ok { + if chunk.PacketStatusSymbol != rtcp.TypeTCCPacketNotReceived { + if len(deltas)-1 < deltaIndex { + return deltaIndex, refTime, result, errInvalidFeedback + } + refTime = refTime.Add(time.Duration(deltas[deltaIndex].Delta) * time.Microsecond) + ack.Arrival = refTime + ack.RTT = ts.Sub(ack.Departure) + deltaIndex++ + } + result[resultIndex] = ack + } + resultIndex++ + } + return deltaIndex, refTime, result, nil +} - packetStatusCount := uint16(0) - chunkIndex := 0 +func (f *FeedbackAdapter) unpackStatusVectorChunk(ts time.Time, start uint16, refTime time.Time, chunk *rtcp.StatusVectorChunk, deltas []*rtcp.RecvDelta) (consumedDeltas int, nextRef time.Time, acks []Acknowledgment, err error) { + result := make([]Acknowledgment, len(chunk.SymbolList)) deltaIndex := 0 - referenceTime := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond) - - for packetStatusCount < feedback.PacketStatusCount { - if chunkIndex >= len(feedback.PacketChunks) || len(feedback.PacketChunks) == 0 { - return nil, errUnknownFeedbackFormat + resultIndex := 0 + for i, symbol := range chunk.SymbolList { + if ack, ok := f.history[start+uint16(i)]; ok { + if symbol != rtcp.TypeTCCPacketNotReceived { + if len(deltas)-1 < deltaIndex { + return deltaIndex, refTime, result, errInvalidFeedback + } + refTime = refTime.Add(time.Duration(deltas[deltaIndex].Delta) * time.Microsecond) + ack.Arrival = refTime + ack.RTT = ts.Sub(ack.Departure) + deltaIndex++ + } + result[resultIndex] = ack } - switch packetChunk := feedback.PacketChunks[chunkIndex].(type) { + resultIndex++ + } + + return deltaIndex, refTime, result, nil +} + +func (f *FeedbackAdapter) onIncomingTransportCC(ts time.Time, feedback *rtcp.TransportLayerCC) ([]Acknowledgment, error) { + result := []Acknowledgment{} + + index := feedback.BaseSequenceNumber + refTime := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond) + recvDeltas := feedback.RecvDeltas + + for _, chunk := range feedback.PacketChunks { + switch chunk := chunk.(type) { case *rtcp.RunLengthChunk: - symbol := packetChunk.PacketStatusSymbol - for i := uint16(0); i < packetChunk.RunLength; i++ { - if sentPacket, ok := f.history[feedback.BaseSequenceNumber+packetStatusCount]; ok { - if symbol == rtcp.TypeTCCPacketReceivedSmallDelta || - symbol == rtcp.TypeTCCPacketReceivedLargeDelta { - if deltaIndex >= len(feedback.RecvDeltas) { - // TODO(mathis): Not enough recv deltas for number - // of received packets: warn or error? - continue - } - receiveTime := referenceTime.Add(time.Duration(feedback.RecvDeltas[deltaIndex].Delta) * time.Microsecond) - referenceTime = receiveTime - sentPacket.Arrival = receiveTime - sentPacket.RTT = t0.Sub(sentPacket.Departure) - result = append(result, sentPacket) - deltaIndex++ - } else { - result = append(result, sentPacket) - } - } - packetStatusCount++ + n, nextRefTime, acks, err := f.unpackRunLengthChunk(ts, index, refTime, chunk, recvDeltas) + if err != nil { + return nil, err } - chunkIndex++ + refTime = nextRefTime + result = append(result, acks...) + recvDeltas = recvDeltas[n:] + index = uint16(int(index) + len(acks)) case *rtcp.StatusVectorChunk: - for _, symbol := range packetChunk.SymbolList { - if sentPacket, ok := f.history[feedback.BaseSequenceNumber+packetStatusCount]; ok { - if symbol == rtcp.TypeTCCPacketReceivedSmallDelta || - symbol == rtcp.TypeTCCPacketReceivedLargeDelta { - if deltaIndex >= len(feedback.RecvDeltas) { - // TODO(mathis): Not enough recv deltas for number - // of received packets: warn or error? - continue - } - receiveTime := referenceTime.Add(time.Duration(feedback.RecvDeltas[deltaIndex].Delta) * time.Microsecond) - referenceTime = receiveTime - sentPacket.Arrival = receiveTime - sentPacket.RTT = t0.Sub(sentPacket.Departure) - result = append(result, sentPacket) - deltaIndex++ - } else { - result = append(result, sentPacket) - } - } - packetStatusCount++ - if packetStatusCount >= feedback.PacketStatusCount { - break - } + n, nextRefTime, acks, err := f.unpackStatusVectorChunk(ts, index, refTime, chunk, recvDeltas) + if err != nil { + return nil, err } - chunkIndex++ + refTime = nextRefTime + result = append(result, acks...) + recvDeltas = recvDeltas[n:] + index = uint16(int(index) + len(acks)) + default: + return nil, errInvalidFeedback } } + return result, nil } diff --git a/pkg/gcc/feedback_adapter_test.go b/pkg/gcc/feedback_adapter_test.go index 9fdec82d..34fe640a 100644 --- a/pkg/gcc/feedback_adapter_test.go +++ b/pkg/gcc/feedback_adapter_test.go @@ -1,6 +1,7 @@ package gcc import ( + "fmt" "testing" "time" @@ -12,6 +13,475 @@ import ( const hdrExtID = uint8(1) +func TestUnpackRunLengthChunk(t *testing.T) { + attributes := make(interceptor.Attributes) + attributes.Set(twccExtensionAttributesKey, hdrExtID) + + cases := []struct { + sentTLCC []uint16 + chunk rtcp.RunLengthChunk + deltas []*rtcp.RecvDelta + start uint16 + // expect: + acks []Acknowledgment + refTime time.Time + n int + }{ + { + sentTLCC: []uint16{}, + chunk: rtcp.RunLengthChunk{}, + deltas: []*rtcp.RecvDelta{}, + start: 0, + acks: []Acknowledgment{}, + refTime: time.Time{}, + n: 0, + }, + { + sentTLCC: []uint16{0, 1, 2, 3, 4, 5}, + chunk: rtcp.RunLengthChunk{ + PacketStatusChunk: nil, + Type: 0, + PacketStatusSymbol: rtcp.TypeTCCPacketReceivedSmallDelta, + RunLength: 6, + }, + deltas: []*rtcp.RecvDelta{ + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + }, + start: 0, + //nolint:dupl + acks: []Acknowledgment{ + { + TLCC: 0, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 1, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 2, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 3, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 4, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 5, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + }, + n: 6, + refTime: time.Time{}, + }, + { + sentTLCC: []uint16{65534, 65535, 0, 1, 2, 3}, + chunk: rtcp.RunLengthChunk{ + PacketStatusChunk: nil, + Type: 0, + PacketStatusSymbol: rtcp.TypeTCCPacketReceivedSmallDelta, + RunLength: 6, + }, + deltas: []*rtcp.RecvDelta{ + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + }, + start: 65534, + acks: []Acknowledgment{ + { + TLCC: 65534, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(250 * time.Microsecond), + RTT: 0, + }, + { + TLCC: 65535, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(500 * time.Microsecond), + RTT: 0, + }, + { + TLCC: 0, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(750 * time.Microsecond), + RTT: 0, + }, + { + TLCC: 1, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(1000 * time.Microsecond), + RTT: 0, + }, + { + TLCC: 2, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(1250 * time.Microsecond), + RTT: 0, + }, + { + TLCC: 3, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(1500 * time.Microsecond), + RTT: 0, + }, + }, + n: 6, + refTime: time.Time{}.Add(1500 * time.Microsecond), + }, + { + sentTLCC: []uint16{65534, 65535, 0, 1, 2, 3}, + chunk: rtcp.RunLengthChunk{ + PacketStatusChunk: nil, + Type: 0, + PacketStatusSymbol: rtcp.TypeTCCPacketNotReceived, + RunLength: 6, + }, + deltas: []*rtcp.RecvDelta{}, + start: 65534, + //nolint:dupl + acks: []Acknowledgment{ + { + TLCC: 65534, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 65535, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 0, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 1, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 2, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 3, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + }, + n: 0, + refTime: time.Time{}, + }, + } + + //nolint:dupl + for i, tc := range cases { + i := i + tc := tc + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + fa := NewFeedbackAdapter() + + headers := []*rtp.Header{} + for i, nr := range tc.sentTLCC { + headers = append(headers, &getPacketWithTransportCCExt(t, nr).Header) + tc.acks[i].Header = headers[i] + } + for _, h := range headers { + assert.NoError(t, fa.OnSent(time.Time{}, h, 0, attributes)) + } + + n, refTime, acks, err := fa.unpackRunLengthChunk(time.Time{}, tc.start, time.Time{}, &tc.chunk, tc.deltas) + assert.NoError(t, err) + assert.Len(t, acks, len(tc.acks)) + assert.Equal(t, tc.n, n) + assert.Equal(t, tc.refTime, refTime) + + for i, a := range acks { + assert.Equal(t, tc.sentTLCC[i], a.TLCC) + } + assert.Equal(t, tc.acks, acks) + }) + } +} + +func TestUnpackStatusVectorChunk(t *testing.T) { + attributes := make(interceptor.Attributes) + attributes.Set(twccExtensionAttributesKey, hdrExtID) + + cases := []struct { + sentTLCC []uint16 + chunk rtcp.StatusVectorChunk + deltas []*rtcp.RecvDelta + start uint16 + // expect: + acks []Acknowledgment + n int + refTime time.Time + }{ + { + sentTLCC: []uint16{}, + chunk: rtcp.StatusVectorChunk{}, + deltas: []*rtcp.RecvDelta{}, + start: 0, + acks: []Acknowledgment{}, + n: 0, + refTime: time.Time{}, + }, + { + sentTLCC: []uint16{0, 1, 2, 3, 4, 5}, + chunk: rtcp.StatusVectorChunk{ + PacketStatusChunk: nil, + Type: 0, + SymbolSize: rtcp.TypeTCCSymbolSizeTwoBit, + SymbolList: []uint16{ + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + }, + }, + deltas: []*rtcp.RecvDelta{ + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + }, + start: 0, + //nolint:dupl + acks: []Acknowledgment{ + { + TLCC: 0, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 1, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 2, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 3, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 4, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 5, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + }, + n: 6, + refTime: time.Time{}, + }, + { + sentTLCC: []uint16{65534, 65535, 0, 1, 2, 3}, + chunk: rtcp.StatusVectorChunk{ + PacketStatusChunk: nil, + Type: 0, + SymbolSize: rtcp.TypeTCCSymbolSizeTwoBit, + SymbolList: []uint16{ + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketReceivedSmallDelta, + }, + }, + deltas: []*rtcp.RecvDelta{ + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + }, + start: 65534, + acks: []Acknowledgment{ + { + TLCC: 65534, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(250 * time.Microsecond), + RTT: 0, + }, + { + TLCC: 65535, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(500 * time.Microsecond), + RTT: 0, + }, + { + TLCC: 0, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(750 * time.Microsecond), + RTT: 0, + }, + { + TLCC: 1, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(1000 * time.Microsecond), + RTT: 0, + }, + { + TLCC: 2, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}, + RTT: 0, + }, + { + TLCC: 3, + Header: nil, + Size: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(1250 * time.Microsecond), + RTT: 0, + }, + }, + n: 5, + refTime: time.Time{}.Add(1250 * time.Microsecond), + }, + } + + //nolint:dupl + for i, tc := range cases { + i := i + tc := tc + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + fa := NewFeedbackAdapter() + + headers := []*rtp.Header{} + for i, nr := range tc.sentTLCC { + headers = append(headers, &getPacketWithTransportCCExt(t, nr).Header) + tc.acks[i].Header = headers[i] + } + for _, h := range headers { + assert.NoError(t, fa.OnSent(time.Time{}, h, 0, attributes)) + } + + n, refTime, acks, err := fa.unpackStatusVectorChunk(time.Time{}, tc.start, time.Time{}, &tc.chunk, tc.deltas) + assert.NoError(t, err) + assert.Len(t, acks, len(tc.acks)) + assert.Equal(t, tc.n, n) + assert.Equal(t, tc.refTime, refTime) + + for i, a := range acks { + assert.Equal(t, tc.sentTLCC[i], a.TLCC) + } + assert.Equal(t, tc.acks, acks) + }) + } +} + func getPacketWithTransportCCExt(t *testing.T, sequenceNumber uint16) *rtp.Packet { pkt := rtp.Packet{ Header: rtp.Header{}, @@ -29,7 +499,7 @@ func getPacketWithTransportCCExt(t *testing.T, sequenceNumber uint16) *rtp.Packe func TestFeedbackAdapterTWCC(t *testing.T) { t.Run("empty", func(t *testing.T) { adapter := NewFeedbackAdapter() - result, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{}) + result, err := adapter.onIncomingTransportCC(time.Time{}, &rtcp.TransportLayerCC{}) assert.NoError(t, err) assert.Empty(t, result) }) @@ -43,7 +513,7 @@ func TestFeedbackAdapterTWCC(t *testing.T) { headers = append(headers, pkt.Header) assert.NoError(t, adapter.OnSent(t0, &pkt.Header, 1200, interceptor.Attributes{twccExtensionAttributesKey: hdrExtID})) } - results, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + results, err := adapter.OnFeedback(t0, &rtcp.TransportLayerCC{ Header: rtcp.Header{}, SenderSSRC: 0, MediaSSRC: 0, @@ -96,7 +566,7 @@ func TestFeedbackAdapterTWCC(t *testing.T) { RecvDeltas: []*rtcp.RecvDelta{ { Type: rtcp.TypeTCCPacketReceivedSmallDelta, - Delta: 4, // 4*250us=1ms + Delta: 4, }, { Type: rtcp.TypeTCCPacketReceivedLargeDelta, @@ -104,7 +574,7 @@ func TestFeedbackAdapterTWCC(t *testing.T) { }, { Type: rtcp.TypeTCCPacketReceivedSmallDelta, - Delta: 12, // 3*4*250us=3ms + Delta: 12, }, { Type: rtcp.TypeTCCPacketReceivedSmallDelta, @@ -119,56 +589,68 @@ func TestFeedbackAdapterTWCC(t *testing.T) { assert.Len(t, results, 22) assert.Contains(t, results, Acknowledgment{ + TLCC: 0, Header: &headers[0], - Size: 0, + Size: 1200, Departure: t0, - Arrival: t0.Add(time.Millisecond), + Arrival: t0.Add(4 * time.Microsecond), + RTT: 0, }) assert.Contains(t, results, Acknowledgment{ + TLCC: 1, Header: &headers[1], - Size: 0, + Size: 1200, Departure: t0, - Arrival: t0.Add(101 * time.Millisecond), + Arrival: t0.Add(104 * time.Microsecond), + RTT: 0, }) for i := uint16(2); i < 7; i++ { assert.Contains(t, results, Acknowledgment{ + TLCC: i, Header: &headers[i], - Size: 0, + Size: 1200, Departure: t0, Arrival: time.Time{}, + RTT: 0, }) } assert.Contains(t, results, Acknowledgment{ + TLCC: 7, Header: &headers[7], - Size: 0, + Size: 1200, Departure: t0, - Arrival: t0.Add(104 * time.Millisecond), + Arrival: t0.Add(116 * time.Microsecond), + RTT: 0, }) for i := uint16(8); i < 21; i++ { assert.Contains(t, results, Acknowledgment{ + TLCC: i, Header: &headers[i], - Size: 0, + Size: 1200, Departure: t0, Arrival: time.Time{}, + RTT: 0, }) } assert.Contains(t, results, Acknowledgment{ + TLCC: 21, Header: &headers[21], - Size: 0, + Size: 1200, Departure: t0, - Arrival: t0.Add(105 * time.Millisecond), + Arrival: t0.Add(120 * time.Microsecond), + RTT: 0, }) }) t.Run("doesNotCrashOnTooManyFeedbackReports", func(*testing.T) { adapter := NewFeedbackAdapter() assert.NotPanics(t, func() { - _, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + _, err := adapter.OnFeedback(time.Time{}, &rtcp.TransportLayerCC{ Header: rtcp.Header{}, SenderSSRC: 0, MediaSSRC: 0, @@ -211,7 +693,7 @@ func TestFeedbackAdapterTWCC(t *testing.T) { assert.NoError(t, adapter.OnSent(t0, &pkt65535.Header, 1200, interceptor.Attributes{twccExtensionAttributesKey: hdrExtID})) assert.NoError(t, adapter.OnSent(t0, &pkt0.Header, 1200, interceptor.Attributes{twccExtensionAttributesKey: hdrExtID})) - results, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + results, err := adapter.OnFeedback(t0, &rtcp.TransportLayerCC{ Header: rtcp.Header{}, SenderSSRC: 0, MediaSSRC: 0, @@ -249,18 +731,22 @@ func TestFeedbackAdapterTWCC(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, results) - assert.Len(t, results, 2) + assert.Len(t, results, 7) assert.Contains(t, results, Acknowledgment{ + TLCC: 65535, Header: &pkt65535.Header, - Size: 0, + Size: 1200, Departure: t0, - Arrival: t0.Add(1 * time.Millisecond), + Arrival: t0.Add(4 * time.Microsecond), + RTT: 0, }) assert.Contains(t, results, Acknowledgment{ + TLCC: 0, Header: &pkt0.Header, - Size: 0, + Size: 1200, Departure: t0, - Arrival: t0.Add(2 * time.Millisecond), + Arrival: t0.Add(8 * time.Microsecond), + RTT: 0, }) }) @@ -274,7 +760,7 @@ func TestFeedbackAdapterTWCC(t *testing.T) { assert.NoError(t, adapter.OnSent(t0, &pkt.Header, 1200, interceptor.Attributes{twccExtensionAttributesKey: hdrExtID})) } - results, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + results, err := adapter.OnFeedback(t0, &rtcp.TransportLayerCC{ Header: rtcp.Header{}, SenderSSRC: 0, MediaSSRC: 0, @@ -314,13 +800,25 @@ func TestFeedbackAdapterTWCC(t *testing.T) { }, }) assert.NoError(t, err) - assert.Len(t, results, 3) + assert.Len(t, results, 7) for i := uint16(0); i < 3; i++ { assert.Contains(t, results, Acknowledgment{ + TLCC: i, Header: &headers[i], - Size: 0, + Size: 1200, Departure: t0, - Arrival: t0.Add(time.Duration(i+1) * time.Millisecond), + Arrival: t0.Add(time.Duration((i + 1)) * 4 * time.Microsecond), + RTT: 0, + }) + } + for i := uint16(3); i < 7; i++ { + assert.Contains(t, results, Acknowledgment{ + TLCC: i, + Header: &headers[i], + Size: 1200, + Departure: t0, + Arrival: time.Time{}, + RTT: 0, }) } }) @@ -332,7 +830,7 @@ func TestFeedbackAdapterTWCC(t *testing.T) { pkt := getPacketWithTransportCCExt(t, i) assert.NoError(t, adapter.OnSent(t0, &pkt.Header, 1200, interceptor.Attributes{twccExtensionAttributesKey: hdrExtID})) } - packets, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + packets, err := adapter.OnFeedback(t0, &rtcp.TransportLayerCC{ Header: rtcp.Header{}, SenderSSRC: 0, MediaSSRC: 0, @@ -373,7 +871,7 @@ func TestFeedbackAdapterTWCC(t *testing.T) { pkt := getPacketWithTransportCCExt(t, i) assert.NoError(t, adapter.OnSent(t0, &pkt.Header, 1200, interceptor.Attributes{twccExtensionAttributesKey: hdrExtID})) } - packets, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + packets, err := adapter.OnFeedback(t0, &rtcp.TransportLayerCC{ Header: rtcp.Header{}, SenderSSRC: 0, MediaSSRC: 0, @@ -419,7 +917,7 @@ func TestFeedbackAdapterTWCC(t *testing.T) { }) assert.NoError(t, err) - assert.Len(t, packets, 3) + assert.Len(t, packets, 14) }) t.Run("mixedRunLengthAndStatusVector", func(t *testing.T) { @@ -431,7 +929,8 @@ func TestFeedbackAdapterTWCC(t *testing.T) { assert.NoError(t, adapter.OnSent(t0, &pkt.Header, 1200, interceptor.Attributes{twccExtensionAttributesKey: hdrExtID})) } - packets, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + //nolint:dupl + packets, err := adapter.OnFeedback(t0, &rtcp.TransportLayerCC{ Header: rtcp.Header{}, SenderSSRC: 0, MediaSSRC: 0, @@ -497,9 +996,10 @@ func TestFeedbackAdapterTWCC(t *testing.T) { assert.NoError(t, adapter.OnSent(t0, &pkt.Header, 1200, interceptor.Attributes{twccExtensionAttributesKey: hdrExtID})) } + //nolint:dupl assert.NotPanics(t, func() { // TODO(mathis): Run length seems off, maybe check why TWCC generated this? - packets, err := adapter.OnIncomingTransportCC(&rtcp.TransportLayerCC{ + packets, err := adapter.OnFeedback(t0, &rtcp.TransportLayerCC{ Header: rtcp.Header{}, SenderSSRC: 0, MediaSSRC: 0, diff --git a/pkg/gcc/interceptor.go b/pkg/gcc/interceptor.go index 342f3e57..d029d16a 100644 --- a/pkg/gcc/interceptor.go +++ b/pkg/gcc/interceptor.go @@ -224,7 +224,7 @@ func (c *Interceptor) loop() { } case pkts := <-c.feedback: for _, pkt := range pkts { - acks, err := c.OnFeedback(pkt) + acks, err := c.OnFeedback(time.Now(), pkt) if err != nil { // TODO panic(err)