From 9a580b11b62a1b7377d2d1eaa159576e428f9d8e Mon Sep 17 00:00:00 2001 From: Hiram Chirino Date: Tue, 23 Apr 2024 20:53:32 -0400 Subject: [PATCH] feat: CachedRedisStorage will now default to flushing a batch ASAP to reduce the length of time the cache is out of sync with Redis counters. Setting a flushing_period is still supported to have a delay before flushing which can be useful to reduce the load on the Redis server in exchange for having staler counters in limitador. Signed-off-by: Hiram Chirino --- limitador/src/storage/redis/mod.rs | 2 +- limitador/src/storage/redis/redis_cached.rs | 25 ++++++++++++++++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/limitador/src/storage/redis/mod.rs b/limitador/src/storage/redis/mod.rs index a60ab99a..951c58c0 100644 --- a/limitador/src/storage/redis/mod.rs +++ b/limitador/src/storage/redis/mod.rs @@ -7,7 +7,7 @@ mod redis_cached; mod redis_sync; mod scripts; -pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 1; +pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 0; pub const DEFAULT_MAX_CACHED_COUNTERS: usize = 10000; pub const DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC: u64 = 5; pub const DEFAULT_TTL_RATIO_CACHED_COUNTERS: u64 = 10; diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 3f9e3da6..a7c40a9d 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -44,6 +44,7 @@ pub struct CachedRedisStorage { async_redis_storage: AsyncRedisStorage, redis_conn_manager: ConnectionManager, partitioned: Arc, + flush_tx: tokio::sync::watch::Sender<()>, } #[async_trait] @@ -154,10 +155,10 @@ impl AsyncCounterStorage for CachedRedisStorage { self.cached_counters.increase_by(counter, delta); } - // Batch or update depending on configuration let mut batcher = self.batcher_counter_updates.lock().unwrap(); let now = SystemTime::now(); for counter in counters.iter() { + // Update an existing batch entry or add a new batch entry match batcher.get_mut(counter) { Some(val) => { val.update(delta, counter.seconds(), now); @@ -174,6 +175,9 @@ impl AsyncCounterStorage for CachedRedisStorage { } } + // ask the flusher to flush the batch + self.flush_tx.send(()).unwrap(); + Ok(Authorization::Ok) } @@ -240,15 +244,30 @@ impl CachedRedisStorage { let batcher: Arc>> = Arc::new(Mutex::new(Default::default())); + let (flush_tx, mut flush_rx) = tokio::sync::watch::channel(()); + { let storage = async_redis_storage.clone(); let counters_cache_clone = counters_cache.clone(); let conn = redis_conn_manager.clone(); let p = Arc::clone(&partitioned); let batcher_flusher = batcher.clone(); - let mut interval = tokio::time::interval(flushing_period); + tokio::spawn(async move { loop { + // Wait for a new flush request, + flush_rx.changed().await.unwrap(); + + if flushing_period != Duration::ZERO { + // Set the flushing_period to reduce the load on Redis the downside to + // setting it the cached counters will not be as accurate as when it's zero. + tokio::time::sleep(flushing_period).await + } + + // even if flushing_period is zero, the next batch/ will be growing while + // current batch is being executed against Redis. When under load, the flush + // frequency will proportional to batch execution latency. + flush_batcher_and_update_counters( conn.clone(), batcher_flusher.clone(), @@ -257,7 +276,6 @@ impl CachedRedisStorage { p.clone(), ) .await; - interval.tick().await; } }); } @@ -268,6 +286,7 @@ impl CachedRedisStorage { redis_conn_manager, async_redis_storage, partitioned, + flush_tx, }) }