Skip to content

Commit

Permalink
Fix twcc feedback adapter
Browse files Browse the repository at this point in the history
Now only includes packet feedback for packets that were received.
Previously, in flight packets could be considered lost.
  • Loading branch information
mengelbart committed Nov 5, 2021
1 parent 57e9724 commit 3767e28
Show file tree
Hide file tree
Showing 3 changed files with 373 additions and 106 deletions.
138 changes: 58 additions & 80 deletions pkg/cc/feedback_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

var errMissingTWCCExtension = errors.New("missing transport layer cc header extension")
var errInvalidFeedbackPacket = errors.New("got invalid feedback packet")

// TODO(mathis): make types internal only?

Expand Down Expand Up @@ -52,116 +53,91 @@ func (f *FeedbackAdapter) OnSent(ts time.Time, pkt *rtp.Packet, attributes inter

// OnIncomingTransportCC converts the incoming rtcp.TransportLayerCC to a
// []PacketResult
func (f *FeedbackAdapter) OnIncomingTransportCC(ts time.Time, feedback *rtcp.TransportLayerCC) []types.PacketResult {
func (f *FeedbackAdapter) OnIncomingTransportCC(feedback *rtcp.TransportLayerCC) ([]types.PacketResult, error) {
result := []types.PacketResult{}

baseSequenceNr := feedback.BaseSequenceNumber
sequenceNr := baseSequenceNr
nextRecvDelta := 0
referenceTime := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond) // TODO(mathis): Use/save reference time globally?
packetStatusCount := uint16(0)
chunkIndex := 0
deltaIndex := 0
referenceTime := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond)

for _, packetChunkInterface := range feedback.PacketChunks {
switch packetChunk := packetChunkInterface.(type) {
for packetStatusCount < feedback.PacketStatusCount {
if chunkIndex >= len(feedback.PacketChunks) || len(feedback.PacketChunks) == 0 {
return nil, errInvalidFeedbackPacket
}
switch packetChunk := feedback.PacketChunks[chunkIndex].(type) {
case *rtcp.RunLengthChunk:
symbol := packetChunk.PacketStatusSymbol
for i := uint16(0); i < packetChunk.RunLength; i++ {
if sent, ok := f.history[sequenceNr]; ok {
var delta time.Duration

switch symbol {
case rtcp.TypeTCCPacketReceivedSmallDelta:
fallthrough
case rtcp.TypeTCCPacketReceivedLargeDelta:
if nextRecvDelta > len(feedback.RecvDeltas)-1 {
if sentPacket, ok := f.history[feedback.BaseSequenceNumber+packetStatusCount]; ok {
if symbol == rtcp.TypeTCCPacketReceivedSmallDelta ||
symbol == rtcp.TypeTCCPacketReceivedLargeDelta {
if deltaIndex >= len(feedback.RecvDeltas) {
// TODO(mathis): Not enough recv deltas for number
// of received packets: warn?
// of received packets: warn or error?
continue
}
recvDelta := feedback.RecvDeltas[nextRecvDelta]
nextRecvDelta++

switch recvDelta.Type {
case rtcp.TypeTCCPacketReceivedSmallDelta:
delta = time.Duration(recvDelta.Delta) * 250 * time.Microsecond

case rtcp.TypeTCCPacketReceivedLargeDelta:
// TODO(mathis): This should not happen: symbol is
// SmallDelta, but recvDelta is largeDelta
// Warn?
delta = time.Duration(recvDelta.Delta) * time.Millisecond

default:
// TODO(mathis): Should not happne, error invalid
// receive delta?
}
receiveTime := referenceTime.Add(delta)
receiveTime := getReceiveTime(referenceTime, feedback.RecvDeltas[deltaIndex])
referenceTime = receiveTime

result = append(result, types.PacketResult{
SentPacket: sent,
SentPacket: sentPacket,
ReceiveTime: receiveTime,
Received: true,
})
delete(f.history, sequenceNr)
deltaIndex++
} else {
result = append(result, types.PacketResult{
SentPacket: sentPacket,
ReceiveTime: time.Time{},
Received: false,
})
}
} else {
// TODO(mathis): got feedback for unsent packet?
}
sequenceNr++
packetStatusCount++
}
chunkIndex++
case *rtcp.StatusVectorChunk:
for _, symbol := range packetChunk.SymbolList {
if sent, ok := f.history[sequenceNr]; ok {
var delta time.Duration

switch symbol {
case rtcp.TypeTCCPacketReceivedSmallDelta:
fallthrough
case rtcp.TypeTCCPacketReceivedLargeDelta:
if nextRecvDelta > len(feedback.RecvDeltas)-1 {
if sentPacket, ok := f.history[feedback.BaseSequenceNumber+packetStatusCount]; ok {
if symbol == rtcp.TypeTCCPacketReceivedSmallDelta ||
symbol == rtcp.TypeTCCPacketReceivedLargeDelta {
if deltaIndex >= len(feedback.RecvDeltas) {
// TODO(mathis): Not enough recv deltas for number
// of received packets: warn?
// of received packets: warn or error?
continue
}
recvDelta := feedback.RecvDeltas[nextRecvDelta]
nextRecvDelta++

switch recvDelta.Type {
case rtcp.TypeTCCPacketReceivedSmallDelta:
delta = time.Duration(recvDelta.Delta) * 250 * time.Microsecond

case rtcp.TypeTCCPacketReceivedLargeDelta:
// TODO(mathis): This should not happen: symbol is
// SmallDelta, but recvDelta is largeDelta
// Warn?
delta = time.Duration(recvDelta.Delta) * time.Millisecond

default:
// TODO(mathis): Should not happne, error invalid
// receive delta?
}
receiveTime := referenceTime.Add(delta)
receiveTime := getReceiveTime(referenceTime, feedback.RecvDeltas[deltaIndex])
referenceTime = receiveTime

result = append(result, types.PacketResult{
SentPacket: sent,
SentPacket: sentPacket,
ReceiveTime: receiveTime,
Received: true,
})
delete(f.history, sequenceNr)
deltaIndex++
} else {
result = append(result, types.PacketResult{
SentPacket: sentPacket,
ReceiveTime: time.Time{},
Received: false,
})
}
}
sequenceNr++
packetStatusCount++
if packetStatusCount >= feedback.PacketStatusCount {
break
}
}
chunkIndex++
}
}
return result, nil
}

for _, v := range sortedKeysUint16(f.history) {
result = append(result, types.PacketResult{
SentPacket: f.history[v],
ReceiveTime: time.Time{},
Received: false,
})
}
return result
// OnIncomingRFC8888 converts the incoming RFC8888 packet to a []PacketResult
func (f *FeedbackAdapter) OnIncomingRFC8888(feedback *rtcp.RawPacket) ([]types.PacketResult, error) {
return nil, nil
}

func sortedKeysUint16(m map[uint16]types.SentPacket) []uint16 {
Expand All @@ -175,7 +151,9 @@ func sortedKeysUint16(m map[uint16]types.SentPacket) []uint16 {
return result
}

// OnIncomingRFC8888 converts the incoming RFC8888 packet to a []PacketResult
func (f *FeedbackAdapter) OnIncomingRFC8888(ts time.Time, feedback *rtcp.RawPacket) []types.PacketResult {
return nil
func getReceiveTime(baseTime time.Time, delta *rtcp.RecvDelta) time.Time {
if delta.Type == rtcp.TypeTCCPacketReceivedSmallDelta {
return baseTime.Add(time.Duration(delta.Delta) * 250 * time.Microsecond)
}
return baseTime.Add(time.Duration(delta.Delta) * time.Millisecond)
}
Loading

0 comments on commit 3767e28

Please sign in to comment.