Skip to content

Commit

Permalink
Merge pull request #99
Browse files Browse the repository at this point in the history
Added auto cleanup on CachingLocal
  • Loading branch information
gfusee authored Oct 22, 2024
2 parents ed3f74c + 63b77d2 commit 219bd00
Showing 1 changed file with 169 additions and 3 deletions.
172 changes: 169 additions & 3 deletions caching/src/local/caching_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use async_trait::async_trait;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::Mutex;
use tokio::task;

use novax::caching::{CachingDurationStrategy, CachingStrategy};
use novax::errors::CachingError;
Expand All @@ -18,18 +19,38 @@ use crate::date::get_current_timestamp::{get_current_timestamp, GetDuration};
pub struct CachingLocal {
duration_strategy: CachingDurationStrategy,
value_map: Arc<Mutex<HashMap<u64, Vec<u8>>>>,
expiration_timestamp_map: Arc<Mutex<HashMap<u64, Duration>>>
expiration_timestamp_map: Arc<Mutex<HashMap<u64, Duration>>>,
cleanup_interval: Arc<Mutex<Duration>>,
is_cleanup_process_started: Arc<Mutex<bool>>,
}

impl CachingLocal {
pub fn empty(duration_strategy: CachingDurationStrategy) -> CachingLocal {
CachingLocal {
duration_strategy,
value_map: Arc::new(Mutex::new(HashMap::new())),
expiration_timestamp_map: Arc::new(Mutex::new(HashMap::new()))
expiration_timestamp_map: Arc::new(Mutex::new(HashMap::new())),
cleanup_interval: Arc::new(Mutex::new(Duration::from_secs(0))),
is_cleanup_process_started: Arc::new(Mutex::new(false)),
}
}

pub async fn empty_with_auto_cleanup(
duration_strategy: CachingDurationStrategy,
cleanup_interval: Duration
) -> Result<CachingLocal, NovaXError> {
let caching = CachingLocal::empty(duration_strategy);

{
let mut locked = caching.cleanup_interval.lock().await;
*locked = cleanup_interval;
}

caching.start_cleanup_process_if_needed().await;

Ok(caching)
}

async fn remove_key(&self, key: u64) {
let _ = self.expiration_timestamp_map.lock().await.remove(&key);
let _ = self.value_map.lock().await.remove(&key);
Expand All @@ -49,6 +70,65 @@ impl CachingLocal {

Ok(())
}

/// Set the cleanup duration for self and all the cloned instances.
pub async fn set_cleanup_interval(&mut self, interval: Duration) {
let mut locked = self.cleanup_interval.lock().await;

*locked = interval;
}

async fn start_cleanup_process_if_needed(&self) {
{
let mut locked = self.is_cleanup_process_started.lock().await;

if *locked {
return;
}

*locked = true;
}

let self_value = self.clone();

task::spawn(async move {
loop {
let duration = {
let locked = self_value.cleanup_interval.lock().await;

*locked
};

let wait_duration = if duration.is_zero() {
Duration::from_secs(10)
} else {
self_value.perform_cleanup().await?;

duration
};

tokio::time::sleep(wait_duration).await
}

#[allow(unreachable_code)]
Ok::<_, NovaXError>(())
});
}

async fn perform_cleanup(&self) -> Result<(), NovaXError> {
let current_timestamp = get_current_timestamp()?;
let mut value_map_locked = self.value_map.lock().await;
let mut expiration_map_locked = self.expiration_timestamp_map.lock().await;

for (key, duration) in expiration_map_locked.clone().into_iter() {
if current_timestamp > duration {
value_map_locked.remove(&key);
expiration_map_locked.remove(&key);
}
}

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -98,7 +178,9 @@ impl CachingStrategy for CachingLocal {
CachingLocal {
duration_strategy: strategy,
value_map: self.value_map.clone(),
expiration_timestamp_map: self.expiration_timestamp_map.clone()
expiration_timestamp_map: self.expiration_timestamp_map.clone(),
cleanup_interval: self.cleanup_interval.clone(),
is_cleanup_process_started: self.is_cleanup_process_started.clone()
}
}
}
Expand Down Expand Up @@ -310,4 +392,88 @@ mod test {

Ok(())
}

#[tokio::test]
async fn test_perform_cleanup_before_expiration() -> Result<(), NovaXError> {
let caching = CachingLocal::empty(CachingDurationStrategy::Duration(Duration::from_secs(10)));
let key = 1;
let value = "test".to_string();

caching.set_cache(key, &value).await?;

set_mock_time(Duration::from_secs(10));

caching.perform_cleanup().await?;

let value_map_locked = caching.value_map.lock().await;
let expiration_timestamp_locked = caching.expiration_timestamp_map.lock().await;

assert_eq!(value_map_locked.len(), 1);
assert_eq!(expiration_timestamp_locked.len(), 1);

Ok(())
}

#[tokio::test]
async fn test_perform_cleanup_after_expiration() -> Result<(), NovaXError> {
let caching = CachingLocal::empty(CachingDurationStrategy::Duration(Duration::from_secs(10)));
let key = 1;
let value = "test".to_string();

caching.set_cache(key, &value).await?;

set_mock_time(Duration::from_secs(11));

caching.perform_cleanup().await?;

let value_map_locked = caching.value_map.lock().await;
let expiration_timestamp_locked = caching.expiration_timestamp_map.lock().await;

assert!(value_map_locked.is_empty());
assert!(expiration_timestamp_locked.is_empty());

Ok(())
}

#[tokio::test]
async fn test_perform_cleanup_one_before_and_after_expiration() -> Result<(), NovaXError> {
let caching = CachingLocal::empty(CachingDurationStrategy::Duration(Duration::from_secs(10)));
let key_long_duration = 1;
let value_long_duration = "test1".to_string();

caching
.with_duration_strategy(CachingDurationStrategy::Duration(Duration::from_secs(100)))
.set_cache(key_long_duration, &value_long_duration)
.await?;

let key_short_duration = 2;
let value_short_duration = "test2".to_string();

caching
.with_duration_strategy(CachingDurationStrategy::Duration(Duration::from_secs(10)))
.set_cache(key_short_duration, &value_short_duration)
.await?;

set_mock_time(Duration::from_secs(11));

{
let value_map_locked = caching.value_map.lock().await;
let expiration_timestamp_locked = caching.expiration_timestamp_map.lock().await;

assert_eq!(value_map_locked.len(), 2);
assert_eq!(expiration_timestamp_locked.len(), 2);
}

caching.perform_cleanup().await?;

{
let value_map_locked = caching.value_map.lock().await;
let expiration_timestamp_locked = caching.expiration_timestamp_map.lock().await;

assert_eq!(value_map_locked.len(), 1);
assert_eq!(expiration_timestamp_locked.len(), 1);
}

Ok(())
}
}

0 comments on commit 219bd00

Please sign in to comment.