Skip to content

Commit

Permalink
Merge pull request #103
Browse files Browse the repository at this point in the history
Turned Mutex as generics in CachingLocal
  • Loading branch information
gfusee authored Oct 24, 2024
2 parents b05b4cc + 6becc5c commit b8cb159
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 64 deletions.
1 change: 1 addition & 0 deletions caching/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub(crate) mod date;
pub mod multi;
pub mod locked;
pub mod redis;
pub mod utils;
120 changes: 88 additions & 32 deletions caching/src/local/caching_local.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -14,17 +15,67 @@ use novax::errors::CachingError;
use novax::errors::NovaXError;

use crate::date::get_current_timestamp::{get_current_timestamp, GetDuration};
use crate::utils::lock::MutexLike;

#[derive(Clone, Debug)]
pub struct CachingLocal {
pub type CachingLocal = BaseCachingLocal<Mutex<HashMap<u64, Vec<u8>>>, Mutex<HashMap<u64, Duration>>, Mutex<Duration>, Mutex<bool>>;

pub struct BaseCachingLocal<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted>
where
MutexValue: MutexLike<T = HashMap<u64, Vec<u8>>>,
MutexExpiration: MutexLike<T = HashMap<u64, Duration>>,
MutexCleanupInterval: MutexLike<T = Duration>,
MutexIsCleanupProcessStarted: MutexLike<T = bool>
{
duration_strategy: CachingDurationStrategy,
value_map: Arc<Mutex<HashMap<u64, Vec<u8>>>>,
expiration_timestamp_map: Arc<Mutex<HashMap<u64, Duration>>>,
cleanup_interval: Arc<Mutex<Duration>>,
is_cleanup_process_started: Arc<Mutex<bool>>,
value_map: Arc<MutexValue>,
expiration_timestamp_map: Arc<MutexExpiration>,
cleanup_interval: Arc<MutexCleanupInterval>,
is_cleanup_process_started: Arc<MutexIsCleanupProcessStarted>,
}

impl CachingLocal {
impl<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted> Clone for BaseCachingLocal<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted>
where
MutexValue: MutexLike<T = HashMap<u64, Vec<u8>>>,
MutexExpiration: MutexLike<T = HashMap<u64, Duration>>,
MutexCleanupInterval: MutexLike<T = Duration>,
MutexIsCleanupProcessStarted: MutexLike<T = bool>
{
fn clone(&self) -> Self {
Self {
duration_strategy: self.duration_strategy.clone(),
value_map: self.value_map.clone(),
expiration_timestamp_map: self.expiration_timestamp_map.clone(),
cleanup_interval: self.cleanup_interval.clone(),
is_cleanup_process_started: self.is_cleanup_process_started.clone(),
}
}
}

impl<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted> Debug for BaseCachingLocal<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted>
where
MutexValue: MutexLike<T = HashMap<u64, Vec<u8>>>,
MutexExpiration: MutexLike<T = HashMap<u64, Duration>>,
MutexCleanupInterval: MutexLike<T = Duration>,
MutexIsCleanupProcessStarted: MutexLike<T = bool>
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BaseCachingLocal")
.field("duration_strategy", &self.duration_strategy)
.field("value_map", &self.value_map)
.field("expiration_timestamp_map", &self.expiration_timestamp_map)
.field("cleanup_interval", &self.cleanup_interval)
.field("is_cleanup_process_started", &self.is_cleanup_process_started)
.finish()
}
}

impl<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted> BaseCachingLocal<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted>
where
MutexValue: MutexLike<T = HashMap<u64, Vec<u8>>>,
MutexExpiration: MutexLike<T = HashMap<u64, Duration>>,
MutexCleanupInterval: MutexLike<T = Duration>,
MutexIsCleanupProcessStarted: MutexLike<T = bool>
{
pub fn empty(duration_strategy: CachingDurationStrategy) -> CachingLocal {
CachingLocal {
duration_strategy,
Expand All @@ -35,32 +86,11 @@ impl CachingLocal {
}
}

pub async fn empty_with_auto_cleanup(
duration_strategy: CachingDurationStrategy,
cleanup_interval: Duration
) -> Result<CachingLocal, NovaXError> {
let caching = CachingLocal::empty(duration_strategy);

{
let mut locked = caching.cleanup_interval.lock().await;
*locked = cleanup_interval;
}

caching.start_cleanup_process_if_needed().await;

Ok(caching)
}

async fn remove_key(&self, key: u64) {
let _ = self.expiration_timestamp_map.lock().await.remove(&key);
let _ = self.value_map.lock().await.remove(&key);
}

async fn clear(&self) {
self.expiration_timestamp_map.lock().await.clear();
self.value_map.lock().await.clear();
}

async fn set_value<T: Serialize + DeserializeOwned>(&self, key: u64, value: &T) -> Result<(), NovaXError> {
let expiration_timestamp = self.duration_strategy.get_duration_timestamp(&get_current_timestamp()?)?;
self.expiration_timestamp_map.lock().await.insert(key, expiration_timestamp);
Expand All @@ -77,6 +107,25 @@ impl CachingLocal {

*locked = interval;
}
}

impl CachingLocal
{
pub async fn empty_with_auto_cleanup(
duration_strategy: CachingDurationStrategy,
cleanup_interval: Duration
) -> Result<CachingLocal, NovaXError> {
let caching = CachingLocal::empty(duration_strategy);

{
let mut locked = caching.cleanup_interval.lock().await;
*locked = cleanup_interval;
}

caching.start_cleanup_process_if_needed().await;

Ok(caching)
}

async fn start_cleanup_process_if_needed(&self) {
{
Expand Down Expand Up @@ -132,7 +181,13 @@ impl CachingLocal {
}

#[async_trait]
impl CachingStrategy for CachingLocal {
impl<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted> CachingStrategy for BaseCachingLocal<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted>
where
MutexValue: MutexLike<T = HashMap<u64, Vec<u8>>>,
MutexExpiration: MutexLike<T = HashMap<u64, Duration>>,
MutexCleanupInterval: MutexLike<T = Duration>,
MutexIsCleanupProcessStarted: MutexLike<T = bool>
{
async fn get_cache<T: Serialize + DeserializeOwned + Send>(&self, key: u64) -> Result<Option<T>, NovaXError> {
let Some(expiration_timestamp) = self.expiration_timestamp_map.lock().await.get(&key).cloned() else { return Ok(None) };

Expand Down Expand Up @@ -169,13 +224,14 @@ impl CachingStrategy for CachingLocal {
}

async fn clear(&self) -> Result<(), NovaXError> {
self.clear().await;
self.expiration_timestamp_map.lock().await.clear();
self.value_map.lock().await.clear();

Ok(())
}

fn with_duration_strategy(&self, strategy: CachingDurationStrategy) -> Self {
CachingLocal {
Self {
duration_strategy: strategy,
value_map: self.value_map.clone(),
expiration_timestamp_map: self.expiration_timestamp_map.clone(),
Expand Down Expand Up @@ -385,7 +441,7 @@ mod test {

caching.set_cache(1, &"test".to_string()).await?;
caching.set_cache(2, &"test2".to_string()).await?;
caching.clear().await;
caching.clear().await?;

assert!(caching.value_map.lock().await.is_empty());
assert!(caching.expiration_timestamp_map.lock().await.is_empty());
Expand Down
90 changes: 58 additions & 32 deletions caching/src/locked/caching.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,84 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;

use async_trait::async_trait;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::sync::{Mutex, RwLock};

use crate::utils::lock::{Locker, MutexLike};
use novax::caching::{CachingDurationStrategy, CachingStrategy};
use novax::errors::NovaXError;

#[allow(type_alias_bounds)]
pub type CachingLocked<C: CachingStrategy> = BaseCachingLocked<C, Arc<RwLock<()>>>;

#[async_trait]
pub trait Locker: Send + Sync + Clone + Debug {
fn new() -> Self;
async fn read(&self) -> RwLockReadGuard<'_, ()>;
async fn write(&self) -> RwLockWriteGuard<'_, ()>;
pub type CachingLocked<C: CachingStrategy> = BaseCachingLocked<C, RwLock<()>, Mutex<HashMap<u64, Arc<RwLock<()>>>>>;

pub struct BaseCachingLocked<C, L, M>
where
C: CachingStrategy,
L: Locker,
M: MutexLike<T = HashMap<u64, Arc<L>>>,
{
pub caching: C,
_lockers_map: Arc<M>
}

#[async_trait]
impl Locker for Arc<RwLock<()>> {
fn new() -> Self {
Self::new(RwLock::new(()))
}

async fn read(&self) -> RwLockReadGuard<'_, ()> {
self.as_ref().read().await
}

async fn write(&self) -> RwLockWriteGuard<'_, ()> {
self.as_ref().write().await
impl<C, L, M> Clone for BaseCachingLocked<C, L, M>
where
C: CachingStrategy,
L: Locker,
M: MutexLike<T = HashMap<u64, Arc<L>>>,
{
fn clone(&self) -> Self {
Self {
caching: self.caching.clone(),
_lockers_map: self._lockers_map.clone(),
}
}
}

#[derive(Clone, Debug)]
pub struct BaseCachingLocked<C: CachingStrategy, L: Locker> {
pub caching: C,
_lockers_map: Arc<Mutex<HashMap<u64, L>>>
impl<C, L, M> Debug for BaseCachingLocked<C, L, M>
where
C: CachingStrategy,
L: Locker,
M: MutexLike<T = HashMap<u64, Arc<L>>>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BaseCachingLocked")
.field("caching", &self.caching)
.field("_lockers_map", &self._lockers_map)
.finish()
}
}

impl<C: CachingStrategy, L: Locker> BaseCachingLocked<C, L> {
pub fn new(caching: C) -> BaseCachingLocked<C, L> {
impl<C, L, M> BaseCachingLocked<C, L, M>
where
C: CachingStrategy,
L: Locker,
M: MutexLike<T = HashMap<u64, Arc<L>>>,
{
pub fn new(caching: C) -> BaseCachingLocked<C, L, M> {
BaseCachingLocked {
caching,
_lockers_map: Arc::new(Mutex::new(HashMap::new()))
_lockers_map: Arc::new(M::new(HashMap::new()))
}
}
}

impl<C: CachingStrategy, L: Locker> BaseCachingLocked<C, L> {
async fn get_locker(&self, key: u64) -> Result<L, NovaXError> {
impl<C, L, M> BaseCachingLocked<C, L, M>
where
C: CachingStrategy,
L: Locker,
M: MutexLike<T = HashMap<u64, Arc<L>>>,
{
async fn get_locker(&self, key: u64) -> Result<Arc<L>, NovaXError> {
let mut lockers_map = self._lockers_map.lock().await;
let locker = if let Some(locker) = lockers_map.get(&key) {
locker.clone()
} else {
let locker = L::new();
let locker = Arc::new(L::new());
lockers_map.insert(key, locker.clone());
locker
};
Expand All @@ -67,7 +88,12 @@ impl<C: CachingStrategy, L: Locker> BaseCachingLocked<C, L> {
}

#[async_trait]
impl<C: CachingStrategy, L: Locker> CachingStrategy for BaseCachingLocked<C, L> {
impl<C, L, M> CachingStrategy for BaseCachingLocked<C, L, M>
where
C: CachingStrategy,
L: Locker,
M: MutexLike<T = HashMap<u64, Arc<L>>>,
{
async fn get_cache<T: Serialize + DeserializeOwned + Send + Sync>(&self, key: u64) -> Result<Option<T>, NovaXError> {
let locker = self.get_locker(key).await?;
let lock_value = locker.read().await;
Expand Down
46 changes: 46 additions & 0 deletions caching/src/utils/lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use async_trait::async_trait;
use std::fmt::Debug;
use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};

#[async_trait]
pub trait MutexLike: Send + Sync + Debug {
type T: Send;
fn new(value: Self::T) -> Self;

async fn lock(&self) -> MutexGuard<'_, Self::T>;
}

#[async_trait]
impl<T: Send + Debug> MutexLike for Mutex<T> {
type T = T;

fn new(value: Self::T) -> Self {
Self::new(value)
}

async fn lock(&self) -> MutexGuard<'_, Self::T> {
Mutex::lock(self).await
}
}

#[async_trait]
pub trait Locker: Send + Sync {
fn new() -> Self;
async fn read(&self) -> RwLockReadGuard<'_, ()>;
async fn write(&self) -> RwLockWriteGuard<'_, ()>;
}

#[async_trait]
impl Locker for RwLock<()> {
fn new() -> Self {
Self::new(())
}

async fn read(&self) -> RwLockReadGuard<'_, ()> {
RwLock::read(self).await
}

async fn write(&self) -> RwLockWriteGuard<'_, ()> {
RwLock::write(self).await
}
}
1 change: 1 addition & 0 deletions caching/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod lock;

0 comments on commit b8cb159

Please sign in to comment.