Skip to content

Commit

Permalink
Implement TWCC feedback adapter
Browse files Browse the repository at this point in the history
Converts TWCC feedback into a list of PacketResults that can be used by
congestion controllers to calculate connection statistics independently
from the feedback wireformat.
  • Loading branch information
mengelbart committed Nov 5, 2021
1 parent e4d8b77 commit f33f518
Show file tree
Hide file tree
Showing 3 changed files with 419 additions and 12 deletions.
142 changes: 136 additions & 6 deletions pkg/cc/feedback_adapter.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package cc

import (
"errors"
"sort"
"time"

"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)

var errMissingTWCCExtension = errors.New("missing transport layer cc header extension")

// TODO(mathis): make types internal only?

type sentPacket struct {
sendTime time.Time
header *rtp.Header
header rtp.Header
}

// PacketResult holds information about a packet and if/when it has been
Expand All @@ -36,27 +41,152 @@ func NewFeedbackAdapter() *FeedbackAdapter {
}

// OnSent records when a packet was been sent.
func (f *FeedbackAdapter) OnSent(ts time.Time, pkt *rtp.Packet) {
f.history[pkt.SequenceNumber] = sentPacket{
// TODO(mathis): Is there a better way to get attributes in here?
func (f *FeedbackAdapter) OnSent(ts time.Time, pkt *rtp.Packet, attributes interceptor.Attributes) error {
hdrExtensionID := attributes.Get(twccExtension)
id, ok := hdrExtensionID.(uint8)
if !ok || hdrExtensionID == 0 {
return errMissingTWCCExtension
}
sequenceNumber := pkt.GetExtension(id)
var tccExt rtp.TransportCCExtension
err := tccExt.Unmarshal(sequenceNumber)
if err != nil {
return err
}

f.history[tccExt.TransportSequence] = sentPacket{
sendTime: ts,
header: &pkt.Header,
header: pkt.Header,
}
return nil
}

// OnIncomingTransportCC converts the incoming rtcp.TransportLayerCC to a
// []PacketResult
func (f *FeedbackAdapter) OnIncomingTransportCC(ts time.Time, feedback *rtcp.TransportLayerCC) []PacketResult {
result := []PacketResult{}
for _, historyPacket := range f.history {

baseSequenceNr := feedback.BaseSequenceNumber
sequenceNr := baseSequenceNr
nextRecvDelta := 0
referenceTime := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond) // TODO(mathis): Use/save reference time globally?

for _, packetChunkInterface := range feedback.PacketChunks {
switch packetChunk := packetChunkInterface.(type) {
case *rtcp.RunLengthChunk:
symbol := packetChunk.PacketStatusSymbol
for i := uint16(0); i < packetChunk.RunLength; i++ {
if sent, ok := f.history[sequenceNr]; ok {
var delta time.Duration

switch symbol {
case rtcp.TypeTCCPacketReceivedSmallDelta:
fallthrough
case rtcp.TypeTCCPacketReceivedLargeDelta:
if nextRecvDelta > len(feedback.RecvDeltas)-1 {
// TODO(mathis): Not enough recv deltas for number
// of received packets: warn?
continue
}
recvDelta := feedback.RecvDeltas[nextRecvDelta]
nextRecvDelta++

switch recvDelta.Type {
case rtcp.TypeTCCPacketReceivedSmallDelta:
delta = time.Duration(recvDelta.Delta) * 250 * time.Microsecond

case rtcp.TypeTCCPacketReceivedLargeDelta:
// TODO(mathis): This should not happen: symbol is
// SmallDelta, but recvDelta is largeDelta
// Warn?
delta = time.Duration(recvDelta.Delta) * time.Millisecond

default:
// TODO(mathis): Should not happne, error invalid
// receive delta?
}
receiveTime := referenceTime.Add(delta)
referenceTime = receiveTime

result = append(result, PacketResult{
SentPacket: sent,
receiveTime: receiveTime,
Received: true,
})
delete(f.history, sequenceNr)
}
}
sequenceNr++
}
case *rtcp.StatusVectorChunk:
for _, symbol := range packetChunk.SymbolList {
if sent, ok := f.history[sequenceNr]; ok {
var delta time.Duration

switch symbol {
case rtcp.TypeTCCPacketReceivedSmallDelta:
fallthrough
case rtcp.TypeTCCPacketReceivedLargeDelta:
if nextRecvDelta > len(feedback.RecvDeltas)-1 {
// TODO(mathis): Not enough recv deltas for number
// of received packets: warn?
continue
}
recvDelta := feedback.RecvDeltas[nextRecvDelta]
nextRecvDelta++

switch recvDelta.Type {
case rtcp.TypeTCCPacketReceivedSmallDelta:
delta = time.Duration(recvDelta.Delta) * 250 * time.Microsecond

case rtcp.TypeTCCPacketReceivedLargeDelta:
// TODO(mathis): This should not happen: symbol is
// SmallDelta, but recvDelta is largeDelta
// Warn?
delta = time.Duration(recvDelta.Delta) * time.Millisecond

default:
// TODO(mathis): Should not happne, error invalid
// receive delta?
}
receiveTime := referenceTime.Add(delta)
referenceTime = receiveTime

result = append(result, PacketResult{
SentPacket: sent,
receiveTime: receiveTime,
Received: true,
})
delete(f.history, sequenceNr)
}
}
sequenceNr++
}
}
}

for _, v := range sortedKeysUint16(f.history) {
result = append(result, PacketResult{
SentPacket: historyPacket,
SentPacket: f.history[v],
receiveTime: time.Time{},
Received: false,
})
}
return result
}

func sortedKeysUint16(m map[uint16]sentPacket) []uint16 {
var result []uint16
for k := range m {
result = append(result, k)
}
sort.Slice(result, func(i, j int) bool {
return result[i] < result[j]
})
return result
}

// OnIncomingRFC8888 converts the incoming RFC8888 packet to a []PacketResult
func (f *FeedbackAdapter) OnIncomingRFC8888(ts time.Time, feedback *rtcp.RawPacket) []PacketResult {
return nil
Expand Down
Loading

0 comments on commit f33f518

Please sign in to comment.