Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Our use of a `BucketRateLimiter` with `DelayingQueue` is problematic when repeatedly inserting the same items. The former continually moves out the rate limit token time with each new item (no deduping), while the latter dedupes insertions. The net effect is that under load newly inserted items end up with a token time way in the future while the queue happily drains to empty with items processed at a much lower than expected rate. This PR adds a new `DedupingBucketRateLimiter` that maintains historic reservations per item and skips taking out a new reservation when a duplicate item is found with an outstanding reservation. Reservations are dropped from the map when `Forget` is called, which happens after the item is fully processed. This is similar to how the exponential backoff rate limiter works. See [here](https://github.com/kubernetes/client-go/blob/master/util/workqueue/default_rate_limiters.go#L82-L149). - [x] Run new rate limiter under high execution load and verify that queue is kept full but not overflown.
- Loading branch information