From c74445c5adabe303562e7a7c1b3dd35be46e7cb8 Mon Sep 17 00:00:00 2001 From: ypothoma Date: Tue, 24 Oct 2023 11:23:53 -0700 Subject: [PATCH] Adding FlexFec encoder implementation --- AUTHORS.txt | 1 + pkg/flexfec/flexfec_coverage.go | 111 +++++++++++++ pkg/flexfec/flexfec_encoder.go | 184 ++++++++++++++++++++++ pkg/flexfec/util/bitarray.go | 78 +++++++++ pkg/flexfec/util/media_packet_iterator.go | 50 ++++++ 5 files changed, 424 insertions(+) create mode 100644 pkg/flexfec/flexfec_coverage.go create mode 100644 pkg/flexfec/flexfec_encoder.go create mode 100644 pkg/flexfec/util/bitarray.go create mode 100644 pkg/flexfec/util/media_packet_iterator.go diff --git a/AUTHORS.txt b/AUTHORS.txt index b50ce3ba..54863555 100644 --- a/AUTHORS.txt +++ b/AUTHORS.txt @@ -27,6 +27,7 @@ Sean DuBois Steffen Vogel treyhakanson XLPolar +ypothoma ziminghua <565209960@qq.com> # List of contributors not appearing in Git history diff --git a/pkg/flexfec/flexfec_coverage.go b/pkg/flexfec/flexfec_coverage.go new file mode 100644 index 00000000..580bdf7e --- /dev/null +++ b/pkg/flexfec/flexfec_coverage.go @@ -0,0 +1,111 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package flexfec + +import ( + "github.com/pion/interceptor/pkg/flexfec/util" + "github.com/pion/rtp" +) + +// Maximum number of media packets that can be protected by a single FEC packet. +// We are not supporting the possibility of having an FEC packet protect multiple +// SSRC source packets for now. +// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1 +const ( + MaxMediaPackets uint32 = 110 + MaxFecPackets uint32 = MaxMediaPackets +) + +// ProtectionCoverage defines the map of RTP packets that individual Fec packets protect. +type ProtectionCoverage struct { + // Array of masks, each mask capable of covering up to maxMediaPkts = 110. + // A mask is represented as a grouping of bytes where each individual bit + // represents the coverage for the media packet at the corresponding index. + packetMasks [MaxFecPackets]util.BitArray + numFecPackets uint32 + numMediaPackets uint32 + mediaPackets []rtp.Packet +} + +// NewCoverage returns a new ProtectionCoverage object. numFecPackets represents the number of +// Fec packets that we will be generating to cover the list of mediaPackets. This allows us to know +// how big the underlying map should be. +func NewCoverage(mediaPackets []rtp.Packet, numFecPackets uint32) *ProtectionCoverage { + numMediaPackets := uint32(len(mediaPackets)) + + // Basic sanity checks + if numMediaPackets <= 0 || numMediaPackets > MaxMediaPackets { + return nil + } + + // We allocate the biggest array of bitmasks that respects the max constraints. + var packetMasks [MaxFecPackets]util.BitArray + for i := 0; i < int(MaxFecPackets); i++ { + packetMasks[i] = util.NewBitArray(MaxMediaPackets) + } + + // Generate FEC bit mask where numFecPackets FEC packets are covering numMediaPackets Media packets. + // In the packetMasks array, each FEC packet is represented by a single BitArray, each bit in a given BitArray + // corresponds to a specific Media packet. + // Ex: Row I, Col J is set to 1 -> FEC packet I will protect media packet J. + for fecPacketIndex := uint32(0); fecPacketIndex < numFecPackets; fecPacketIndex++ { + // We use an interleaved method to determine coverage. Given N FEC packets, Media packet X will be + // covered by FEC packet X % N. + for mediaPacketIndex := uint32(0); mediaPacketIndex < numMediaPackets; mediaPacketIndex++ { + coveringFecPktIndex := mediaPacketIndex % numFecPackets + packetMasks[coveringFecPktIndex].SetBit(mediaPacketIndex, 1) + } + } + + return &ProtectionCoverage{ + packetMasks: packetMasks, + numFecPackets: numFecPackets, + numMediaPackets: numMediaPackets, + mediaPackets: mediaPackets, + } +} + +// ResetCoverage clears the underlying map so that we can reuse it for new batches of RTP packets. +func (p *ProtectionCoverage) ResetCoverage() { + for i := uint32(0); i < MaxFecPackets; i++ { + for j := uint32(0); j < MaxMediaPackets; j++ { + p.packetMasks[i].SetBit(j, 0) + } + } +} + +// GetCoveredBy returns an iterator over RTP packets that are protected by the specified Fec packet index. +func (p *ProtectionCoverage) GetCoveredBy(fecPacketIndex uint32) *util.MediaPacketIterator { + coverage := make([]uint32, 0, p.numMediaPackets) + for mediaPacketIndex := uint32(0); mediaPacketIndex < p.numMediaPackets; mediaPacketIndex++ { + if p.packetMasks[fecPacketIndex].GetBit(mediaPacketIndex) == 1 { + coverage = append(coverage, mediaPacketIndex) + } + } + return util.NewMediaPacketIterator(p.mediaPackets, coverage) +} + +// MarshalBitmasks returns the underlying bitmask that defines which media packets are protected by the +// specified fecPacketIndex. +func (p *ProtectionCoverage) MarshalBitmasks(fecPacketIndex uint32) []byte { + return p.packetMasks[fecPacketIndex].Marshal() +} + +// ExtractMask1 returns the first section of the bitmask as defined by the FEC header. +// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1 +func (p *ProtectionCoverage) ExtractMask1(fecPacketIndex uint32) uint16 { + return uint16(p.packetMasks[fecPacketIndex].GetBitValue(0, 14)) +} + +// ExtractMask2 returns the second section of the bitmask as defined by the FEC header. +// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1 +func (p *ProtectionCoverage) ExtractMask2(fecPacketIndex uint32) uint32 { + return uint32(p.packetMasks[fecPacketIndex].GetBitValue(15, 45)) +} + +// ExtractMask3 returns the third section of the bitmask as defined by the FEC header. +// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1 +func (p *ProtectionCoverage) ExtractMask3(fecPacketIndex uint32) uint64 { + return p.packetMasks[fecPacketIndex].GetBitValue(46, 109) +} diff --git a/pkg/flexfec/flexfec_encoder.go b/pkg/flexfec/flexfec_encoder.go new file mode 100644 index 00000000..e1531a47 --- /dev/null +++ b/pkg/flexfec/flexfec_encoder.go @@ -0,0 +1,184 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +// Package flexfec implements FlexFEC to recover missing RTP packets due to packet loss. +// https://datatracker.ietf.org/doc/html/rfc8627 +package flexfec + +import ( + "encoding/binary" + + "github.com/pion/interceptor/pkg/flexfec/util" + "github.com/pion/rtp" +) + +const ( + // BaseRTPHeaderSize represents the minium RTP packet header size in bytes. + BaseRTPHeaderSize = 12 + // BaseFecHeaderSize represents the minium FEC payload's header size including the + // required first mask. + BaseFecHeaderSize = 12 +) + +// FlexEncoder implements the Fec encoding mechanism for the "Flex" variant of FlexFec. +type FlexEncoder struct { + baseSN uint16 + payloadType uint8 + ssrc uint32 + coverage *ProtectionCoverage +} + +// NewFlexEncoder returns a new FlexFecEncer. +func NewFlexEncoder(baseSN uint16, payloadType uint8, ssrc uint32) *FlexEncoder { + return &FlexEncoder{ + baseSN: baseSN, + payloadType: payloadType, + ssrc: ssrc, + coverage: nil, + } +} + +// EncodeFec returns a list of generated RTP packets with FEC payloads that protect the specified mediaPackets. +// This method does not account for missing RTP packets in the mediaPackets array nor does it account for +// them being passed out of order. +func (flex *FlexEncoder) EncodeFec(mediaPackets []rtp.Packet, numFecPackets uint32) []rtp.Packet { + // Start by defining which FEC packets cover which media packets + if flex.coverage == nil { + flex.coverage = NewCoverage(mediaPackets, numFecPackets) + } else { + flex.coverage.ResetCoverage() + } + + if flex.coverage == nil { + return nil + } + + // Generate FEC payloads + fecPackets := make([]rtp.Packet, numFecPackets) + for fecPacketIndex := uint32(0); fecPacketIndex < numFecPackets; fecPacketIndex++ { + fecPackets[fecPacketIndex] = flex.encodeFlexFecPacket(fecPacketIndex) + } + + return fecPackets +} + +func (flex *FlexEncoder) encodeFlexFecPacket(fecPacketIndex uint32) rtp.Packet { + mediaPacketsIt := flex.coverage.GetCoveredBy(fecPacketIndex) + flexFecHeader := flex.encodeFlexFecHeader( + mediaPacketsIt, + flex.coverage.ExtractMask1(fecPacketIndex), + flex.coverage.ExtractMask2(fecPacketIndex), + flex.coverage.ExtractMask3(fecPacketIndex), + ) + flexFecRepairPayload := flex.encodeFlexFecRepairPayload(mediaPacketsIt.Reset()) + + return rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Padding: false, + Extension: false, + Marker: false, + PayloadType: flex.payloadType, + SequenceNumber: flex.baseSN, + Timestamp: 54243243, + SSRC: flex.ssrc, + CSRC: []uint32{}, + }, + Payload: append(flexFecHeader, flexFecRepairPayload...), + } +} + +func (flex *FlexEncoder) encodeFlexFecHeader(mediaPackets *util.MediaPacketIterator, mask1 uint16, optionalMask2 uint32, optionalMask3 uint64) []byte { + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |0|0|P|X| CC |M| PT recovery | length recovery | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | TS recovery | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SN base_i |k| Mask [0-14] | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |k| Mask [15-45] (optional) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Mask [46-109] (optional) | + | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | ... next SN base and Mask for CSRC_i in CSRC list ... | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + : Repair "Payload" follows FEC Header : + : : + */ + + // Get header size - This depends on the size of the bitmask. + headerSize := BaseFecHeaderSize + if optionalMask2 > 0 { + headerSize += 4 + } + if optionalMask3 > 0 { + headerSize += 8 + } + + // Allocate a the FlexFec header + flexFecHeader := make([]byte, headerSize) + + // XOR the relevant fields for the header + // TO DO - CHECK TO SEE IF THE MARSHALTO() call works with this. + tmpMediaPacketBuf := make([]byte, headerSize) + for mediaPackets.HasNext() { + mediaPacket := mediaPackets.Next() + n, err := mediaPacket.MarshalTo(tmpMediaPacketBuf) + + if n == 0 || err != nil { + return nil + } + + // XOR the first 2 bytes of the header: V, P, X, CC, M, PT fields + flexFecHeader[0] ^= tmpMediaPacketBuf[0] + flexFecHeader[1] ^= tmpMediaPacketBuf[1] + + // XOR the length recovery field + lengthRecoveryVal := uint16(mediaPacket.MarshalSize() - BaseRTPHeaderSize) + flexFecHeader[2] ^= uint8(lengthRecoveryVal >> 8) + flexFecHeader[3] ^= uint8(lengthRecoveryVal) + + // XOR the 5th to 8th bytes of the header: the timestamp field + flexFecHeader[4] ^= flexFecHeader[4] + flexFecHeader[5] ^= flexFecHeader[5] + flexFecHeader[6] ^= flexFecHeader[6] + flexFecHeader[7] ^= flexFecHeader[7] + } + + // Write the bitmasks to the header + binary.BigEndian.PutUint16(flexFecHeader[10:12], mask1) + + if optionalMask2 > 0 { + binary.BigEndian.PutUint32(flexFecHeader[12:16], optionalMask2) + flexFecHeader[10] |= 0b10000000 + } + if optionalMask3 > 0 { + binary.BigEndian.PutUint64(flexFecHeader[16:24], optionalMask3) + flexFecHeader[12] |= 0b10000000 + } + return flexFecHeader +} + +func (flex *FlexEncoder) encodeFlexFecRepairPayload(mediaPackets *util.MediaPacketIterator) []byte { + flexFecPayload := make([]byte, len(mediaPackets.First().Payload)) + + for mediaPackets.HasNext() { + mediaPacketPayload := mediaPackets.Next().Payload + + if len(flexFecPayload) < len(mediaPacketPayload) { + // Expected FEC packet payload is bigger that what we can currently store, + // we need to resize. + flexFecPayloadTmp := make([]byte, len(mediaPacketPayload)) + copy(flexFecPayloadTmp, flexFecPayload) + flexFecPayload = flexFecPayloadTmp + } + for byteIndex := 0; byteIndex < len(mediaPacketPayload); byteIndex++ { + flexFecPayload[byteIndex] ^= mediaPacketPayload[byteIndex] + } + } + return flexFecPayload +} diff --git a/pkg/flexfec/util/bitarray.go b/pkg/flexfec/util/bitarray.go new file mode 100644 index 00000000..d24efc20 --- /dev/null +++ b/pkg/flexfec/util/bitarray.go @@ -0,0 +1,78 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +// Package util implements utilities to better support Fec decoding / encoding. +package util + +// BitArray provides support for bitmask manipulations. +type BitArray struct { + bytes []byte +} + +// NewBitArray returns a new BitArray. It takes sizeBits as parameter which represents +// the size of the underlying bitmask. +func NewBitArray(sizeBits uint32) BitArray { + var sizeBytes uint32 + if sizeBits%8 == 0 { + sizeBytes = sizeBits / 8 + } else { + sizeBytes = sizeBits/8 + 1 + } + + return BitArray{ + bytes: make([]byte, sizeBytes), + } +} + +// SetBit sets a bit to the specified bit value on the bitmask. +func (b *BitArray) SetBit(bitIndex uint32, bitValue uint32) { + byteIndex := bitIndex / 8 + bitOffset := uint(bitIndex % 8) + + // Set the specific bit to 1 using bitwise OR + if bitValue == 1 { + b.bytes[byteIndex] |= 1 << bitOffset + } else { + b.bytes[byteIndex] |= 0 << bitOffset + } +} + +// GetBit returns the bit value at a specified index of the bitmask. +func (b *BitArray) GetBit(bitIndex uint32) uint8 { + return b.bytes[bitIndex/8] +} + +// Marshal returns the underlying bitmask. +func (b *BitArray) Marshal() []byte { + return b.bytes +} + +// GetBitValue returns a subsection of the bitmask. +func (b *BitArray) GetBitValue(i int, j int) uint64 { + if i < 0 || i >= len(b.bytes)*8 || j < 0 || j >= len(b.bytes)*8 || i > j { + return 0 + } + + startByte := i / 8 + startBit := i % 8 + endByte := j / 8 + + // Create a slice containing the bytes to extract + subArray := b.bytes[startByte : endByte+1] + + // Initialize the result value + var result uint64 + + // Loop through the bytes and concatenate the bits + for idx, b := range subArray { + if idx == 0 { + b <<= uint(startBit) + } + result |= uint64(b) + } + + // Mask the bits that are not part of the desired range + result &= (1< +// SPDX-License-Identifier: MIT + +package util + +import "github.com/pion/rtp" + +// MediaPacketIterator supports iterating through a list of media packets protected by +// a specific Fec packet. +type MediaPacketIterator struct { + mediaPackets []rtp.Packet + coveredIndices []uint32 + nextIndex int +} + +// NewMediaPacketIterator returns a new MediaPacketIterator. +func NewMediaPacketIterator(mediaPackets []rtp.Packet, coveredIndices []uint32) *MediaPacketIterator { + return &MediaPacketIterator{ + mediaPackets: mediaPackets, + coveredIndices: coveredIndices, + nextIndex: 0, + } +} + +// Reset sets the starting iterating index back to 0. +func (m *MediaPacketIterator) Reset() *MediaPacketIterator { + m.nextIndex = 0 + return m +} + +// HasNext indicates whether or not there are more media packets +// that can be iterated through. +func (m *MediaPacketIterator) HasNext() bool { + return m.nextIndex < len(m.coveredIndices) +} + +// Next returns the next media packet to iterate through. +func (m *MediaPacketIterator) Next() *rtp.Packet { + if m.nextIndex == len(m.coveredIndices) { + return nil + } + packet := m.mediaPackets[m.nextIndex] + m.nextIndex++ + return &packet +} + +// First returns the first media packet to iterate through. +func (m *MediaPacketIterator) First() *rtp.Packet { + return &m.mediaPackets[0] +}