diff --git a/limitador-server/src/config.rs b/limitador-server/src/config.rs index 372b1732..e84bfbea 100644 --- a/limitador-server/src/config.rs +++ b/limitador-server/src/config.rs @@ -54,6 +54,8 @@ pub mod env { value_for("REDIS_LOCAL_CACHE_MAX_TTL_CACHED_COUNTERS_MS"); pub static ref REDIS_LOCAL_CACHE_FLUSHING_PERIOD_MS: Option<&'static str> = value_for("REDIS_LOCAL_CACHE_FLUSHING_PERIOD_MS"); + pub static ref REDIS_LOCAL_CACHE_BATCH_SIZE: Option<&'static str> = + value_for("REDIS_LOCAL_CACHE_BATCH_SIZE"); pub static ref REDIS_LOCAL_CACHE_TTL_RATIO_CACHED_COUNTERS: Option<&'static str> = value_for("REDIS_LOCAL_CACHE_TTL_RATIO_CACHED_COUNTERS"); pub static ref RATE_LIMIT_HEADERS: Option<&'static str> = value_for("RATE_LIMIT_HEADERS"); @@ -162,6 +164,7 @@ pub struct RedisStorageConfiguration { #[derive(PartialEq, Eq, Debug)] pub struct RedisStorageCacheConfiguration { + pub batch_size: usize, pub flushing_period: i64, pub max_counters: usize, pub response_timeout: u64, diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 226f1aa9..818d7da7 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -24,8 +24,8 @@ use limitador::storage::disk::DiskStorage; #[cfg(feature = "infinispan")] use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder}; use limitador::storage::redis::{ - AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_FLUSHING_PERIOD_SEC, - DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS, + AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_BATCH_SIZE, + DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS, }; use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage}; use limitador::{ @@ -132,6 +132,7 @@ impl Limiter { // TODO: Not all the options are configurable via ENV. Add them as needed. let cached_redis_storage = CachedRedisStorageBuilder::new(redis_url) + .batch_size(cache_cfg.batch_size) .flushing_period(Duration::from_millis(cache_cfg.flushing_period as u64)) .max_cached_counters(cache_cfg.max_counters) .response_timeout(Duration::from_millis(cache_cfg.response_timeout)); @@ -588,6 +589,18 @@ fn create_config() -> (Configuration, &'static str) { .about("Uses Redis to store counters, with an in-memory cache") .display_order(4) .arg(redis_url_arg) + .arg( + Arg::new("batch") + .long("batch-size") + .action(ArgAction::Set) + .value_parser(clap::value_parser!(u64)) + .default_value( + config::env::REDIS_LOCAL_CACHE_BATCH_SIZE + .unwrap_or(leak(DEFAULT_BATCH_SIZE)), + ) + .display_order(3) + .help("Size of entries to flush in as single flush"), + ) .arg( Arg::new("flush") .long("flush-period") @@ -720,6 +733,7 @@ fn create_config() -> (Configuration, &'static str) { Some(("redis_cached", sub)) => StorageConfiguration::Redis(RedisStorageConfiguration { url: sub.get_one::("URL").unwrap().to_owned(), cache: Some(RedisStorageCacheConfiguration { + batch_size: *sub.get_one("batch").unwrap(), flushing_period: *sub.get_one("flush").unwrap(), max_counters: *sub.get_one("max").unwrap(), response_timeout: *sub.get_one("timeout").unwrap(), @@ -799,6 +813,10 @@ fn storage_config_from_env() -> Result { url, cache: if env_option_is_enabled("REDIS_LOCAL_CACHE_ENABLED") { Some(RedisStorageCacheConfiguration { + batch_size: env::var("REDIS_LOCAL_CACHE_BATCH_SIZE") + .unwrap_or_else(|_| (DEFAULT_BATCH_SIZE).to_string()) + .parse() + .expect("Expected an usize"), flushing_period: env::var("REDIS_LOCAL_CACHE_FLUSHING_PERIOD_MS") .unwrap_or_else(|_| (DEFAULT_FLUSHING_PERIOD_SEC * 1000).to_string()) .parse() diff --git a/limitador/src/storage/redis/mod.rs b/limitador/src/storage/redis/mod.rs index 785c13f8..180d8dd6 100644 --- a/limitador/src/storage/redis/mod.rs +++ b/limitador/src/storage/redis/mod.rs @@ -8,6 +8,7 @@ mod redis_sync; mod scripts; pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 1; +pub const DEFAULT_BATCH_SIZE: usize = 100; pub const DEFAULT_MAX_CACHED_COUNTERS: usize = 10000; pub const DEFAULT_RESPONSE_TIMEOUT_MS: u64 = 350; diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index f8282859..698d8151 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -7,7 +7,8 @@ use crate::storage::redis::counters_cache::{ use crate::storage::redis::redis_async::AsyncRedisStorage; use crate::storage::redis::scripts::BATCH_UPDATE_COUNTERS; use crate::storage::redis::{ - DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS, + DEFAULT_BATCH_SIZE, DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, + DEFAULT_RESPONSE_TIMEOUT_MS, }; use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; @@ -146,6 +147,7 @@ impl CachedRedisStorage { pub async fn new(redis_url: &str) -> Result { Self::new_with_options( redis_url, + DEFAULT_BATCH_SIZE, Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC), DEFAULT_MAX_CACHED_COUNTERS, Duration::from_millis(DEFAULT_RESPONSE_TIMEOUT_MS), @@ -155,6 +157,7 @@ impl CachedRedisStorage { async fn new_with_options( redis_url: &str, + batch_size: usize, flushing_period: Duration, max_cached_counters: usize, response_timeout: Duration, @@ -193,6 +196,7 @@ impl CachedRedisStorage { storage.is_alive().await, counters_cache_clone.clone(), p.clone(), + batch_size, ) .await; } @@ -224,6 +228,7 @@ fn flip_partitioned(storage: &AtomicBool, partition: bool) -> bool { pub struct CachedRedisStorageBuilder { redis_url: String, + batch_size: usize, flushing_period: Duration, max_cached_counters: usize, response_timeout: Duration, @@ -233,12 +238,18 @@ impl CachedRedisStorageBuilder { pub fn new(redis_url: &str) -> Self { Self { redis_url: redis_url.to_string(), + batch_size: DEFAULT_BATCH_SIZE, flushing_period: Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC), max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS, response_timeout: Duration::from_millis(DEFAULT_RESPONSE_TIMEOUT_MS), } } + pub fn batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + pub fn flushing_period(mut self, flushing_period: Duration) -> Self { self.flushing_period = flushing_period; self @@ -257,6 +268,7 @@ impl CachedRedisStorageBuilder { pub async fn build(self) -> Result { CachedRedisStorage::new_with_options( &self.redis_url, + self.batch_size, self.flushing_period, self.max_cached_counters, self.response_timeout, @@ -319,6 +331,7 @@ async fn flush_batcher_and_update_counters( storage_is_alive: bool, cached_counters: Arc, partitioned: Arc, + batch_size: usize, ) { if partitioned.load(Ordering::Acquire) || !storage_is_alive { if !cached_counters.batcher().is_empty() { @@ -327,7 +340,9 @@ async fn flush_batcher_and_update_counters( } else { let updated_counters = cached_counters .batcher() - .consume(100, |counters| update_counters(&mut redis_conn, counters)) + .consume(batch_size, |counters| { + update_counters(&mut redis_conn, counters) + }) .await .or_else(|err| { if err.is_transient() { @@ -486,8 +501,14 @@ mod tests { assert_eq!(c.hits(&counter), 2); } - flush_batcher_and_update_counters(mock_client, true, cached_counters.clone(), partitioned) - .await; + flush_batcher_and_update_counters( + mock_client, + true, + cached_counters.clone(), + partitioned, + 100, + ) + .await; if let Some(c) = cached_counters.get(&counter) { assert_eq!(c.hits(&counter), 8);