diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go index 02f7db115..2bcfb2cdc 100644 --- a/pkg/buffer/buffer.go +++ b/pkg/buffer/buffer.go @@ -268,6 +268,10 @@ func (b *Buffer) onFeedback(fn func(fb []rtcp.Packet)) { b.feedbackCB = fn } +func (b *Buffer) onNack(fn func(fb *rtcp.TransportLayerNack)) { + b.pktQueue.onLost = fn +} + func (b *Buffer) GetMediaSSRC() uint32 { return b.mediaSSRC } diff --git a/pkg/buffer/interceptor.go b/pkg/buffer/interceptor.go index b2262ba46..7b6ea0e4f 100644 --- a/pkg/buffer/interceptor.go +++ b/pkg/buffer/interceptor.go @@ -4,6 +4,8 @@ import ( "sync" "sync/atomic" + log "github.com/pion/ion-log" + "github.com/pion/interceptor" "github.com/pion/rtcp" "github.com/pion/rtp" @@ -84,7 +86,9 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor. func (i *Interceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { i.twcc.onFeedback = func(pkts []rtcp.Packet) { - writer.Write(pkts, nil) + if _, err := writer.Write(pkts, nil); err != nil { + log.Errorf("Writing buffer twcc rtcp err: %v", err) + } } i.rtcpWriter.Store(writer) return writer @@ -127,12 +131,21 @@ func (i *Interceptor) newBuffer(info *interceptor.StreamInfo) *Buffer { buffer := NewBuffer(info, Options{}) buffer.onFeedback(func(pkts []rtcp.Packet) { if p, ok := i.rtcpWriter.Load().(interceptor.RTCPWriter); ok { - p.Write(pkts, nil) + if _, err := p.Write(pkts, nil); err != nil { + log.Errorf("Writing buffer rtcp err: %v", err) + } } }) buffer.onTransportWideCC(func(sn uint16, timeNS int64, marker bool) { i.twcc.push(sn, timeNS, marker) }) + buffer.onNack(func(fb *rtcp.TransportLayerNack) { + if p, ok := i.rtcpWriter.Load().(interceptor.RTCPWriter); ok { + if _, err := p.Write([]rtcp.Packet{fb}, nil); err != nil { + log.Errorf("Writing buffer rtcp err: %v", err) + } + } + }) i.Lock() i.buffers = append(i.buffers, buffer) i.twcc.mSSRC = info.SSRC diff --git a/pkg/sfu/nack.go b/pkg/sfu/nack.go index 4b5c068f3..b70deb5d7 100644 --- a/pkg/sfu/nack.go +++ b/pkg/sfu/nack.go @@ -27,9 +27,9 @@ func newNACKList() *nackList { } } -func (n *nackList) getNACKSeqNo(seqno []uint16) []uint16 { +func (n *nackList) getNACKSeqNo(seqNo []uint16) []uint16 { packets := make([]uint16, 0, 17) - for _, sn := range seqno { + for _, sn := range seqNo { if nack, ok := n.nacks[sn]; !ok { n.nacks[sn] = n.ll.PushBack(NACK{sn, time.Now().UnixNano()}) packets = append(packets, sn) diff --git a/pkg/sfu/subscriber.go b/pkg/sfu/subscriber.go index 8fe81920b..130f00806 100644 --- a/pkg/sfu/subscriber.go +++ b/pkg/sfu/subscriber.go @@ -2,6 +2,7 @@ package sfu import ( "io" + "math" "sync" "sync/atomic" "time" @@ -173,6 +174,7 @@ func (s *Subscriber) Close() error { func (s *Subscriber) downTracksReports() { for { time.Sleep(5 * time.Second) + var r []rtcp.Packet var sd []rtcp.SourceDescriptionChunk s.RLock() @@ -210,7 +212,14 @@ func (s *Subscriber) downTracksReports() { } } s.RUnlock() - if len(r) > 0 { + i := math.Ceil(float64(len(sd)) / float64(20)) + j := 0 + for i > 0 { + if i > 1 { + sd = sd[j*20 : (j+1)*20-1] + } else { + sd = sd[j*20 : cap(sd)] + } r = append(r, &rtcp.SourceDescription{Chunks: sd}) if err := s.pc.WriteRTCP(r); err != nil { if err == io.EOF || err == io.ErrClosedPipe { @@ -218,6 +227,9 @@ func (s *Subscriber) downTracksReports() { } log.Errorf("Sending downtrack reports err: %v", err) } + r = r[:0] + i-- + j++ } } }