diff --git a/caching/Cargo.toml b/caching/Cargo.toml index fe3b449..137d3cb 100644 --- a/caching/Cargo.toml +++ b/caching/Cargo.toml @@ -10,10 +10,11 @@ repository = "https://github.com/gfusee/novax" [dependencies] serde = "1.0.180" -rmp-serde = "1.1.2" tokio = "1.29.1" async-trait = "0.1.72" +redis = { version = "0.27.4", features = ["aio", "tokio-comp"] } novax = { path = "../core", version = "0.1.13" } +rmp-serde = "=1.1.2" [dev-dependencies] thread_local = "1.1.7" diff --git a/caching/src/date/get_current_timestamp.rs b/caching/src/date/get_current_timestamp.rs index 250188f..76208a3 100644 --- a/caching/src/date/get_current_timestamp.rs +++ b/caching/src/date/get_current_timestamp.rs @@ -1,4 +1,4 @@ -pub(crate) fn get_timestamp_of_next_block(current_timestamp: Duration) -> Result { +pub(crate) fn get_timestamp_of_next_block(current_timestamp: &Duration) -> Result { let mut timestamp = current_timestamp.as_secs() + 1; while timestamp % 6 != 0 { timestamp += 1 @@ -7,6 +7,26 @@ pub(crate) fn get_timestamp_of_next_block(current_timestamp: Duration) -> Result Ok(Duration::from_secs(timestamp)) } +pub(crate) trait GetDuration { + fn get_duration_timestamp(&self, current_timestamp: &Duration) -> Result; + fn get_duration_from_now(&self, current_timestamp: &Duration) -> Result { + Ok(self.get_duration_timestamp(current_timestamp)? - *current_timestamp) + } +} + +impl GetDuration for CachingDurationStrategy { + fn get_duration_timestamp(&self, current_timestamp: &Duration) -> Result { + match self { + CachingDurationStrategy::EachBlock => { + get_timestamp_of_next_block(current_timestamp) + } + CachingDurationStrategy::Duration(duration) => { + Ok(*current_timestamp + *duration) + } + } + } +} + #[cfg(not(test))] mod implementation { use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -26,6 +46,7 @@ pub(crate) use implementation::get_current_timestamp; #[cfg(test)] pub(crate) use implementation::set_mock_time; +use novax::caching::CachingDurationStrategy; use novax::errors::NovaXError; #[cfg(test)] @@ -62,7 +83,7 @@ mod tests { #[test] fn test_get_timestamp_of_next_block_start_of_block() { - let result = get_timestamp_of_next_block(Duration::from_secs(6)).unwrap(); + let result = get_timestamp_of_next_block(&Duration::from_secs(6)).unwrap(); let expected = Duration::from_secs(12); assert_eq!(result, expected); @@ -70,7 +91,7 @@ mod tests { #[test] fn test_get_timestamp_of_next_block_mid_of_block() { - let result = get_timestamp_of_next_block(Duration::from_secs(8)).unwrap(); + let result = get_timestamp_of_next_block(&Duration::from_secs(8)).unwrap(); let expected = Duration::from_secs(12); assert_eq!(result, expected); @@ -78,7 +99,7 @@ mod tests { #[test] fn test_get_timestamp_of_next_block_end_of_block() { - let result = get_timestamp_of_next_block(Duration::from_secs(11)).unwrap(); + let result = get_timestamp_of_next_block(&Duration::from_secs(11)).unwrap(); let expected = Duration::from_secs(12); assert_eq!(result, expected); diff --git a/caching/src/lib.rs b/caching/src/lib.rs index feeb44f..73f5842 100644 --- a/caching/src/lib.rs +++ b/caching/src/lib.rs @@ -2,3 +2,4 @@ pub mod local; pub(crate) mod date; pub mod multi; pub mod locked; +pub mod redis; diff --git a/caching/src/local/caching_local.rs b/caching/src/local/caching_local.rs index 17f82a1..d2ae1be 100644 --- a/caching/src/local/caching_local.rs +++ b/caching/src/local/caching_local.rs @@ -1,15 +1,18 @@ -use async_trait::async_trait; -use std::sync::Arc; use std::collections::HashMap; use std::future::Future; +use std::sync::Arc; use std::time::Duration; -use serde::Serialize; + +use async_trait::async_trait; use serde::de::DeserializeOwned; +use serde::Serialize; use tokio::sync::Mutex; + use novax::caching::{CachingDurationStrategy, CachingStrategy}; -use novax::errors::NovaXError; use novax::errors::CachingError; -use crate::date::get_current_timestamp::{get_current_timestamp, get_timestamp_of_next_block}; +use novax::errors::NovaXError; + +use crate::date::get_current_timestamp::{get_current_timestamp, GetDuration}; #[derive(Clone, Debug)] pub struct CachingLocal { @@ -38,15 +41,7 @@ impl CachingLocal { } async fn set_value(&self, key: u64, value: &T) -> Result<(), NovaXError> { - let current_timestamp = get_current_timestamp()?; - let expiration_timestamp = match self.duration_strategy { - CachingDurationStrategy::EachBlock => { - get_timestamp_of_next_block(current_timestamp)? - } - CachingDurationStrategy::Duration(duration) => { - current_timestamp + duration - } - }; + let expiration_timestamp = self.duration_strategy.get_duration_timestamp(&get_current_timestamp()?)?; self.expiration_timestamp_map.lock().await.insert(key, expiration_timestamp); let Ok(serialized) = rmp_serde::to_vec(value) else { return Err(CachingError::UnableToSerialize.into())}; @@ -66,7 +61,9 @@ impl CachingStrategy for CachingLocal { Ok(None) } else { let Some(encoded_value) = self.value_map.lock().await.get(&key).cloned() else { return Ok(None) }; - let Ok(value) = rmp_serde::from_slice(&encoded_value) else { return Err(CachingError::UnableToDeserialize.into()) }; + let Ok(value) = rmp_serde::from_slice(&encoded_value) else { + return Err(CachingError::UnableToDeserialize.into()) + }; Ok(Some(value)) } @@ -109,8 +106,10 @@ impl CachingStrategy for CachingLocal { #[cfg(test)] mod test { use std::time::Duration; + use novax::caching::{CachingDurationStrategy, CachingStrategy}; use novax::errors::NovaXError; + use crate::date::get_current_timestamp::set_mock_time; use crate::local::caching_local::CachingLocal; @@ -155,7 +154,6 @@ mod test { let result = caching.get_cache::(key).await?; let expected = Some("test".to_string()); - assert_eq!(result, expected); Ok(()) diff --git a/caching/src/redis/caching_redis.rs b/caching/src/redis/caching_redis.rs new file mode 100644 index 0000000..e79ddeb --- /dev/null +++ b/caching/src/redis/caching_redis.rs @@ -0,0 +1,269 @@ +use std::future::Future; + +use async_trait::async_trait; +use serde::de::DeserializeOwned; +use serde::Serialize; + +use novax::caching::{CachingDurationStrategy, CachingStrategy}; +use novax::errors::{CachingError, NovaXError}; + +use crate::date::get_current_timestamp::{get_current_timestamp, GetDuration}; +use crate::redis::client::RedisClient; +use crate::redis::error::CachingRedisError; +use crate::redis::IntoConnectionInfo; + +pub type CachingRedis = BaseCachingRedis; + +#[derive(Clone, Debug)] +pub struct BaseCachingRedis { + pub(crate) client: Client, + pub duration_strategy: CachingDurationStrategy +} + +impl BaseCachingRedis { + pub fn new( + info: Info, + duration_strategy: CachingDurationStrategy + ) -> Result { + let client = Client::open(info)?; + + Ok( + BaseCachingRedis { + client, + duration_strategy + } + ) + } +} + +#[async_trait] +impl CachingStrategy for BaseCachingRedis { + async fn get_cache(&self, key: u64) -> Result, NovaXError> { + let opt_value_encoded: Option> = self.client + .get(key) + .await + .map_err(|e| { + CachingError::from(e) + })?; + + let Some(value_encoded) = opt_value_encoded else { + return Ok(None); + }; + + let Ok(decoded) = rmp_serde::from_slice(&value_encoded) else { + return Err(CachingError::UnableToDeserialize.into()) + }; + + Ok(Some(decoded)) + } + + async fn set_cache(&self, key: u64, value: &T) -> Result<(), NovaXError> { + let Ok(encoded) = rmp_serde::to_vec(value) else { + return Err(CachingError::UnableToSerialize.into()) + }; + + self.client + .set(key, encoded, self.duration_strategy.get_duration_from_now(&get_current_timestamp()?)?.as_secs()) + .await + .map_err(|e| { + CachingError::from(e).into() + }) + } + + async fn get_or_set_cache(&self, key: u64, getter: FutureGetter) -> Result + where + T: Serialize + DeserializeOwned + Send + Sync, + FutureGetter: Future> + Send, + Error: From + { + let opt_value = self.get_cache(key).await?; + + match opt_value { + None => { + let value_to_set = getter.await?; + self.set_cache(key, &value_to_set).await?; + Ok(value_to_set) + }, + Some(value) => { + Ok(value) + } + } + } + + async fn clear(&self) -> Result<(), NovaXError> { + self.client + .clear() + .await + .map_err(|e| { + CachingError::from(e).into() + }) + } + + fn with_duration_strategy(&self, strategy: CachingDurationStrategy) -> Self { + BaseCachingRedis { + client: self.client.clone(), + duration_strategy: strategy, + } + } +} + +#[cfg(test)] +mod test { + use std::time::Duration; + use async_trait::async_trait; + use redis::{FromRedisValue, IntoConnectionInfo, ToRedisArgs}; + + use novax::caching::{CachingDurationStrategy, CachingStrategy}; + use novax::errors::NovaXError; + + use crate::date::get_current_timestamp::set_mock_time; + use crate::redis::client::RedisClient; + use crate::redis::error::CachingRedisError; + use crate::redis::caching_redis::BaseCachingRedis; + + #[derive(Clone, Debug)] + struct MockRedisClient; + + #[async_trait] + impl RedisClient for MockRedisClient { + fn open(_info: Info) -> Result { + Ok(MockRedisClient) + } + + async fn get(&self, key: K) -> Result, CachingRedisError> { + if key.to_redis_args() == 1.to_redis_args() { // Not found + Ok(None) + } else if key.to_redis_args() == 2.to_redis_args() { // Found + Ok(Some(RV::from_byte_vec(&[146, 0, 1]).unwrap().into_iter().next().unwrap())) + } else { + Ok(None) + } + } + + async fn set(&self, key: K, value: V, duration: u64) -> Result<(), CachingRedisError> { + if value.to_redis_args() != rmp_serde::to_vec("test").unwrap().to_redis_args() { + panic!(); + } + + if key.to_redis_args() == 2.to_redis_args() { // set_cache_start_of_block + if duration != 6 { + panic!(); + } + + Ok(()) + } else if key.to_redis_args() == 3.to_redis_args() { // set_cache_next_block + if duration != 3 { + panic!(); + } + + Ok(()) + } else { + Ok(()) + } + } + + async fn clear(&self) -> Result<(), CachingRedisError> { + Ok(()) + } + } + + #[tokio::test] + async fn test_get_cache_key_not_found() -> Result<(), NovaXError> { + let caching = BaseCachingRedis::::new("", CachingDurationStrategy::EachBlock).unwrap(); + let key = 1; + + let result = caching.get_cache::<()>(key).await?; + + assert_eq!(result, None); + + Ok(()) + } + + #[tokio::test] + async fn test_get_cache_key_found() -> Result<(), NovaXError> { + let caching = BaseCachingRedis::::new("", CachingDurationStrategy::EachBlock).unwrap(); + let key = 2; + + let result = caching.get_cache::>(key).await?; + + assert_eq!(result, Some([0, 1].to_vec())); + + Ok(()) + } + + #[tokio::test] + async fn test_set_cache() -> Result<(), NovaXError> { + let caching = BaseCachingRedis::::new("", CachingDurationStrategy::EachBlock).unwrap(); + let key = 1; + let value = "test".to_string(); + + caching.set_cache(key, &value).await?; + + Ok(()) + } + + #[tokio::test] + async fn test_set_cache_start_of_block() -> Result<(), NovaXError> { + set_mock_time(Duration::from_secs(0)); + let caching = BaseCachingRedis::::new("", CachingDurationStrategy::EachBlock).unwrap(); + let key = 2; + let value = "test".to_string(); + + caching.set_cache(key, &value).await?; + + Ok(()) + } + + #[tokio::test] + async fn test_set_cache_next_block() -> Result<(), NovaXError> { + set_mock_time(Duration::from_secs(3)); + let caching = BaseCachingRedis::::new("", CachingDurationStrategy::EachBlock).unwrap(); + let key = 3; + let value = "test".to_string(); + + caching.set_cache(key, &value).await?; + + Ok(()) + } + + #[tokio::test] + async fn test_get_and_set_cache_key_not_found() -> Result<(), NovaXError> { + let caching = BaseCachingRedis::::new("", CachingDurationStrategy::EachBlock).unwrap(); + let key = 1; + + let result = caching.get_or_set_cache::(key, async { + // error if serialized + + Ok("test".to_string()) + }).await?; + + assert_eq!(result, "test"); + + Ok(()) + } + + #[tokio::test] + async fn test_get_and_set_cache_key_found() -> Result<(), NovaXError> { + let caching = BaseCachingRedis::::new("", CachingDurationStrategy::EachBlock).unwrap(); + let key = 2; + + let result = caching.get_or_set_cache::, _, NovaXError>(key, async { + // error if serialized + + panic!() + }).await?; + + assert_eq!(result, vec![0u8, 1u8]); + + Ok(()) + } + + #[tokio::test] + async fn test_clear() -> Result<(), NovaXError> { + let caching = BaseCachingRedis::::new("", CachingDurationStrategy::EachBlock).unwrap(); + + caching.clear().await?; + + Ok(()) + } +} \ No newline at end of file diff --git a/caching/src/redis/client.rs b/caching/src/redis/client.rs new file mode 100644 index 0000000..224d4e4 --- /dev/null +++ b/caching/src/redis/client.rs @@ -0,0 +1,64 @@ +use std::fmt::Debug; + +use async_trait::async_trait; +use redis::{AsyncConnectionConfig, FromRedisValue, IntoConnectionInfo, SetExpiry, ToRedisArgs}; +use redis::AsyncCommands; + +use crate::redis::error::CachingRedisError; + +#[async_trait] +pub trait RedisClient: Clone + Debug + Send + Sync { + fn open(info: Info) -> Result; + async fn set(&self, key: K, value: V, duration: u64) -> Result<(), CachingRedisError>; + async fn get(&self, key: K) -> Result, CachingRedisError>; + async fn clear(&self) -> Result<(), CachingRedisError>; +} + +#[async_trait] +impl RedisClient for redis::Client { + fn open(info: Info) -> Result { + match Self::open(info) { + Ok(client) => Ok(client), + Err(_) => Err(CachingRedisError::CannotOpenConnection) + } + } + + async fn set(&self, key: K, value: V, duration: u64) -> Result<(), CachingRedisError> { + let Ok(mut connection) = self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new()).await else { + return Err(CachingRedisError::CannotGetConnection) + }; + + let options = redis::SetOptions::default() + .with_expiration(SetExpiry::EX(duration)); + + if connection.set_options::<_, _, ()>(key, value, options).await.is_err() { + return Err(CachingRedisError::CannotSetValue) + } + + return Ok(()) + } + + async fn get(&self, key: K) -> Result, CachingRedisError> { + let Ok(mut connection) = self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new()).await else { + return Err(CachingRedisError::CannotGetConnection) + }; + + let Ok(value) = connection.get(key).await else { + return Err(CachingRedisError::CannotGetValue) + }; + + Ok(value) + } + + async fn clear(&self) -> Result<(), CachingRedisError> { + let Ok(mut connection) = self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new()).await else { + return Err(CachingRedisError::CannotGetConnection) + }; + + if redis::cmd("FLUSHALL").exec_async(&mut connection).await.is_err() { + return Err(CachingRedisError::CannotSetValue) + }; + + return Ok(()); + } +} \ No newline at end of file diff --git a/caching/src/redis/error.rs b/caching/src/redis/error.rs new file mode 100644 index 0000000..a66ff43 --- /dev/null +++ b/caching/src/redis/error.rs @@ -0,0 +1,56 @@ +use serde::{Deserialize, Serialize}; +use novax::errors::CachingError; + +#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)] +pub enum CachingRedisError { + CannotOpenConnection, + CannotGetConnection, + CannotGetValue, + CannotSetValue, + CannotClearAllValues +} + +impl CachingRedisError { + pub fn get_description(&self) -> String { + match self { + CachingRedisError::CannotOpenConnection => { + "Cannot open a connection to the redis server.".to_string() + } + CachingRedisError::CannotGetConnection => { + "Cannot get the connection to the redis server.".to_string() + } + CachingRedisError::CannotGetValue => { + "Cannot get the value from the redis server.".to_string() + } + CachingRedisError::CannotSetValue => { + "Cannot set the value to the redis server.".to_string() + } + CachingRedisError::CannotClearAllValues => { + "Cannot clear all the values in the redis server.".to_string() + } + } + } + pub fn get_type(&self) -> String { + "CachingRedisError".to_string() + } + + pub fn get_code(&self) -> usize { + match self { + CachingRedisError::CannotOpenConnection => 0, + CachingRedisError::CannotGetConnection => 1, + CachingRedisError::CannotGetValue => 2, + CachingRedisError::CannotSetValue => 3, + CachingRedisError::CannotClearAllValues => 4 + } + } +} + +impl From for CachingError { + fn from(value: CachingRedisError) -> Self { + CachingError::OtherError { + description: value.get_description(), + code: value.get_code(), + type_name: value.get_type(), + } + } +} \ No newline at end of file diff --git a/caching/src/redis/mod.rs b/caching/src/redis/mod.rs new file mode 100644 index 0000000..05cc005 --- /dev/null +++ b/caching/src/redis/mod.rs @@ -0,0 +1,8 @@ +pub mod caching_redis; +pub mod client; +pub mod error; + +pub use redis::ConnectionInfo; +pub use redis::IntoConnectionInfo; +pub use redis::RedisConnectionInfo; +pub use redis::RedisError; \ No newline at end of file diff --git a/core/src/errors/caching_error.rs b/core/src/errors/caching_error.rs index cb4d55a..285119c 100644 --- a/core/src/errors/caching_error.rs +++ b/core/src/errors/caching_error.rs @@ -1,4 +1,7 @@ +use std::fmt::Debug; + use serde::{Deserialize, Serialize}; + use crate::errors::novax_error::NovaXError; /// An enumeration of errors that may occur during caching operations within the contract framework. @@ -35,6 +38,7 @@ pub enum CachingError { /// Encountered when an error occurs in the getter function which is used to fetch data in case /// it is not found in the cache. ErrorInGetter, + OtherError { description: String, code: usize, type_name: String }, /// A catch-all for other unforeseen errors that may occur during caching operations. UnknownError,