Skip to content

Commit

Permalink
Refactor feedback adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
mengelbart committed Dec 14, 2021
1 parent d45c4e9 commit b340576
Show file tree
Hide file tree
Showing 6 changed files with 650 additions and 105 deletions.
13 changes: 13 additions & 0 deletions pkg/gcc/acknowledgment.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gcc

import (
"fmt"
"time"

"github.com/pion/rtp"
Expand All @@ -9,9 +10,21 @@ import (
// Acknowledgment holds information about a packet and if/when it has been
// sent/received.
type Acknowledgment struct {
TLCC uint16
Header *rtp.Header
Size int
Departure time.Time
Arrival time.Time
RTT time.Duration
}

func (a Acknowledgment) String() string {
s := "ACK:\n"
s += fmt.Sprintf("\tTLCC:\t%v\n", a.TLCC)
s += fmt.Sprintf("\tHEADER:\t%v\n", a.Header)
s += fmt.Sprintf("\tSIZE:\t%v\n", a.Size)
s += fmt.Sprintf("\tDEPARTURE:\t%v\n", a.Departure)
s += fmt.Sprintf("\tARRIVAL:\t%v\n", a.Arrival)
s += fmt.Sprintf("\tRTT:\t%v\n", a.RTT)
return s
}
16 changes: 9 additions & 7 deletions pkg/gcc/delay_based_bwe.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,17 +323,24 @@ func calculateReceivedRate(log []Acknowledgment) int {
if len(log) == 0 {
return 0
}
sum := 0
if len(log) == 1 && !log[0].Arrival.IsZero() {
return log[0].Header.MarshalSize() + log[0].Size
}

start := log[0].Arrival
end := log[len(log)-1].Arrival
d := end.Sub(start)
if d == 0 {
return 0
}

sum := 0
for _, ack := range log {
if !ack.Arrival.IsZero() {
sum += ack.Header.MarshalSize() + ack.Size
}
}

d := end.Sub(start)
rate := int(float64(8*sum) / d.Seconds())
// fmt.Printf("calculating rate for: from %v to %v => %v / %v = %v\n", start, end, sum, d.Seconds(), rate)
return rate
Expand Down Expand Up @@ -367,11 +374,6 @@ func preFilter(log []Acknowledgment) []arrivalGroup {
ag.add(p)
res = append(res, ag)
}
// for _, r := range res {
// for _, n := range r.packets {
// fmt.Printf("%v:%v\n", n.TLCC, n.Arrival.Sub(time.Time{}).Milliseconds())
// }
// }
return res
}

Expand Down
19 changes: 15 additions & 4 deletions pkg/gcc/delay_based_bwe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,25 @@ func TestPreFilter(t *testing.T) {
{
log: []Acknowledgment{
{
Arrival: time.Time{},
Header: nil,
Size: 0,
Departure: time.Time{},
Arrival: time.Time{}.Add(time.Millisecond),
RTT: 0,
},
},
exp: []arrivalGroup{
{
packets: []Acknowledgment{{}},
arrival: time.Time{},
packets: []Acknowledgment{{
Header: nil,
Size: 0,
Departure: time.Time{},
Arrival: time.Time{}.Add(time.Millisecond),
RTT: 0,
}},
arrival: time.Time{}.Add(time.Millisecond),
departure: time.Time{},
rtt: 0,
},
},
},
Expand Down Expand Up @@ -268,7 +279,7 @@ func TestCalculateReceivingRate(t *testing.T) {
},
},
{
expected: 12 + 12 + 1200 + 1200,
expected: (12 + 12 + 1200 + 1200) * 8 * 10, // *8: Bytes to bits, *10: calculate rate in 100ms
log: []Acknowledgment{
{
Header: &rtp.Header{},
Expand Down
145 changes: 82 additions & 63 deletions pkg/gcc/feedback_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
var (
errMissingTWCCExtension = errors.New("missing transport layer cc header extension")
errUnknownFeedbackFormat = errors.New("unknown feedback format")

errInvalidFeedback = errors.New("invalid feedback")
)

// FeedbackAdapter converts incoming feedback from the wireformat to a
Expand Down Expand Up @@ -47,96 +49,113 @@ func (f *FeedbackAdapter) OnSent(ts time.Time, header *rtp.Header, size int, att
f.lock.Lock()
defer f.lock.Unlock()
f.history[tccExt.TransportSequence] = Acknowledgment{
TLCC: tccExt.TransportSequence,
Header: header,
Size: size,
Departure: ts,
Arrival: time.Time{},
RTT: 0,
}
return nil
}

// OnFeedback converts incoming RTCP packet feedback to Acknowledgments.
// Currently only TWCC is supported.
func (f *FeedbackAdapter) OnFeedback(feedback rtcp.Packet) ([]Acknowledgment, error) {
func (f *FeedbackAdapter) OnFeedback(ts time.Time, feedback rtcp.Packet) ([]Acknowledgment, error) {
f.lock.Lock()
defer f.lock.Unlock()

switch fb := feedback.(type) {
case *rtcp.TransportLayerCC:
return f.OnIncomingTransportCC(fb)
return f.onIncomingTransportCC(ts, fb)
default:
return nil, errUnknownFeedbackFormat
}
}

// OnIncomingTransportCC converts the incoming rtcp.TransportLayerCC to a
// []PacketResult
func (f *FeedbackAdapter) OnIncomingTransportCC(feedback *rtcp.TransportLayerCC) ([]Acknowledgment, error) {
t0 := time.Now()
f.lock.Lock()
defer f.lock.Unlock()
func (f *FeedbackAdapter) unpackRunLengthChunk(ts time.Time, start uint16, refTime time.Time, chunk *rtcp.RunLengthChunk, deltas []*rtcp.RecvDelta) (consumedDeltas int, nextRef time.Time, acks []Acknowledgment, err error) {
result := make([]Acknowledgment, chunk.RunLength)
deltaIndex := 0

result := []Acknowledgment{}
// Rollover if necessary
end := int(start + chunk.RunLength)
if end < int(start) {
end += 65536
}
resultIndex := 0
for i := int(start); i < end; i++ {
if ack, ok := f.history[uint16(i)]; ok {
if chunk.PacketStatusSymbol != rtcp.TypeTCCPacketNotReceived {
if len(deltas)-1 < deltaIndex {
return deltaIndex, refTime, result, errInvalidFeedback
}
refTime = refTime.Add(time.Duration(deltas[deltaIndex].Delta) * time.Microsecond)
ack.Arrival = refTime
ack.RTT = ts.Sub(ack.Departure)
deltaIndex++
}
result[resultIndex] = ack
}
resultIndex++
}
return deltaIndex, refTime, result, nil
}

packetStatusCount := uint16(0)
chunkIndex := 0
func (f *FeedbackAdapter) unpackStatusVectorChunk(ts time.Time, start uint16, refTime time.Time, chunk *rtcp.StatusVectorChunk, deltas []*rtcp.RecvDelta) (consumedDeltas int, nextRef time.Time, acks []Acknowledgment, err error) {
result := make([]Acknowledgment, len(chunk.SymbolList))
deltaIndex := 0
referenceTime := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond)

for packetStatusCount < feedback.PacketStatusCount {
if chunkIndex >= len(feedback.PacketChunks) || len(feedback.PacketChunks) == 0 {
return nil, errUnknownFeedbackFormat
resultIndex := 0
for i, symbol := range chunk.SymbolList {
if ack, ok := f.history[start+uint16(i)]; ok {
if symbol != rtcp.TypeTCCPacketNotReceived {
if len(deltas)-1 < deltaIndex {
return deltaIndex, refTime, result, errInvalidFeedback
}
refTime = refTime.Add(time.Duration(deltas[deltaIndex].Delta) * time.Microsecond)
ack.Arrival = refTime
ack.RTT = ts.Sub(ack.Departure)
deltaIndex++
}
result[resultIndex] = ack
}
switch packetChunk := feedback.PacketChunks[chunkIndex].(type) {
resultIndex++
}

return deltaIndex, refTime, result, nil
}

func (f *FeedbackAdapter) onIncomingTransportCC(ts time.Time, feedback *rtcp.TransportLayerCC) ([]Acknowledgment, error) {
result := []Acknowledgment{}

index := feedback.BaseSequenceNumber
refTime := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond)
recvDeltas := feedback.RecvDeltas

for _, chunk := range feedback.PacketChunks {
switch chunk := chunk.(type) {
case *rtcp.RunLengthChunk:
symbol := packetChunk.PacketStatusSymbol
for i := uint16(0); i < packetChunk.RunLength; i++ {
if sentPacket, ok := f.history[feedback.BaseSequenceNumber+packetStatusCount]; ok {
if symbol == rtcp.TypeTCCPacketReceivedSmallDelta ||
symbol == rtcp.TypeTCCPacketReceivedLargeDelta {
if deltaIndex >= len(feedback.RecvDeltas) {
// TODO(mathis): Not enough recv deltas for number
// of received packets: warn or error?
continue
}
receiveTime := referenceTime.Add(time.Duration(feedback.RecvDeltas[deltaIndex].Delta) * time.Microsecond)
referenceTime = receiveTime
sentPacket.Arrival = receiveTime
sentPacket.RTT = t0.Sub(sentPacket.Departure)
result = append(result, sentPacket)
deltaIndex++
} else {
result = append(result, sentPacket)
}
}
packetStatusCount++
n, nextRefTime, acks, err := f.unpackRunLengthChunk(ts, index, refTime, chunk, recvDeltas)
if err != nil {
return nil, err
}
chunkIndex++
refTime = nextRefTime
result = append(result, acks...)
recvDeltas = recvDeltas[n:]
index = uint16(int(index) + len(acks))
case *rtcp.StatusVectorChunk:
for _, symbol := range packetChunk.SymbolList {
if sentPacket, ok := f.history[feedback.BaseSequenceNumber+packetStatusCount]; ok {
if symbol == rtcp.TypeTCCPacketReceivedSmallDelta ||
symbol == rtcp.TypeTCCPacketReceivedLargeDelta {
if deltaIndex >= len(feedback.RecvDeltas) {
// TODO(mathis): Not enough recv deltas for number
// of received packets: warn or error?
continue
}
receiveTime := referenceTime.Add(time.Duration(feedback.RecvDeltas[deltaIndex].Delta) * time.Microsecond)
referenceTime = receiveTime
sentPacket.Arrival = receiveTime
sentPacket.RTT = t0.Sub(sentPacket.Departure)
result = append(result, sentPacket)
deltaIndex++
} else {
result = append(result, sentPacket)
}
}
packetStatusCount++
if packetStatusCount >= feedback.PacketStatusCount {
break
}
n, nextRefTime, acks, err := f.unpackStatusVectorChunk(ts, index, refTime, chunk, recvDeltas)
if err != nil {
return nil, err
}
chunkIndex++
refTime = nextRefTime
result = append(result, acks...)
recvDeltas = recvDeltas[n:]
index = uint16(int(index) + len(acks))
default:
return nil, errInvalidFeedback
}
}

return result, nil
}

Expand Down
Loading

0 comments on commit b340576

Please sign in to comment.