-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathadaptive.go
414 lines (365 loc) · 14.5 KB
/
adaptive.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
package bulwark
import (
"context"
"errors"
"math/rand"
"sync"
"time"
"github.com/deixis/faults"
)
const (
// K is the default accept multiplier, which is used to determine the number
// of requests that are allowed to reach the backend.
//
// A value of 2 means that the throttle will allow twice as many requests to
// actually reach the backend as it believes will succeed.
K = 2
// MinRPS is the minimum number of requests per second that the adaptive
// throttle will allow (approximately) through to the upstream, even if every
// request is failing.
MinRPS = 1
)
// AdaptiveThrottle is used in a client to throttle requests to a backend as it becomes unhealthy to
// help it recover from overload more quickly. Because backends must expend resources to reject
// requests over their capacity it is vital for clients to ease off on sending load when they are
// in trouble, lest the backend spend all of its resources on rejecting requests and have none left
// over to actually serve any.
//
// The adaptive throttle works by tracking the success rate of requests over some time interval
// (usually a minute or so), and randomly rejecting requests without sending them to avoid sending
// too much more than the rate that are expected to actually be successful. Some slop is included,
// because even if the backend is serving zero requests successfully, we do need to occasionally
// send it requests to learn when it becomes healthy again.
//
// More on adaptive throttles in https://sre.google/sre-book/handling-overload/
type AdaptiveThrottle struct {
m sync.Mutex
k float64
minPerWindow float64
requests []windowedCounter
accepts []windowedCounter
}
// NewAdaptiveThrottle returns an AdaptiveThrottle.
//
// priorities is the number of priorities that the throttle will accept. Giving a priority outside
// of `[0, priorities)` will panic.
func NewAdaptiveThrottle(priorities int, options ...AdaptiveThrottleOption) *AdaptiveThrottle {
opts := adaptiveThrottleOptions{
d: time.Minute,
k: K,
minRate: MinRPS,
}
for _, option := range options {
option.f(&opts)
}
now := Now()
requests := make([]windowedCounter, priorities)
accepts := make([]windowedCounter, priorities)
for i := range requests {
requests[i] = newWindowedCounter(now, opts.d/10, 10)
accepts[i] = newWindowedCounter(now, opts.d/10, 10)
}
return &AdaptiveThrottle{
k: opts.k,
requests: requests,
accepts: accepts,
minPerWindow: opts.minRate * opts.d.Seconds(),
}
}
// Throttle sends a request to the backend when the adaptive throttle allows it.
// The request is throttled based on the priority of the request.
//
// The default priority is used when the given `ctx` does not have a priority set.
// The `ctx` can set the priority using `WithPriority`.
//
// When `throttledFn` returns an error, the error is considered as a rejection
// when `isErrorAccepted` returns false or when the error is wrapped in a
// `RejectedError`.
//
// If there are enough rejections within a given time window, further calls to
// `Throttle` may begin returning `ClientSideRejectionError` immediately
// without invoking `throttledFn`. Lower-priority requests are preferred to be
// rejected first.
func (t *AdaptiveThrottle) Throttle(
ctx context.Context, defaultPriority Priority, fn throttledFn, fallbackFn ...fallbackFn,
) error {
priority := PriorityFromContext(ctx, defaultPriority)
now := Now()
rejectionProbability := t.rejectionProbability(priority, now)
if rand.Float64() < rejectionProbability {
// As Bulwark starts rejecting requests, requests will continue to exceed
// accepts. While it may seem counterintuitive, given that locally rejected
// requests aren't actually propagated, this is the preferred behavior. As the
// rate at which the application attempts requests to Bulwark grows
// (relative to the rate at which the backend accepts them), we want to
// increase the probability of dropping new requests.
t.reject(priority, now)
if len(fallbackFn) > 0 {
return fallbackFn[0](ctx, ClientSideRejectionError, true)
}
return ClientSideRejectionError
}
err := fn(ctx)
now = Now()
switch {
case err == nil:
t.accept(priority, now)
case errors.Is(err, errRejected{}):
// Unwrap error to return the original error to the caller
err = err.(errRejected).inner
fallthrough
case IsRejectedError(err):
t.reject(priority, now)
default:
t.accept(priority, now)
}
if err != nil && len(fallbackFn) > 0 {
return fallbackFn[0](ctx, err, false)
}
return err
}
// rejectionProbability returns the probability that a request of the given
// priority will be rejected. The result is clamped to the range [0, 1].
//
// It uses the formula from https://sre.google/sre-book/handling-overload/ to
// calculate the probability that a request will be rejected. The formula is:
//
// clamp(0, (requests - k * accepts) / (requests + minPerWindow), 1)
//
// Where:
// - requests is the number of requests of the given priority in the last d time window.
// - accepts is the number of requests of the given priority that were accepted in the last d time
// window.
// - k is the ratio of the measured success rate and the rate that the throttle will admit.
// - minPerWindow is the minimum number of requests per second that the adaptive throttle will allow
// (approximately) through to the upstream, even if every request is failing.
func (t *AdaptiveThrottle) rejectionProbability(p Priority, now time.Time) float64 {
t.m.Lock()
requests := float64(t.requests[int(p)].get(now))
accepts := float64(t.accepts[int(p)].get(now))
for i := 0; i < int(p); i++ {
// Also count non-accepted requests for every higher priority as
// non-accepted for this priority.
requests += float64(t.requests[i].get(now) - t.accepts[i].get(now))
}
t.m.Unlock()
return clamp(0, (requests-t.k*accepts)/(requests+t.minPerWindow), 1)
}
// accept records that a request of the given priority was accepted.
func (t *AdaptiveThrottle) accept(p Priority, now time.Time) {
t.m.Lock()
t.requests[int(p)].add(now, 1)
t.accepts[int(p)].add(now, 1)
t.m.Unlock()
}
// reject records that a request of the given priority was rejected.
func (t *AdaptiveThrottle) reject(p Priority, now time.Time) {
t.m.Lock()
t.requests[int(p)].add(now, 1)
t.m.Unlock()
}
// Additional options for the AdaptiveThrottle type. These options do not frequently need to be
// tuned as the defaults work in a majority of cases.
type AdaptiveThrottleOption struct {
f func(*adaptiveThrottleOptions)
}
type adaptiveThrottleOptions struct {
k float64
minRate float64
d time.Duration
isErrorAccepted func(err error) bool
}
// WithAdaptiveThrottleRatio sets the ratio of the measured success rate and the rate that the throttle
// will admit. For example, when k is 2 the throttle will allow twice as many requests to actually
// reach the backend as it believes will succeed. Higher values of k mean that the throttle will
// react more slowly when a backend becomes unhealthy, but react more quickly when it becomes
// healthy again, and will allow more load to an unhealthy backend. k=2 is usually a good place to
// start, but backends that serve "cheap" requests (e.g. in-memory caches) may need a lower value.
func WithAdaptiveThrottleRatio(k float64) AdaptiveThrottleOption {
return AdaptiveThrottleOption{func(opts *adaptiveThrottleOptions) {
opts.k = k
}}
}
// WithAdaptiveThrottleMinimumRate sets the minimum number of requests per second that the adaptive
// throttle will allow (approximately) through to the upstream, even if every request is failing.
// This is important because this is how the adaptive throttle 'learns' when the upstream becomes
// healthy again.
func WithAdaptiveThrottleMinimumRate(x float64) AdaptiveThrottleOption {
return AdaptiveThrottleOption{func(opts *adaptiveThrottleOptions) {
opts.minRate = x
}}
}
// WithAdaptiveThrottleWindow sets the time window over which the throttle remembers requests for use in
// figuring out the success rate.
func WithAdaptiveThrottleWindow(d time.Duration) AdaptiveThrottleOption {
return AdaptiveThrottleOption{func(opts *adaptiveThrottleOptions) {
opts.d = d
}}
}
// Deprecated: Wrap errors with RejectedError instead and use the global DefaultRejectedErrors.
//
// WithAcceptedErrors sets the function that determines whether an error should
// be considered for the throttling. When the call to fn returns true, the error
// is not counted towards the throttling.
func WithAcceptedErrors(fn func(err error) bool) AdaptiveThrottleOption {
return AdaptiveThrottleOption{func(opts *adaptiveThrottleOptions) {
opts.isErrorAccepted = fn
}}
}
func Throttle[T any](
ctx context.Context,
at *AdaptiveThrottle,
defaultPriority Priority,
throttledFn throttledArgsFn[T],
fallbackFn ...fallbackArgsFn[T],
) (T, error) {
priority := PriorityFromContext(ctx, defaultPriority)
now := Now()
rejectionProbability := at.rejectionProbability(priority, now)
if rand.Float64() < rejectionProbability {
// As Bulwark starts rejecting requests, requests will continue to exceed
// accepts. While it may seem counterintuitive, given that locally rejected
// requests aren't actually propagated, this is the preferred behavior. As the
// rate at which the application attempts requests to Bulwark grows
// (relative to the rate at which the backend accepts them), we want to
// increase the probability of dropping new requests.
at.reject(priority, now)
var zero T
if len(fallbackFn) > 0 {
return fallbackFn[0](ctx, ClientSideRejectionError, true)
}
return zero, ClientSideRejectionError
}
t, err := throttledFn(ctx)
now = Now()
switch {
case err == nil:
at.accept(priority, now)
case errors.Is(err, errRejected{}):
// Unwrap error to return the original error to the caller
err = err.(errRejected).inner
fallthrough
case IsRejectedError(err):
at.reject(priority, now)
default:
at.accept(priority, now)
}
if err != nil && len(fallbackFn) > 0 {
return fallbackFn[0](ctx, err, false)
}
return t, err
}
// WithAdaptiveThrottle is used to send a request to a backend using the given AdaptiveThrottle for
// client-rejections.
//
// If f returns an error, at considers this to be a rejection unless it is wrapped with
// AcceptedError(). If there are enough rejections within a given time window, further calls to
// WithAdaptiveThrottle may begin returning ErrClientRejection immediately without invoking f. The
// rate at which this happens depends on the error rate of f.
//
// WithAdaptiveThrottle will prefer to reject lower-priority requests if it can.
func WithAdaptiveThrottle[T any](
at *AdaptiveThrottle,
priority Priority,
throttledFn func() (T, error),
) (T, error) {
now := Now()
rejectionProbability := at.rejectionProbability(priority, now)
if rand.Float64() < rejectionProbability {
// As Bulwark starts rejecting requests, requests will continue to exceed
// accepts. While it may seem counterintuitive, given that locally rejected
// requests aren't actually propagated, this is the preferred behavior. As the
// rate at which the application attempts requests to Bulwark grows
// (relative to the rate at which the backend accepts them), we want to
// increase the probability of dropping new requests.
at.reject(priority, now)
var zero T
return zero, ClientSideRejectionError
}
t, err := throttledFn()
now = Now()
switch {
case err == nil:
at.accept(priority, now)
case errors.Is(err, errRejected{}):
at.reject(priority, now)
// Unwrap error to return the original error to the caller
err = err.(errRejected).inner
case IsRejectedError(err):
at.reject(priority, now)
default:
at.accept(priority, now)
}
return t, err
}
// RejectedError wraps an error to indicate that the error should be considered
// for the throttling.
//
// Any error that indicates that the backend is unhealthy should be wrapped with
// `RejectedError`. But other errors, such as bad requests, authentication failures,
// pre-condition failures, etc., should not be wrapped with `RejectedError`.
func RejectedError(err error) error { return errRejected{inner: err} }
type errRejected struct{ inner error }
func (err errRejected) Error() string { return err.inner.Error() }
func (err errRejected) Unwrap() error { return err.inner }
func (err errRejected) Is(target error) bool {
_, ok := target.(errRejected)
return ok
}
// clamp clamps x to the range [min, max].
func clamp(min, x, max float64) float64 {
if x < min {
return min
}
if x > max {
return max
}
return x
}
type (
throttledFn func(ctx context.Context) error
fallbackFn func(ctx context.Context, err error, local bool) error
throttledArgsFn[T any] func(ctx context.Context) (T, error)
fallbackArgsFn[T any] func(ctx context.Context, err error, local bool) (T, error)
)
var (
// Deprecated: Override `IsRejectedError` instead and/or wrap errors with `RejectedError`.
//
// DefaultAcceptedErrors is the default function used to determine whether
// an error should be considered for the throttling.
DefaultAcceptedErrors = func(err error) bool {
return errors.Is(err, context.Canceled) ||
faults.IsUnauthenticated(err) ||
faults.IsPermissionDenied(err) ||
faults.IsBad(err) ||
faults.IsAborted(err) ||
faults.IsNotFound(err) ||
faults.IsFailedPrecondition(err) ||
faults.IsUnimplemented(err)
}
// DefaultRejectedError is the default function used to determine whether
// an error should be considered for the throttling.
DefaultRejectedError = func(err error) bool {
return faults.IsUnavailable(err) ||
faults.IsResourceExhausted(err)
}
// Deprecated: Use `ClientSideRejectionError` instead.
//
// DefaultClientSideRejectionError is the default error returned when the
// client rejects the request due to the adaptive throttle.
DefaultClientSideRejectionError = faults.Unavailable(time.Second)
// ClientSideRejectionError is the error returned when the client rejects the
// request due to the adaptive throttle.
ClientSideRejectionError = DefaultClientSideRejectionError
// IsRejectedError is a global function that determines whether an error
// should be considered for the throttling. Any error that indicates that the
// backend is unhealthy should be considered for the throttling.
//
// This function can be overridden to customise the behaviour of Bulwark.
// For example, it is possible to use a whitelist of errors that should be
// accepted and reject the rest.
IsRejectedError = DefaultRejectedError
// Now returns the current time. It is a variable to allow tests to override
// the current time.
Now = time.Now
)