diff --git a/AUTHORS.txt b/AUTHORS.txt index de281684..aebbe978 100644 --- a/AUTHORS.txt +++ b/AUTHORS.txt @@ -8,6 +8,7 @@ Adam Kiss adamroach Aditya Kumar aler9 <46489434+aler9@users.noreply.github.com> +Antoine Antoine Baché Atsushi Watanabe Bobby Peck @@ -18,8 +19,8 @@ Kevin Caffrey Maksim Nesterov Mathis Engelbart Sean DuBois -ziminghua <565209960@qq.com> Steffen Vogel +ziminghua <565209960@qq.com> # List of contributors not appearing in Git history diff --git a/README.md b/README.md index c12b3c23..dcc856f1 100644 --- a/README.md +++ b/README.md @@ -34,13 +34,14 @@ by anyone. With the following tenets in mind. * [Transport Wide Congestion Control Feedback](https://github.com/pion/interceptor/tree/master/pkg/twcc) * [Packet Dump](https://github.com/pion/interceptor/tree/master/pkg/packetdump) * [Google Congestion Control](https://github.com/pion/interceptor/tree/master/pkg/gcc) +* [Stats](https://github.com/pion/interceptor/tree/master/pkg/stats) A [webrtc-stats](https://www.w3.org/TR/webrtc-stats/) compliant statistics generation +* [Interval PLI](https://github.com/pion/interceptor/tree/master/pkg/intervalpli) Generate PLI on a interval. Useful when no decoder is available. ### Planned Interceptors * Bandwidth Estimation - [NADA](https://tools.ietf.org/html/rfc8698) * JitterBuffer, re-order packets and wait for arrival * [FlexFec](https://tools.ietf.org/html/draft-ietf-payload-flexible-fec-scheme-20) -* [webrtc-stats](https://www.w3.org/TR/webrtc-stats/) compliant statistics generation * [RTCP Feedback for Congestion Control](https://datatracker.ietf.org/doc/html/rfc8888) the standardized alternative to TWCC. ### Interceptor Public API diff --git a/pkg/intervalpli/generator_interceptor.go b/pkg/intervalpli/generator_interceptor.go new file mode 100644 index 00000000..7caa844e --- /dev/null +++ b/pkg/intervalpli/generator_interceptor.go @@ -0,0 +1,175 @@ +package intervalpli + +import ( + "sync" + "time" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" +) + +// GeneratorInterceptor interceptor sends PLI packets. +// Implements PLI in a naive way: sends a PLI for each new track that support PLI, periodically. +type GeneratorInterceptor struct { + interceptor.NoOp + + interval time.Duration + streams sync.Map + immediatePLINeeded chan []uint32 + + log logging.LeveledLogger + m sync.Mutex + wg sync.WaitGroup + + close chan struct{} +} + +// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor. +func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) { + r := &GeneratorInterceptor{ + interval: 3 * time.Second, + log: logging.NewDefaultLoggerFactory().NewLogger("pli_generator"), + immediatePLINeeded: make(chan []uint32, 1), + close: make(chan struct{}), + } + + for _, opt := range opts { + if err := opt(r); err != nil { + return nil, err + } + } + + return r, nil +} + +func (r *GeneratorInterceptor) isClosed() bool { + select { + case <-r.close: + return true + default: + return false + } +} + +// Close closes the interceptor. +func (r *GeneratorInterceptor) Close() error { + defer r.wg.Wait() + r.m.Lock() + defer r.m.Unlock() + + if !r.isClosed() { + close(r.close) + } + + return nil +} + +// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method +// will be called once per packet batch. +func (r *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + r.m.Lock() + defer r.m.Unlock() + + if r.isClosed() { + return writer + } + + r.wg.Add(1) + + go r.loop(writer) + + return writer +} + +func (r *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) { + defer r.wg.Done() + + ticker, tickerChan := r.createLoopTicker() + + defer func() { + if ticker != nil { + ticker.Stop() + } + }() + + for { + select { + case ssrcs := <-r.immediatePLINeeded: + r.writePLIs(rtcpWriter, ssrcs) + + case <-tickerChan: + ssrcs := make([]uint32, 0) + + r.streams.Range(func(k, value interface{}) bool { + key, ok := k.(uint32) + if !ok { + return false + } + + ssrcs = append(ssrcs, key) + return true + }) + + r.writePLIs(rtcpWriter, ssrcs) + + case <-r.close: + return + } + } +} + +func (r *GeneratorInterceptor) createLoopTicker() (*time.Ticker, <-chan time.Time) { + if r.interval > 0 { + ticker := time.NewTicker(r.interval) + return ticker, ticker.C + } + + return nil, make(chan time.Time) +} + +func (r *GeneratorInterceptor) writePLIs(rtcpWriter interceptor.RTCPWriter, ssrcs []uint32) { + if len(ssrcs) == 0 { + return + } + + pkts := []rtcp.Packet{} + + for _, ssrc := range ssrcs { + pkts = append(pkts, &rtcp.PictureLossIndication{MediaSSRC: ssrc}) + } + + if _, err := rtcpWriter.Write(pkts, interceptor.Attributes{}); err != nil { + r.log.Warnf("failed sending: %+v", err) + } +} + +// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method +// will be called once per rtp packet. +func (r *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + if !streamSupportPli(info) { + return reader + } + + r.streams.Store(info.SSRC, nil) + // New streams need to receive a PLI as soon as possible. + r.ForcePLI(info.SSRC) + + return reader +} + +// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track. +func (r *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) { + r.streams.Delete(info.SSRC) +} + +// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might +// change in the future. The returned method will be called once per packet batch. +func (r *GeneratorInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { + return reader +} + +// ForcePLI sends a PLI request to the tracks matching the provided SSRCs. +func (r *GeneratorInterceptor) ForcePLI(ssrc ...uint32) { + r.immediatePLINeeded <- ssrc +} diff --git a/pkg/intervalpli/generator_interceptor_test.go b/pkg/intervalpli/generator_interceptor_test.go new file mode 100644 index 00000000..96a28f5f --- /dev/null +++ b/pkg/intervalpli/generator_interceptor_test.go @@ -0,0 +1,84 @@ +package intervalpli + +import ( + "testing" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/internal/test" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/stretchr/testify/assert" +) + +func TestPLIGeneratorInterceptor_Unsupported(t *testing.T) { + i, err := NewGeneratorInterceptor( + GeneratorInterval(time.Millisecond*10), + GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ) + assert.Nil(t, err) + + streamSSRC := uint32(123456) + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: streamSSRC, + MimeType: "video/h264", + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + timeout := time.NewTimer(100 * time.Millisecond) + defer timeout.Stop() + select { + case <-timeout.C: + return + case <-stream.WrittenRTCP(): + assert.FailNow(t, "should not receive any PIL") + } +} + +func TestPLIGeneratorInterceptor(t *testing.T) { + i, err := NewGeneratorInterceptor( + GeneratorInterval(time.Second*1), + GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ) + assert.Nil(t, err) + + streamSSRC := uint32(123456) + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: streamSSRC, + ClockRate: 90000, + MimeType: "video/h264", + RTCPFeedback: []interceptor.RTCPFeedback{ + {Type: "nack", Parameter: "pli"}, + }, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + sr, ok := pkts[0].(*rtcp.PictureLossIndication) + assert.True(t, ok) + assert.Equal(t, &rtcp.PictureLossIndication{MediaSSRC: streamSSRC}, sr) + + // Should not have another packet immediately... + func() { + timeout := time.NewTimer(100 * time.Millisecond) + defer timeout.Stop() + select { + case <-timeout.C: + return + case <-stream.WrittenRTCP(): + assert.FailNow(t, "should not receive any PIL") + } + }() + + // ... but should receive one 1sec later. + pkts = <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + sr, ok = pkts[0].(*rtcp.PictureLossIndication) + assert.True(t, ok) + assert.Equal(t, &rtcp.PictureLossIndication{MediaSSRC: streamSSRC}, sr) +} diff --git a/pkg/intervalpli/generator_option.go b/pkg/intervalpli/generator_option.go new file mode 100644 index 00000000..d717ad2e --- /dev/null +++ b/pkg/intervalpli/generator_option.go @@ -0,0 +1,26 @@ +package intervalpli + +import ( + "time" + + "github.com/pion/logging" +) + +// GeneratorOption can be used to configure GeneratorInterceptor. +type GeneratorOption func(r *GeneratorInterceptor) error + +// GeneratorLog sets a logger for the interceptor. +func GeneratorLog(log logging.LeveledLogger) GeneratorOption { + return func(r *GeneratorInterceptor) error { + r.log = log + return nil + } +} + +// GeneratorInterval sets send interval for the interceptor. +func GeneratorInterval(interval time.Duration) GeneratorOption { + return func(r *GeneratorInterceptor) error { + r.interval = interval + return nil + } +} diff --git a/pkg/intervalpli/pli.go b/pkg/intervalpli/pli.go new file mode 100644 index 00000000..10ea6a3b --- /dev/null +++ b/pkg/intervalpli/pli.go @@ -0,0 +1,14 @@ +// Package intervalpli is an interceptor that requests PLI on a static interval. Useful when bridging protocols that don't have receiver feedback +package intervalpli + +import "github.com/pion/interceptor" + +func streamSupportPli(info *interceptor.StreamInfo) bool { + for _, fb := range info.RTCPFeedback { + if fb.Type == "nack" && fb.Parameter == "pli" { + return true + } + } + + return false +}