Skip to content

Commit

Permalink
All the fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed May 21, 2024
1 parent 5368e72 commit 3426478
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 32 deletions.
22 changes: 16 additions & 6 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::select;
use tokio::sync::{Notify, Semaphore};
use tracing::info;

#[derive(Debug)]
pub struct CachedCounterValue {
Expand Down Expand Up @@ -203,17 +204,26 @@ impl Batcher {
return result;
} else {
ready = select! {
_ = self.notifier.notified() => self.batch_ready(max),
_ = tokio::time::sleep(self.interval) => true,
_ = async {
loop {
self.notifier.notified().await;
if self.batch_ready(max) {
break;
}
}
} => {
info!("Priority flush!");
true
},
_ = tokio::time::sleep(self.interval) => {
// info!("Time limit hit!");
true
},
}
}
}
}

pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}

fn batch_ready(&self, size: usize) -> bool {
self.updates.len() >= size
|| self
Expand Down
10 changes: 9 additions & 1 deletion limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use redis::{AsyncCommands, RedisError};
use std::collections::HashSet;
use std::str::FromStr;
use std::time::{Duration, Instant};
use tracing::{debug_span, Instrument};
use tracing::{debug_span, warn, Instrument};

// Note: this implementation does not guarantee exact limits. Ensuring that we
// never go over the limits would hurt performance. This implementation
Expand Down Expand Up @@ -215,6 +215,14 @@ impl AsyncRedisStorage {
.clone()
.incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1)
.await
.map_err(|err| {
let err = <RedisError as Into<StorageErr>>::into(err);
if !err.is_transient() {
panic!("Unrecoverable Redis error: {}", err);
}
warn!("Live check failure: {}", err);
err
})
.is_ok()
.then(|| histogram!("liveness_latency").record(now.elapsed().as_secs_f64()))
.is_some()
Expand Down
52 changes: 27 additions & 25 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ use redis::aio::{ConnectionLike, ConnectionManager};
use redis::{ConnectionInfo, RedisError};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::atomic::Ordering::Acquire;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug_span, error, warn, Instrument};
use tracing::{debug_span, error, info, warn, Instrument};

// This is just a first version.
//
Expand Down Expand Up @@ -189,15 +190,13 @@ impl CachedRedisStorage {
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone());

{
let storage = async_redis_storage.clone();
let counters_cache_clone = counters_cache.clone();
let conn = redis_conn_manager.clone();
let p = Arc::clone(&partitioned);
tokio::spawn(async move {
loop {
flush_batcher_and_update_counters(
conn.clone(),
storage.is_alive().await,
counters_cache_clone.clone(),
p.clone(),
batch_size,
Expand Down Expand Up @@ -333,35 +332,38 @@ async fn update_counters<C: ConnectionLike>(
#[tracing::instrument(skip_all)]
async fn flush_batcher_and_update_counters<C: ConnectionLike>(
mut redis_conn: C,
storage_is_alive: bool,
cached_counters: Arc<CountersCache>,
partitioned: Arc<AtomicBool>,
batch_size: usize,
) {
if partitioned.load(Ordering::Acquire) || !storage_is_alive {
if !cached_counters.batcher().is_empty() {
let updated_counters = cached_counters
.batcher()
.consume(batch_size, |counters| {
if !counters.is_empty() && !partitioned.load(Acquire) {
info!("Flushing {} counter updates", counters.len());
}
update_counters(&mut redis_conn, counters)
})
.await
.map(|res| {
// info!("Success {} counters", res.len());
flip_partitioned(&partitioned, false);
}
} else {
let updated_counters = cached_counters
.batcher()
.consume(batch_size, |counters| {
update_counters(&mut redis_conn, counters)
})
.await
.or_else(|err| {
if err.is_transient() {
flip_partitioned(&partitioned, true);
Ok(Vec::new())
} else {
Err(err)
res
})
.or_else(|err| {
if err.is_transient() {
if flip_partitioned(&partitioned, true) {
warn!("Error flushing {}", err);
}
})
.expect("Unrecoverable Redis error!");
Ok(Vec::new())
} else {
Err(err)
}
})
.expect("Unrecoverable Redis error!");

for (counter, new_value, remote_deltas, ttl) in updated_counters {
cached_counters.apply_remote_delta(counter, new_value, remote_deltas, ttl);
}
for (counter, new_value, remote_deltas, ttl) in updated_counters {
cached_counters.apply_remote_delta(counter, new_value, remote_deltas, ttl);
}
}

Expand Down

0 comments on commit 3426478

Please sign in to comment.