Skip to content

Commit

Permalink
Merge pull request #276 from Kuadrant/write-behind-lock
Browse files Browse the repository at this point in the history
Write behind lock
  • Loading branch information
alexsnaps authored Mar 26, 2024
2 parents ada0199 + eab1166 commit b86149b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 62 deletions.
44 changes: 0 additions & 44 deletions limitador/src/storage/redis/batcher.rs

This file was deleted.

1 change: 0 additions & 1 deletion limitador/src/storage/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use ::redis::RedisError;
use std::time::Duration;

mod batcher;
mod counters_cache;
mod redis_async;
mod redis_cached;
Expand Down
43 changes: 26 additions & 17 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::counter::Counter;
use crate::limit::Limit;
use crate::prometheus_metrics::CounterAccess;
use crate::storage::keys::*;
use crate::storage::redis::batcher::Batcher;
use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder};
use crate::storage::redis::redis_async::AsyncRedisStorage;
use crate::storage::redis::scripts::VALUES_AND_TTLS;
Expand All @@ -14,11 +13,10 @@ use crate::storage::{AsyncCounterStorage, Authorization, StorageErr};
use async_trait::async_trait;
use redis::aio::ConnectionManager;
use redis::{ConnectionInfo, RedisError};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::sync::Mutex;

// This is just a first version.
//
Expand All @@ -40,7 +38,7 @@ use tokio::sync::Mutex;

pub struct CachedRedisStorage {
cached_counters: Mutex<CountersCache>,
batcher_counter_updates: Arc<Mutex<Batcher>>,
batcher_counter_updates: Arc<Mutex<HashMap<Counter, i64>>>,
async_redis_storage: AsyncRedisStorage,
redis_conn_manager: ConnectionManager,
batching_is_enabled: bool,
Expand Down Expand Up @@ -81,7 +79,7 @@ impl AsyncCounterStorage for CachedRedisStorage {

// Check cached counters
{
let cached_counters = self.cached_counters.lock().await;
let cached_counters = self.cached_counters.lock().unwrap();
for counter in counters.iter_mut() {
match cached_counters.get(counter) {
Some(val) => {
Expand Down Expand Up @@ -122,7 +120,7 @@ impl AsyncCounterStorage for CachedRedisStorage {
Duration::from_millis((Instant::now() - time_start_get_ttl).as_millis() as u64);

{
let mut cached_counters = self.cached_counters.lock().await;
let mut cached_counters = self.cached_counters.lock().unwrap();
for (i, counter) in not_cached.iter_mut().enumerate() {
cached_counters.insert(
counter.clone(),
Expand Down Expand Up @@ -150,17 +148,24 @@ impl AsyncCounterStorage for CachedRedisStorage {

// Update cached values
{
let mut cached_counters = self.cached_counters.lock().await;
let mut cached_counters = self.cached_counters.lock().unwrap();
for counter in counters.iter() {
cached_counters.decrease_by(counter, delta);
}
}

// Batch or update depending on configuration
if self.batching_is_enabled {
let batcher = self.batcher_counter_updates.lock().await;
let mut batcher = self.batcher_counter_updates.lock().unwrap();
for counter in counters.iter() {
batcher.add_counter(counter, delta).await
match batcher.get_mut(counter) {
Some(val) => {
*val += delta;
}
None => {
batcher.insert(counter.clone(), delta);
}
}
}
} else {
for counter in counters.iter() {
Expand Down Expand Up @@ -216,17 +221,21 @@ impl CachedRedisStorage {
let async_redis_storage =
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone());

let batcher = Arc::new(Mutex::new(Batcher::new(async_redis_storage.clone())));
let storage = async_redis_storage.clone();
let batcher = Arc::new(Mutex::new(Default::default()));
if let Some(flushing_period) = flushing_period {
let batcher_flusher = batcher.clone();
let mut interval = tokio::time::interval(flushing_period);
tokio::spawn(async move {
loop {
let time_start = Instant::now();
batcher_flusher.lock().await.flush().await;
let sleep_time = flushing_period
.checked_sub(time_start.elapsed())
.unwrap_or_else(|| Duration::from_secs(0));
tokio::time::sleep(sleep_time).await;
let counters = {
let mut batch = batcher_flusher.lock().unwrap();
std::mem::take(&mut *batch)
};
for (counter, delta) in counters {
storage.update_counter(&counter, delta).await.unwrap();
}
interval.tick().await;
}
});
}
Expand Down

0 comments on commit b86149b

Please sign in to comment.