From ee61355ef01aa8e2987e847ddf8920a630eedd48 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Thu, 28 Mar 2024 18:31:54 +0100 Subject: [PATCH 01/19] [wip][feat] Batch updating --- limitador/src/storage/keys.rs | 8 ++++ limitador/src/storage/redis/redis_async.rs | 44 ++++++++++++++++++++- limitador/src/storage/redis/redis_cached.rs | 25 ++++++------ limitador/src/storage/redis/scripts.rs | 18 +++++++++ 4 files changed, 82 insertions(+), 13 deletions(-) diff --git a/limitador/src/storage/keys.rs b/limitador/src/storage/keys.rs index 51b257c2..fb22666c 100644 --- a/limitador/src/storage/keys.rs +++ b/limitador/src/storage/keys.rs @@ -23,6 +23,14 @@ pub fn key_for_counter(counter: &Counter) -> String { ) } +pub fn keys_for_counters_of_limits(limits: Vec<&Limit>) -> String { + limits + .iter() + .map(|limit| key_for_counters_of_limit(limit)) + .collect::>() + .join(";") +} + pub fn key_for_counters_of_limit(limit: &Limit) -> String { format!( "namespace:{{{}}},counters_of_limit:{}", diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 9718da5d..a793ed93 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -6,14 +6,17 @@ use crate::counter::Counter; use crate::limit::Limit; use crate::storage::keys::*; use crate::storage::redis::is_limited; -use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS}; +use crate::storage::redis::scripts::{ + BATCH_UPDATE_COUNTERS, SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS, +}; use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; use redis::{AsyncCommands, RedisError}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::time::Duration; use tracing::{debug_span, Instrument}; +use crate::storage::atomic_expiring_value::AtomicExpiringValue; // Note: this implementation does not guarantee exact limits. Ensuring that we // never go over the limits would hurt performance. This implementation @@ -231,6 +234,43 @@ impl AsyncRedisStorage { Ok(()) } + + pub async fn update_counters( + &self, + counters_and_deltas: HashMap, + ) -> Result<(), StorageErr> { + let mut con = self.conn_manager.clone(); + let span = trace_span!("datastore"); + + // TODO (didierofrivia): Fix this so we don't iterate twice + let (counters, limits): (Vec<&Counter>, Vec<&Limit>) = + counters_and_deltas.keys().map(|k| (k, k.limit())).unzip(); + let (ttls, deltas): (Vec, Vec) = counters_and_deltas + .iter() + .map(|(k, v)| (k.seconds(), v)) + .unzip(); + + let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS); + let mut script_invocation = redis_script.prepare_invoke(); + + // populate the counter keys in the redis script + for counter_key in counters { + script_invocation.key(key_for_counter(counter_key)); + } + + async { + script_invocation + .arg(keys_for_counters_of_limits(limits)) + .arg(ttls) + .arg(deltas) + .invoke_async::<_, _>(&mut con) + .await + } + .instrument(span) + .await?; + + Ok(()) + } } #[cfg(test)] diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index c08989b5..4e8de91e 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -248,17 +248,20 @@ impl CachedRedisStorage { let mut batch = batcher_flusher.lock().unwrap(); std::mem::take(&mut *batch) }; - let now = SystemTime::now(); - for (counter, delta) in counters { - let delta = delta.value_at(now); - if delta > 0 { - storage - .update_counter(&counter, delta) - .await - .or_else(|err| if err.is_transient() { Ok(()) } else { Err(err) }) - .expect("Unrecoverable Redis error!"); - } - } + + // TODO: After rebase, the code needs to be refactored to use delta.value_at(SystemTime::now()) and compare delta is greater than 0 after adding the key to update in update_counters + storage + .update_counters(counters) + .await + .or_else(|err| { + if err.is_transient() { + p.store(true, Ordering::Release); + Ok(()) + } else { + Err(err) + } + }) + .expect("Unrecoverable Redis error!"); } interval.tick().await; } diff --git a/limitador/src/storage/redis/scripts.rs b/limitador/src/storage/redis/scripts.rs index 6c2432c9..3eea10d6 100644 --- a/limitador/src/storage/redis/scripts.rs +++ b/limitador/src/storage/redis/scripts.rs @@ -19,6 +19,24 @@ pub const SCRIPT_UPDATE_COUNTER: &str = " end return c"; +// KEYS: List of counter keys +// ARGV[1]: limit keys +// ARGV[2]: TTLs +// ARGV[3]: deltas +pub const BATCH_UPDATE_COUNTERS: &str = " + for i, counter_key in ipairs(KEYS) do + local limit_key = ARGV[1][i] + local ttl = tonumber(ARGV[2][i]) + local delta = tonumber(ARGV[3][i]) + + local c = redis.call('incrby', counter_key, delta) + if c == delta then + redis.call('expire', counter_key, ttl) + redis.call('sadd', limit_key, counter_key) + end + end +"; + // KEYS: the function returns the value and TTL (in ms) for these keys // The first position of the list returned contains the value of KEYS[1], the // second position contains its TTL. The third position contains the value of From 167c3639b7e02e43d05a8523bdd1d10996af0dfb Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Wed, 3 Apr 2024 11:27:58 +0200 Subject: [PATCH 02/19] [feat] Simplified batch update --- limitador/src/storage/keys.rs | 8 -------- limitador/src/storage/redis/redis_async.rs | 19 +++++-------------- limitador/src/storage/redis/scripts.rs | 14 ++++++++------ 3 files changed, 13 insertions(+), 28 deletions(-) diff --git a/limitador/src/storage/keys.rs b/limitador/src/storage/keys.rs index fb22666c..51b257c2 100644 --- a/limitador/src/storage/keys.rs +++ b/limitador/src/storage/keys.rs @@ -23,14 +23,6 @@ pub fn key_for_counter(counter: &Counter) -> String { ) } -pub fn keys_for_counters_of_limits(limits: Vec<&Limit>) -> String { - limits - .iter() - .map(|limit| key_for_counters_of_limit(limit)) - .collect::>() - .join(";") -} - pub fn key_for_counters_of_limit(limit: &Limit) -> String { format!( "namespace:{{{}}},counters_of_limit:{}", diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index a793ed93..4992b0de 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -242,27 +242,18 @@ impl AsyncRedisStorage { let mut con = self.conn_manager.clone(); let span = trace_span!("datastore"); - // TODO (didierofrivia): Fix this so we don't iterate twice - let (counters, limits): (Vec<&Counter>, Vec<&Limit>) = - counters_and_deltas.keys().map(|k| (k, k.limit())).unzip(); - let (ttls, deltas): (Vec, Vec) = counters_and_deltas - .iter() - .map(|(k, v)| (k.seconds(), v)) - .unzip(); - let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS); let mut script_invocation = redis_script.prepare_invoke(); - // populate the counter keys in the redis script - for counter_key in counters { - script_invocation.key(key_for_counter(counter_key)); + for (counter, delta) in counters_and_deltas { + script_invocation.key(key_for_counter(&counter)); + script_invocation.arg(key_for_counters_of_limit(&counter.limit())); + script_invocation.arg(&counter.seconds()); + script_invocation.arg(delta); } async { script_invocation - .arg(keys_for_counters_of_limits(limits)) - .arg(ttls) - .arg(deltas) .invoke_async::<_, _>(&mut con) .await } diff --git a/limitador/src/storage/redis/scripts.rs b/limitador/src/storage/redis/scripts.rs index 3eea10d6..8f5c5d67 100644 --- a/limitador/src/storage/redis/scripts.rs +++ b/limitador/src/storage/redis/scripts.rs @@ -20,20 +20,22 @@ pub const SCRIPT_UPDATE_COUNTER: &str = " return c"; // KEYS: List of counter keys -// ARGV[1]: limit keys -// ARGV[2]: TTLs -// ARGV[3]: deltas +// ARGV[j]: Limit keys +// ARGV[j+1]: TTLs +// ARGV[j+2]: Deltas pub const BATCH_UPDATE_COUNTERS: &str = " + local j = 1 for i, counter_key in ipairs(KEYS) do - local limit_key = ARGV[1][i] - local ttl = tonumber(ARGV[2][i]) - local delta = tonumber(ARGV[3][i]) + local limit_key = ARGV[j] + local ttl = tonumber(ARGV[j+1]) + local delta = tonumber(ARGV[j+2]) local c = redis.call('incrby', counter_key, delta) if c == delta then redis.call('expire', counter_key, ttl) redis.call('sadd', limit_key, counter_key) end + j = j + 3 end "; From a65a81fb3054d6f31f7b2510476f2d377b4baebf Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Wed, 3 Apr 2024 15:08:01 +0200 Subject: [PATCH 03/19] [refactor] Sending limit_keys as keys, simpler lua script --- limitador/src/storage/redis/redis_async.rs | 16 ++++++---------- limitador/src/storage/redis/scripts.rs | 21 ++++++++++----------- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 4992b0de..936fd12c 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -15,7 +15,7 @@ use redis::{AsyncCommands, RedisError}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::time::Duration; -use tracing::{debug_span, Instrument}; +use tracing::{debug_span, trace_span, Instrument}; use crate::storage::atomic_expiring_value::AtomicExpiringValue; // Note: this implementation does not guarantee exact limits. Ensuring that we @@ -247,18 +247,14 @@ impl AsyncRedisStorage { for (counter, delta) in counters_and_deltas { script_invocation.key(key_for_counter(&counter)); - script_invocation.arg(key_for_counters_of_limit(&counter.limit())); - script_invocation.arg(&counter.seconds()); + script_invocation.key(key_for_counters_of_limit(counter.limit())); + script_invocation.arg(counter.seconds()); script_invocation.arg(delta); } - async { - script_invocation - .invoke_async::<_, _>(&mut con) - .await - } - .instrument(span) - .await?; + async { script_invocation.invoke_async::<_, _>(&mut con).await } + .instrument(span) + .await?; Ok(()) } diff --git a/limitador/src/storage/redis/scripts.rs b/limitador/src/storage/redis/scripts.rs index 8f5c5d67..26871acd 100644 --- a/limitador/src/storage/redis/scripts.rs +++ b/limitador/src/storage/redis/scripts.rs @@ -19,23 +19,22 @@ pub const SCRIPT_UPDATE_COUNTER: &str = " end return c"; -// KEYS: List of counter keys -// ARGV[j]: Limit keys -// ARGV[j+1]: TTLs -// ARGV[j+2]: Deltas +// KEY[i]: Counter key +// KEY[i+1]: Limit key +// ARGV[i]: TTLs +// ARGV[i+1]: Deltas pub const BATCH_UPDATE_COUNTERS: &str = " - local j = 1 - for i, counter_key in ipairs(KEYS) do - local limit_key = ARGV[j] - local ttl = tonumber(ARGV[j+1]) - local delta = tonumber(ARGV[j+2]) + for i = 1, #KEYS, 2 do + local counter_key = KEYS[i] + local limit_key = KEYS[i+1] + local ttl = ARGV[i] + local delta = ARGV[i+1] local c = redis.call('incrby', counter_key, delta) - if c == delta then + if c == tonumber(delta) then redis.call('expire', counter_key, ttl) redis.call('sadd', limit_key, counter_key) end - j = j + 3 end "; From 9bc7f5ba61a64c8efb6f88b32fcedfb95ab588b0 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Mon, 8 Apr 2024 13:32:59 +0200 Subject: [PATCH 04/19] [test] Changing flush period to 2 milis --- limitador/tests/integration_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/limitador/tests/integration_tests.rs b/limitador/tests/integration_tests.rs index 5cdb6b88..8d15af57 100644 --- a/limitador/tests/integration_tests.rs +++ b/limitador/tests/integration_tests.rs @@ -59,7 +59,7 @@ macro_rules! test_with_all_storage_impls { #[serial] async fn [<$function _with_async_redis_and_local_cache>]() { let storage_builder = CachedRedisStorageBuilder::new("redis://127.0.0.1:6379"). - flushing_period(Duration::from_millis(200)). + flushing_period(Duration::from_millis(2)). max_ttl_cached_counters(Duration::from_secs(3600)). ttl_ratio_cached_counters(1). max_cached_counters(10000); From dcffbf4870f9dc2609b877a7bba73dc925f72cc3 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Mon, 8 Apr 2024 13:33:21 +0200 Subject: [PATCH 05/19] [test] Testing multiple limits rate limit waiting for flushing period --- limitador/tests/integration_tests.rs | 71 ++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/limitador/tests/integration_tests.rs b/limitador/tests/integration_tests.rs index 8d15af57..844a90ed 100644 --- a/limitador/tests/integration_tests.rs +++ b/limitador/tests/integration_tests.rs @@ -149,6 +149,7 @@ mod test { test_with_all_storage_impls!(delete_limits_of_a_namespace_also_deletes_counters); test_with_all_storage_impls!(delete_limits_of_an_empty_namespace_does_nothing); test_with_all_storage_impls!(rate_limited); + test_with_all_storage_impls!(multiple_limits_rate_limited); test_with_all_storage_impls!(rate_limited_with_delta_higher_than_one); test_with_all_storage_impls!(rate_limited_with_delta_higher_than_max); test_with_all_storage_impls!(takes_into_account_only_vars_of_the_limits); @@ -478,6 +479,76 @@ mod test { .unwrap()); } + async fn multiple_limits_rate_limited(rate_limiter: &mut TestsLimiter) { + let namespace = "test_namespace"; + let max_hits = 3; + let limits = vec![ + Limit::new( + namespace, + max_hits, + 60, + vec!["req.method == 'GET'"], + vec!["app_id"], + ), + Limit::new( + namespace, + max_hits + 1, + 60, + vec!["req.method == 'POST'"], + vec!["app_id"], + ), + ]; + + for limit in limits { + rate_limiter.add_limit(&limit).await; + } + + let mut get_values: HashMap = HashMap::new(); + get_values.insert("req.method".to_string(), "GET".to_string()); + get_values.insert("app_id".to_string(), "test_app_id".to_string()); + + let mut post_values: HashMap = HashMap::new(); + post_values.insert("req.method".to_string(), "POST".to_string()); + post_values.insert("app_id".to_string(), "test_app_id".to_string()); + + for i in 0..max_hits { + assert!( + !rate_limiter + .is_rate_limited(namespace, &get_values, 1) + .await + .unwrap(), + "Must not be limited after {i}" + ); + assert!( + !rate_limiter + .is_rate_limited(namespace, &post_values, 1) + .await + .unwrap(), + "Must not be limited after {i}" + ); + rate_limiter + .check_rate_limited_and_update(namespace, &get_values, 1, false) + .await + .unwrap(); + rate_limiter + .check_rate_limited_and_update(namespace, &post_values, 1, false) + .await + .unwrap(); + } + + // We wait for the flushing period to pass so the counters are flushed in the cached storage + tokio::time::sleep(Duration::from_millis(3)).await; + + assert!(rate_limiter + .is_rate_limited(namespace, &get_values, 1) + .await + .unwrap()); + assert!(!rate_limiter + .is_rate_limited(namespace, &post_values, 1) + .await + .unwrap()); + } + async fn rate_limited_with_delta_higher_than_one(rate_limiter: &mut TestsLimiter) { let namespace = "test_namespace"; let limit = Limit::new( From 18c2fcd6d6fcef7188a9ae298c0c06d585284713 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Tue, 9 Apr 2024 11:16:50 +0200 Subject: [PATCH 06/19] [refactor] function `partial_counter_from_counter_key` getting Counter from key only --- limitador/src/storage/keys.rs | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/limitador/src/storage/keys.rs b/limitador/src/storage/keys.rs index 51b257c2..86110521 100644 --- a/limitador/src/storage/keys.rs +++ b/limitador/src/storage/keys.rs @@ -36,7 +36,7 @@ pub fn prefix_for_namespace(namespace: &str) -> String { } pub fn counter_from_counter_key(key: &str, limit: &Limit) -> Counter { - let mut counter = partial_counter_from_counter_key(key, limit.namespace().as_ref()); + let mut counter = partial_counter_from_counter_key(key); if !counter.update_to_limit(limit) { // this means some kind of data corruption _or_ most probably // an out of sync `impl PartialEq for Limit` vs `pub fn key_for_counter(counter: &Counter) -> String` @@ -49,11 +49,24 @@ pub fn counter_from_counter_key(key: &str, limit: &Limit) -> Counter { counter } -pub fn partial_counter_from_counter_key(key: &str, namespace: &str) -> Counter { - let offset = ",counter:".len(); - let start_pos_counter = prefix_for_namespace(namespace).len() + offset; - - let counter: Counter = serde_json::from_str(&key[start_pos_counter..]).unwrap(); +pub fn partial_counter_from_counter_key(key: &str) -> Counter { + let namespace_prefix = "namespace:"; + let counter_prefix = ",counter:"; + + // Find the start position of the counter portion + let start_pos_namespace = key + .find(namespace_prefix) + .expect("Namespace not found in the key"); + let start_pos_counter = key[start_pos_namespace..] + .find(counter_prefix) + .expect("Counter not found in the key") + + start_pos_namespace + + counter_prefix.len(); + + // Extract counter JSON substring and deserialize it + let counter_str = &key[start_pos_counter..]; + let counter: Counter = + serde_json::from_str(counter_str).expect("Failed to deserialize counter JSON"); counter } @@ -87,7 +100,7 @@ mod tests { let limit = Limit::new(namespace, 1, 1, vec!["req.method == 'GET'"], vec!["app_id"]); let counter = Counter::new(limit.clone(), HashMap::default()); let raw = key_for_counter(&counter); - assert_eq!(counter, partial_counter_from_counter_key(&raw, namespace)); + assert_eq!(counter, partial_counter_from_counter_key(&raw)); let prefix = prefix_for_namespace(namespace); assert_eq!(&raw[0..prefix.len()], &prefix); } From a9ec2474f180aaba97618e9b120420633d0f772f Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Tue, 9 Apr 2024 15:32:29 +0200 Subject: [PATCH 07/19] [feature] Getting updated counters from redis --- limitador/src/storage/redis/redis_async.rs | 19 ++++++++++++++++--- limitador/src/storage/redis/redis_cached.rs | 4 ++-- limitador/src/storage/redis/scripts.rs | 3 +++ 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 936fd12c..1f8f5350 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -238,13 +238,14 @@ impl AsyncRedisStorage { pub async fn update_counters( &self, counters_and_deltas: HashMap, - ) -> Result<(), StorageErr> { + ) -> Result, StorageErr> { let mut con = self.conn_manager.clone(); let span = trace_span!("datastore"); let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS); let mut script_invocation = redis_script.prepare_invoke(); + // TODO: Fix calculating delta value greater than zero at _now_ and return the right AtomicExpiringValue for (counter, delta) in counters_and_deltas { script_invocation.key(key_for_counter(&counter)); script_invocation.key(key_for_counters_of_limit(counter.limit())); @@ -252,11 +253,23 @@ impl AsyncRedisStorage { script_invocation.arg(delta); } - async { script_invocation.invoke_async::<_, _>(&mut con).await } + let script_res: Vec> = script_invocation + .invoke_async::<_, _>(&mut con) .instrument(span) .await?; - Ok(()) + let counter_value_map: HashMap = script_res + .iter() + .filter_map(|counter_value| match counter_value { + Some((raw_counter_key, val)) => { + let counter = partial_counter_from_counter_key(raw_counter_key); + Some((counter, *val)) + } + None => None, + }) + .collect(); + + Ok(counter_value_map) } } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 4e8de91e..aef70827 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -250,13 +250,13 @@ impl CachedRedisStorage { }; // TODO: After rebase, the code needs to be refactored to use delta.value_at(SystemTime::now()) and compare delta is greater than 0 after adding the key to update in update_counters - storage + let _updated_counters = storage .update_counters(counters) .await .or_else(|err| { if err.is_transient() { p.store(true, Ordering::Release); - Ok(()) + Ok(HashMap::default()) } else { Err(err) } diff --git a/limitador/src/storage/redis/scripts.rs b/limitador/src/storage/redis/scripts.rs index 26871acd..d7554772 100644 --- a/limitador/src/storage/redis/scripts.rs +++ b/limitador/src/storage/redis/scripts.rs @@ -24,6 +24,7 @@ pub const SCRIPT_UPDATE_COUNTER: &str = " // ARGV[i]: TTLs // ARGV[i+1]: Deltas pub const BATCH_UPDATE_COUNTERS: &str = " + local res = {} for i = 1, #KEYS, 2 do local counter_key = KEYS[i] local limit_key = KEYS[i+1] @@ -35,7 +36,9 @@ pub const BATCH_UPDATE_COUNTERS: &str = " redis.call('expire', counter_key, ttl) redis.call('sadd', limit_key, counter_key) end + table.insert(res, { counter_key, c }) end + return res "; // KEYS: the function returns the value and TTL (in ms) for these keys From cec9f140e10aafb50f4cb84225f0fa0ed10a27c6 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Wed, 10 Apr 2024 15:35:23 +0200 Subject: [PATCH 08/19] [refactor] Fix using `AtomicExpiringValue` when batch updating counters --- limitador/src/storage/redis/redis_async.rs | 29 +++++++++++++-------- limitador/src/storage/redis/redis_cached.rs | 1 - 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 1f8f5350..d7af5467 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -4,6 +4,7 @@ use self::redis::aio::ConnectionManager; use self::redis::ConnectionInfo; use crate::counter::Counter; use crate::limit::Limit; +use crate::storage::atomic_expiring_value::AtomicExpiringValue; use crate::storage::keys::*; use crate::storage::redis::is_limited; use crate::storage::redis::scripts::{ @@ -14,9 +15,8 @@ use async_trait::async_trait; use redis::{AsyncCommands, RedisError}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tracing::{debug_span, trace_span, Instrument}; -use crate::storage::atomic_expiring_value::AtomicExpiringValue; // Note: this implementation does not guarantee exact limits. Ensuring that we // never go over the limits would hurt performance. This implementation @@ -235,22 +235,25 @@ impl AsyncRedisStorage { Ok(()) } - pub async fn update_counters( + pub(crate) async fn update_counters( &self, counters_and_deltas: HashMap, - ) -> Result, StorageErr> { + ) -> Result, StorageErr> { let mut con = self.conn_manager.clone(); let span = trace_span!("datastore"); let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS); let mut script_invocation = redis_script.prepare_invoke(); - // TODO: Fix calculating delta value greater than zero at _now_ and return the right AtomicExpiringValue + let now = SystemTime::now(); for (counter, delta) in counters_and_deltas { - script_invocation.key(key_for_counter(&counter)); - script_invocation.key(key_for_counters_of_limit(counter.limit())); - script_invocation.arg(counter.seconds()); - script_invocation.arg(delta); + let delta = delta.value_at(now); + if delta > 0 { + script_invocation.key(key_for_counter(&counter)); + script_invocation.key(key_for_counters_of_limit(counter.limit())); + script_invocation.arg(counter.seconds()); + script_invocation.arg(delta); + } } let script_res: Vec> = script_invocation @@ -258,12 +261,16 @@ impl AsyncRedisStorage { .instrument(span) .await?; - let counter_value_map: HashMap = script_res + let counter_value_map: HashMap = script_res .iter() .filter_map(|counter_value| match counter_value { Some((raw_counter_key, val)) => { let counter = partial_counter_from_counter_key(raw_counter_key); - Some((counter, *val)) + let seconds = counter.seconds(); + Some(( + counter, + AtomicExpiringValue::new(*val, now + Duration::from_secs(seconds)), + )) } None => None, }) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index aef70827..38e63223 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -249,7 +249,6 @@ impl CachedRedisStorage { std::mem::take(&mut *batch) }; - // TODO: After rebase, the code needs to be refactored to use delta.value_at(SystemTime::now()) and compare delta is greater than 0 after adding the key to update in update_counters let _updated_counters = storage .update_counters(counters) .await From 8cfb2e461024c981e8de83b60752bd76f76346fb Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Thu, 11 Apr 2024 10:24:08 +0200 Subject: [PATCH 09/19] [wip] Updating cached counters --- limitador/src/storage/redis/redis_async.rs | 21 +----- limitador/src/storage/redis/redis_cached.rs | 82 +++++++++++++-------- 2 files changed, 54 insertions(+), 49 deletions(-) diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index d7af5467..4bdebd99 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -238,7 +238,7 @@ impl AsyncRedisStorage { pub(crate) async fn update_counters( &self, counters_and_deltas: HashMap, - ) -> Result, StorageErr> { + ) -> Result, StorageErr> { let mut con = self.conn_manager.clone(); let span = trace_span!("datastore"); @@ -256,27 +256,12 @@ impl AsyncRedisStorage { } } - let script_res: Vec> = script_invocation + let script_res: Vec> = script_invocation .invoke_async::<_, _>(&mut con) .instrument(span) .await?; - let counter_value_map: HashMap = script_res - .iter() - .filter_map(|counter_value| match counter_value { - Some((raw_counter_key, val)) => { - let counter = partial_counter_from_counter_key(raw_counter_key); - let seconds = counter.seconds(); - Some(( - counter, - AtomicExpiringValue::new(*val, now + Duration::from_secs(seconds)), - )) - } - None => None, - }) - .collect(); - - Ok(counter_value_map) + Ok(script_res.into_iter().flatten().collect()) } } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 38e63223..ab55ddf5 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -39,7 +39,7 @@ use tracing::{error, warn}; // multiple times when it is not cached. pub struct CachedRedisStorage { - cached_counters: CountersCache, + cached_counters: Arc, batcher_counter_updates: Arc>>, async_redis_storage: AsyncRedisStorage, redis_conn_manager: ConnectionManager, @@ -226,6 +226,15 @@ impl CachedRedisStorage { ) .await?; + let cached_counters = CountersCacheBuilder::new() + .max_cached_counters(max_cached_counters) + .max_ttl_cached_counter(ttl_cached_counters) + .ttl_ratio_cached_counter(ttl_ratio_cached_counters) + .build(); + + let cacher = Arc::new(cached_counters); + let cacher_clone = cacher.clone(); + let partitioned = Arc::new(AtomicBool::new(false)); let async_redis_storage = AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); @@ -238,42 +247,16 @@ impl CachedRedisStorage { let mut interval = tokio::time::interval(flushing_period); tokio::spawn(async move { loop { - if p.load(Ordering::Acquire) { - if storage.is_alive().await { - warn!("Partition to Redis resolved!"); - p.store(false, Ordering::Release); + tokio::select! { + _ = interval.tick() => { + flush_batcher_and_update_counters(batcher_flusher.clone(), storage.clone(), cacher_clone.clone(), p.clone()).await; } - } else { - let counters = { - let mut batch = batcher_flusher.lock().unwrap(); - std::mem::take(&mut *batch) - }; - - let _updated_counters = storage - .update_counters(counters) - .await - .or_else(|err| { - if err.is_transient() { - p.store(true, Ordering::Release); - Ok(HashMap::default()) - } else { - Err(err) - } - }) - .expect("Unrecoverable Redis error!"); } - interval.tick().await; } }); - let cached_counters = CountersCacheBuilder::new() - .max_cached_counters(max_cached_counters) - .max_ttl_cached_counter(ttl_cached_counters) - .ttl_ratio_cached_counter(ttl_ratio_cached_counters) - .build(); - Ok(Self { - cached_counters, + cached_counters: cacher, batcher_counter_updates: batcher, redis_conn_manager, async_redis_storage, @@ -397,6 +380,43 @@ impl CachedRedisStorageBuilder { } } +async fn flush_batcher_and_update_counters( + batcher: Arc>>, + storage: AsyncRedisStorage, + cached_counters: Arc, + partitioned: Arc, +) { + if partitioned.load(Ordering::Acquire) { + if storage.is_alive().await { + warn!("Partition to Redis resolved!"); + partitioned.store(false, Ordering::Release); + } + } else { + let counters = { + let mut batch = batcher.lock().unwrap(); + std::mem::take(&mut *batch) + }; + + let updated_counters = storage + .update_counters(counters) + .await + .or_else(|err| { + if err.is_transient() { + partitioned.store(true, Ordering::Release); + Ok(Vec::new()) + } else { + Err(err) + } + }) + .expect("Unrecoverable Redis error!"); + + for (counter_key, value) in updated_counters { + let counter = partial_counter_from_counter_key(&counter_key); + cached_counters.increase_by(&counter, value); + } + } +} + #[cfg(test)] mod tests { use crate::storage::redis::CachedRedisStorage; From 3be955d2eb52c87adfa19bcb7601e944d994f3a1 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Tue, 16 Apr 2024 18:07:25 +0200 Subject: [PATCH 10/19] [wip][refactor] Testing batch updates --- Cargo.lock | 11 +++ limitador/Cargo.toml | 1 + limitador/src/storage/redis/redis_async.rs | 84 +++++++++++++++++++-- limitador/src/storage/redis/redis_cached.rs | 5 +- 4 files changed, 94 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bbbd89c1..301842c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1486,6 +1486,7 @@ dependencies = [ "r2d2", "rand", "redis", + "redis-test", "reqwest", "rmp-serde", "rocksdb", @@ -2388,6 +2389,16 @@ dependencies = [ "url", ] +[[package]] +name = "redis-test" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a948b3cec9e4b1fedbb0f0788e79029fb1f641b6cfefb7a15d044f803854427" +dependencies = [ + "futures", + "redis", +] + [[package]] name = "redox_syscall" version = "0.4.1" diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 103043df..87a2c772 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -41,6 +41,7 @@ redis = { version = "0.25", optional = true, features = [ "tls-native-tls", "tokio-native-tls-comp", ] } +redis-test = { version = "0.4.0", features = ["aio"] } r2d2 = { version = "0.8", optional = true } tokio = { version = "1", optional = true, features = [ "rt-multi-thread", diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 4bdebd99..bc96a45f 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -12,6 +12,7 @@ use crate::storage::redis::scripts::{ }; use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; +use redis::aio::ConnectionLike; use redis::{AsyncCommands, RedisError}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; @@ -29,7 +30,7 @@ use tracing::{debug_span, trace_span, Instrument}; #[derive(Clone)] pub struct AsyncRedisStorage { - conn_manager: ConnectionManager, + pub(crate) conn_manager: ConnectionManager, } #[async_trait] @@ -235,11 +236,10 @@ impl AsyncRedisStorage { Ok(()) } - pub(crate) async fn update_counters( - &self, + pub(crate) async fn update_counters( + redis_conn: &mut C, counters_and_deltas: HashMap, ) -> Result, StorageErr> { - let mut con = self.conn_manager.clone(); let span = trace_span!("datastore"); let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS); @@ -257,7 +257,7 @@ impl AsyncRedisStorage { } let script_res: Vec> = script_invocation - .invoke_async::<_, _>(&mut con) + .invoke_async::<_, _>(redis_conn) .instrument(span) .await?; @@ -269,6 +269,8 @@ impl AsyncRedisStorage { mod tests { use crate::storage::redis::AsyncRedisStorage; use redis::ErrorKind; + use redis_test::{MockCmd, MockRedisConnection, IntoRedisValue}; + use crate::storage::keys::{key_for_counter, key_for_counters_of_limit}; #[tokio::test] async fn errs_on_bad_url() { @@ -285,4 +287,76 @@ mod tests { assert_eq!(error.kind(), ErrorKind::IoError); assert!(error.is_connection_refusal()) } + + #[tokio::test] + async fn batch_update_counters() { + + let mut counters_and_deltas = std::collections::HashMap::new(); + let counter = crate::counter::Counter::new( + crate::limit::Limit::new( + "test_namespace", + 10, + 60, + vec!["req.method == 'GET'"], + vec!["app_id"], + ), + std::collections::HashMap::new(), + ); + + let expiring_value = crate::storage::atomic_expiring_value::AtomicExpiringValue::new( + 1, + std::time::SystemTime::now() + std::time::Duration::from_secs(60), + ); + + let counter_key = key_for_counter(&counter); + let key_for_counters_of_limit = key_for_counters_of_limit(counter.limit()); + + counters_and_deltas.insert(counter, expiring_value); + + let mock_response = format!( + "{{{{ {},1}}}}", + counter_key.clone(), + ); + + let mut mock_client = MockRedisConnection::new( + vec![ + MockCmd::new( + redis::cmd("EVALSHA") + .arg("13e042bb900a9a1104370208a300432bcdd45383") + .arg("2") + .arg(counter_key.clone()) + .arg(key_for_counters_of_limit.clone()) + .arg(60) + .arg(1), + Ok(IntoRedisValue::into_redis_value(mock_response)), + ), + MockCmd::new( + redis::cmd("incrby") + .arg(counter_key.clone()) + .arg(1), + Ok("1"), + ), + MockCmd::new( + redis::cmd("EXPIRE") + .arg(counter_key.clone()) + .arg(60), + Ok("1"), + ), + MockCmd::new( + redis::cmd("SADD") + .arg(key_for_counters_of_limit) + .arg(counter_key.clone()), + Ok("1"), + ), + ], + ); + + let result = + AsyncRedisStorage::update_counters(&mut mock_client, counters_and_deltas).await; + + + + assert!(result.is_ok()); + //assert!(result.unwrap(), "{}", vec![("test_namespace:app_id:GET:1", 1)]); + } } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index ab55ddf5..0ae03bba 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -397,8 +397,9 @@ async fn flush_batcher_and_update_counters( std::mem::take(&mut *batch) }; - let updated_counters = storage - .update_counters(counters) + let mut conn = storage.conn_manager.clone(); + + let updated_counters = AsyncRedisStorage::update_counters(&mut conn, counters) .await .or_else(|err| { if err.is_transient() { From 49ecd7b7515301b588ebb5c7531e0f0213bce559 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Thu, 18 Apr 2024 10:45:05 +0200 Subject: [PATCH 11/19] [wip] Testing, refactoring... --- limitador/src/storage/redis/redis_async.rs | 107 +++++++++----------- limitador/src/storage/redis/redis_cached.rs | 12 ++- limitador/src/storage/redis/scripts.rs | 3 +- 3 files changed, 59 insertions(+), 63 deletions(-) diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index bc96a45f..8bcda255 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -239,12 +239,13 @@ impl AsyncRedisStorage { pub(crate) async fn update_counters( redis_conn: &mut C, counters_and_deltas: HashMap, - ) -> Result, StorageErr> { + ) -> Result, StorageErr> { let span = trace_span!("datastore"); let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS); let mut script_invocation = redis_script.prepare_invoke(); + let mut res: Vec<(Counter, i64)> = Vec::new(); let now = SystemTime::now(); for (counter, delta) in counters_and_deltas { let delta = delta.value_at(now); @@ -253,24 +254,39 @@ impl AsyncRedisStorage { script_invocation.key(key_for_counters_of_limit(counter.limit())); script_invocation.arg(counter.seconds()); script_invocation.arg(delta); + // We need to store the counter in the actual order we are sending it to the script + res.push((counter, 0)); } } - let script_res: Vec> = script_invocation - .invoke_async::<_, _>(redis_conn) + // The redis crate is not working with tables, thus the response will be a Vec of counter values + let script_res: Vec = script_invocation + .invoke_async(redis_conn) .instrument(span) .await?; - Ok(script_res.into_iter().flatten().collect()) + // We need to update the values of the counters with the values returned by redis + for (i, (_, value)) in res.iter_mut().enumerate() { + if let Some(new_value) = script_res.get(i) { + *value = *new_value; + } + } + + Ok(res) } } #[cfg(test)] mod tests { - use crate::storage::redis::AsyncRedisStorage; - use redis::ErrorKind; - use redis_test::{MockCmd, MockRedisConnection, IntoRedisValue}; + use crate::counter::Counter; + use crate::limit::Limit; + use crate::storage::atomic_expiring_value::AtomicExpiringValue; use crate::storage::keys::{key_for_counter, key_for_counters_of_limit}; + use crate::storage::redis::AsyncRedisStorage; + use redis::{ErrorKind, Value}; + use redis_test::{MockCmd, MockRedisConnection}; + use std::collections::HashMap; + use std::time::{Duration, SystemTime}; #[tokio::test] async fn errs_on_bad_url() { @@ -290,73 +306,46 @@ mod tests { #[tokio::test] async fn batch_update_counters() { - - let mut counters_and_deltas = std::collections::HashMap::new(); - let counter = crate::counter::Counter::new( - crate::limit::Limit::new( + let mut counters_and_deltas = HashMap::new(); + let counter = Counter::new( + Limit::new( "test_namespace", 10, 60, vec!["req.method == 'GET'"], vec!["app_id"], ), - std::collections::HashMap::new(), + Default::default(), ); - let expiring_value = crate::storage::atomic_expiring_value::AtomicExpiringValue::new( - 1, - std::time::SystemTime::now() + std::time::Duration::from_secs(60), - ); + let expiring_value = + AtomicExpiringValue::new(1, SystemTime::now() + Duration::from_secs(60)); - let counter_key = key_for_counter(&counter); - let key_for_counters_of_limit = key_for_counters_of_limit(counter.limit()); + counters_and_deltas.insert(counter.clone(), expiring_value); - counters_and_deltas.insert(counter, expiring_value); + let mock_response = Value::Bulk(vec![Value::Int(10), Value::Int(20)]); - let mock_response = format!( - "{{{{ {},1}}}}", - counter_key.clone(), - ); - - let mut mock_client = MockRedisConnection::new( - vec![ - MockCmd::new( - redis::cmd("EVALSHA") - .arg("13e042bb900a9a1104370208a300432bcdd45383") - .arg("2") - .arg(counter_key.clone()) - .arg(key_for_counters_of_limit.clone()) - .arg(60) - .arg(1), - Ok(IntoRedisValue::into_redis_value(mock_response)), - ), - MockCmd::new( - redis::cmd("incrby") - .arg(counter_key.clone()) - .arg(1), - Ok("1"), - ), - MockCmd::new( - redis::cmd("EXPIRE") - .arg(counter_key.clone()) - .arg(60), - Ok("1"), - ), - MockCmd::new( - redis::cmd("SADD") - .arg(key_for_counters_of_limit) - .arg(counter_key.clone()), - Ok("1"), - ), - ], - ); + let mut mock_client = MockRedisConnection::new(vec![MockCmd::new( + redis::cmd("EVALSHA") + .arg("8ee7a63a239b1e196b6a557956da849c10ffefcf") + .arg("2") + .arg(key_for_counter(&counter)) + .arg(key_for_counters_of_limit(counter.limit())) + .arg(60) + .arg(1), + Ok(mock_response.clone()), + )]); let result = AsyncRedisStorage::update_counters(&mut mock_client, counters_and_deltas).await; - - assert!(result.is_ok()); - //assert!(result.unwrap(), "{}", vec![("test_namespace:app_id:GET:1", 1)]); + + let (c, v) = result.unwrap()[0].clone(); + assert_eq!( + "req.method == \"GET\"", + c.limit().conditions().iter().collect::>()[0] + ); + assert_eq!(10, v); } } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 0ae03bba..8381f5d8 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -411,9 +411,15 @@ async fn flush_batcher_and_update_counters( }) .expect("Unrecoverable Redis error!"); - for (counter_key, value) in updated_counters { - let counter = partial_counter_from_counter_key(&counter_key); - cached_counters.increase_by(&counter, value); + for (counter, value) in updated_counters { + //TODO: Populate the right ttls + cached_counters.insert( + counter, + Option::from(value), + 0, + Duration::from_secs(0), + SystemTime::now(), + ); } } } diff --git a/limitador/src/storage/redis/scripts.rs b/limitador/src/storage/redis/scripts.rs index d7554772..f75b3adb 100644 --- a/limitador/src/storage/redis/scripts.rs +++ b/limitador/src/storage/redis/scripts.rs @@ -36,7 +36,8 @@ pub const BATCH_UPDATE_COUNTERS: &str = " redis.call('expire', counter_key, ttl) redis.call('sadd', limit_key, counter_key) end - table.insert(res, { counter_key, c }) + + table.insert(res, c) end return res "; From 8fbbc357bce8788aefe14fc79d304183daa74082 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Fri, 19 Apr 2024 12:53:34 +0200 Subject: [PATCH 12/19] [refactor] Getting values and ttls back from redis and updating the cache --- limitador/src/storage/redis/redis_async.rs | 26 ++++++++++++--------- limitador/src/storage/redis/redis_cached.rs | 11 +++++---- limitador/src/storage/redis/scripts.rs | 3 +++ 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 8bcda255..0c35d32b 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -239,13 +239,13 @@ impl AsyncRedisStorage { pub(crate) async fn update_counters( redis_conn: &mut C, counters_and_deltas: HashMap, - ) -> Result, StorageErr> { + ) -> Result, StorageErr> { let span = trace_span!("datastore"); let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS); let mut script_invocation = redis_script.prepare_invoke(); - let mut res: Vec<(Counter, i64)> = Vec::new(); + let mut res: Vec<(Counter, i64, i64)> = Vec::new(); let now = SystemTime::now(); for (counter, delta) in counters_and_deltas { let delta = delta.value_at(now); @@ -255,7 +255,7 @@ impl AsyncRedisStorage { script_invocation.arg(counter.seconds()); script_invocation.arg(delta); // We need to store the counter in the actual order we are sending it to the script - res.push((counter, 0)); + res.push((counter, 0, 0)); } } @@ -265,11 +265,14 @@ impl AsyncRedisStorage { .instrument(span) .await?; - // We need to update the values of the counters with the values returned by redis - for (i, (_, value)) in res.iter_mut().enumerate() { - if let Some(new_value) = script_res.get(i) { - *value = *new_value; - } + // We need to update the values and ttls returned by redis + let counters_range = 0..res.len(); + let script_res_range = (0..script_res.len()).step_by(2); + + for (i, j) in counters_range.zip(script_res_range) { + let (_, val, ttl) = &mut res[i]; + *val = script_res[j]; + *ttl = script_res[j + 1]; } Ok(res) @@ -323,11 +326,11 @@ mod tests { counters_and_deltas.insert(counter.clone(), expiring_value); - let mock_response = Value::Bulk(vec![Value::Int(10), Value::Int(20)]); + let mock_response = Value::Bulk(vec![Value::Int(10), Value::Int(60)]); let mut mock_client = MockRedisConnection::new(vec![MockCmd::new( redis::cmd("EVALSHA") - .arg("8ee7a63a239b1e196b6a557956da849c10ffefcf") + .arg("8fbdbae84f16e71bcaef347c46f8887564b01213") .arg("2") .arg(key_for_counter(&counter)) .arg(key_for_counters_of_limit(counter.limit())) @@ -341,11 +344,12 @@ mod tests { assert!(result.is_ok()); - let (c, v) = result.unwrap()[0].clone(); + let (c, v, t) = result.unwrap()[0].clone(); assert_eq!( "req.method == \"GET\"", c.limit().conditions().iter().collect::>()[0] ); assert_eq!(10, v); + assert_eq!(60, t); } } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 8381f5d8..a724fe89 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -399,6 +399,8 @@ async fn flush_batcher_and_update_counters( let mut conn = storage.conn_manager.clone(); + let time_start_update_counters = Instant::now(); + let updated_counters = AsyncRedisStorage::update_counters(&mut conn, counters) .await .or_else(|err| { @@ -411,13 +413,14 @@ async fn flush_batcher_and_update_counters( }) .expect("Unrecoverable Redis error!"); - for (counter, value) in updated_counters { - //TODO: Populate the right ttls + for (counter, value, ttl) in updated_counters { cached_counters.insert( counter, Option::from(value), - 0, - Duration::from_secs(0), + ttl, + Duration::from_millis( + (Instant::now() - time_start_update_counters).as_millis() as u64 + ), SystemTime::now(), ); } diff --git a/limitador/src/storage/redis/scripts.rs b/limitador/src/storage/redis/scripts.rs index f75b3adb..c81b2e89 100644 --- a/limitador/src/storage/redis/scripts.rs +++ b/limitador/src/storage/redis/scripts.rs @@ -23,6 +23,8 @@ pub const SCRIPT_UPDATE_COUNTER: &str = " // KEY[i+1]: Limit key // ARGV[i]: TTLs // ARGV[i+1]: Deltas +// This function returns a list with the values and TTLs for the updated counter_keys, +// the first position the counter value and the second the TTL pub const BATCH_UPDATE_COUNTERS: &str = " local res = {} for i = 1, #KEYS, 2 do @@ -38,6 +40,7 @@ pub const BATCH_UPDATE_COUNTERS: &str = " end table.insert(res, c) + table.insert(res, redis.call('pttl', counter_key)) end return res "; From a5e92693ed3fbfa4dd815c822efe47ebd4bfc8fa Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Fri, 19 Apr 2024 16:28:02 +0200 Subject: [PATCH 13/19] [refactor] Update the cached counter if previous stored value is less than the updated count --- limitador/src/storage/redis/counters_cache.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 603fc3ac..2f2b22ca 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -121,9 +121,10 @@ impl CountersCache { ); if let Some(ttl) = cache_ttl.checked_sub(ttl_margin) { if ttl > Duration::ZERO { - let value = CachedCounterValue::from(&counter, counter_val, cache_ttl); - let previous = self.cache.get_with(counter.clone(), || Arc::new(value)); - if previous.expired_at(now) { + let previous = self.cache.get_with(counter.clone(), || { + Arc::new(CachedCounterValue::from(&counter, counter_val, cache_ttl)) + }); + if previous.expired_at(now) || previous.value.value() < counter_val { previous.set_from_authority(&counter, counter_val, cache_ttl); } return previous; From ea16dcceb32c8d634933ab3f90b619af63e189f2 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Fri, 19 Apr 2024 16:30:23 +0200 Subject: [PATCH 14/19] [script] A more efficient way of building the table with val and ttl --- limitador/src/storage/redis/scripts.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/limitador/src/storage/redis/scripts.rs b/limitador/src/storage/redis/scripts.rs index c81b2e89..b241d88d 100644 --- a/limitador/src/storage/redis/scripts.rs +++ b/limitador/src/storage/redis/scripts.rs @@ -34,13 +34,14 @@ pub const BATCH_UPDATE_COUNTERS: &str = " local delta = ARGV[i+1] local c = redis.call('incrby', counter_key, delta) + table.insert(res, c) if c == tonumber(delta) then redis.call('expire', counter_key, ttl) redis.call('sadd', limit_key, counter_key) + table.insert(res, ttl*1000) + else + table.insert(res, redis.call('pttl', counter_key)) end - - table.insert(res, c) - table.insert(res, redis.call('pttl', counter_key)) end return res "; From 232da2c6d57b9f1af05f459a60013f3b0404cdc8 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Fri, 19 Apr 2024 16:32:06 +0200 Subject: [PATCH 15/19] [refactor] Fixing loop execution and variables scoped --- limitador/src/storage/redis/redis_cached.rs | 33 +++++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index a724fe89..805f99e5 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -238,22 +238,29 @@ impl CachedRedisStorage { let partitioned = Arc::new(AtomicBool::new(false)); let async_redis_storage = AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); - - let storage = async_redis_storage.clone(); let batcher: Arc>> = Arc::new(Mutex::new(Default::default())); - let p = Arc::clone(&partitioned); - let batcher_flusher = batcher.clone(); - let mut interval = tokio::time::interval(flushing_period); - tokio::spawn(async move { - loop { - tokio::select! { - _ = interval.tick() => { - flush_batcher_and_update_counters(batcher_flusher.clone(), storage.clone(), cacher_clone.clone(), p.clone()).await; - } + + { + let storage = async_redis_storage.clone(); + let conn = redis_conn_manager.clone(); + let p = Arc::clone(&partitioned); + let batcher_flusher = batcher.clone(); + let mut interval = tokio::time::interval(flushing_period); + tokio::spawn(async move { + loop { + flush_batcher_and_update_counters( + conn.clone(), + batcher_flusher.clone(), + storage.clone(), + cacher_clone.clone(), + p.clone(), + ) + .await; + interval.tick().await; } - } - }); + }); + } Ok(Self { cached_counters: cacher, From 3e81ae7ac6f0e39390b24baa3b48ae2df4966153 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Fri, 19 Apr 2024 16:32:52 +0200 Subject: [PATCH 16/19] [refactor] update_counters within redis_cached --- limitador/src/storage/redis/redis_async.rs | 111 ++------------------ limitador/src/storage/redis/redis_cached.rs | 108 +++++++++++++++++-- 2 files changed, 107 insertions(+), 112 deletions(-) diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 0c35d32b..9718da5d 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -4,20 +4,16 @@ use self::redis::aio::ConnectionManager; use self::redis::ConnectionInfo; use crate::counter::Counter; use crate::limit::Limit; -use crate::storage::atomic_expiring_value::AtomicExpiringValue; use crate::storage::keys::*; use crate::storage::redis::is_limited; -use crate::storage::redis::scripts::{ - BATCH_UPDATE_COUNTERS, SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS, -}; +use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS}; use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; -use redis::aio::ConnectionLike; use redis::{AsyncCommands, RedisError}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::str::FromStr; -use std::time::{Duration, SystemTime}; -use tracing::{debug_span, trace_span, Instrument}; +use std::time::Duration; +use tracing::{debug_span, Instrument}; // Note: this implementation does not guarantee exact limits. Ensuring that we // never go over the limits would hurt performance. This implementation @@ -30,7 +26,7 @@ use tracing::{debug_span, trace_span, Instrument}; #[derive(Clone)] pub struct AsyncRedisStorage { - pub(crate) conn_manager: ConnectionManager, + conn_manager: ConnectionManager, } #[async_trait] @@ -235,61 +231,12 @@ impl AsyncRedisStorage { Ok(()) } - - pub(crate) async fn update_counters( - redis_conn: &mut C, - counters_and_deltas: HashMap, - ) -> Result, StorageErr> { - let span = trace_span!("datastore"); - - let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS); - let mut script_invocation = redis_script.prepare_invoke(); - - let mut res: Vec<(Counter, i64, i64)> = Vec::new(); - let now = SystemTime::now(); - for (counter, delta) in counters_and_deltas { - let delta = delta.value_at(now); - if delta > 0 { - script_invocation.key(key_for_counter(&counter)); - script_invocation.key(key_for_counters_of_limit(counter.limit())); - script_invocation.arg(counter.seconds()); - script_invocation.arg(delta); - // We need to store the counter in the actual order we are sending it to the script - res.push((counter, 0, 0)); - } - } - - // The redis crate is not working with tables, thus the response will be a Vec of counter values - let script_res: Vec = script_invocation - .invoke_async(redis_conn) - .instrument(span) - .await?; - - // We need to update the values and ttls returned by redis - let counters_range = 0..res.len(); - let script_res_range = (0..script_res.len()).step_by(2); - - for (i, j) in counters_range.zip(script_res_range) { - let (_, val, ttl) = &mut res[i]; - *val = script_res[j]; - *ttl = script_res[j + 1]; - } - - Ok(res) - } } #[cfg(test)] mod tests { - use crate::counter::Counter; - use crate::limit::Limit; - use crate::storage::atomic_expiring_value::AtomicExpiringValue; - use crate::storage::keys::{key_for_counter, key_for_counters_of_limit}; use crate::storage::redis::AsyncRedisStorage; - use redis::{ErrorKind, Value}; - use redis_test::{MockCmd, MockRedisConnection}; - use std::collections::HashMap; - use std::time::{Duration, SystemTime}; + use redis::ErrorKind; #[tokio::test] async fn errs_on_bad_url() { @@ -306,50 +253,4 @@ mod tests { assert_eq!(error.kind(), ErrorKind::IoError); assert!(error.is_connection_refusal()) } - - #[tokio::test] - async fn batch_update_counters() { - let mut counters_and_deltas = HashMap::new(); - let counter = Counter::new( - Limit::new( - "test_namespace", - 10, - 60, - vec!["req.method == 'GET'"], - vec!["app_id"], - ), - Default::default(), - ); - - let expiring_value = - AtomicExpiringValue::new(1, SystemTime::now() + Duration::from_secs(60)); - - counters_and_deltas.insert(counter.clone(), expiring_value); - - let mock_response = Value::Bulk(vec![Value::Int(10), Value::Int(60)]); - - let mut mock_client = MockRedisConnection::new(vec![MockCmd::new( - redis::cmd("EVALSHA") - .arg("8fbdbae84f16e71bcaef347c46f8887564b01213") - .arg("2") - .arg(key_for_counter(&counter)) - .arg(key_for_counters_of_limit(counter.limit())) - .arg(60) - .arg(1), - Ok(mock_response.clone()), - )]); - - let result = - AsyncRedisStorage::update_counters(&mut mock_client, counters_and_deltas).await; - - assert!(result.is_ok()); - - let (c, v, t) = result.unwrap()[0].clone(); - assert_eq!( - "req.method == \"GET\"", - c.limit().conditions().iter().collect::>()[0] - ); - assert_eq!(10, v); - assert_eq!(60, t); - } } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 805f99e5..4c1b8ee0 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -4,21 +4,21 @@ use crate::storage::atomic_expiring_value::AtomicExpiringValue; use crate::storage::keys::*; use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder}; use crate::storage::redis::redis_async::AsyncRedisStorage; -use crate::storage::redis::scripts::VALUES_AND_TTLS; +use crate::storage::redis::scripts::{BATCH_UPDATE_COUNTERS, VALUES_AND_TTLS}; use crate::storage::redis::{ DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, DEFAULT_RESPONSE_TIMEOUT_MS, DEFAULT_TTL_RATIO_CACHED_COUNTERS, }; use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; -use redis::aio::ConnectionManager; +use redis::aio::{ConnectionLike, ConnectionManager}; use redis::{ConnectionInfo, RedisError}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant, SystemTime}; -use tracing::{error, warn}; +use tracing::{error, trace_span, warn, Instrument}; // This is just a first version. // @@ -387,7 +387,49 @@ impl CachedRedisStorageBuilder { } } +async fn update_counters( + redis_conn: &mut C, + counters_and_deltas: HashMap, +) -> Result, StorageErr> { + let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS); + let mut script_invocation = redis_script.prepare_invoke(); + + let mut res: Vec<(Counter, i64, i64)> = Vec::new(); + let now = SystemTime::now(); + for (counter, delta) in counters_and_deltas { + let delta = delta.value_at(now); + if delta > 0 { + script_invocation.key(key_for_counter(&counter)); + script_invocation.key(key_for_counters_of_limit(counter.limit())); + script_invocation.arg(counter.seconds()); + script_invocation.arg(delta); + // We need to store the counter in the actual order we are sending it to the script + res.push((counter, 0, 0)); + } + } + + let span = trace_span!("datastore"); + // The redis crate is not working with tables, thus the response will be a Vec of counter values + let script_res: Vec = script_invocation + .invoke_async(redis_conn) + .instrument(span) + .await?; + + // We need to update the values and ttls returned by redis + let counters_range = 0..res.len(); + let script_res_range = (0..script_res.len()).step_by(2); + + for (i, j) in counters_range.zip(script_res_range) { + let (_, val, ttl) = &mut res[i]; + *val = script_res[j]; + *ttl = script_res[j + 1]; + } + + Ok(res) +} + async fn flush_batcher_and_update_counters( + mut conn: ConnectionManager, batcher: Arc>>, storage: AsyncRedisStorage, cached_counters: Arc, @@ -404,11 +446,9 @@ async fn flush_batcher_and_update_counters( std::mem::take(&mut *batch) }; - let mut conn = storage.conn_manager.clone(); - let time_start_update_counters = Instant::now(); - let updated_counters = AsyncRedisStorage::update_counters(&mut conn, counters) + let updated_counters = update_counters(&mut conn, counters) .await .or_else(|err| { if err.is_transient() { @@ -436,8 +476,16 @@ async fn flush_batcher_and_update_counters( #[cfg(test)] mod tests { + use crate::counter::Counter; + use crate::limit::Limit; + use crate::storage::atomic_expiring_value::AtomicExpiringValue; + use crate::storage::keys::{key_for_counter, key_for_counters_of_limit}; use crate::storage::redis::CachedRedisStorage; - use redis::ErrorKind; + use crate::storage::redis::redis_cached::update_counters; + use redis::{ErrorKind, Value}; + use redis_test::{MockCmd, MockRedisConnection}; + use std::collections::HashMap; + use std::time::{Duration, SystemTime}; #[tokio::test] async fn errs_on_bad_url() { @@ -454,4 +502,50 @@ mod tests { assert_eq!(error.kind(), ErrorKind::IoError); assert!(error.is_connection_refusal()) } + + #[tokio::test] + async fn batch_update_counters() { + let mut counters_and_deltas = HashMap::new(); + let counter = Counter::new( + Limit::new( + "test_namespace", + 10, + 60, + vec!["req.method == 'GET'"], + vec!["app_id"], + ), + Default::default(), + ); + + let expiring_value = + AtomicExpiringValue::new(1, SystemTime::now() + Duration::from_secs(60)); + + counters_and_deltas.insert(counter.clone(), expiring_value); + + let mock_response = Value::Bulk(vec![Value::Int(10), Value::Int(60)]); + + let mut mock_client = MockRedisConnection::new(vec![MockCmd::new( + redis::cmd("EVALSHA") + .arg("1e87383cf7dba2bd0f9972ed73671274e6cbd5da") + .arg("2") + .arg(key_for_counter(&counter)) + .arg(key_for_counters_of_limit(counter.limit())) + .arg(60) + .arg(1), + Ok(mock_response.clone()), + )]); + + let result = + update_counters(&mut mock_client, counters_and_deltas).await; + + assert!(result.is_ok()); + + let (c, v, t) = result.unwrap()[0].clone(); + assert_eq!( + "req.method == \"GET\"", + c.limit().conditions().iter().collect::>()[0] + ); + assert_eq!(10, v); + assert_eq!(60, t); + } } From c57418136114993e31de4ba3543acac8b1ce4af8 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Fri, 19 Apr 2024 17:32:22 +0200 Subject: [PATCH 17/19] [refactor] Testing flush_batcher_and_update_counters --- limitador/src/storage/redis/redis_cached.rs | 88 ++++++++++++++++++--- 1 file changed, 79 insertions(+), 9 deletions(-) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 4c1b8ee0..41ee0f2c 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -14,6 +14,7 @@ use async_trait::async_trait; use redis::aio::{ConnectionLike, ConnectionManager}; use redis::{ConnectionInfo, RedisError}; use std::collections::{HashMap, HashSet}; +use std::future::Future; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -252,7 +253,7 @@ impl CachedRedisStorage { flush_batcher_and_update_counters( conn.clone(), batcher_flusher.clone(), - storage.clone(), + storage.is_alive(), cacher_clone.clone(), p.clone(), ) @@ -428,15 +429,15 @@ async fn update_counters( Ok(res) } -async fn flush_batcher_and_update_counters( - mut conn: ConnectionManager, +async fn flush_batcher_and_update_counters( + mut redis_conn: C, batcher: Arc>>, - storage: AsyncRedisStorage, + storage_is_alive: impl Future, cached_counters: Arc, partitioned: Arc, ) { if partitioned.load(Ordering::Acquire) { - if storage.is_alive().await { + if storage_is_alive.await { warn!("Partition to Redis resolved!"); partitioned.store(false, Ordering::Release); } @@ -448,7 +449,7 @@ async fn flush_batcher_and_update_counters( let time_start_update_counters = Instant::now(); - let updated_counters = update_counters(&mut conn, counters) + let updated_counters = update_counters(&mut redis_conn, counters) .await .or_else(|err| { if err.is_transient() { @@ -480,11 +481,14 @@ mod tests { use crate::limit::Limit; use crate::storage::atomic_expiring_value::AtomicExpiringValue; use crate::storage::keys::{key_for_counter, key_for_counters_of_limit}; + use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder}; + use crate::storage::redis::redis_cached::{flush_batcher_and_update_counters, update_counters}; use crate::storage::redis::CachedRedisStorage; - use crate::storage::redis::redis_cached::update_counters; use redis::{ErrorKind, Value}; use redis_test::{MockCmd, MockRedisConnection}; use std::collections::HashMap; + use std::sync::atomic::AtomicBool; + use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; #[tokio::test] @@ -535,8 +539,7 @@ mod tests { Ok(mock_response.clone()), )]); - let result = - update_counters(&mut mock_client, counters_and_deltas).await; + let result = update_counters(&mut mock_client, counters_and_deltas).await; assert!(result.is_ok()); @@ -548,4 +551,71 @@ mod tests { assert_eq!(10, v); assert_eq!(60, t); } + + #[tokio::test] + async fn flush_batcher_and_update_counters_test() { + let counter = Counter::new( + Limit::new( + "test_namespace", + 10, + 60, + vec!["req.method == 'POST'"], + vec!["app_id"], + ), + Default::default(), + ); + + let mock_response = Value::Bulk(vec![Value::Int(8), Value::Int(60)]); + + let mock_client = MockRedisConnection::new(vec![MockCmd::new( + redis::cmd("EVALSHA") + .arg("1e87383cf7dba2bd0f9972ed73671274e6cbd5da") + .arg("2") + .arg(key_for_counter(&counter)) + .arg(key_for_counters_of_limit(counter.limit())) + .arg(60) + .arg(2), + Ok(mock_response.clone()), + )]); + + let mut batched_counters = HashMap::new(); + batched_counters.insert( + counter.clone(), + AtomicExpiringValue::new(2, SystemTime::now() + Duration::from_secs(60)), + ); + + let batcher: Arc>> = + Arc::new(Mutex::new(batched_counters)); + let cache = CountersCacheBuilder::new().build(); + cache.insert( + counter.clone(), + Some(1), + 10, + Duration::from_secs(0), + SystemTime::now(), + ); + let cached_counters: Arc = Arc::new(cache); + let partitioned = Arc::new(AtomicBool::new(false)); + + async fn future_true() -> bool { + true + } + + if let Some(c) = cached_counters.get(&counter) { + assert_eq!(c.hits(&counter), 1); + } + + flush_batcher_and_update_counters( + mock_client, + batcher, + future_true(), + cached_counters.clone(), + partitioned, + ) + .await; + + if let Some(c) = cached_counters.get(&counter) { + assert_eq!(c.hits(&counter), 8); + } + } } From 62908ca4ada08ad72c35da8a070175ae7dc71b8b Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 19 Apr 2024 12:41:26 -0400 Subject: [PATCH 18/19] Redis-test is merely a dev dependency --- limitador/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 87a2c772..2d6646c6 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -41,7 +41,6 @@ redis = { version = "0.25", optional = true, features = [ "tls-native-tls", "tokio-native-tls-comp", ] } -redis-test = { version = "0.4.0", features = ["aio"] } r2d2 = { version = "0.8", optional = true } tokio = { version = "1", optional = true, features = [ "rt-multi-thread", @@ -55,6 +54,7 @@ base64 = { version = "0.22", optional = true } [dev-dependencies] serial_test = "3.0" criterion = { version = "0.5.1", features = ["html_reports"] } +redis-test = { version = "0.4.0", features = ["aio"] } paste = "1" rand = "0.8" tempfile = "3.5.0" From 1edd4c4203f07651ebb8e40151bfd39a2028d7ad Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Mon, 22 Apr 2024 10:59:36 +0200 Subject: [PATCH 19/19] [refactor] More accurate naming to the cache of counters --- limitador/src/storage/redis/redis_cached.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 41ee0f2c..de98ab06 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -19,7 +19,7 @@ use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant, SystemTime}; -use tracing::{error, trace_span, warn, Instrument}; +use tracing::{debug_span, error, warn, Instrument}; // This is just a first version. // @@ -233,9 +233,7 @@ impl CachedRedisStorage { .ttl_ratio_cached_counter(ttl_ratio_cached_counters) .build(); - let cacher = Arc::new(cached_counters); - let cacher_clone = cacher.clone(); - + let counters_cache = Arc::new(cached_counters); let partitioned = Arc::new(AtomicBool::new(false)); let async_redis_storage = AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); @@ -244,6 +242,7 @@ impl CachedRedisStorage { { let storage = async_redis_storage.clone(); + let counters_cache_clone = counters_cache.clone(); let conn = redis_conn_manager.clone(); let p = Arc::clone(&partitioned); let batcher_flusher = batcher.clone(); @@ -254,7 +253,7 @@ impl CachedRedisStorage { conn.clone(), batcher_flusher.clone(), storage.is_alive(), - cacher_clone.clone(), + counters_cache_clone.clone(), p.clone(), ) .await; @@ -264,7 +263,7 @@ impl CachedRedisStorage { } Ok(Self { - cached_counters: cacher, + cached_counters: counters_cache, batcher_counter_updates: batcher, redis_conn_manager, async_redis_storage, @@ -409,7 +408,7 @@ async fn update_counters( } } - let span = trace_span!("datastore"); + let span = debug_span!("datastore"); // The redis crate is not working with tables, thus the response will be a Vec of counter values let script_res: Vec = script_invocation .invoke_async(redis_conn)