Skip to content

Commit

Permalink
Add DedupingBucketRateLimiter
Browse files Browse the repository at this point in the history
Our use of a `BucketRateLimiter` with `DelayingQueue` is problematic when repeatedly inserting the same items. The former continually moves out the rate limit token time with each new item (no deduping), while the latter dedupes insertions. The net effect is that under load newly inserted items end up with a token time way in the future while the queue happily drains to empty with items processed at a much lower than expected rate.

This PR adds a new `DedupingBucketRateLimiter` that maintains historic reservations per item and skips taking out a new reservation when a duplicate item is found with an outstanding reservation. Reservations are dropped from the map when `Forget` is called, which happens after the item is fully processed. This is similar to how the exponential backoff rate limiter works. See [here](https://github.com/kubernetes/client-go/blob/master/util/workqueue/default_rate_limiters.go#L82-L149).

- [x] Run new rate limiter under high execution load and verify that queue is kept full but not overflown.

Signed-off-by: Andrew Dye <[email protected]>
  • Loading branch information
andrewwdye committed Nov 18, 2024
1 parent 8e9616a commit 224c732
Show file tree
Hide file tree
Showing 6 changed files with 1,127 additions and 7 deletions.
36 changes: 36 additions & 0 deletions flytepropeller/pkg/controller/interfaces/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package interfaces

import (
"context"
"time"

"golang.org/x/time/rate"
)

//go:generate mockery-v2 --name Limiter --output ../mocks --case=snake --with-expecter
//go:generate mockery-v2 --name Reservation --output ../mocks --case=snake --with-expecter

type Limiter interface {
Allow() bool
AllowN(t time.Time, n int) bool
Burst() int
Limit() rate.Limit
Reserve() Reservation
ReserveN(t time.Time, n int) Reservation
SetBurst(newBurst int)
SetBurstAt(t time.Time, newBurst int)
SetLimit(newLimit rate.Limit)
SetLimitAt(t time.Time, newLimit rate.Limit)
Tokens() float64
TokensAt(t time.Time) float64
Wait(ctx context.Context) (err error)
WaitN(ctx context.Context, n int) (err error)
}

type Reservation interface {
Cancel()
CancelAt(t time.Time)
Delay() time.Duration
DelayFrom(t time.Time) time.Duration
OK() bool
}
Loading

0 comments on commit 224c732

Please sign in to comment.