diff --git a/caching/src/local/caching_local.rs b/caching/src/local/caching_local.rs index d2ae1be..507e511 100644 --- a/caching/src/local/caching_local.rs +++ b/caching/src/local/caching_local.rs @@ -7,6 +7,7 @@ use async_trait::async_trait; use serde::de::DeserializeOwned; use serde::Serialize; use tokio::sync::Mutex; +use tokio::task; use novax::caching::{CachingDurationStrategy, CachingStrategy}; use novax::errors::CachingError; @@ -18,7 +19,9 @@ use crate::date::get_current_timestamp::{get_current_timestamp, GetDuration}; pub struct CachingLocal { duration_strategy: CachingDurationStrategy, value_map: Arc>>>, - expiration_timestamp_map: Arc>> + expiration_timestamp_map: Arc>>, + cleanup_interval: Arc>, + is_cleanup_process_started: Arc>, } impl CachingLocal { @@ -26,10 +29,28 @@ impl CachingLocal { CachingLocal { duration_strategy, value_map: Arc::new(Mutex::new(HashMap::new())), - expiration_timestamp_map: Arc::new(Mutex::new(HashMap::new())) + expiration_timestamp_map: Arc::new(Mutex::new(HashMap::new())), + cleanup_interval: Arc::new(Mutex::new(Duration::from_secs(0))), + is_cleanup_process_started: Arc::new(Mutex::new(false)), } } + pub async fn empty_with_auto_cleanup( + duration_strategy: CachingDurationStrategy, + cleanup_interval: Duration + ) -> Result { + let caching = CachingLocal::empty(duration_strategy); + + { + let mut locked = caching.cleanup_interval.lock().await; + *locked = cleanup_interval; + } + + caching.start_cleanup_process_if_needed().await; + + Ok(caching) + } + async fn remove_key(&self, key: u64) { let _ = self.expiration_timestamp_map.lock().await.remove(&key); let _ = self.value_map.lock().await.remove(&key); @@ -49,6 +70,65 @@ impl CachingLocal { Ok(()) } + + /// Set the cleanup duration for self and all the cloned instances. + pub async fn set_cleanup_interval(&mut self, interval: Duration) { + let mut locked = self.cleanup_interval.lock().await; + + *locked = interval; + } + + async fn start_cleanup_process_if_needed(&self) { + { + let mut locked = self.is_cleanup_process_started.lock().await; + + if *locked { + return; + } + + *locked = true; + } + + let self_value = self.clone(); + + task::spawn(async move { + loop { + let duration = { + let locked = self_value.cleanup_interval.lock().await; + + *locked + }; + + let wait_duration = if duration.is_zero() { + Duration::from_secs(10) + } else { + self_value.perform_cleanup().await?; + + duration + }; + + tokio::time::sleep(wait_duration).await + } + + #[allow(unreachable_code)] + Ok::<_, NovaXError>(()) + }); + } + + async fn perform_cleanup(&self) -> Result<(), NovaXError> { + let current_timestamp = get_current_timestamp()?; + let mut value_map_locked = self.value_map.lock().await; + let mut expiration_map_locked = self.expiration_timestamp_map.lock().await; + + for (key, duration) in expiration_map_locked.clone().into_iter() { + if current_timestamp > duration { + value_map_locked.remove(&key); + expiration_map_locked.remove(&key); + } + } + + Ok(()) + } } #[async_trait] @@ -98,7 +178,9 @@ impl CachingStrategy for CachingLocal { CachingLocal { duration_strategy: strategy, value_map: self.value_map.clone(), - expiration_timestamp_map: self.expiration_timestamp_map.clone() + expiration_timestamp_map: self.expiration_timestamp_map.clone(), + cleanup_interval: self.cleanup_interval.clone(), + is_cleanup_process_started: self.is_cleanup_process_started.clone() } } } @@ -310,4 +392,88 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_perform_cleanup_before_expiration() -> Result<(), NovaXError> { + let caching = CachingLocal::empty(CachingDurationStrategy::Duration(Duration::from_secs(10))); + let key = 1; + let value = "test".to_string(); + + caching.set_cache(key, &value).await?; + + set_mock_time(Duration::from_secs(10)); + + caching.perform_cleanup().await?; + + let value_map_locked = caching.value_map.lock().await; + let expiration_timestamp_locked = caching.expiration_timestamp_map.lock().await; + + assert_eq!(value_map_locked.len(), 1); + assert_eq!(expiration_timestamp_locked.len(), 1); + + Ok(()) + } + + #[tokio::test] + async fn test_perform_cleanup_after_expiration() -> Result<(), NovaXError> { + let caching = CachingLocal::empty(CachingDurationStrategy::Duration(Duration::from_secs(10))); + let key = 1; + let value = "test".to_string(); + + caching.set_cache(key, &value).await?; + + set_mock_time(Duration::from_secs(11)); + + caching.perform_cleanup().await?; + + let value_map_locked = caching.value_map.lock().await; + let expiration_timestamp_locked = caching.expiration_timestamp_map.lock().await; + + assert!(value_map_locked.is_empty()); + assert!(expiration_timestamp_locked.is_empty()); + + Ok(()) + } + + #[tokio::test] + async fn test_perform_cleanup_one_before_and_after_expiration() -> Result<(), NovaXError> { + let caching = CachingLocal::empty(CachingDurationStrategy::Duration(Duration::from_secs(10))); + let key_long_duration = 1; + let value_long_duration = "test1".to_string(); + + caching + .with_duration_strategy(CachingDurationStrategy::Duration(Duration::from_secs(100))) + .set_cache(key_long_duration, &value_long_duration) + .await?; + + let key_short_duration = 2; + let value_short_duration = "test2".to_string(); + + caching + .with_duration_strategy(CachingDurationStrategy::Duration(Duration::from_secs(10))) + .set_cache(key_short_duration, &value_short_duration) + .await?; + + set_mock_time(Duration::from_secs(11)); + + { + let value_map_locked = caching.value_map.lock().await; + let expiration_timestamp_locked = caching.expiration_timestamp_map.lock().await; + + assert_eq!(value_map_locked.len(), 2); + assert_eq!(expiration_timestamp_locked.len(), 2); + } + + caching.perform_cleanup().await?; + + { + let value_map_locked = caching.value_map.lock().await; + let expiration_timestamp_locked = caching.expiration_timestamp_map.lock().await; + + assert_eq!(value_map_locked.len(), 1); + assert_eq!(expiration_timestamp_locked.len(), 1); + } + + Ok(()) + } } \ No newline at end of file