From 52b6f6f16a42cd539895cf44fe875df9aa3795c9 Mon Sep 17 00:00:00 2001 From: HatemMn <19950216+HatemMn@users.noreply.github.com> Date: Tue, 3 Dec 2024 17:22:00 +0100 Subject: [PATCH] UNSTABLE : WIP on redis --- Cargo.lock | 2 +- Cargo.toml | 3 +- crate/server/Cargo.toml | 1 + crate/server/src/config/params/db_params.rs | 8 +- crate/server/src/database/database_struct.rs | 7 + crate/server/src/database/database_traits.rs | 52 ++--- crate/server/src/database/mod.rs | 1 + crate/server/src/database/redis/datasets.rs | 10 +- crate/server/src/database/redis/findex.rs | 214 +++++++++++++++++- crate/server/src/database/redis/mod.rs | 6 +- .../server/src/database/redis/permissions.rs | 12 +- 11 files changed, 258 insertions(+), 58 deletions(-) create mode 100644 crate/server/src/database/database_struct.rs diff --git a/Cargo.lock b/Cargo.lock index 0477926..e17cf39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -866,7 +866,7 @@ dependencies = [ [[package]] name = "cosmian_findex" version = "7.0.0" -source = "git+https://github.com/Cosmian/findex?branch=feat/prepare_findex_server_v7#25ea96bbc29ddb39961a0de13d6f467545f4b9bc" +source = "git+https://github.com/Cosmian/findex?branch=feat/update_for_findex_server#e03c97f496ecb1b3195d3c839df6e4717cb3d384" dependencies = [ "aes", "rand", diff --git a/Cargo.toml b/Cargo.toml index 7089705..77165bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ actix-web = { version = "4.9.0", default-features = false } base64 = "0.21" clap = { version = "4.5", default-features = false } # cloudproof_findex = { path = "../cloudproof_rust/crates/findex" } +# todo(Hatem) : DELET THE PACKAGE BELOW, PURGE IT cloudproof_findex = { git = "https://www.github.com/Cosmian/cloudproof_rust", branch = "feat/add_basic_findex_rest_client" } # cosmian_config_utils = { path ="../../core/client_server/crate/config_utils" } # cosmian_http_client = { path ="../../core/client_server/crate/http_client" } @@ -66,4 +67,4 @@ url = "2.5" x509-parser = "0.16" zeroize = { version = "1.8", default-features = false } uuid = { version = "1.10", features = ["v4"] } -cosmian_findex = { git = "https://github.com/Cosmian/findex", branch = "feat/prepare_findex_server_v7" } +cosmian_findex = { git = "https://github.com/Cosmian/findex", branch = "feat/update_for_findex_server" } diff --git a/crate/server/Cargo.toml b/crate/server/Cargo.toml index 077b5d2..8e22bea 100644 --- a/crate/server/Cargo.toml +++ b/crate/server/Cargo.toml @@ -42,6 +42,7 @@ clap = { workspace = true, features = [ "derive", "cargo", ] } +# TODO(hatem) : DELET THE PACKAGE BELOW, PURGE IT cloudproof_findex = { workspace = true, features = ["redis-interface"] } cosmian_findex = { workspace = true, features = ["redis-mem"] } cosmian_findex_structs = { path = "../structs" } diff --git a/crate/server/src/config/params/db_params.rs b/crate/server/src/config/params/db_params.rs index c4fad68..058809b 100644 --- a/crate/server/src/config/params/db_params.rs +++ b/crate/server/src/config/params/db_params.rs @@ -3,7 +3,7 @@ use std::fmt::{self, Display}; use url::Url; pub enum DbParams { - Redis(Url), + Redis(Url, usize), } impl DbParams { @@ -11,7 +11,7 @@ impl DbParams { #[must_use] pub const fn db_name(&self) -> &str { match &self { - Self::Redis(_) => "Redis", + Self::Redis(_, _) => "Redis", } } } @@ -19,8 +19,8 @@ impl DbParams { impl Display for DbParams { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::Redis(url) => { - write!(f, "redis: {}", redact_url(url),) + Self::Redis(url, w) => { + write!(f, "redis : {} | Word length = {}", redact_url(url), w) } } } diff --git a/crate/server/src/database/database_struct.rs b/crate/server/src/database/database_struct.rs new file mode 100644 index 0000000..dfd68d8 --- /dev/null +++ b/crate/server/src/database/database_struct.rs @@ -0,0 +1,7 @@ +use cosmian_findex::{Address, MemoryADT, ADDRESS_LENGTH}; + +use super::database_traits::FindexMemoryTrait; + +pub(crate) struct FindexDatabase { + pub(crate) memory: usize, +} diff --git a/crate/server/src/database/database_traits.rs b/crate/server/src/database/database_traits.rs index 246e6bd..224c9eb 100644 --- a/crate/server/src/database/database_traits.rs +++ b/crate/server/src/database/database_traits.rs @@ -1,49 +1,27 @@ use async_trait::async_trait; -use cloudproof_findex::{ - db_interfaces::{redis::FindexTable, rest::UpsertData}, - reexport::cosmian_findex::{ - TokenToEncryptedValueMap, TokenWithEncryptedValueList, Tokens, ENTRY_LENGTH, LINK_LENGTH, - }, -}; + +use cosmian_findex::{Address, MemoryADT, ADDRESS_LENGTH}; use cosmian_findex_structs::{EncryptedEntries, Permission, Permissions, Uuids}; use uuid::Uuid; use crate::error::result::FResult; -#[async_trait] -pub(crate) trait FindexTrait: Sync + Send { +use super::redis::WORD_LENGTH; + +pub(crate) trait FindexMemoryTrait: + Send + Sync + Clone + MemoryADT
, Word = [u8; WORD_LENGTH]> +{ // - // Findex v6 + // Findex // - async fn findex_fetch_entries( - &self, - index_id: &Uuid, - tokens: Tokens, - ) -> FResult>; - async fn findex_fetch_chains( - &self, - index_id: &Uuid, - tokens: Tokens, - ) -> FResult>; - async fn findex_upsert_entries( - &self, - index_id: &Uuid, - upsert_data: UpsertData, - ) -> FResult>; - async fn findex_insert_chains( - &self, - index_id: &Uuid, - items: TokenToEncryptedValueMap, - ) -> FResult<()>; - async fn findex_delete( - &self, - index_id: &Uuid, - findex_table: FindexTable, - tokens: Tokens, - ) -> FResult<()>; - async fn findex_dump_tokens(&self, index_id: &Uuid) -> FResult; } +pub(crate) type Asba = dyn FindexMemoryTrait< + Word = [u8; WORD_LENGTH], + Address = Address, + Error = dyn Send + Sync + std::error::Error, +>; + #[async_trait] pub(crate) trait PermissionsTrait: Sync + Send { // @@ -77,4 +55,4 @@ pub(crate) trait DatasetsTrait: Sync + Send { } #[async_trait] -pub(crate) trait DatabaseTraits: FindexTrait + PermissionsTrait + DatasetsTrait {} +pub(crate) trait DatabaseTraits: PermissionsTrait + DatasetsTrait {} diff --git a/crate/server/src/database/mod.rs b/crate/server/src/database/mod.rs index fff8326..85cceb3 100644 --- a/crate/server/src/database/mod.rs +++ b/crate/server/src/database/mod.rs @@ -1,3 +1,4 @@ +mod database_struct; mod database_traits; mod redis; diff --git a/crate/server/src/database/redis/datasets.rs b/crate/server/src/database/redis/datasets.rs index 34d6431..02b56c5 100644 --- a/crate/server/src/database/redis/datasets.rs +++ b/crate/server/src/database/redis/datasets.rs @@ -6,7 +6,7 @@ use redis::pipe; use tracing::{instrument, trace}; use uuid::Uuid; -use super::Redis; +use super::{ServerRedis, WORD_LENGTH}; use crate::{ database::database_traits::DatasetsTrait, error::{result::FResult, server::FindexServerError}, @@ -18,7 +18,7 @@ fn build_dataset_key(index_id: &Uuid, uid: &Uuid) -> Vec { } #[async_trait] -impl DatasetsTrait for Redis { +impl DatasetsTrait for ServerRedis { // // Dataset management // @@ -34,7 +34,7 @@ impl DatasetsTrait for Redis { pipe.set(key, data); } pipe.atomic() - .query_async(&mut self.mgr.clone()) + .query_async(&mut self.memory.manager.clone()) .await .map_err(FindexServerError::from) } @@ -47,7 +47,7 @@ impl DatasetsTrait for Redis { pipe.del(key); } pipe.atomic() - .query_async(&mut self.mgr.clone()) + .query_async(&mut self.memory.manager.clone()) .await .map_err(FindexServerError::from) } @@ -70,7 +70,7 @@ impl DatasetsTrait for Redis { } let values: Vec> = pipe .atomic() - .query_async(&mut self.mgr.clone()) + .query_async(&mut self.memory.manager.clone()) .await .map_err(FindexServerError::from)?; diff --git a/crate/server/src/database/redis/findex.rs b/crate/server/src/database/redis/findex.rs index 9c9e7f3..a6ecb15 100644 --- a/crate/server/src/database/redis/findex.rs +++ b/crate/server/src/database/redis/findex.rs @@ -1,5 +1,215 @@ use cosmian_findex::{Address, RedisMemory, ADDRESS_LENGTH}; -pub(crate) struct ServerRedis { - inner: RedisMemory, 8>, +use crate::database::database_traits::FindexTrait; + +use super::WORD_LENGTH; + +// Word length is function of the serialization function provided when findex is instantiated +// In the (naïve) case of dummy_encode / dummy_decode as provided in findex benches, +// WORD_LENGTH = 1 + CHUNK_LENGTH = 1 + (8 * BLOCK_LENGTH) = 129 for a BLOCK_LENGTH set to 16. +pub(crate) struct ServerRedis { + pub(crate) memory: RedisMemory, [u8; WORD_LENGTH]>, +} + +impl FindexTrait for ServerRedis { + //todo(hatem): implement the following functions + + // #[instrument(ret(Display), err, skip_all)] + // async fn findex_fetch_entries( + // &self, + // index_id: &Uuid, + // tokens: Tokens, + // ) -> FResult> { + // trace!("fetch_entries: number of tokens: {}", tokens.len()); + // let uids = tokens.into_iter().collect::>(); + // trace!("fetch_entries: uids len: {}", uids.len()); + + // let redis_keys = uids + // .iter() + // .map(|uid| build_key(index_id, FindexTable::Entry, uid)) + // .collect::>(); + // trace!("fetch_entries: redis_keys len: {}", redis_keys.len()); + + // // Fetch all the values in an atomic operation. + // let mut pipe = pipe(); + // for key in redis_keys { + // pipe.get(key); + // } + // let values: Vec> = pipe + // .atomic() + // .query_async(&mut self.mgr.clone()) + // .await + // .map_err(FindexServerError::from)?; + + // trace!("findex_fetch_entries: values len: {}", values.len()); + + // // Zip and filter empty values out. + // let res = uids + // .into_iter() + // .zip(values) + // .filter_map(|(k, v)| { + // if v.is_empty() { + // None + // } else { + // Some(EncryptedValue::try_from(v.as_slice()).map(|v| (k, v))) + // } + // }) + // .collect::, CoreError>>()?; + // trace!("fetch_entries: non empty tuples len: {}", res.len()); + + // Ok(res.into()) + // } + + // #[instrument(ret(Display), err, skip_all)] + // async fn findex_fetch_chains( + // &self, + // index_id: &Uuid, + // tokens: Tokens, + // ) -> FResult> { + // trace!("fetch_chains: number of tokens: {}", tokens.len()); + // let uids = tokens.into_iter().collect::>(); + // trace!("fetch_chains: uids len: {}", uids.len()); + + // let redis_keys = uids + // .iter() + // .map(|uid| build_key(index_id, FindexTable::Chain, uid)) + // .collect::>(); + // trace!("fetch_chains: redis_keys len: {}", redis_keys.len()); + + // // Fetch all the values in an atomic operation. + // let mut pipe = pipe(); + // for key in redis_keys { + // pipe.get(key); + // } + // let values: Vec> = pipe + // .atomic() + // .query_async(&mut self.mgr.clone()) + // .await + // .map_err(FindexServerError::from)?; + + // // Zip and filter empty values out. + // let res = uids + // .into_iter() + // .zip(values) + // .filter_map(|(k, v)| { + // if v.is_empty() { + // None + // } else { + // Some(EncryptedValue::try_from(v.as_slice()).map(|v| (k, v))) + // } + // }) + // .collect::, CoreError>>()?; + // trace!("fetch_entries: non empty tuples len: {}", res.len()); + + // let result: TokenWithEncryptedValueList = res.into(); + + // Ok(result) + // } + + // #[instrument(ret(Display), err, skip_all)] + // async fn findex_upsert_entries( + // &self, + // index_id: &Uuid, + // upsert_data: UpsertData, + // ) -> FResult> { + // trace!( + // "upsert_entries: number of upsert data: {}", + // upsert_data.len() + // ); + + // let mut old_values = HashMap::with_capacity(upsert_data.len()); + // let mut new_values = HashMap::with_capacity(upsert_data.len()); + // for (token, (old_value, new_value)) in upsert_data { + // if let Some(old_value) = old_value { + // old_values.insert(token, old_value); + // } + // new_values.insert(token, new_value); + // } + + // trace!( + // "upsert_entries: number of old_values {}, number of new_values {}", + // old_values.len(), + // new_values.len() + // ); + + // let mut rejected = HashMap::with_capacity(new_values.len()); + // for (uid, new_value) in new_values { + // let new_value = Vec::from(&new_value); + // let old_value = old_values.get(&uid).map(Vec::from).unwrap_or_default(); + // let key = build_key(index_id, FindexTable::Entry, &uid); + + // let indexed_value: Vec<_> = self + // .upsert_script + // .arg(key) + // .arg(old_value) + // .arg(new_value) + // .invoke_async(&mut self.mgr.clone()) + // .await?; + + // if !indexed_value.is_empty() { + // let encrypted_value = EncryptedValue::try_from(indexed_value.as_slice())?; + // rejected.insert(uid, encrypted_value); + // } + // } + + // trace!("upsert_entries: rejected: {}", rejected.len()); + + // Ok(rejected.into()) + // } + + // #[instrument(ret, err, skip_all)] + // async fn findex_insert_chains( + // &self, + // index_id: &Uuid, + // items: TokenToEncryptedValueMap, + // ) -> FResult<()> { + // let mut pipe = pipe(); + // for (k, v) in &*items { + // pipe.set(build_key(index_id, FindexTable::Chain, k), Vec::from(v)); + // } + // pipe.atomic() + // .query_async(&mut self.mgr.clone()) + // .await + // .map_err(FindexServerError::from) + // } + + // #[instrument(ret, err, skip_all)] + // async fn findex_delete( + // &self, + // index_id: &Uuid, + // findex_table: FindexTable, + // entry_uids: Tokens, + // ) -> FResult<()> { + // let mut pipeline = pipe(); + // for uid in entry_uids { + // pipeline.del(build_key(index_id, findex_table, &uid)); + // } + // pipeline + // .atomic() + // .query_async(&mut self.mgr.clone()) + // .await + // .map_err(FindexServerError::from) + // } + + // #[instrument(ret(Display), err, skip_all)] + // #[allow(clippy::indexing_slicing)] + // async fn findex_dump_tokens(&self, index_id: &Uuid) -> FResult { + // let keys: Vec> = self + // .mgr + // .clone() + // .keys(build_key(index_id, FindexTable::Entry, b"*")) + // .await?; + + // trace!("dumping {} keywords (ET+CT)", keys.len()); + + // let mut tokens_set = HashSet::new(); + // for key in keys { + // if key[..TABLE_PREFIX_LENGTH] == [0x00, u8::from(FindexTable::Entry)] { + // if let Ok(token) = Token::try_from(&key[TABLE_PREFIX_LENGTH..]) { + // tokens_set.insert(token); + // } + // } + // } + // Ok(Tokens::from(tokens_set)) + // } } diff --git a/crate/server/src/database/redis/mod.rs b/crate/server/src/database/redis/mod.rs index 678779b..7be5e9d 100644 --- a/crate/server/src/database/redis/mod.rs +++ b/crate/server/src/database/redis/mod.rs @@ -1,6 +1,8 @@ #![allow(clippy::blocks_in_conditions)] //todo(manu): fix it -pub(crate) use findex::Redis; +pub(crate) use findex::ServerRedis; + +pub(crate) const WORD_LENGTH: usize = 129; use super::DatabaseTraits; @@ -8,4 +10,4 @@ mod datasets; mod findex; mod permissions; -impl DatabaseTraits for Redis {} +impl DatabaseTraits for ServerRedis {} diff --git a/crate/server/src/database/redis/permissions.rs b/crate/server/src/database/redis/permissions.rs index eb61be5..0405fc3 100644 --- a/crate/server/src/database/redis/permissions.rs +++ b/crate/server/src/database/redis/permissions.rs @@ -4,14 +4,14 @@ use redis::pipe; use tracing::{instrument, trace}; use uuid::Uuid; -use super::Redis; +use super::{ServerRedis, WORD_LENGTH}; use crate::{ database::database_traits::PermissionsTrait, error::{result::FResult, server::FindexServerError}, }; #[async_trait] -impl PermissionsTrait for Redis { +impl PermissionsTrait for ServerRedis { #[instrument(ret(Display), err, skip(self))] async fn create_index_id(&self, user_id: &str) -> FResult { let uuid = Uuid::new_v4(); @@ -26,7 +26,7 @@ impl PermissionsTrait for Redis { let mut pipe = pipe(); pipe.set::<_, _>(key, permissions.serialize()); pipe.atomic() - .query_async::<()>(&mut self.mgr.clone()) + .query_async::<()>(&mut self.memory.manager.clone()) .await .map_err(FindexServerError::from)?; @@ -42,7 +42,7 @@ impl PermissionsTrait for Redis { let mut values: Vec> = pipe .atomic() - .query_async(&mut self.mgr.clone()) + .query_async(&mut self.memory.manager.clone()) .await .map_err(FindexServerError::from)?; @@ -83,7 +83,7 @@ impl PermissionsTrait for Redis { let mut pipe = pipe(); pipe.set::<_, _>(key, permissions.serialize()); pipe.atomic() - .query_async(&mut self.mgr.clone()) + .query_async(&mut self.memory.manager.clone()) .await .map_err(FindexServerError::from) } @@ -98,7 +98,7 @@ impl PermissionsTrait for Redis { pipe.set::<_, _>(key, permissions.serialize()); pipe.atomic() - .query_async::<()>(&mut self.mgr.clone()) + .query_async::<()>(&mut self.memory.manager.clone()) .await .map_err(FindexServerError::from)?; }