Skip to content

Commit

Permalink
fix(buffer): Fix buckets pools (#369)
Browse files Browse the repository at this point in the history
* fix(buffer): Fix buckets pools

* Inline writing packets to avoid MTU limit

* Remove downtracks from subs
  • Loading branch information
OrlandoCo authored Jan 7, 2021
1 parent b263d30 commit cfd4eaf
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 49 deletions.
15 changes: 3 additions & 12 deletions pkg/buffer/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ type Bucket struct {
onLost func(nack []rtcp.NackPair, askKeyframe bool)
}

func NewBucket(size int, nack bool) *Bucket {
func NewBucket(buf []byte, nack bool) *Bucket {
b := &Bucket{
buf: make([]byte, size),
maxSteps: int(math.Floor(float64(size)/float64(maxPktSize))) - 1,
buf: buf,
maxSteps: int(math.Floor(float64(len(buf))/float64(maxPktSize))) - 1,
}
if nack {
b.nacker = newNACKQueue()
Expand Down Expand Up @@ -117,12 +117,3 @@ func (b *Bucket) set(sn uint16, pkt []byte) []byte {
copy(b.buf[off+2:], pkt)
return b.buf[off+2 : off+2+len(pkt)]
}

func (b *Bucket) reset() {
b.headSN = 0
b.step = 0
b.onLost = nil
if b.nacker != nil {
b.nacker.reset()
}
}
4 changes: 2 additions & 2 deletions pkg/buffer/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var TestPackets = []*rtp.Packet{
}

func Test_queue(t *testing.T) {
q := NewBucket(2*1000*1000, true)
q := NewBucket(make([]byte, 25000), true)
q.onLost = func(_ []rtcp.NackPair, _ bool) {
}

Expand Down Expand Up @@ -100,7 +100,7 @@ func Test_queue_edges(t *testing.T) {
},
},
}
q := NewBucket(2*1000*1000, true)
q := NewBucket(make([]byte, 25000), true)
q.onLost = func(_ []rtcp.NackPair, _ bool) {
}
q.headSN = 65532
Expand Down
10 changes: 4 additions & 6 deletions pkg/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,10 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, o Options) {
switch {
case strings.HasPrefix(codec.MimeType, "audio/"):
b.codecType = webrtc.RTPCodecTypeAudio
b.bucket = b.audioPool.Get().(*Bucket)
b.bucket.reset()
b.bucket = NewBucket(b.audioPool.Get().([]byte), false)
case strings.HasPrefix(codec.MimeType, "video/"):
b.codecType = webrtc.RTPCodecTypeVideo
b.bucket = b.videoPool.Get().(*Bucket)
b.bucket.reset()
b.bucket = NewBucket(b.videoPool.Get().([]byte), true)
default:
b.codecType = webrtc.RTPCodecType(0)
}
Expand Down Expand Up @@ -224,10 +222,10 @@ func (b *Buffer) Close() error {

b.closed = true
if b.bucket != nil && b.codecType == webrtc.RTPCodecTypeVideo {
b.videoPool.Put(b.bucket)
b.videoPool.Put(b.bucket.buf)
}
if b.bucket != nil && b.codecType == webrtc.RTPCodecTypeAudio {
b.audioPool.Put(b.bucket)
b.audioPool.Put(b.bucket.buf)
}
b.onClose()
close(b.packetChan)
Expand Down
2 changes: 1 addition & 1 deletion pkg/buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestNewBuffer(t *testing.T) {
}
pool := &sync.Pool{
New: func() interface{} {
return NewBucket(2*1000*1000, true)
return make([]byte, 30000)
},
}
buff := NewBuffer(123, pool, pool)
Expand Down
4 changes: 2 additions & 2 deletions pkg/buffer/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ func NewBufferFactory() *Factory {
videoPool: &sync.Pool{
New: func() interface{} {
// Make a 2MB buffer for video
return NewBucket(2*1000*1000, true)
return make([]byte, 2*1000*1000)
},
},
audioPool: &sync.Pool{
New: func() interface{} {
// Make a max 25 packets buffer for audio
return NewBucket(maxPktSize*25, false)
return make([]byte, maxPktSize*25)
},
},
rtpBuffers: make(map[uint32]*Buffer),
Expand Down
8 changes: 0 additions & 8 deletions pkg/buffer/nack.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@ func newNACKQueue() *nackQueue {
}
}

func (n *nackQueue) reset() {
n.maxSN = 0
n.counter = 0
n.cycles = 0
n.kfSN = 0
n.nacks = n.nacks[:0]
}

func (n *nackQueue) remove(sn uint16) {
var extSN uint32
if sn > n.maxSN && sn&0x8000 == 1 && n.maxSN&0x8000 == 0 {
Expand Down
16 changes: 15 additions & 1 deletion pkg/buffer/nack_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package buffer

import (
"math/rand"
"reflect"
"testing"
"time"

"github.com/pion/rtcp"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -113,7 +115,7 @@ func Test_nackQueue_push(t *testing.T) {
}
}

func Test_nackQueue_pushAndNack(t *testing.T) {
func Test_nackQueue(t *testing.T) {
type fields struct {
nacks []nack
cycles uint32
Expand All @@ -140,7 +142,19 @@ func Test_nackQueue_pushAndNack(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
n := nackQueue{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 100; i++ {
assert.NotPanics(t, func() {
n.push(uint16(r.Intn(60000)))
n.remove(uint16(r.Intn(60000)))
n.pairs()
})
}

for _, sn := range n.nacks {
print(sn.sn, ",")
}
})
}
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/sfu/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (r *router) addDownTrack(sub *Subscriber, recv Receiver) error {
return err
}

outTrack, err := NewDownTrack(webrtc.RTPCodecCapability{
downTrack, err := NewDownTrack(webrtc.RTPCodecCapability{
MimeType: codec.MimeType,
ClockRate: codec.ClockRate,
Channels: codec.Channels,
Expand All @@ -209,27 +209,28 @@ func (r *router) addDownTrack(sub *Subscriber, recv Receiver) error {
return err
}
// Create webrtc sender for the peer we are sending track to
if outTrack.transceiver, err = sub.pc.AddTransceiverFromTrack(outTrack, webrtc.RTPTransceiverInit{
if downTrack.transceiver, err = sub.pc.AddTransceiverFromTrack(downTrack, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
}); err != nil {
return err
}

// nolint:scopelint
outTrack.OnCloseHandler(func() {
if err := sub.pc.RemoveTrack(outTrack.transceiver.Sender()); err != nil {
downTrack.OnCloseHandler(func() {
if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {
log.Errorf("Error closing down track: %v", err)
} else {
sub.RemoveDownTrack(recv.StreamID(), downTrack)
sub.negotiate()
}
})

outTrack.OnBind(func() {
downTrack.OnBind(func() {
go sub.sendStreamDownTracksReports(recv.StreamID())
})

sub.AddDownTrack(recv.StreamID(), outTrack)
recv.AddDownTrack(outTrack, r.config.Simulcast.BestQualityFirst)
sub.AddDownTrack(recv.StreamID(), downTrack)
recv.AddDownTrack(downTrack, r.config.Simulcast.BestQualityFirst)
return nil
}

Expand Down
39 changes: 29 additions & 10 deletions pkg/sfu/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package sfu

import (
"io"
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -126,6 +125,23 @@ func (s *Subscriber) AddDownTrack(streamID string, downTrack *DownTrack) {
}
}

func (s *Subscriber) RemoveDownTrack(streamID string, downTrack *DownTrack) {
s.Lock()
defer s.Unlock()
if dts, ok := s.tracks[streamID]; ok {
idx := -1
for i, dt := range dts {
if dt == downTrack {
idx = i
}
}
dts[idx] = dts[len(dts)-1]
dts[len(dts)-1] = nil
dts = dts[:len(dts)-1]
s.tracks[streamID] = dts
}
}

func (s *Subscriber) AddDataChannel(label string) (*webrtc.DataChannel, error) {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -177,6 +193,10 @@ func (s *Subscriber) downTracksReports() {
for {
time.Sleep(5 * time.Second)

if s.pc.ConnectionState() == webrtc.ICETransportStateClosed {
return
}

var r []rtcp.Packet
var sd []rtcp.SourceDescriptionChunk
s.RLock()
Expand Down Expand Up @@ -214,24 +234,23 @@ func (s *Subscriber) downTracksReports() {
}
}
s.RUnlock()
i := math.Ceil(float64(len(sd)) / float64(20))
i := 0
j := 0
for i > 0 {
if i > 1 {
sd = sd[j*20 : (j+1)*20-1]
} else {
sd = sd[j*20 : cap(sd)]
for i < len(sd) {
i = (j + 1) * 15
if i >= len(sd) {
i = len(sd)
}
r = append(r, &rtcp.SourceDescription{Chunks: sd})
nsd := sd[j*15 : i]
r = append(r, &rtcp.SourceDescription{Chunks: nsd})
j++
if err := s.pc.WriteRTCP(r); err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
log.Errorf("Sending downtrack reports err: %v", err)
}
r = r[:0]
i--
j++
}
}
}
Expand Down

0 comments on commit cfd4eaf

Please sign in to comment.