Skip to content

Commit

Permalink
Add callback to get bandwith estimator on new PC
Browse files Browse the repository at this point in the history
  • Loading branch information
mengelbart committed Dec 16, 2021
1 parent baad2da commit f271b1a
Showing 1 changed file with 30 additions and 37 deletions.
67 changes: 30 additions & 37 deletions pkg/gcc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gcc

import (
"errors"
"fmt"
"sync"
"time"

Expand All @@ -18,6 +17,14 @@ const transportCCURI = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide
// ErrUnknownSession indicates that a session ID was not assigned
var ErrUnknownSession = errors.New("unknown session ID")

// Pacer is the interface implemented by packet pacers
type Pacer interface {
interceptor.RTPWriter
AddStream(ssrc uint32, writer interceptor.RTPWriter)
SetTargetBitrate(int)
Close() error
}

// Option can be used to set initial options on GCC interceptors
type Option func(*Interceptor) error

Expand All @@ -36,25 +43,32 @@ func SetPacer(pacer Pacer) Option {
}
}

type BandwidthEstimator interface {
GetTargetBitrate() int
GetStats() map[string]interface{}
}

type NewPeerConnectionCallback func(id string, estimator BandwidthEstimator)

// InterceptorFactory is a factory for GCC interceptors
type InterceptorFactory struct {
opts []Option
interceptors map[string]*Interceptor
opts []Option
addPeerConnection NewPeerConnectionCallback
}

// NewInterceptor returns a new GCC interceptor factory
func NewInterceptor(opts ...Option) (*InterceptorFactory, error) {
return &InterceptorFactory{
opts: opts,
interceptors: map[string]*Interceptor{},
opts: opts,
}, nil
}

func (f *InterceptorFactory) OnNewPeerConnection(cb NewPeerConnectionCallback) {
f.addPeerConnection = cb
}

// NewInterceptor returns a new GCC interceptor
func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
if i, ok := f.interceptors[id]; ok {
return i, nil
}
i := &Interceptor{
NoOp: interceptor.NoOp{},
lock: sync.Mutex{},
Expand Down Expand Up @@ -82,17 +96,11 @@ func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor,
i.loss = newLossBasedBWE(i.bitrate)
i.delay = newDelayBasedBWE(i.bitrate)

f.interceptors[id] = i
go i.loop()
return i, nil
}

// GetTargetBitrate returns the target bitrate for the connection with id
func (f *InterceptorFactory) GetTargetBitrate(id string) (int, error) {
if i, ok := f.interceptors[id]; ok {
return i.getTargetBitrate(), nil
if f.addPeerConnection != nil {
f.addPeerConnection(id, i)
}
return 0, fmt.Errorf("%w: %v", ErrUnknownSession, id)
return i, nil
}

// Stats contains internal statistics of the bandwidth estimator
Expand All @@ -101,23 +109,6 @@ type Stats struct {
DelayStats
}

// GetStats returns a sample of the internal statistics of the interceptor
// running on the connection with id
func (f *InterceptorFactory) GetStats(id string) (*Stats, error) {
if i, ok := f.interceptors[id]; ok {
return i.getStats(), nil
}
return nil, fmt.Errorf("%w: %v", ErrUnknownSession, id)
}

// Pacer is the interface implemented by packet pacers
type Pacer interface {
interceptor.RTPWriter
AddStream(ssrc uint32, writer interceptor.RTPWriter)
SetTargetBitrate(int)
Close() error
}

type packetAndAttributes struct {
header rtp.Header
payload []byte
Expand Down Expand Up @@ -145,14 +136,16 @@ type Interceptor struct {
close chan struct{}
}

func (c *Interceptor) getTargetBitrate() int {
func (c *Interceptor) GetTargetBitrate() int {
c.lock.Lock()
defer c.lock.Unlock()
return c.bitrate
}

func (c *Interceptor) getStats() *Stats {
return c.latestStats
func (c *Interceptor) GetStats() map[string]interface{} {
return map[string]interface{}{
"gcc": c.latestStats,
}
}

// BindRTCPReader lets you modify any incoming RTCP packets. It is called once
Expand Down

0 comments on commit f271b1a

Please sign in to comment.