From f271b1aadb4af330f5b1f11211545cc5a32098ae Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Thu, 16 Dec 2021 12:59:57 +0100 Subject: [PATCH] Add callback to get bandwith estimator on new PC --- pkg/gcc/interceptor.go | 67 +++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 37 deletions(-) diff --git a/pkg/gcc/interceptor.go b/pkg/gcc/interceptor.go index 1127410c..0ddc8f4f 100644 --- a/pkg/gcc/interceptor.go +++ b/pkg/gcc/interceptor.go @@ -2,7 +2,6 @@ package gcc import ( "errors" - "fmt" "sync" "time" @@ -18,6 +17,14 @@ const transportCCURI = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide // ErrUnknownSession indicates that a session ID was not assigned var ErrUnknownSession = errors.New("unknown session ID") +// Pacer is the interface implemented by packet pacers +type Pacer interface { + interceptor.RTPWriter + AddStream(ssrc uint32, writer interceptor.RTPWriter) + SetTargetBitrate(int) + Close() error +} + // Option can be used to set initial options on GCC interceptors type Option func(*Interceptor) error @@ -36,25 +43,32 @@ func SetPacer(pacer Pacer) Option { } } +type BandwidthEstimator interface { + GetTargetBitrate() int + GetStats() map[string]interface{} +} + +type NewPeerConnectionCallback func(id string, estimator BandwidthEstimator) + // InterceptorFactory is a factory for GCC interceptors type InterceptorFactory struct { - opts []Option - interceptors map[string]*Interceptor + opts []Option + addPeerConnection NewPeerConnectionCallback } // NewInterceptor returns a new GCC interceptor factory func NewInterceptor(opts ...Option) (*InterceptorFactory, error) { return &InterceptorFactory{ - opts: opts, - interceptors: map[string]*Interceptor{}, + opts: opts, }, nil } +func (f *InterceptorFactory) OnNewPeerConnection(cb NewPeerConnectionCallback) { + f.addPeerConnection = cb +} + // NewInterceptor returns a new GCC interceptor func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) { - if i, ok := f.interceptors[id]; ok { - return i, nil - } i := &Interceptor{ NoOp: interceptor.NoOp{}, lock: sync.Mutex{}, @@ -82,17 +96,11 @@ func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, i.loss = newLossBasedBWE(i.bitrate) i.delay = newDelayBasedBWE(i.bitrate) - f.interceptors[id] = i go i.loop() - return i, nil -} - -// GetTargetBitrate returns the target bitrate for the connection with id -func (f *InterceptorFactory) GetTargetBitrate(id string) (int, error) { - if i, ok := f.interceptors[id]; ok { - return i.getTargetBitrate(), nil + if f.addPeerConnection != nil { + f.addPeerConnection(id, i) } - return 0, fmt.Errorf("%w: %v", ErrUnknownSession, id) + return i, nil } // Stats contains internal statistics of the bandwidth estimator @@ -101,23 +109,6 @@ type Stats struct { DelayStats } -// GetStats returns a sample of the internal statistics of the interceptor -// running on the connection with id -func (f *InterceptorFactory) GetStats(id string) (*Stats, error) { - if i, ok := f.interceptors[id]; ok { - return i.getStats(), nil - } - return nil, fmt.Errorf("%w: %v", ErrUnknownSession, id) -} - -// Pacer is the interface implemented by packet pacers -type Pacer interface { - interceptor.RTPWriter - AddStream(ssrc uint32, writer interceptor.RTPWriter) - SetTargetBitrate(int) - Close() error -} - type packetAndAttributes struct { header rtp.Header payload []byte @@ -145,14 +136,16 @@ type Interceptor struct { close chan struct{} } -func (c *Interceptor) getTargetBitrate() int { +func (c *Interceptor) GetTargetBitrate() int { c.lock.Lock() defer c.lock.Unlock() return c.bitrate } -func (c *Interceptor) getStats() *Stats { - return c.latestStats +func (c *Interceptor) GetStats() map[string]interface{} { + return map[string]interface{}{ + "gcc": c.latestStats, + } } // BindRTCPReader lets you modify any incoming RTCP packets. It is called once