Skip to content

Commit

Permalink
Flex fec encoder for RFC version 03
Browse files Browse the repository at this point in the history
  • Loading branch information
pougetat committed Nov 4, 2023
1 parent 7d6c986 commit a621baf
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 95 deletions.
79 changes: 79 additions & 0 deletions pkg/flexfec/encoder_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package flexfec

import (
"github.com/pion/interceptor"
"github.com/pion/rtp"
)

// FecInterceptor implements FlexFec.
type FecInterceptor struct {
interceptor.NoOp
flexFecEncoder FlexEncoder
packetBuffer []rtp.Packet
minNumMediaPackets uint32
}

// FecOption can be used to set initial options on Fec encoder interceptors.
type FecOption func(d *FecInterceptor) error

// FecInterceptorFactory creates new FecInterceptors.
type FecInterceptorFactory struct {
opts []FecOption
}

// NewFecInterceptor returns a new Fec interceptor factory.
func NewFecInterceptor(opts ...FecOption) (*FecInterceptorFactory, error) {
return &FecInterceptorFactory{opts: opts}, nil
}

// NewInterceptor constructs a new FecInterceptor.
func (r *FecInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
// Hardcoded for now:
// Min num media packets to encode FEC -> 5
// Min num fec packets -> 1

interceptor := &FecInterceptor{
packetBuffer: make([]rtp.Packet, 0),
minNumMediaPackets: 5,
}
return interceptor, nil
}

// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
// will be called once per rtp packet.
func (r *FecInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
// Chromium supports version flexfec-03 of existing draft, this is the one we will configure by default
// although we should support configuring the latest (flexfec-20) as well.
r.flexFecEncoder = NewFlexEncoder03(info.PayloadType, info.SSRC)

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
r.packetBuffer = append(r.packetBuffer, rtp.Packet{
Header: *header,
Payload: payload,
})

// Send the media RTP packet
result, err := writer.Write(header, payload, attributes)

// Send the FEC packets
var fecPackets []rtp.Packet
if len(r.packetBuffer) == int(r.minNumMediaPackets) {
fecPackets = r.flexFecEncoder.EncodeFec(r.packetBuffer, 2)

for _, fecPacket := range fecPackets {
fecResult, fecErr := writer.Write(&fecPacket.Header, fecPacket.Payload, attributes)

if fecErr != nil && fecResult == 0 {
break
}
}
// Reset the packet buffer now that we've sent the corresponding FEC packets.
r.packetBuffer = nil
}

return result, err
})
}
96 changes: 72 additions & 24 deletions pkg/flexfec/flexfec_coverage.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,64 @@ func NewCoverage(mediaPackets []rtp.Packet, numFecPackets uint32) *ProtectionCov
// We allocate the biggest array of bitmasks that respects the max constraints.
var packetMasks [MaxFecPackets]util.BitArray
for i := 0; i < int(MaxFecPackets); i++ {
packetMasks[i] = util.NewBitArray(MaxMediaPackets)
packetMasks[i] = util.BitArray{}
}

coverage := &ProtectionCoverage{
packetMasks: packetMasks,
numFecPackets: 0,
numMediaPackets: 0,
mediaPackets: nil,
}

coverage.UpdateCoverage(mediaPackets, numFecPackets)
return coverage
}

// UpdateCoverage updates the ProtectionCoverage object with new bitmasks accounting for the numFecPackets
// we want to use to protect the batch media packets.
func (p *ProtectionCoverage) UpdateCoverage(mediaPackets []rtp.Packet, numFecPackets uint32) {
numMediaPackets := uint32(len(mediaPackets))

// Basic sanity checks
if numMediaPackets <= 0 || numMediaPackets > MaxMediaPackets {
return
}

p.mediaPackets = mediaPackets

if numFecPackets == p.numFecPackets && numMediaPackets == p.numMediaPackets {
// We have the same number of FEC packets covering the same number of media packets, we can simply
// reuse the previous coverage map with the updated media packets.
return
}

p.numFecPackets = numFecPackets
p.numMediaPackets = numMediaPackets

// The number of FEC packets and/or the number of packets has changed, we need to update the coverage map
// to reflect these new values.
p.resetCoverage()

// Generate FEC bit mask where numFecPackets FEC packets are covering numMediaPackets Media packets.
// In the packetMasks array, each FEC packet is represented by a single BitArray, each bit in a given BitArray
// corresponds to a specific Media packet.
// Ex: Row I, Col J is set to 1 -> FEC packet I will protect media packet J.
for fecPacketIndex := uint32(0); fecPacketIndex < numFecPackets; fecPacketIndex++ {
// We use an interleaved method to determine coverage. Given N FEC packets, Media packet X will be
// covered by FEC packet X % N.
for mediaPacketIndex := uint32(0); mediaPacketIndex < numMediaPackets; mediaPacketIndex++ {
coveringFecPktIndex := mediaPacketIndex % numFecPackets
packetMasks[coveringFecPktIndex].SetBit(mediaPacketIndex, 1)
coveredMediaPacketIndex := fecPacketIndex
for coveredMediaPacketIndex < numMediaPackets {
p.packetMasks[fecPacketIndex].SetBit(coveredMediaPacketIndex)
coveredMediaPacketIndex += numFecPackets
}
}

return &ProtectionCoverage{
packetMasks: packetMasks,
numFecPackets: numFecPackets,
numMediaPackets: numMediaPackets,
mediaPackets: mediaPackets,
}
}

// ResetCoverage clears the underlying map so that we can reuse it for new batches of RTP packets.
func (p *ProtectionCoverage) ResetCoverage() {
func (p *ProtectionCoverage) resetCoverage() {
for i := uint32(0); i < MaxFecPackets; i++ {
for j := uint32(0); j < MaxMediaPackets; j++ {
p.packetMasks[i].SetBit(j, 0)
}
p.packetMasks[i].Reset()
}
}

Expand All @@ -86,26 +114,46 @@ func (p *ProtectionCoverage) GetCoveredBy(fecPacketIndex uint32) *util.MediaPack
return util.NewMediaPacketIterator(p.mediaPackets, coverage)
}

// MarshalBitmasks returns the underlying bitmask that defines which media packets are protected by the
// specified fecPacketIndex.
func (p *ProtectionCoverage) MarshalBitmasks(fecPacketIndex uint32) []byte {
return p.packetMasks[fecPacketIndex].Marshal()
}

// ExtractMask1 returns the first section of the bitmask as defined by the FEC header.
// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1
func (p *ProtectionCoverage) ExtractMask1(fecPacketIndex uint32) uint16 {
return uint16(p.packetMasks[fecPacketIndex].GetBitValue(0, 14))
mask := p.packetMasks[fecPacketIndex]
// We get the first 16 bits (64 - 16 -> shift by 48) and we shift once more for K field
mask1 := mask.Lo >> 49
return uint16(mask1)
}

// ExtractMask2 returns the second section of the bitmask as defined by the FEC header.
// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1
func (p *ProtectionCoverage) ExtractMask2(fecPacketIndex uint32) uint32 {
return uint32(p.packetMasks[fecPacketIndex].GetBitValue(15, 45))
mask := p.packetMasks[fecPacketIndex]
// We remove the first 15 bits
mask2 := mask.Lo << 15
// We get the first 31 bits (64 - 31 -> shift by 33) and we shift once more for K field
mask2 >>= 34
return uint32(mask2)
}

// ExtractMask3 returns the third section of the bitmask as defined by the FEC header.
// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1
func (p *ProtectionCoverage) ExtractMask3(fecPacketIndex uint32) uint64 {
return p.packetMasks[fecPacketIndex].GetBitValue(46, 109)
mask := p.packetMasks[fecPacketIndex]
// We remove the first 46 bits
maskLo := mask.Lo << 46
maskHi := mask.Hi >> 18
mask3 := maskLo | maskHi
return mask3
}

// ExtractMask3_03 returns the third section of the bitmask as defined by the FEC header.
// https://datatracker.ietf.org/doc/html/draft-ietf-payload-flexible-fec-scheme-03#section-4.2
func (p *ProtectionCoverage) ExtractMask3_03(fecPacketIndex uint32) uint64 {
mask := p.packetMasks[fecPacketIndex]
// We remove the first 46 bits
maskLo := mask.Lo << 46
maskHi := mask.Hi >> 18
mask3 := maskLo | maskHi
// We shift once for the K bit.
mask3 >>= 1
return mask3
}
42 changes: 26 additions & 16 deletions pkg/flexfec/flexfec_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,37 @@ const (
BaseFecHeaderSize = 12
)

// FlexEncoder implements the Fec encoding mechanism for the "Flex" variant of FlexFec.
type FlexEncoder struct {
baseSN uint16
// FlexEncoder is the interface that FecInterceptor uses to encode Fec packets.
type FlexEncoder interface {
EncodeFec(mediaPackets []rtp.Packet, numFecPackets uint32) []rtp.Packet
}

// FlexEncoder20 implements the Fec encoding mechanism for the "Flex" variant of FlexFec.
type FlexEncoder20 struct {
fecBaseSn uint16
payloadType uint8
ssrc uint32
coverage *ProtectionCoverage
}

// NewFlexEncoder returns a new FlexFecEncer.
func NewFlexEncoder(baseSN uint16, payloadType uint8, ssrc uint32) *FlexEncoder {
return &FlexEncoder{
baseSN: baseSN,
func NewFlexEncoder(payloadType uint8, ssrc uint32) *FlexEncoder20 {
return &FlexEncoder20{
payloadType: payloadType,
ssrc: ssrc,
coverage: nil,
fecBaseSn: uint16(1000),
}
}

// EncodeFec returns a list of generated RTP packets with FEC payloads that protect the specified mediaPackets.
// This method does not account for missing RTP packets in the mediaPackets array nor does it account for
// them being passed out of order.
func (flex *FlexEncoder) EncodeFec(mediaPackets []rtp.Packet, numFecPackets uint32) []rtp.Packet {
func (flex *FlexEncoder20) EncodeFec(mediaPackets []rtp.Packet, numFecPackets uint32) []rtp.Packet {
// Start by defining which FEC packets cover which media packets
if flex.coverage == nil {
flex.coverage = NewCoverage(mediaPackets, numFecPackets)
} else {
flex.coverage.ResetCoverage()
flex.coverage.UpdateCoverage(mediaPackets, numFecPackets)
}

if flex.coverage == nil {
Expand All @@ -56,39 +60,42 @@ func (flex *FlexEncoder) EncodeFec(mediaPackets []rtp.Packet, numFecPackets uint
// Generate FEC payloads
fecPackets := make([]rtp.Packet, numFecPackets)
for fecPacketIndex := uint32(0); fecPacketIndex < numFecPackets; fecPacketIndex++ {
fecPackets[fecPacketIndex] = flex.encodeFlexFecPacket(fecPacketIndex)
fecPackets[fecPacketIndex] = flex.encodeFlexFecPacket(fecPacketIndex, mediaPackets[0].SequenceNumber)
}

return fecPackets
}

func (flex *FlexEncoder) encodeFlexFecPacket(fecPacketIndex uint32) rtp.Packet {
func (flex *FlexEncoder20) encodeFlexFecPacket(fecPacketIndex uint32, mediaBaseSn uint16) rtp.Packet {
mediaPacketsIt := flex.coverage.GetCoveredBy(fecPacketIndex)
flexFecHeader := flex.encodeFlexFecHeader(
mediaPacketsIt,
flex.coverage.ExtractMask1(fecPacketIndex),
flex.coverage.ExtractMask2(fecPacketIndex),
flex.coverage.ExtractMask3(fecPacketIndex),
mediaBaseSn,
)
flexFecRepairPayload := flex.encodeFlexFecRepairPayload(mediaPacketsIt.Reset())

return rtp.Packet{
packet := rtp.Packet{
Header: rtp.Header{
Version: 2,
Padding: false,
Extension: false,
Marker: false,
PayloadType: flex.payloadType,
SequenceNumber: flex.baseSN,
SequenceNumber: flex.fecBaseSn,
Timestamp: 54243243,
SSRC: flex.ssrc,
CSRC: []uint32{},
},
Payload: append(flexFecHeader, flexFecRepairPayload...),
}
flex.fecBaseSn++
return packet
}

func (flex *FlexEncoder) encodeFlexFecHeader(mediaPackets *util.MediaPacketIterator, mask1 uint16, optionalMask2 uint32, optionalMask3 uint64) []byte {
func (flex *FlexEncoder20) encodeFlexFecHeader(mediaPackets *util.MediaPacketIterator, mask1 uint16, optionalMask2 uint32, optionalMask3 uint64, mediaBaseSn uint16) []byte {
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
Expand Down Expand Up @@ -119,7 +126,7 @@ func (flex *FlexEncoder) encodeFlexFecHeader(mediaPackets *util.MediaPacketItera
headerSize += 8
}

// Allocate a the FlexFec header
// Allocate the FlexFec header
flexFecHeader := make([]byte, headerSize)

// XOR the relevant fields for the header
Expand Down Expand Up @@ -149,6 +156,9 @@ func (flex *FlexEncoder) encodeFlexFecHeader(mediaPackets *util.MediaPacketItera
flexFecHeader[7] ^= flexFecHeader[7]
}

// Write the base SN for the batch of media packets
binary.BigEndian.PutUint16(flexFecHeader[8:10], mediaBaseSn)

// Write the bitmasks to the header
binary.BigEndian.PutUint16(flexFecHeader[10:12], mask1)

Expand All @@ -163,7 +173,7 @@ func (flex *FlexEncoder) encodeFlexFecHeader(mediaPackets *util.MediaPacketItera
return flexFecHeader
}

func (flex *FlexEncoder) encodeFlexFecRepairPayload(mediaPackets *util.MediaPacketIterator) []byte {
func (flex *FlexEncoder20) encodeFlexFecRepairPayload(mediaPackets *util.MediaPacketIterator) []byte {
flexFecPayload := make([]byte, len(mediaPackets.First().Payload))

for mediaPackets.HasNext() {
Expand Down
Loading

0 comments on commit a621baf

Please sign in to comment.