From 2e5d7c2c23cb48485e280a1428116bf9dd052ffa Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 5 Apr 2024 11:37:48 -0400 Subject: [PATCH 1/7] Fix write behind --- Cargo.lock | 16 --- limitador/Cargo.toml | 1 - limitador/src/storage/redis/counters_cache.rs | 73 +++++++++--- limitador/src/storage/redis/redis_cached.rs | 111 ++++++++---------- 4 files changed, 108 insertions(+), 93 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d8e070f..ee9d8ceb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1571,7 +1571,6 @@ dependencies = [ "thiserror", "tokio", "tracing", - "ttl_cache", ] [[package]] @@ -1610,12 +1609,6 @@ dependencies = [ "url", ] -[[package]] -name = "linked-hash-map" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" - [[package]] name = "linux-raw-sys" version = "0.4.11" @@ -3434,15 +3427,6 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" -[[package]] -name = "ttl_cache" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4189890526f0168710b6ee65ceaedf1460c48a14318ceec933cb26baa492096a" -dependencies = [ - "linked-hash-map", -] - [[package]] name = "typenum" version = "1.17.0" diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index c1349091..1bac654a 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -23,7 +23,6 @@ lenient_conditions = [] [dependencies] moka = "0.11.2" getrandom = { version = "0.2", features = ["js"] } -ttl_cache = "0.5" serde = { version = "1", features = ["derive"] } postcard = { version = "1.0.4", features = ["use-std"] } serde_json = "1" diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 01f8a146..de6fc679 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -1,15 +1,47 @@ use crate::counter::Counter; +use crate::storage::atomic_expiring_value::AtomicExpiringValue; use crate::storage::redis::{ DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, DEFAULT_TTL_RATIO_CACHED_COUNTERS, }; -use std::time::Duration; -use ttl_cache::TtlCache; +use moka::sync::Cache; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +pub struct CachedCounterValue { + value: AtomicExpiringValue, +} pub struct CountersCache { max_ttl_cached_counters: Duration, pub ttl_ratio_cached_counters: u64, - cache: TtlCache, + cache: Cache>, +} + +impl CachedCounterValue { + pub fn from(counter: &Counter, value: i64) -> Self { + let now = SystemTime::now(); + Self { + value: AtomicExpiringValue::new(value, now + Duration::from_secs(counter.seconds())), + } + } + + pub fn delta(&self, counter: &Counter, delta: i64) -> i64 { + self.value + .update(delta, counter.seconds(), SystemTime::now()) + } + + pub fn hits(&self, _: &Counter) -> i64 { + self.value.value_at(SystemTime::now()) + } + + pub fn remaining(&self, counter: &Counter) -> i64 { + counter.max_value() - self.hits(counter) + } + + pub fn is_limited(&self, counter: &Counter, delta: i64) -> bool { + self.hits(counter) as i128 + delta as i128 > counter.max_value() as i128 + } } pub struct CountersCacheBuilder { @@ -46,18 +78,18 @@ impl CountersCacheBuilder { CountersCache { max_ttl_cached_counters: self.max_ttl_cached_counters, ttl_ratio_cached_counters: self.ttl_ratio_cached_counters, - cache: TtlCache::new(self.max_cached_counters), + cache: Cache::new(self.max_cached_counters as u64), } } } impl CountersCache { - pub fn get(&self, counter: &Counter) -> Option { - self.cache.get(counter).copied() + pub fn get(&self, counter: &Counter) -> Option> { + self.cache.get(counter) } pub fn insert( - &mut self, + &self, counter: Counter, redis_val: Option, redis_ttl_ms: i64, @@ -72,14 +104,15 @@ impl CountersCache { ); if let Some(ttl) = counter_ttl.checked_sub(ttl_margin) { if ttl > Duration::from_secs(0) { - self.cache.insert(counter, counter_val, ttl); + let value = CachedCounterValue::from(&counter, counter_val); + self.cache.insert(counter, Arc::new(value)); } } } - pub fn increase_by(&mut self, counter: &Counter, delta: i64) { - if let Some(val) = self.cache.get_mut(counter) { - *val += delta + pub fn increase_by(&self, counter: &Counter, delta: i64) { + if let Some(val) = self.cache.get(counter) { + val.delta(counter, delta); }; } @@ -149,7 +182,7 @@ mod tests { values, ); - let mut cache = CountersCacheBuilder::new().build(); + let cache = CountersCacheBuilder::new().build(); cache.insert(counter.clone(), Some(10), 10, Duration::from_secs(0)); assert!(cache.get(&counter).is_some()); @@ -192,7 +225,7 @@ mod tests { values, ); - let mut cache = CountersCacheBuilder::new().build(); + let cache = CountersCacheBuilder::new().build(); cache.insert( counter.clone(), Some(current_value), @@ -200,7 +233,10 @@ mod tests { Duration::from_secs(0), ); - assert_eq!(cache.get(&counter).unwrap(), current_value); + assert_eq!( + cache.get(&counter).map(|e| e.hits(&counter)).unwrap(), + current_value + ); } #[test] @@ -219,10 +255,10 @@ mod tests { values, ); - let mut cache = CountersCacheBuilder::new().build(); + let cache = CountersCacheBuilder::new().build(); cache.insert(counter.clone(), None, 10, Duration::from_secs(0)); - assert_eq!(cache.get(&counter).unwrap(), 0); + assert_eq!(cache.get(&counter).map(|e| e.hits(&counter)).unwrap(), 0); } #[test] @@ -251,6 +287,9 @@ mod tests { ); cache.increase_by(&counter, increase_by); - assert_eq!(cache.get(&counter).unwrap(), current_val + increase_by); + assert_eq!( + cache.get(&counter).map(|e| e.hits(&counter)).unwrap(), + (current_val + increase_by) + ); } } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index d5731ee0..09fff51c 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -1,7 +1,9 @@ use crate::counter::Counter; use crate::limit::Limit; use crate::storage::keys::*; -use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder}; +use crate::storage::redis::counters_cache::{ + CachedCounterValue, CountersCache, CountersCacheBuilder, +}; use crate::storage::redis::redis_async::AsyncRedisStorage; use crate::storage::redis::scripts::VALUES_AND_TTLS; use crate::storage::redis::{ @@ -38,8 +40,8 @@ use tracing::{error, warn}; // multiple times when it is not cached. pub struct CachedRedisStorage { - cached_counters: Mutex, - batcher_counter_updates: Arc>>, + cached_counters: CountersCache, + batcher_counter_updates: Arc>>, async_redis_storage: AsyncRedisStorage, redis_conn_manager: ConnectionManager, partitioned: Arc, @@ -76,30 +78,26 @@ impl AsyncCounterStorage for CachedRedisStorage { let mut first_limited = None; // Check cached counters - { - let cached_counters = self.cached_counters.lock().unwrap(); - for counter in counters.iter_mut() { - match cached_counters.get(counter) { - Some(val) => { - if first_limited.is_none() && val + delta > counter.max_value() { - let a = Authorization::Limited( - counter.limit().name().map(|n| n.to_owned()), - ); - if !load_counters { - return Ok(a); - } - first_limited = Some(a); - } - if load_counters { - counter.set_remaining(counter.max_value() - val - delta); - // todo: how do we get the ttl for this entry? - // counter.set_expires_in(Duration::from_secs(counter.seconds())); + for counter in counters.iter_mut() { + match self.cached_counters.get(counter) { + Some(val) => { + if first_limited.is_none() && val.is_limited(counter, delta) { + let a = + Authorization::Limited(counter.limit().name().map(|n| n.to_owned())); + if !load_counters { + return Ok(a); } + first_limited = Some(a); } - None => { - not_cached.push(counter); + if load_counters { + counter.set_remaining(val.remaining(counter) - delta); + // todo: how do we get the ttl for this entry? + // counter.set_expires_in(Duration::from_secs(counter.seconds())); } } + None => { + not_cached.push(counter); + } } } @@ -127,31 +125,28 @@ impl AsyncCounterStorage for CachedRedisStorage { let ttl_margin = Duration::from_millis((Instant::now() - time_start_get_ttl).as_millis() as u64); - { - let mut cached_counters = self.cached_counters.lock().unwrap(); - for (i, counter) in not_cached.iter_mut().enumerate() { - cached_counters.insert( - counter.clone(), - counter_vals[i], - counter_ttls_msecs[i], - ttl_margin, - ); - let remaining = counter.max_value() - counter_vals[i].unwrap_or(0) - delta; - if first_limited.is_none() && remaining < 0 { - first_limited = Some(Authorization::Limited( - counter.limit().name().map(|n| n.to_owned()), - )); - } - if load_counters { - counter.set_remaining(remaining); - let counter_ttl = if counter_ttls_msecs[i] >= 0 { - Duration::from_millis(counter_ttls_msecs[i] as u64) - } else { - Duration::from_secs(counter.max_value() as u64) - }; - - counter.set_expires_in(counter_ttl); - } + for (i, counter) in not_cached.iter_mut().enumerate() { + self.cached_counters.insert( + counter.clone(), + counter_vals[i], + counter_ttls_msecs[i], + ttl_margin, + ); + let remaining = counter.max_value() - counter_vals[i].unwrap_or(0) - delta; + if first_limited.is_none() && remaining < 0 { + first_limited = Some(Authorization::Limited( + counter.limit().name().map(|n| n.to_owned()), + )); + } + if load_counters { + counter.set_remaining(remaining); + let counter_ttl = if counter_ttls_msecs[i] >= 0 { + Duration::from_millis(counter_ttls_msecs[i] as u64) + } else { + Duration::from_secs(counter.max_value() as u64) + }; + + counter.set_expires_in(counter_ttl); } } } @@ -161,11 +156,8 @@ impl AsyncCounterStorage for CachedRedisStorage { } // Update cached values - { - let mut cached_counters = self.cached_counters.lock().unwrap(); - for counter in counters.iter() { - cached_counters.increase_by(counter, delta); - } + for counter in counters.iter() { + self.cached_counters.increase_by(counter, delta); } // Batch or update depending on configuration @@ -231,7 +223,8 @@ impl CachedRedisStorage { AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); let storage = async_redis_storage.clone(); - let batcher: Arc>> = Arc::new(Mutex::new(Default::default())); + let batcher: Arc>> = + Arc::new(Mutex::new(Default::default())); let p = Arc::clone(&partitioned); let batcher_flusher = batcher.clone(); let mut interval = tokio::time::interval(flushing_period); @@ -249,7 +242,7 @@ impl CachedRedisStorage { }; for (counter, delta) in counters { storage - .update_counter(&counter, delta) + .update_counter(&counter, delta.hits(&counter)) .await .or_else(|err| { if err.is_transient() { @@ -273,7 +266,7 @@ impl CachedRedisStorage { .build(); Ok(Self { - cached_counters: Mutex::new(cached_counters), + cached_counters, batcher_counter_updates: batcher, redis_conn_manager, async_redis_storage, @@ -339,15 +332,15 @@ impl CachedRedisStorage { fn batch_counter( delta: i64, - batcher: &mut MutexGuard>, + batcher: &mut MutexGuard>, counter: &Counter, ) { match batcher.get_mut(counter) { Some(val) => { - *val += delta; + val.delta(counter, delta); } None => { - batcher.insert(counter.clone(), delta); + batcher.insert(counter.clone(), CachedCounterValue::from(counter, delta)); } } } From 21fed88db97bc1b0565b1b553e99f990ae862182 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 5 Apr 2024 11:55:26 -0400 Subject: [PATCH 2/7] Avoid replacing, previously installed mapping --- limitador/src/storage/redis/counters_cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index de6fc679..bb9cfa57 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -105,7 +105,7 @@ impl CountersCache { if let Some(ttl) = counter_ttl.checked_sub(ttl_margin) { if ttl > Duration::from_secs(0) { let value = CachedCounterValue::from(&counter, counter_val); - self.cache.insert(counter, Arc::new(value)); + self.cache.get_with(counter.clone(), || Arc::new(value)); } } } From 3ffad44ea08b4f0fbdc39dcb349dcc966f51ed51 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 5 Apr 2024 13:44:08 -0400 Subject: [PATCH 3/7] Refactored AtomicExpiringValue --- .../src/storage/atomic_expiring_value.rs | 98 ++++++++++++++----- 1 file changed, 73 insertions(+), 25 deletions(-) diff --git a/limitador/src/storage/atomic_expiring_value.rs b/limitador/src/storage/atomic_expiring_value.rs index 634b9e35..e13b7c69 100644 --- a/limitador/src/storage/atomic_expiring_value.rs +++ b/limitador/src/storage/atomic_expiring_value.rs @@ -5,22 +5,19 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; #[derive(Debug)] pub(crate) struct AtomicExpiringValue { value: AtomicI64, - expiry: AtomicU64, // in microseconds + expiry: AtomicExpiryTime, } impl AtomicExpiringValue { pub fn new(value: i64, expiry: SystemTime) -> Self { - let expiry = Self::get_duration_micros(expiry); Self { value: AtomicI64::new(value), - expiry: AtomicU64::new(expiry), + expiry: AtomicExpiryTime::new(expiry), } } pub fn value_at(&self, when: SystemTime) -> i64 { - let when = Self::get_duration_micros(when); - let expiry = self.expiry.load(Ordering::SeqCst); - if expiry <= when { + if self.expiry.expired_at(when) { return 0; } self.value.load(Ordering::SeqCst) @@ -31,25 +28,49 @@ impl AtomicExpiringValue { } pub fn update(&self, delta: i64, ttl: u64, when: SystemTime) -> i64 { - let ttl_micros = ttl * 1_000_000; - let when_micros = Self::get_duration_micros(when); - - let expiry = self.expiry.load(Ordering::SeqCst); - if expiry <= when_micros { - let new_expiry = when_micros + ttl_micros; - if self - .expiry - .compare_exchange(expiry, new_expiry, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - { - self.value.store(delta, Ordering::SeqCst); - } + if self.expiry.update_if_expired(ttl, when) { + self.value.store(delta, Ordering::SeqCst); return delta; } self.value.fetch_add(delta, Ordering::SeqCst) + delta } pub fn ttl(&self) -> Duration { + self.expiry.duration() + } + + #[allow(dead_code)] + pub fn set(&self, value: i64, ttl: Duration) { + self.expiry.update(ttl); + self.value.store(value, Ordering::SeqCst); + } +} + +#[derive(Debug)] +pub struct AtomicExpiryTime { + expiry: AtomicU64, // in microseconds +} + +impl AtomicExpiryTime { + pub fn new(when: SystemTime) -> Self { + let expiry = Self::since_epoch(when); + Self { + expiry: AtomicU64::new(expiry), + } + } + + #[allow(dead_code)] + pub fn from_now(ttl: Duration) -> Self { + Self::new(SystemTime::now() + ttl) + } + + fn since_epoch(when: SystemTime) -> u64 { + when.duration_since(UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH!") + .as_micros() as u64 + } + + pub fn duration(&self) -> Duration { let expiry = SystemTime::UNIX_EPOCH + Duration::from_micros(self.expiry.load(Ordering::SeqCst)); expiry @@ -57,10 +78,37 @@ impl AtomicExpiringValue { .unwrap_or(Duration::ZERO) } - fn get_duration_micros(when: SystemTime) -> u64 { - when.duration_since(UNIX_EPOCH) - .expect("SystemTime before UNIX EPOCH!") - .as_micros() as u64 + pub fn expired_at(&self, when: SystemTime) -> bool { + let when = Self::since_epoch(when); + self.expiry.load(Ordering::SeqCst) <= when + } + + #[allow(dead_code)] + pub fn update(&self, ttl: Duration) { + self.expiry + .store(Self::since_epoch(SystemTime::now() + ttl), Ordering::SeqCst); + } + + pub fn update_if_expired(&self, ttl: u64, when: SystemTime) -> bool { + let ttl_micros = ttl * 1_000_000; + let when_micros = Self::since_epoch(when); + let expiry = self.expiry.load(Ordering::SeqCst); + if expiry <= when_micros { + let new_expiry = when_micros + ttl_micros; + return self + .expiry + .compare_exchange(expiry, new_expiry, Ordering::SeqCst, Ordering::SeqCst) + .is_ok(); + } + false + } +} + +impl Clone for AtomicExpiryTime { + fn clone(&self) -> Self { + Self { + expiry: AtomicU64::new(self.expiry.load(Ordering::SeqCst)), + } } } @@ -68,7 +116,7 @@ impl Default for AtomicExpiringValue { fn default() -> Self { AtomicExpiringValue { value: AtomicI64::new(0), - expiry: AtomicU64::new(0), + expiry: AtomicExpiryTime::new(UNIX_EPOCH), } } } @@ -77,7 +125,7 @@ impl Clone for AtomicExpiringValue { fn clone(&self) -> Self { AtomicExpiringValue { value: AtomicI64::new(self.value.load(Ordering::SeqCst)), - expiry: AtomicU64::new(self.expiry.load(Ordering::SeqCst)), + expiry: self.expiry.clone(), } } } From f64f44fb146ef5d62d3c2e6271315e0846ea39b5 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 5 Apr 2024 13:44:31 -0400 Subject: [PATCH 4/7] Encapsulate cache value ttl and value properly --- limitador/src/storage/redis/counters_cache.rs | 62 +++++++++++++--- limitador/src/storage/redis/redis_cached.rs | 72 +++++++++---------- 2 files changed, 84 insertions(+), 50 deletions(-) diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index bb9cfa57..603fc3ac 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -1,5 +1,5 @@ use crate::counter::Counter; -use crate::storage::atomic_expiring_value::AtomicExpiringValue; +use crate::storage::atomic_expiring_value::{AtomicExpiringValue, AtomicExpiryTime}; use crate::storage::redis::{ DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, DEFAULT_TTL_RATIO_CACHED_COUNTERS, @@ -10,6 +10,7 @@ use std::time::{Duration, SystemTime}; pub struct CachedCounterValue { value: AtomicExpiringValue, + expiry: AtomicExpiryTime, } pub struct CountersCache { @@ -19,13 +20,24 @@ pub struct CountersCache { } impl CachedCounterValue { - pub fn from(counter: &Counter, value: i64) -> Self { + pub fn from(counter: &Counter, value: i64, ttl: Duration) -> Self { let now = SystemTime::now(); Self { value: AtomicExpiringValue::new(value, now + Duration::from_secs(counter.seconds())), + expiry: AtomicExpiryTime::from_now(ttl), } } + pub fn expired_at(&self, now: SystemTime) -> bool { + self.expiry.expired_at(now) + } + + pub fn set_from_authority(&self, counter: &Counter, value: i64, expiry: Duration) { + let time_window = Duration::from_secs(counter.seconds()); + self.value.set(value, time_window); + self.expiry.update(expiry); + } + pub fn delta(&self, counter: &Counter, delta: i64) -> i64 { self.value .update(delta, counter.seconds(), SystemTime::now()) @@ -42,6 +54,10 @@ impl CachedCounterValue { pub fn is_limited(&self, counter: &Counter, delta: i64) -> bool { self.hits(counter) as i128 + delta as i128 > counter.max_value() as i128 } + + pub fn to_next_window(&self) -> Duration { + self.value.ttl() + } } pub struct CountersCacheBuilder { @@ -94,20 +110,30 @@ impl CountersCache { redis_val: Option, redis_ttl_ms: i64, ttl_margin: Duration, - ) { + now: SystemTime, + ) -> Arc { let counter_val = redis_val.unwrap_or(0); - let counter_ttl = self.ttl_from_redis_ttl( + let cache_ttl = self.ttl_from_redis_ttl( redis_ttl_ms, counter.seconds(), counter_val, counter.max_value(), ); - if let Some(ttl) = counter_ttl.checked_sub(ttl_margin) { - if ttl > Duration::from_secs(0) { - let value = CachedCounterValue::from(&counter, counter_val); - self.cache.get_with(counter.clone(), || Arc::new(value)); + if let Some(ttl) = cache_ttl.checked_sub(ttl_margin) { + if ttl > Duration::ZERO { + let value = CachedCounterValue::from(&counter, counter_val, cache_ttl); + let previous = self.cache.get_with(counter.clone(), || Arc::new(value)); + if previous.expired_at(now) { + previous.set_from_authority(&counter, counter_val, cache_ttl); + } + return previous; } } + Arc::new(CachedCounterValue::from( + &counter, + counter_val, + Duration::ZERO, + )) } pub fn increase_by(&self, counter: &Counter, delta: i64) { @@ -183,7 +209,13 @@ mod tests { ); let cache = CountersCacheBuilder::new().build(); - cache.insert(counter.clone(), Some(10), 10, Duration::from_secs(0)); + cache.insert( + counter.clone(), + Some(10), + 10, + Duration::from_secs(0), + SystemTime::now(), + ); assert!(cache.get(&counter).is_some()); } @@ -231,6 +263,7 @@ mod tests { Some(current_value), 10, Duration::from_secs(0), + SystemTime::now(), ); assert_eq!( @@ -256,7 +289,13 @@ mod tests { ); let cache = CountersCacheBuilder::new().build(); - cache.insert(counter.clone(), None, 10, Duration::from_secs(0)); + cache.insert( + counter.clone(), + None, + 10, + Duration::from_secs(0), + SystemTime::now(), + ); assert_eq!(cache.get(&counter).map(|e| e.hits(&counter)).unwrap(), 0); } @@ -278,12 +317,13 @@ mod tests { values, ); - let mut cache = CountersCacheBuilder::new().build(); + let cache = CountersCacheBuilder::new().build(); cache.insert( counter.clone(), Some(current_val), 10, Duration::from_secs(0), + SystemTime::now(), ); cache.increase_by(&counter, increase_by); diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 09fff51c..847063b2 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -1,9 +1,8 @@ use crate::counter::Counter; use crate::limit::Limit; +use crate::storage::atomic_expiring_value::AtomicExpiringValue; use crate::storage::keys::*; -use crate::storage::redis::counters_cache::{ - CachedCounterValue, CountersCache, CountersCacheBuilder, -}; +use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder}; use crate::storage::redis::redis_async::AsyncRedisStorage; use crate::storage::redis::scripts::VALUES_AND_TTLS; use crate::storage::redis::{ @@ -17,8 +16,8 @@ use redis::{ConnectionInfo, RedisError}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; -use std::time::{Duration, Instant}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant, SystemTime}; use tracing::{error, warn}; // This is just a first version. @@ -41,7 +40,7 @@ use tracing::{error, warn}; pub struct CachedRedisStorage { cached_counters: CountersCache, - batcher_counter_updates: Arc>>, + batcher_counter_updates: Arc>>, async_redis_storage: AsyncRedisStorage, redis_conn_manager: ConnectionManager, partitioned: Arc, @@ -77,10 +76,11 @@ impl AsyncCounterStorage for CachedRedisStorage { let mut not_cached: Vec<&mut Counter> = vec![]; let mut first_limited = None; + let now = SystemTime::now(); // Check cached counters for counter in counters.iter_mut() { match self.cached_counters.get(counter) { - Some(val) => { + Some(val) if !val.expired_at(now) => { if first_limited.is_none() && val.is_limited(counter, delta) { let a = Authorization::Limited(counter.limit().name().map(|n| n.to_owned())); @@ -91,11 +91,10 @@ impl AsyncCounterStorage for CachedRedisStorage { } if load_counters { counter.set_remaining(val.remaining(counter) - delta); - // todo: how do we get the ttl for this entry? - // counter.set_expires_in(Duration::from_secs(counter.seconds())); + counter.set_expires_in(val.to_next_window()); } } - None => { + _ => { not_cached.push(counter); } } @@ -126,27 +125,22 @@ impl AsyncCounterStorage for CachedRedisStorage { Duration::from_millis((Instant::now() - time_start_get_ttl).as_millis() as u64); for (i, counter) in not_cached.iter_mut().enumerate() { - self.cached_counters.insert( + let cached_value = self.cached_counters.insert( counter.clone(), counter_vals[i], counter_ttls_msecs[i], ttl_margin, + now, ); - let remaining = counter.max_value() - counter_vals[i].unwrap_or(0) - delta; - if first_limited.is_none() && remaining < 0 { + let remaining = cached_value.remaining(counter); + if first_limited.is_none() && remaining <= 0 { first_limited = Some(Authorization::Limited( counter.limit().name().map(|n| n.to_owned()), )); } if load_counters { - counter.set_remaining(remaining); - let counter_ttl = if counter_ttls_msecs[i] >= 0 { - Duration::from_millis(counter_ttls_msecs[i] as u64) - } else { - Duration::from_secs(counter.max_value() as u64) - }; - - counter.set_expires_in(counter_ttl); + counter.set_remaining(remaining - delta); + counter.set_expires_in(cached_value.to_next_window()); } } } @@ -162,8 +156,22 @@ impl AsyncCounterStorage for CachedRedisStorage { // Batch or update depending on configuration let mut batcher = self.batcher_counter_updates.lock().unwrap(); + let now = SystemTime::now(); for counter in counters.iter() { - Self::batch_counter(delta, &mut batcher, counter); + match batcher.get_mut(counter) { + Some(val) => { + val.update(delta, counter.seconds(), now); + } + None => { + batcher.insert( + counter.clone(), + AtomicExpiringValue::new( + delta, + now + Duration::from_secs(counter.seconds()), + ), + ); + } + } } Ok(Authorization::Ok) @@ -223,7 +231,7 @@ impl CachedRedisStorage { AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); let storage = async_redis_storage.clone(); - let batcher: Arc>> = + let batcher: Arc>> = Arc::new(Mutex::new(Default::default())); let p = Arc::clone(&partitioned); let batcher_flusher = batcher.clone(); @@ -240,9 +248,10 @@ impl CachedRedisStorage { let mut batch = batcher_flusher.lock().unwrap(); std::mem::take(&mut *batch) }; + let now = SystemTime::now(); for (counter, delta) in counters { storage - .update_counter(&counter, delta.hits(&counter)) + .update_counter(&counter, delta.value_at(now)) .await .or_else(|err| { if err.is_transient() { @@ -329,21 +338,6 @@ impl CachedRedisStorage { Ok((counter_vals, counter_ttls_msecs)) } - - fn batch_counter( - delta: i64, - batcher: &mut MutexGuard>, - counter: &Counter, - ) { - match batcher.get_mut(counter) { - Some(val) => { - val.delta(counter, delta); - } - None => { - batcher.insert(counter.clone(), CachedCounterValue::from(counter, delta)); - } - } - } } pub struct CachedRedisStorageBuilder { From e4f54b2b8fb00d7df51aded74e40272eee041d60 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 5 Apr 2024 13:56:08 -0400 Subject: [PATCH 5/7] Do no flip parition bit to true in write behind --- limitador-server/examples/limits.yaml | 2 +- limitador/src/storage/redis/redis_cached.rs | 9 +-------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/limitador-server/examples/limits.yaml b/limitador-server/examples/limits.yaml index 4178b413..f0ea815b 100644 --- a/limitador-server/examples/limits.yaml +++ b/limitador-server/examples/limits.yaml @@ -14,4 +14,4 @@ conditions: - "req.method == 'POST'" variables: - - user_id \ No newline at end of file + - user_id diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 847063b2..6fa0d6cd 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -253,14 +253,7 @@ impl CachedRedisStorage { storage .update_counter(&counter, delta.value_at(now)) .await - .or_else(|err| { - if err.is_transient() { - p.store(true, Ordering::Release); - Ok(()) - } else { - Err(err) - } - }) + .or_else(|err| if err.is_transient() { Ok(()) } else { Err(err) }) .expect("Unrecoverable Redis error!"); } } From 26447a3411ce418c7fdda1528338fccf89e1ad91 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 5 Apr 2024 17:02:54 -0400 Subject: [PATCH 6/7] Only write out meaningful deltas --- limitador/src/storage/redis/redis_cached.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 6fa0d6cd..092dc1d0 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -203,7 +203,7 @@ impl CachedRedisStorage { DEFAULT_TTL_RATIO_CACHED_COUNTERS, Duration::from_millis(DEFAULT_RESPONSE_TIMEOUT_MS), ) - .await + .await } async fn new_with_options( @@ -224,7 +224,7 @@ impl CachedRedisStorage { response_timeout, Duration::from_secs(5), ) - .await?; + .await?; let partitioned = Arc::new(AtomicBool::new(false)); let async_redis_storage = @@ -250,11 +250,14 @@ impl CachedRedisStorage { }; let now = SystemTime::now(); for (counter, delta) in counters { - storage - .update_counter(&counter, delta.value_at(now)) - .await - .or_else(|err| if err.is_transient() { Ok(()) } else { Err(err) }) - .expect("Unrecoverable Redis error!"); + let delta = delta.value_at(now); + if delta > 0 { + storage + .update_counter(&counter, delta) + .await + .or_else(|err| if err.is_transient() { Ok(()) } else { Err(err) }) + .expect("Unrecoverable Redis error!"); + } } } interval.tick().await; @@ -388,7 +391,7 @@ impl CachedRedisStorageBuilder { self.ttl_ratio_cached_counters, self.response_timeout, ) - .await + .await } } From 954c530b4425709d9ca5ef6b5a5baf0135208642 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Mon, 8 Apr 2024 08:46:19 -0400 Subject: [PATCH 7/7] Fix fmt --- limitador/src/storage/redis/redis_cached.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 092dc1d0..c08989b5 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -203,7 +203,7 @@ impl CachedRedisStorage { DEFAULT_TTL_RATIO_CACHED_COUNTERS, Duration::from_millis(DEFAULT_RESPONSE_TIMEOUT_MS), ) - .await + .await } async fn new_with_options( @@ -224,7 +224,7 @@ impl CachedRedisStorage { response_timeout, Duration::from_secs(5), ) - .await?; + .await?; let partitioned = Arc::new(AtomicBool::new(false)); let async_redis_storage = @@ -391,7 +391,7 @@ impl CachedRedisStorageBuilder { self.ttl_ratio_cached_counters, self.response_timeout, ) - .await + .await } }