Skip to content

Commit

Permalink
feat: add fixed_window bucket option (#79)
Browse files Browse the repository at this point in the history
* feat: add fixed_window bucket option

* activate fixed_window through param so we can control the rollout using feature flags

* add fixed window on ERL

* disabled CI as docker-compose is erroring out

* adjust reset_ms based on fixed_window

* fixed CI to use docker compose instead of docker-compose

* fix take_elevated for fixed_window

* tests to make sure the last_drip ain't modified within the same interval in fixed window

* fixed window param takes precedence over bucket config

* added fixed_window to README.md

* improved description in readme

* fixed returning modified current_timestamp and reset_ms correctly calculated for fixed window in takeeleavated

* added isFixedWindowEnabled tests and clarified README.md

* fixed flaky test

* clarified README.md and added tests

* missing trail line

* nit:trailing comma

---------

Co-authored-by: panga <[email protected]>
  • Loading branch information
pubalokta and panga authored Oct 16, 2024
1 parent 2d44b1c commit 6ecb349
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 18 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ It's a fork from [LimitDB](https://github.com/limitd/limitdb).
- [Breaking changes from `Limitdb`](#breaking-changes-from-limitdb)
- [TAKE](#take)
- [TAKEELEVATED](#takeelevated)
- [Use of fixed window on Take and TakeElevated](#use-of-fixed-window-on-take-and-takeelevated)
- [PUT](#put)
- [Overriding Configuration at Runtime](#overriding-configuration-at-runtime)
- [Overriding Configuration at Runtime with ERL](#overriding-configuration-at-runtime-with-erl)
Expand Down Expand Up @@ -82,6 +83,7 @@ const limitd = new Limitd({
- `unlimited` (boolean = false): unlimited requests (skip take).
- `skip_n_calls` (number): take will go to redis every `n` calls instead of going in every take.
- `elevated_limits` (object): elevated limits configuration that kicks in when the bucket is empty. Please refer to the [ERL section](#ERL-Elevated-Rate-Limits) for more details.
- `fixed_window` (boolean = false): refill at specified interval instead of granular.

You can also define your rates using `per_second`, `per_minute`, `per_hour`, `per_day`. So `per_second: 1` is equivalent to `per_interval: 1, interval: 1000`.

Expand Down Expand Up @@ -320,6 +322,32 @@ if erl_triggered // quota left in the quotaKey bucket
if !erl_triggered // ERL wasn't triggered in this call, so we haven't identified the remaining quota.
```

### Use of fixed window in Take and TakeElevated
By default, the bucket uses the sliding window algorithm to refill tokens. For example, if the bucket is set to 100 tokens per second, it refills 1 token every 10 milliseconds (1000ms / 100 tokens per second).

With the fixed window algorithm, the bucket refills at the specified interval. For instance, if set to 100 tokens per second, it refills 100 tokens every second.

To use the fixed window algorithm on `Take` or `TakeElevated`, set the `fixed_window` property in the bucket configuration to `true` (default is `false`). This will refill the bucket at the specified interval

Additionally, you can use the `fixed_window` flag in the configOverride parameter. This acts as a feature flag for safe deployment, but it cannot activate the fixed window algorithm if the bucket configuration is set to false.

Both the bucket configuration and the configOverride parameter must be set to true to activate the fixed window algorithm. If the configOverride parameter is not provided, it defaults to true, and the activation depends on the bucket configuration.

The following table describes how the fixed window bucket configuration and the fixed window param interact to activate the fixed window algorithm.

| fixed_window bucket config | fixed_window param | Fixed Window Enabled |
|----------------------------|--------------------|----------------------|
| true | true | Yes |
| true | false | No |
| true | not provided | Yes |
| false | true | No |
| false | false | No |
| false | not provided | No |
| not provided | true | No |
| not provided | false | No |
| not provided | not provided | No |


## PUT

You can manually reset a fill a bucket using PUT:
Expand Down
6 changes: 5 additions & 1 deletion lib/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const utils = require('./utils');
const Redis = require('ioredis');
const { validateParams, validateERLParams } = require('./validation');
const DBPing = require("./db_ping");
const { calculateQuotaExpiration, resolveElevatedParams } = require('./utils');
const { calculateQuotaExpiration, resolveElevatedParams, isFixedWindowEnabled } = require('./utils');
const EventEmitter = require('events').EventEmitter;

const TAKE_LUA = fs.readFileSync(`${__dirname}/take.lua`, "utf8");
Expand Down Expand Up @@ -264,12 +264,14 @@ class LimitDBRedis extends EventEmitter {
*/
take(params, callback) {
this._doTake(params, callback, (key, bucketKeyConfig, count) => {
const useFixedWindow = isFixedWindowEnabled(bucketKeyConfig.fixed_window, params.fixed_window);
this.redis.take(key,
bucketKeyConfig.ms_per_interval || 0,
bucketKeyConfig.size,
count,
Math.ceil(bucketKeyConfig.ttl || this.globalTTL),
bucketKeyConfig.drip_interval || 0,
useFixedWindow ? bucketKeyConfig.interval : 0,
(err, results) => {
if (err) {
return callback(err);
Expand Down Expand Up @@ -308,12 +310,14 @@ class LimitDBRedis extends EventEmitter {
this._doTake(params, callback, (key, bucketKeyConfig, count) => {
const elevated_limits = resolveElevatedParams(erlParams, bucketKeyConfig, key, this.prefix);
const erl_quota_expiration = calculateQuotaExpiration(elevated_limits);
const useFixedWindow = isFixedWindowEnabled(bucketKeyConfig.fixed_window, params.fixed_window);
this.redis.takeElevated(key, elevated_limits.erl_is_active_key, elevated_limits.erl_quota_key,
bucketKeyConfig.ms_per_interval || 0,
bucketKeyConfig.size,
count,
Math.ceil(bucketKeyConfig.ttl || this.globalTTL),
bucketKeyConfig.drip_interval || 0,
useFixedWindow ? bucketKeyConfig.interval : 0,
elevated_limits.ms_per_interval,
elevated_limits.size,
elevated_limits.erl_activation_period_seconds,
Expand Down
15 changes: 13 additions & 2 deletions lib/take.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ local new_content = tonumber(ARGV[2])
local tokens_to_take = tonumber(ARGV[3])
local ttl = tonumber(ARGV[4])
local drip_interval = tonumber(ARGV[5])
local fixed_window = tonumber(ARGV[6])

local current_time = redis.call('TIME')
local current_timestamp_ms = current_time[1] * 1000 + current_time[2] / 1000
local redis_timestamp_ms = current_timestamp_ms

local current = redis.pcall('HMGET', KEYS[1], 'd', 'r')

Expand All @@ -18,6 +20,13 @@ if current[1] and tokens_per_ms then
-- drip bucket
local last_drip = current[1]
local content = current[2]

if fixed_window > 0 then
-- fixed window for granting new tokens
local interval_correction = (current_timestamp_ms - last_drip) % fixed_window
current_timestamp_ms = current_timestamp_ms - interval_correction
end

local delta_ms = math.max(current_timestamp_ms - last_drip, 0)
local drip_amount = delta_ms * tokens_per_ms
new_content = math.min(content + drip_amount, bucket_size)
Expand All @@ -41,8 +50,10 @@ redis.call('HMSET', KEYS[1],
redis.call('EXPIRE', KEYS[1], ttl)

local reset_ms = 0
if drip_interval > 0 then
if fixed_window > 0 then
reset_ms = current_timestamp_ms + fixed_window
elseif drip_interval > 0 then
reset_ms = math.ceil(current_timestamp_ms + (bucket_size - new_content) * drip_interval)
end

return { new_content, enough_tokens, current_timestamp_ms, reset_ms }
return { new_content, enough_tokens, redis_timestamp_ms, reset_ms }
46 changes: 38 additions & 8 deletions lib/take_elevated.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ local bucket_size = tonumber(ARGV[2])
local tokens_to_take = tonumber(ARGV[3])
local ttl = tonumber(ARGV[4])
local drip_interval = tonumber(ARGV[5])
local erl_tokens_per_ms = tonumber(ARGV[6])
local erl_bucket_size = tonumber(ARGV[7])
local erl_activation_period_seconds = tonumber(ARGV[8])
local erl_quota = tonumber(ARGV[9])
local erl_quota_expiration_epoch = tonumber(ARGV[10])
local erl_configured_for_bucket = tonumber(ARGV[11]) == 1
local fixed_window = tonumber(ARGV[6])
local erl_tokens_per_ms = tonumber(ARGV[7])
local erl_bucket_size = tonumber(ARGV[8])
local erl_activation_period_seconds = tonumber(ARGV[9])
local erl_quota = tonumber(ARGV[10])
local erl_quota_expiration_epoch = tonumber(ARGV[11])
local erl_configured_for_bucket = tonumber(ARGV[12]) == 1

-- the key to use for pulling last bucket state from redis
local lastBucketStateKey = KEYS[1]
Expand All @@ -29,12 +30,36 @@ end
-- get current time from redis, to be used in new bucket size calculations later
local current_time = redis.call('TIME')
local current_timestamp_ms = current_time[1] * 1000 + current_time[2] / 1000
local redis_timestamp_ms = current_timestamp_ms

local function adjustCurrentTimestampForFixedWindow(current_timestamp_ms)
if current[1] and tokens_per_ms then
-- drip bucket
local last_drip = current[1]

if fixed_window > 0 then
-- fixed window for granting new tokens
local interval_correction = (current_timestamp_ms - last_drip) % fixed_window
current_timestamp_ms = current_timestamp_ms - interval_correction
end

return current_timestamp_ms
end
return current_timestamp_ms
end

local function calculateNewBucketContent(current, tokens_per_ms, bucket_size, current_timestamp_ms)
if current[1] and tokens_per_ms then
-- drip bucket
local last_drip = current[1]
local content = current[2]

if fixed_window > 0 then
-- fixed window for granting new tokens
local interval_correction = (current_timestamp_ms - last_drip) % fixed_window
current_timestamp_ms = current_timestamp_ms - interval_correction
end

local delta_ms = math.max(current_timestamp_ms - last_drip, 0)
local drip_amount = delta_ms * tokens_per_ms
return math.min(content + drip_amount, bucket_size)
Expand Down Expand Up @@ -71,6 +96,9 @@ local function takeERLQuota(erl_quota_key, erl_quota, erl_quota_expiration_epoch
return previously_used_quota
end

-- adjust current timestamp for fixed window
current_timestamp_ms = adjustCurrentTimestampForFixedWindow(current_timestamp_ms)

-- Enable verbatim replication to ensure redis sends script's source code to all masters
-- managing the sharded database in a clustered deployment.
-- https://redis.io/docs/interact/programmability/eval-intro/#:~:text=scripts%20debugger.-,Script%20replication,-In%20standalone%20deployments
Expand Down Expand Up @@ -124,7 +152,9 @@ redis.call('HMSET', lastBucketStateKey,
redis.call('EXPIRE', lastBucketStateKey, ttl)

local reset_ms = 0
if drip_interval > 0 then
if fixed_window > 0 then
reset_ms = current_timestamp_ms + fixed_window
elseif drip_interval > 0 then
if is_erl_activated == 1 then
reset_ms = math.ceil(current_timestamp_ms + (erl_bucket_size - bucket_content_after_take) * drip_interval)
else
Expand All @@ -133,4 +163,4 @@ if drip_interval > 0 then
end

-- Return the current quota
return { bucket_content_after_take, enough_tokens, current_timestamp_ms, reset_ms, erl_triggered, is_erl_activated, erl_quota_left }
return { bucket_content_after_take, enough_tokens, redis_timestamp_ms, reset_ms, erl_triggered, is_erl_activated, erl_quota_left }
23 changes: 21 additions & 2 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ function normalizeTemporals(params) {
'interval',
'size',
'unlimited',
'skip_n_calls'
'skip_n_calls',
'fixed_window'
]);

INTERVAL_SHORTCUTS.forEach(intervalShortcut => {
Expand Down Expand Up @@ -219,6 +220,23 @@ function replicateHashtag(baseKey, prefix, key) {
}
}

/** isFixedWindowEnabled
* | fixed_window bucket config | fixed_window param | Fixed Window Enabled |
* |----------------------------|--------------------|----------------------|
* | true | true | Yes |
* | true | false | No |
* | true | not provided | Yes |
* | false | true | No |
* | false | false | No |
* | false | not provided | No |
* | not provided | true | No |
* | not provided | false | No |
* | not provided | not provided | No |
*/
function isFixedWindowEnabled(fixedWindowFromConfig, fixedWindowFromParam) {
return fixedWindowFromConfig === true && (fixedWindowFromParam === true || fixedWindowFromParam === undefined);
}

module.exports = {
buildBuckets,
buildBucket,
Expand All @@ -232,5 +250,6 @@ module.exports = {
endOfMonthTimestamp,
calculateQuotaExpiration,
resolveElevatedParams,
replicateHashtag
replicateHashtag,
isFixedWindowEnabled,
};
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"mocha": "^5.2.0",
"mockdate": "^3.0.5",
"nyc": "^14.1.1",
"sinon": "^19.0.2",
"toxiproxy-node-client": "^2.0.6"
}
}
1 change: 0 additions & 1 deletion test/db.standalonemode.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ describe('when using LimitDB', () => {
}

if (err) {
console.log(err, err.message);
done(err);
}
});
Expand Down
Loading

0 comments on commit 6ecb349

Please sign in to comment.