Skip to content

Commit

Permalink
Implement first version of loss based GCC
Browse files Browse the repository at this point in the history
First draft of the loss based congestion controller of Google Congestion
Control.
  • Loading branch information
mengelbart committed Nov 23, 2021
1 parent d75d7da commit 33e3463
Show file tree
Hide file tree
Showing 13 changed files with 1,409 additions and 0 deletions.
1 change: 1 addition & 0 deletions AUTHORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ adamroach <[email protected]>
aler9 <[email protected]>
Antoine Baché <[email protected]>
Atsushi Watanabe <[email protected]>
boks1971 <[email protected]>
Jonathan Müller <[email protected]>
Mathis Engelbart <[email protected]>
Sean DuBois <[email protected]>
33 changes: 33 additions & 0 deletions internal/types/datarate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package types

const (
// BitPerSecond is a data rate of 1 bit per second
BitPerSecond = DataRate(1)
// KiloBitPerSecond is a data rate of 1 kilobit per second
KiloBitPerSecond = 1000 * BitPerSecond
// MegaBitPerSecond is a data rate of 1 megabit per second
MegaBitPerSecond = 1000 * KiloBitPerSecond
)

// DataRate in bit per second
type DataRate int

// BitsPerMillisecond returns the datarate in b/ms (bits per millisecond).
func (r DataRate) BitsPerMillisecond() int {
return int(r / 1000.0)
}

// MaxDataRate returns the maximum of the given DataRates.
func MaxDataRate(a, b DataRate) DataRate {
if a > b {
return a
}
return b
}

func MinDataRate(a, b DataRate) DataRate {
if a < b {
return a
}
return b
}
20 changes: 20 additions & 0 deletions internal/types/packet_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package types

import (
"time"

"github.com/pion/rtp"
)

type SentPacket struct {
SendTime time.Time
Header *rtp.Header
}

// PacketResult holds information about a packet and if/when it has been
// sent/received.
type PacketResult struct {
SentPacket SentPacket
ReceiveTime time.Time
Received bool
}
166 changes: 166 additions & 0 deletions pkg/cc/feedback_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package cc

import (
"errors"
"sort"
"sync"
"time"

"github.com/pion/interceptor"
"github.com/pion/interceptor/internal/types"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)

var errMissingTWCCExtension = errors.New("missing transport layer cc header extension")
var errInvalidFeedbackPacket = errors.New("got invalid feedback packet")

// TODO(mathis): make types internal only?

// FeedbackAdapter converts incoming feedback from the wireformat to a
// PacketResult
type FeedbackAdapter struct {
lock sync.Mutex
history map[uint16]types.SentPacket
}

// NewFeedbackAdapter returns a new FeedbackAdapter
func NewFeedbackAdapter() *FeedbackAdapter {
return &FeedbackAdapter{
history: make(map[uint16]types.SentPacket),
}
}

// OnSent records when a packet was been sent.
// TODO(mathis): Is there a better way to get attributes in here?
func (f *FeedbackAdapter) OnSent(ts time.Time, header *rtp.Header, attributes interceptor.Attributes) error {
hdrExtensionID := attributes.Get(twccExtension)
id, ok := hdrExtensionID.(uint8)
if !ok || hdrExtensionID == 0 {
return errMissingTWCCExtension
}
sequenceNumber := header.GetExtension(id)
var tccExt rtp.TransportCCExtension
err := tccExt.Unmarshal(sequenceNumber)
if err != nil {
return err
}

f.lock.Lock()
defer f.lock.Unlock()
f.history[tccExt.TransportSequence] = types.SentPacket{
SendTime: ts,
Header: header,
}
return nil
}

// OnIncomingTransportCC converts the incoming rtcp.TransportLayerCC to a
// []PacketResult
func (f *FeedbackAdapter) OnIncomingTransportCC(feedback *rtcp.TransportLayerCC) ([]types.PacketResult, error) {
f.lock.Lock()
defer f.lock.Unlock()

result := []types.PacketResult{}

packetStatusCount := uint16(0)
chunkIndex := 0
deltaIndex := 0
referenceTime := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond)

for packetStatusCount < feedback.PacketStatusCount {
if chunkIndex >= len(feedback.PacketChunks) || len(feedback.PacketChunks) == 0 {
return nil, errInvalidFeedbackPacket
}
switch packetChunk := feedback.PacketChunks[chunkIndex].(type) {
case *rtcp.RunLengthChunk:
symbol := packetChunk.PacketStatusSymbol
for i := uint16(0); i < packetChunk.RunLength; i++ {
if sentPacket, ok := f.history[feedback.BaseSequenceNumber+packetStatusCount]; ok {
if symbol == rtcp.TypeTCCPacketReceivedSmallDelta ||
symbol == rtcp.TypeTCCPacketReceivedLargeDelta {
if deltaIndex >= len(feedback.RecvDeltas) {
// TODO(mathis): Not enough recv deltas for number
// of received packets: warn or error?
continue
}
receiveTime := getReceiveTime(referenceTime, feedback.RecvDeltas[deltaIndex])
referenceTime = receiveTime
result = append(result, types.PacketResult{
SentPacket: sentPacket,
ReceiveTime: receiveTime,
Received: true,
})
deltaIndex++
} else {
result = append(result, types.PacketResult{
SentPacket: sentPacket,
ReceiveTime: time.Time{},
Received: false,
})
}
} else {
// TODO(mathis): got feedback for unsent packet?
}
packetStatusCount++
}
chunkIndex++
case *rtcp.StatusVectorChunk:
for _, symbol := range packetChunk.SymbolList {
if sentPacket, ok := f.history[feedback.BaseSequenceNumber+packetStatusCount]; ok {
if symbol == rtcp.TypeTCCPacketReceivedSmallDelta ||
symbol == rtcp.TypeTCCPacketReceivedLargeDelta {
if deltaIndex >= len(feedback.RecvDeltas) {
// TODO(mathis): Not enough recv deltas for number
// of received packets: warn or error?
continue
}
receiveTime := getReceiveTime(referenceTime, feedback.RecvDeltas[deltaIndex])
referenceTime = receiveTime
result = append(result, types.PacketResult{
SentPacket: sentPacket,
ReceiveTime: receiveTime,
Received: true,
})
deltaIndex++
} else {
result = append(result, types.PacketResult{
SentPacket: sentPacket,
ReceiveTime: time.Time{},
Received: false,
})
}
}
packetStatusCount++
if packetStatusCount >= feedback.PacketStatusCount {
break
}
}
chunkIndex++
}
}
return result, nil
}

// OnIncomingRFC8888 converts the incoming RFC8888 packet to a []PacketResult
func (f *FeedbackAdapter) OnIncomingRFC8888(feedback *rtcp.RawPacket) ([]types.PacketResult, error) {
return nil, nil
}

func sortedKeysUint16(m map[uint16]types.SentPacket) []uint16 {
var result []uint16
for k := range m {
result = append(result, k)
}
sort.Slice(result, func(i, j int) bool {
return result[i] < result[j]
})
return result
}

func getReceiveTime(baseTime time.Time, delta *rtcp.RecvDelta) time.Time {
if delta.Type == rtcp.TypeTCCPacketReceivedSmallDelta {
return baseTime.Add(time.Duration(delta.Delta) * 250 * time.Microsecond)
}
return baseTime.Add(time.Duration(delta.Delta) * time.Millisecond)
}
Loading

0 comments on commit 33e3463

Please sign in to comment.