Skip to content

Commit

Permalink
fix: rate intermittent flaky tests (livekit#721)
Browse files Browse the repository at this point in the history

changes:
  * add retries to advance clock to unblock rate limiters
  * rewrite some tests to be more robust and simpler
  * add assertCount with noise to consider oversubscribed CI machines
  * remove parallel tests, reduce CPU pressure on CI machines
  * replaced assert with require
  • Loading branch information
lherman-cs authored May 24, 2024
1 parent 6410d00 commit 2ec622e
Showing 1 changed file with 65 additions and 40 deletions.
105 changes: 65 additions & 40 deletions utils/rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"go.uber.org/atomic"

"github.com/benbjohnson/clock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -132,6 +131,7 @@ type testRunner interface {
// assertCountAt asserts the limiters have Taken() a number of times at the given time.
// It's a thin wrapper around afterFunc to reduce boilerplate code.
assertCountAt(d time.Duration, count int)
assertCountAtWithNoise(d time.Duration, count int, noise int)
// afterFunc executes a func at a given time.
// not using clock.AfterFunc because andres-erbsen/clock misses a nap there.
afterFunc(d time.Duration, fn func())
Expand Down Expand Up @@ -191,7 +191,22 @@ func runTest(t *testing.T, fn func(testRunner)) {
defer r.wg.Wait()

fn(&r)
r.clock.Add(r.maxDuration)

// it's possible that there are some goroutines still waiting
// in taking the bandwidth. We need to keep moving the clock forward
// until all goroutines are finished
go func() {
ticker := time.NewTicker(5 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ticker.C:
r.clock.Add(r.maxDuration)
case <-r.doneCh:
}
}
}()
})
}
}
Expand Down Expand Up @@ -237,7 +252,17 @@ func (r *runnerImpl) takeOnceAfter(d time.Duration, rl Limiter) {
func (r *runnerImpl) assertCountAt(d time.Duration, count int) {
r.wg.Add(1)
r.afterFunc(d, func() {
assert.Equal(r.t, int32(count), r.count.Load(), "count not as expected")
require.Equal(r.t, int32(count), r.count.Load(), "count not as expected")
r.wg.Done()
})
}

// assertCountAtWithNoise like assertCountAt but also considers possible noise in CI
func (r *runnerImpl) assertCountAtWithNoise(d time.Duration, count int, noise int) {
r.wg.Add(1)
r.afterFunc(d, func() {
require.InDelta(r.t, count, int(r.count.Load()), float64(noise),
"expected count to be within noise tolerance")
r.wg.Done()
})
}
Expand Down Expand Up @@ -270,7 +295,6 @@ func (r *runnerImpl) goWait(fn func()) {
}

func TestRateLimiter(t *testing.T) {
t.Parallel()
runTest(t, func(r testRunner) {
rl := r.createLimiter(100, WithoutSlack)

Expand All @@ -280,15 +304,14 @@ func TestRateLimiter(t *testing.T) {
r.startTaking(rl)
r.startTaking(rl)

r.assertCountAt(1*time.Second, 100)
r.assertCountAt(2*time.Second, 200)
r.assertCountAt(3*time.Second, 300)
r.assertCountAtWithNoise(1*time.Second, 100, 2)
r.assertCountAtWithNoise(2*time.Second, 200, 2)
r.assertCountAtWithNoise(3*time.Second, 300, 2)
})
}

func TestDelayedRateLimiter(t *testing.T) {
t.Skip(UnstableTest)
t.Parallel()
runTest(t, func(r testRunner) {
slow := r.createLimiter(10, WithoutSlack)
fast := r.createLimiter(100, WithoutSlack)
Expand All @@ -307,7 +330,6 @@ func TestDelayedRateLimiter(t *testing.T) {
}

func TestPer(t *testing.T) {
t.Parallel()
runTest(t, func(r testRunner) {
rl := r.createLimiter(7, WithoutSlack, Per(time.Minute))

Expand All @@ -322,7 +344,6 @@ func TestPer(t *testing.T) {

// TestInitial verifies that the initial sequence is scheduled as expected.
func TestInitial(t *testing.T) {
t.Parallel()
tests := []struct {
msg string
opts []Option
Expand All @@ -339,39 +360,37 @@ func TestInitial(t *testing.T) {
for _, tt := range tests {
t.Run(tt.msg, func(t *testing.T) {
runTest(t, func(r testRunner) {
perRequest := 100 * time.Millisecond
rl := r.createLimiter(10, tt.opts...)

var (
clk = r.getClock()
prev = clk.Now()

results = make(chan time.Time)
results = make(chan time.Time, 3)
have []time.Duration
startWg sync.WaitGroup
)
startWg.Add(3)

for i := 0; i < 3; i++ {
go func() {
startWg.Done()
results <- rl.Take()
}()
}
results <- rl.Take()
clk.Add(perRequest)

startWg.Wait()
clk.Add(time.Second)
results <- rl.Take()
clk.Add(perRequest)

results <- rl.Take()
clk.Add(perRequest)

for i := 0; i < 3; i++ {
ts := <-results
have = append(have, ts.Sub(prev))
prev = ts
}

assert.Equal(t,
require.Equal(t,
[]time.Duration{
0,
time.Millisecond * 100,
time.Millisecond * 100,
perRequest,
perRequest,
},
have,
"bad timestamps for inital takes",
Expand All @@ -382,23 +401,34 @@ func TestInitial(t *testing.T) {
}

func TestMaxSlack(t *testing.T) {
t.Parallel()
runTest(t, func(r testRunner) {
clock := r.getClock()
rl := r.createLimiter(1, WithSlack(1))
rl.Take()
clock.Add(time.Second)
rl.Take()
clock.Add(time.Second)
rl.Take()

r.takeOnceAfter(time.Nanosecond, rl)
r.takeOnceAfter(2*time.Second+1*time.Nanosecond, rl)
r.takeOnceAfter(2*time.Second+2*time.Nanosecond, rl)
r.takeOnceAfter(2*time.Second+3*time.Nanosecond, rl)
r.takeOnceAfter(2*time.Second+4*time.Nanosecond, rl)
doneCh := make(chan struct{})
go func() {
rl.Take()
close(doneCh)
}()

r.assertCountAt(3*time.Second, 3)
r.assertCountAt(10*time.Second, 5)
select {
case <-doneCh:
require.Fail(t, "expect rate limiter to be waiting")
case <-time.After(time.Millisecond):
// clean up ratelimiter waiting for take
clock.Add(time.Second)
}
})
}

func TestSlack(t *testing.T) {
t.Parallel()
t.Skip(UnstableTest)

// To simulate slack, we combine two limiters.
// - First, we start a single goroutine with both of them,
// during this time the slow limiter will dominate,
Expand Down Expand Up @@ -469,11 +499,6 @@ func TestSlack(t *testing.T) {
}

for _, tt := range tests {
cfg := buildConfig(tt.opt)
if cfg.slack >= 100 {
t.Skip(UnstableTest)
}

t.Run(tt.msg, func(t *testing.T) {
runTest(t, func(r testRunner) {
slow := r.createLimiter(10, WithoutSlack)
Expand All @@ -487,8 +512,8 @@ func TestSlack(t *testing.T) {
})

// limiter with 10hz dominates here - we're always at 10.
r.assertCountAt(1*time.Second, 10)
r.assertCountAt(3*time.Second, tt.want)
r.assertCountAtWithNoise(1*time.Second, 10, 2)
r.assertCountAtWithNoise(3*time.Second, tt.want, 2)
})
})
}
Expand Down

0 comments on commit 2ec622e

Please sign in to comment.