Skip to content

Commit

Permalink
Fix overuse detection and some review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
mengelbart committed Jan 14, 2022
1 parent dce3ab5 commit c1f02ed
Show file tree
Hide file tree
Showing 14 changed files with 271 additions and 204 deletions.
17 changes: 6 additions & 11 deletions pkg/cc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
package cc

import (
"errors"

"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/gcc"
"github.com/pion/logging"
Expand All @@ -13,10 +11,7 @@ import (

const transportCCURI = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"

// ErrUnknownSession indicates that a session ID was not assigned
var ErrUnknownSession = errors.New("unknown session ID")

// Option can be used to set initial options on GCC interceptors
// Option can be used to set initial options on CC interceptors
type Option func(*Interceptor) error

// BandwidthEstimatorFactory creates new BandwidthEstimators
Expand All @@ -38,14 +33,14 @@ type BandwidthEstimator interface {
// PeerConnection with id
type NewPeerConnectionCallback func(id string, estimator BandwidthEstimator)

// InterceptorFactory is a factory for GCC interceptors
// InterceptorFactory is a factory for CC interceptors
type InterceptorFactory struct {
opts []Option
bweFactory func() (BandwidthEstimator, error)
addPeerConnection NewPeerConnectionCallback
}

// NewInterceptor returns a new GCC interceptor factory
// NewInterceptor returns a new CC interceptor factory
func NewInterceptor(factory BandwidthEstimatorFactory, opts ...Option) (*InterceptorFactory, error) {
if factory == nil {
factory = func() (BandwidthEstimator, error) {
Expand All @@ -59,21 +54,21 @@ func NewInterceptor(factory BandwidthEstimatorFactory, opts ...Option) (*Interce
}, nil
}

// OnNewPeerConnection sets a callback that is called when a new GCC interceptor
// OnNewPeerConnection sets a callback that is called when a new CC interceptor
// is created.
func (f *InterceptorFactory) OnNewPeerConnection(cb NewPeerConnectionCallback) {
f.addPeerConnection = cb
}

// NewInterceptor returns a new GCC interceptor
// NewInterceptor returns a new CC interceptor
func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
bwe, err := f.bweFactory()
if err != nil {
return nil, err
}
i := &Interceptor{
NoOp: interceptor.NoOp{},
log: logging.NewDefaultLoggerFactory().NewLogger("gcc_interceptor"),
log: logging.NewDefaultLoggerFactory().NewLogger("cc_interceptor"),
estimator: bwe,
feedback: make(chan []rtcp.Packet),
close: make(chan struct{}),
Expand Down
94 changes: 66 additions & 28 deletions pkg/gcc/adaptive_threshold.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package gcc

import (
"math"
"time"
)

const (
maxDeltas = 60
)

type adaptiveThresholdOption func(*adaptiveThreshold)

func setInitialThreshold(t time.Duration) adaptiveThresholdOption {
Expand All @@ -12,49 +17,82 @@ func setInitialThreshold(t time.Duration) adaptiveThresholdOption {
}
}

// adaptiveThreshold implements a threshold that continuously adapts depending on
// the current measurements/estimates. This is necessary to avoid starving GCC
// in the presence of concurrent TCP flows by allowing larger Queueing delays,
// when measurements/estimates increase. overuseCoefficientU and
// overuseCoefficientD define by how much the threshold adapts. We basically
// want the threshold to increase fast, if the measurement is outside [-thresh,
// thresh] and decrease slowly if it is within.
//
// See https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#section-5.4
// or [Analysis and Design of the Google Congestion Control for Web Real-time
// Communication (WebRTC)](https://c3lab.poliba.it/images/6/65/Gcc-analysis.pdf)
// for a more detailed description
type adaptiveThreshold struct {
thresh time.Duration
overuseCoefficientU float64
overuseCoefficientD float64
min time.Duration
max time.Duration
thresh time.Duration
overuseCoefficientUp float64
overuseCoefficientDown float64
min time.Duration
max time.Duration
lastUpdate time.Time
numDeltas int
}

// newAdaptiveThreshold initializes a new adaptiveThreshold with default
// values taken from draft-ietf-rmcat-gcc-02
func newAdaptiveThreshold(opts ...adaptiveThresholdOption) *adaptiveThreshold {
at := &adaptiveThreshold{
thresh: time.Duration(125 * float64(time.Microsecond)),
overuseCoefficientU: 0.01,
overuseCoefficientD: 0.00018,
min: 600 * time.Microsecond,
max: 600 * time.Millisecond,
thresh: time.Duration(12500 * float64(time.Microsecond)),
overuseCoefficientUp: 0.01,
overuseCoefficientDown: 0.00018,
min: 6 * time.Millisecond,
max: 600 * time.Millisecond,
lastUpdate: time.Time{},
numDeltas: 0,
}
for _, opt := range opts {
opt(at)
}
return at
}

func (a *adaptiveThreshold) compare(estimate, dt time.Duration) (usage, time.Duration) {
absEstimate := estimate
if absEstimate < 0 {
absEstimate = -absEstimate
func (a *adaptiveThreshold) compare(estimate, dt time.Duration) (usage, time.Duration, time.Duration) {
a.numDeltas++
if a.numDeltas < 2 {
return usageNormal, estimate, a.max
}
k := a.overuseCoefficientU
if absEstimate < a.thresh {
k = a.overuseCoefficientD
t := time.Duration(minInt(a.numDeltas, maxDeltas)) * estimate
use := usageNormal
if t > a.thresh {
use = usageOver
} else if t < -a.thresh {
use = usageUnder
}
if absEstimate-a.thresh <= 15*time.Millisecond {
factor := k * float64(dt.Microseconds()) / 1000.0
add := factor * float64((absEstimate - a.thresh).Microseconds()) / 1000.0
a.thresh += time.Duration(add * float64(time.Millisecond))
}
a.thresh = clampDuration(a.thresh, a.min, a.max)
thresh := a.thresh
a.update(t)
return use, t, thresh
}

if estimate > a.thresh {
return over, a.thresh
func (a *adaptiveThreshold) update(estimate time.Duration) {
now := time.Now()
if a.lastUpdate.IsZero() {
a.lastUpdate = now
}
absEstimate := time.Duration(math.Abs(float64(estimate.Microseconds()))) * time.Microsecond
if absEstimate > a.thresh+15*time.Millisecond {
a.lastUpdate = now
return
}
if estimate < -a.thresh {
return under, a.thresh
k := a.overuseCoefficientUp
if absEstimate < a.thresh {
k = a.overuseCoefficientDown
}
return normal, a.thresh
maxTimeDelta := 100 * time.Millisecond
timeDelta := time.Duration(minInt(int(now.Sub(a.lastUpdate).Milliseconds()), int(maxTimeDelta.Milliseconds()))) * time.Millisecond
d := absEstimate - a.thresh
add := k * float64(d.Milliseconds()) * float64(timeDelta.Milliseconds())
a.thresh += time.Duration(add) * 1000 * time.Microsecond
a.thresh = clampDuration(a.thresh, a.min, a.max)
a.lastUpdate = now
}
43 changes: 36 additions & 7 deletions pkg/gcc/adaptive_threshold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,48 +23,73 @@ func TestAdaptiveThreshold(t *testing.T) {
expected: []usage{},
options: []adaptiveThresholdOption{},
},
{
name: "firstInputIsAlwaysNormal",
in: []input{{
estimate: 1 * time.Second,
delta: 0,
}},
expected: []usage{usageNormal},
options: []adaptiveThresholdOption{},
},
{
name: "singleOver",
in: []input{
{
estimate: 0,
delta: 0,
},
{
estimate: 20 * time.Millisecond,
delta: 0,
},
},
expected: []usage{over},
expected: []usage{usageNormal, usageOver},
options: []adaptiveThresholdOption{
setInitialThreshold(10 * time.Millisecond),
},
},
{
name: "singleNormal",
in: []input{
{
estimate: 0,
delta: 0,
},
{
estimate: 5 * time.Millisecond,
delta: 0,
},
},
expected: []usage{normal},
expected: []usage{usageNormal, usageNormal},
options: []adaptiveThresholdOption{
setInitialThreshold(10 * time.Millisecond),
},
},
{
name: "singleUnder",
in: []input{
{
estimate: 0,
delta: 0,
},
{
estimate: -20 * time.Millisecond,
delta: 0,
},
},
expected: []usage{under},
expected: []usage{usageNormal, usageUnder},
options: []adaptiveThresholdOption{
setInitialThreshold(10 * time.Millisecond),
},
},
{
name: "increaseThresholdOnOveruse",
in: []input{
{
estimate: 0,
delta: 0,
},
{
estimate: 25 * time.Millisecond,
delta: 30 * time.Millisecond,
Expand All @@ -74,14 +99,18 @@ func TestAdaptiveThreshold(t *testing.T) {
delta: 30 * time.Millisecond,
},
},
expected: []usage{over, normal},
expected: []usage{usageNormal, usageOver, usageNormal},
options: []adaptiveThresholdOption{
setInitialThreshold(10 * time.Millisecond),
setInitialThreshold(40 * time.Millisecond),
},
},
{
name: "overuseAfterOveruse",
in: []input{
{
estimate: 0,
delta: 0,
},
{
estimate: 20 * time.Millisecond,
delta: 30 * time.Millisecond,
Expand All @@ -91,7 +120,7 @@ func TestAdaptiveThreshold(t *testing.T) {
delta: 30 * time.Millisecond,
},
},
expected: []usage{over, over},
expected: []usage{usageNormal, usageOver, usageOver},
options: []adaptiveThresholdOption{
setInitialThreshold(10 * time.Millisecond),
},
Expand All @@ -104,7 +133,7 @@ func TestAdaptiveThreshold(t *testing.T) {
threshold := newAdaptiveThreshold(tc.options...)
usages := []usage{}
for _, in := range tc.in {
use, _ := threshold.compare(in.estimate, in.delta)
use, _, _ := threshold.compare(in.estimate, in.delta)
usages = append(usages, use)
}
assert.Equal(t, tc.expected, usages, "%v != %v", tc.expected, usages)
Expand Down
2 changes: 1 addition & 1 deletion pkg/gcc/delay_based_bwe.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newDelayController(c delayControllerConfig) *delayController {

arrivalGroupAccumulator := newArrivalGroupAccumulator()
slopeEstimator := newSlopeEstimator(newKalman())
overuseDetector := newOveruseDetector(newAdaptiveThreshold(setInitialThreshold(12*time.Millisecond)), 10*time.Millisecond)
overuseDetector := newOveruseDetector(newAdaptiveThreshold(), 10*time.Millisecond)
rateController := newRateController(c.nowFn, c.initialBitrate, c.minBitrate, c.maxBitrate)

arrival := arrivalGroupAccumulator.run(ackPipe)
Expand Down
18 changes: 15 additions & 3 deletions pkg/gcc/loss_based_bwe.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ import (
"github.com/pion/logging"
)

const (
// constants from
// https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#section-6

increaseLossThreshold = 0.02
increaseTimeThreshold = 200 * time.Millisecond
increaseFactor = 1.05

decreaseLossThreshold = 0.1
decreaseTimeThreshold = 200 * time.Millisecond
)

// LossStats contains internal statistics of the loss based controller
type LossStats struct {
TargetBitrate int
Expand Down Expand Up @@ -77,11 +89,11 @@ func (e *lossBasedBandwidthEstimator) updateLossEstimate(results []Acknowledgmen
e.lock.Lock()
defer e.lock.Unlock()

if increaseLoss < 0.02 && time.Since(e.lastIncrease) > 200*time.Millisecond {
if increaseLoss < increaseLossThreshold && time.Since(e.lastIncrease) > increaseTimeThreshold {
e.log.Infof("loss controller increasing; averageLoss: %v, decreaseLoss: %v, increaseLoss: %v", e.averageLoss, decreaseLoss, increaseLoss)
e.lastIncrease = time.Now()
e.bitrate = clampInt(int(1.05*float64(e.bitrate)), e.minBitrate, e.maxBitrate)
} else if decreaseLoss > 0.1 && time.Since(e.lastDecrease) > 200*time.Millisecond {
e.bitrate = clampInt(int(increaseFactor*float64(e.bitrate)), e.minBitrate, e.maxBitrate)
} else if decreaseLoss > decreaseLossThreshold && time.Since(e.lastDecrease) > decreaseTimeThreshold {
e.log.Infof("loss controller decreasing; averageLoss: %v, decreaseLoss: %v, increaseLoss: %v", e.averageLoss, decreaseLoss, increaseLoss)
e.lastDecrease = time.Now()
e.bitrate = clampInt(int(float64(e.bitrate)*(1-0.5*decreaseLoss)), e.minBitrate, e.maxBitrate)
Expand Down
4 changes: 2 additions & 2 deletions pkg/gcc/minmax.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ func clampInt(b, min, max int) int {
}

func clampDuration(d, min, max time.Duration) time.Duration {
if min < d && d < max {
if min <= d && d <= max {
return d
}
if d < min {
if d <= min {
return min
}
return max
Expand Down
Loading

0 comments on commit c1f02ed

Please sign in to comment.