From 47513390ba0298656868ca7b6e14e58a2c91505c Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Tue, 21 May 2024 11:44:27 -0400 Subject: [PATCH 1/6] All the fixes --- limitador/src/storage/redis/counters_cache.rs | 22 ++++-- limitador/src/storage/redis/redis_async.rs | 10 ++- limitador/src/storage/redis/redis_cached.rs | 77 ++++++++++--------- 3 files changed, 64 insertions(+), 45 deletions(-) diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 67591f44..ee1c23f5 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -14,6 +14,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; use tokio::select; use tokio::sync::{Notify, Semaphore}; +use tracing::info; #[derive(Debug)] pub struct CachedCounterValue { @@ -221,17 +222,26 @@ impl Batcher { return result; } else { ready = select! { - _ = self.notifier.notified() => self.batch_ready(max), - _ = tokio::time::sleep(self.interval) => true, + _ = async { + loop { + self.notifier.notified().await; + if self.batch_ready(max) { + break; + } + } + } => { + info!("Priority flush!"); + true + }, + _ = tokio::time::sleep(self.interval) => { + // info!("Time limit hit!"); + true + }, } } } } - pub fn is_empty(&self) -> bool { - self.updates.is_empty() - } - fn batch_ready(&self, size: usize) -> bool { self.updates.len() >= size || self diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index d77a16bd..c8adf009 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -14,7 +14,7 @@ use redis::{AsyncCommands, RedisError}; use std::collections::HashSet; use std::str::FromStr; use std::time::{Duration, Instant}; -use tracing::{debug_span, Instrument}; +use tracing::{debug_span, warn, Instrument}; // Note: this implementation does not guarantee exact limits. Ensuring that we // never go over the limits would hurt performance. This implementation @@ -215,6 +215,14 @@ impl AsyncRedisStorage { .clone() .incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1) .await + .map_err(|err| { + let err = >::into(err); + if !err.is_transient() { + panic!("Unrecoverable Redis error: {}", err); + } + warn!("Live check failure: {}", err); + err + }) .is_ok() .then(|| histogram!("liveness_latency").record(now.elapsed().as_secs_f64())) .is_some() diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 1a5e036b..d409bcf3 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -17,10 +17,11 @@ use redis::aio::{ConnectionLike, ConnectionManager}; use redis::{ConnectionInfo, RedisError}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; +use std::sync::atomic::Ordering::Acquire; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use tracing::{debug_span, error, warn, Instrument}; +use tracing::{debug_span, error, info, warn, Instrument}; // This is just a first version. // @@ -189,7 +190,6 @@ impl CachedRedisStorage { AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); { - let storage = async_redis_storage.clone(); let counters_cache_clone = counters_cache.clone(); let conn = redis_conn_manager.clone(); let p = Arc::clone(&partitioned); @@ -197,7 +197,6 @@ impl CachedRedisStorage { loop { flush_batcher_and_update_counters( conn.clone(), - storage.is_alive().await, counters_cache_clone.clone(), p.clone(), batch_size, @@ -339,48 +338,51 @@ async fn update_counters( #[tracing::instrument(skip_all)] async fn flush_batcher_and_update_counters( mut redis_conn: C, - storage_is_alive: bool, cached_counters: Arc, partitioned: Arc, batch_size: usize, ) { - if partitioned.load(Ordering::Acquire) || !storage_is_alive { - if !cached_counters.batcher().is_empty() { + let updated_counters = cached_counters + .batcher() + .consume(batch_size, |counters| { + if !counters.is_empty() && !partitioned.load(Acquire) { + info!("Flushing {} counter updates", counters.len()); + } + update_counters(&mut redis_conn, counters) + }) + .await + .map(|res| { + // info!("Success {} counters", res.len()); flip_partitioned(&partitioned, false); - } - } else { - let updated_counters = cached_counters - .batcher() - .consume(batch_size, |counters| { - update_counters(&mut redis_conn, counters) - }) - .await - .or_else(|(data, err)| { - if err.is_transient() { - flip_partitioned(&partitioned, true); - let counters = data.len(); - let mut reverted = 0; - for (counter, old_value, pending_writes, _) in data { - if cached_counters - .return_pending_writes(&counter, old_value, pending_writes) - .is_err() - { - error!("Couldn't revert writes back to {:?}", &counter); - } else { - reverted += 1; - } + res + }) + .or_else(|(data, err)| { + if err.is_transient() { + if flip_partitioned(&partitioned, true) { + warn!("Error flushing {}", err); + } + let counters = data.len(); + let mut reverted = 0; + for (counter, old_value, pending_writes, _) in data { + if cached_counters + .return_pending_writes(&counter, old_value, pending_writes) + .is_err() + { + error!("Couldn't revert writes back to {:?}", &counter); + } else { + reverted += 1; } - warn!("Reverted {} of {} counter increments", reverted, counters); - Ok(Vec::new()) - } else { - Err(err) } - }) - .expect("Unrecoverable Redis error!"); + warn!("Reverted {} of {} counter increments", reverted, counters); + Ok(Vec::new()) + } else { + Err(err) + } + }) + .expect("Unrecoverable Redis error!"); - for (counter, new_value, remote_deltas, ttl) in updated_counters { - cached_counters.apply_remote_delta(counter, new_value, remote_deltas, ttl); - } + for (counter, new_value, remote_deltas, ttl) in updated_counters { + cached_counters.apply_remote_delta(counter, new_value, remote_deltas, ttl); } } @@ -531,7 +533,6 @@ mod tests { flush_batcher_and_update_counters( mock_client, - true, cached_counters.clone(), partitioned, 100, From fc2e7ef572a48e6736e9e467dbd78100b1ccdc82 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Tue, 21 May 2024 11:51:18 -0400 Subject: [PATCH 2/6] Clean up --- limitador/src/storage/redis/counters_cache.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index ee1c23f5..ef924d92 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -233,10 +233,7 @@ impl Batcher { info!("Priority flush!"); true }, - _ = tokio::time::sleep(self.interval) => { - // info!("Time limit hit!"); - true - }, + _ = tokio::time::sleep(self.interval) => true, } } } From bc2abfb166ccd54c7b670714b3f438f03b8312e9 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Tue, 21 May 2024 11:55:31 -0400 Subject: [PATCH 3/6] Fix tests --- limitador/src/storage/redis/redis_cached.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index d409bcf3..a2bd009e 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -582,14 +582,8 @@ mod tests { assert_eq!(c.hits(&counter), 5); } - flush_batcher_and_update_counters( - mock_client, - true, - cached_counters.clone(), - partitioned, - 100, - ) - .await; + flush_batcher_and_update_counters(mock_client, cached_counters.clone(), partitioned, 100) + .await; let c = cached_counters.get(&counter).unwrap(); assert_eq!(c.hits(&counter), 5); From 45b191efee6ccbe6798782ab2825331db19fd16b Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Tue, 21 May 2024 12:42:13 -0400 Subject: [PATCH 4/6] Moar fixes --- limitador/src/storage/redis/redis_cached.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index a2bd009e..0014ba88 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -531,13 +531,8 @@ mod tests { assert_eq!(c.hits(&counter), 2); } - flush_batcher_and_update_counters( - mock_client, - cached_counters.clone(), - partitioned, - 100, - ) - .await; + flush_batcher_and_update_counters(mock_client, cached_counters.clone(), partitioned, 100) + .await; let c = cached_counters.get(&counter).unwrap(); assert_eq!(c.hits(&counter), 8); From 6f90be6c37a3b62bee4c0fed46bccb098caee8ee Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Tue, 21 May 2024 13:04:14 -0400 Subject: [PATCH 5/6] Avoid polluting the logs and cleanups --- limitador/src/storage/redis/redis_cached.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 0014ba88..736927b8 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -17,7 +17,6 @@ use redis::aio::{ConnectionLike, ConnectionManager}; use redis::{ConnectionInfo, RedisError}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; -use std::sync::atomic::Ordering::Acquire; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -345,20 +344,20 @@ async fn flush_batcher_and_update_counters( let updated_counters = cached_counters .batcher() .consume(batch_size, |counters| { - if !counters.is_empty() && !partitioned.load(Acquire) { + if !counters.is_empty() && !partitioned.load(Ordering::Acquire) { info!("Flushing {} counter updates", counters.len()); } update_counters(&mut redis_conn, counters) }) .await .map(|res| { - // info!("Success {} counters", res.len()); flip_partitioned(&partitioned, false); res }) .or_else(|(data, err)| { if err.is_transient() { - if flip_partitioned(&partitioned, true) { + let new_partition = flip_partitioned(&partitioned, true); + if new_partition { warn!("Error flushing {}", err); } let counters = data.len(); @@ -373,7 +372,9 @@ async fn flush_batcher_and_update_counters( reverted += 1; } } - warn!("Reverted {} of {} counter increments", reverted, counters); + if new_partition { + warn!("Reverted {} of {} counter increments", reverted, counters); + } Ok(Vec::new()) } else { Err(err) From b27770456a374e003aea6ef5ad0178cde0c241ea Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Wed, 22 May 2024 08:19:00 -0400 Subject: [PATCH 6/6] Remove is_alive and liveness_latency histogram --- limitador/src/storage/redis/redis_async.rs | 24 ++-------------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index c8adf009..c7df1d0c 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -9,12 +9,11 @@ use crate::storage::redis::is_limited; use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS}; use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; -use metrics::histogram; use redis::{AsyncCommands, RedisError}; use std::collections::HashSet; use std::str::FromStr; -use std::time::{Duration, Instant}; -use tracing::{debug_span, warn, Instrument}; +use std::time::Duration; +use tracing::{debug_span, Instrument}; // Note: this implementation does not guarantee exact limits. Ensuring that we // never go over the limits would hurt performance. This implementation @@ -209,25 +208,6 @@ impl AsyncRedisStorage { Self { conn_manager } } - pub async fn is_alive(&self) -> bool { - let now = Instant::now(); - self.conn_manager - .clone() - .incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1) - .await - .map_err(|err| { - let err = >::into(err); - if !err.is_transient() { - panic!("Unrecoverable Redis error: {}", err); - } - warn!("Live check failure: {}", err); - err - }) - .is_ok() - .then(|| histogram!("liveness_latency").record(now.elapsed().as_secs_f64())) - .is_some() - } - async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone();