Skip to content

Commit

Permalink
Merge pull request #339 from Kuadrant/fix_partion_flush
Browse files Browse the repository at this point in the history
All the fixes
  • Loading branch information
alexsnaps authored May 22, 2024
2 parents b89185d + b277704 commit dfc5093
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 70 deletions.
17 changes: 12 additions & 5 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::select;
use tokio::sync::{Notify, Semaphore};
use tracing::info;

#[derive(Debug)]
pub struct CachedCounterValue {
Expand Down Expand Up @@ -221,17 +222,23 @@ impl Batcher {
return result;
} else {
ready = select! {
_ = self.notifier.notified() => self.batch_ready(max),
_ = async {
loop {
self.notifier.notified().await;
if self.batch_ready(max) {
break;
}
}
} => {
info!("Priority flush!");
true
},
_ = tokio::time::sleep(self.interval) => true,
}
}
}
}

pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}

fn batch_ready(&self, size: usize) -> bool {
self.updates.len() >= size
|| self
Expand Down
14 changes: 1 addition & 13 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ use crate::storage::redis::is_limited;
use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS};
use crate::storage::{AsyncCounterStorage, Authorization, StorageErr};
use async_trait::async_trait;
use metrics::histogram;
use redis::{AsyncCommands, RedisError};
use std::collections::HashSet;
use std::str::FromStr;
use std::time::{Duration, Instant};
use std::time::Duration;
use tracing::{debug_span, Instrument};

// Note: this implementation does not guarantee exact limits. Ensuring that we
Expand Down Expand Up @@ -209,17 +208,6 @@ impl AsyncRedisStorage {
Self { conn_manager }
}

pub async fn is_alive(&self) -> bool {
let now = Instant::now();
self.conn_manager
.clone()
.incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1)
.await
.is_ok()
.then(|| histogram!("liveness_latency").record(now.elapsed().as_secs_f64()))
.is_some()
}

async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> {
let mut con = self.conn_manager.clone();

Expand Down
95 changes: 43 additions & 52 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug_span, error, warn, Instrument};
use tracing::{debug_span, error, info, warn, Instrument};

// This is just a first version.
//
Expand Down Expand Up @@ -189,15 +189,13 @@ impl CachedRedisStorage {
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone());

{
let storage = async_redis_storage.clone();
let counters_cache_clone = counters_cache.clone();
let conn = redis_conn_manager.clone();
let p = Arc::clone(&partitioned);
tokio::spawn(async move {
loop {
flush_batcher_and_update_counters(
conn.clone(),
storage.is_alive().await,
counters_cache_clone.clone(),
p.clone(),
batch_size,
Expand Down Expand Up @@ -339,48 +337,53 @@ async fn update_counters<C: ConnectionLike>(
#[tracing::instrument(skip_all)]
async fn flush_batcher_and_update_counters<C: ConnectionLike>(
mut redis_conn: C,
storage_is_alive: bool,
cached_counters: Arc<CountersCache>,
partitioned: Arc<AtomicBool>,
batch_size: usize,
) {
if partitioned.load(Ordering::Acquire) || !storage_is_alive {
if !cached_counters.batcher().is_empty() {
let updated_counters = cached_counters
.batcher()
.consume(batch_size, |counters| {
if !counters.is_empty() && !partitioned.load(Ordering::Acquire) {
info!("Flushing {} counter updates", counters.len());
}
update_counters(&mut redis_conn, counters)
})
.await
.map(|res| {
flip_partitioned(&partitioned, false);
}
} else {
let updated_counters = cached_counters
.batcher()
.consume(batch_size, |counters| {
update_counters(&mut redis_conn, counters)
})
.await
.or_else(|(data, err)| {
if err.is_transient() {
flip_partitioned(&partitioned, true);
let counters = data.len();
let mut reverted = 0;
for (counter, old_value, pending_writes, _) in data {
if cached_counters
.return_pending_writes(&counter, old_value, pending_writes)
.is_err()
{
error!("Couldn't revert writes back to {:?}", &counter);
} else {
reverted += 1;
}
res
})
.or_else(|(data, err)| {
if err.is_transient() {
let new_partition = flip_partitioned(&partitioned, true);
if new_partition {
warn!("Error flushing {}", err);
}
let counters = data.len();
let mut reverted = 0;
for (counter, old_value, pending_writes, _) in data {
if cached_counters
.return_pending_writes(&counter, old_value, pending_writes)
.is_err()
{
error!("Couldn't revert writes back to {:?}", &counter);
} else {
reverted += 1;
}
}
if new_partition {
warn!("Reverted {} of {} counter increments", reverted, counters);
Ok(Vec::new())
} else {
Err(err)
}
})
.expect("Unrecoverable Redis error!");
Ok(Vec::new())
} else {
Err(err)
}
})
.expect("Unrecoverable Redis error!");

for (counter, new_value, remote_deltas, ttl) in updated_counters {
cached_counters.apply_remote_delta(counter, new_value, remote_deltas, ttl);
}
for (counter, new_value, remote_deltas, ttl) in updated_counters {
cached_counters.apply_remote_delta(counter, new_value, remote_deltas, ttl);
}
}

Expand Down Expand Up @@ -529,14 +532,8 @@ mod tests {
assert_eq!(c.hits(&counter), 2);
}

flush_batcher_and_update_counters(
mock_client,
true,
cached_counters.clone(),
partitioned,
100,
)
.await;
flush_batcher_and_update_counters(mock_client, cached_counters.clone(), partitioned, 100)
.await;

let c = cached_counters.get(&counter).unwrap();
assert_eq!(c.hits(&counter), 8);
Expand Down Expand Up @@ -581,14 +578,8 @@ mod tests {
assert_eq!(c.hits(&counter), 5);
}

flush_batcher_and_update_counters(
mock_client,
true,
cached_counters.clone(),
partitioned,
100,
)
.await;
flush_batcher_and_update_counters(mock_client, cached_counters.clone(), partitioned, 100)
.await;

let c = cached_counters.get(&counter).unwrap();
assert_eq!(c.hits(&counter), 5);
Expand Down

0 comments on commit dfc5093

Please sign in to comment.