From 2e848523250b6986c4c0d3567fa07215f383a0f5 Mon Sep 17 00:00:00 2001 From: pwbh <127856937+pwbh@users.noreply.github.com> Date: Wed, 18 Oct 2023 20:26:53 +0300 Subject: [PATCH] Check index doesn't exceed max index --- Cargo.lock | 21 ++++++ nyx-storage/Cargo.toml | 1 + nyx-storage/src/directory.rs | 114 +++++++++++++++++++++++++++++++++ nyx-storage/src/lib.rs | 24 ++++++- nyx-storage/src/write_queue.rs | 42 ++++++++++-- 5 files changed, 193 insertions(+), 9 deletions(-) create mode 100644 nyx-storage/src/directory.rs diff --git a/Cargo.lock b/Cargo.lock index cfa049d..e42570e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -322,6 +322,7 @@ name = "nyx-storage" version = "0.1.0" dependencies = [ "async-std", + "serde", ] [[package]] @@ -407,6 +408,26 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "serde" +version = "1.0.189" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.189" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "slab" version = "0.4.9" diff --git a/nyx-storage/Cargo.toml b/nyx-storage/Cargo.toml index 5ae0195..86a968d 100644 --- a/nyx-storage/Cargo.toml +++ b/nyx-storage/Cargo.toml @@ -7,3 +7,4 @@ edition = "2021" [dependencies] async-std.workspace = true +serde.workspace = true diff --git a/nyx-storage/src/directory.rs b/nyx-storage/src/directory.rs new file mode 100644 index 0000000..30bb82e --- /dev/null +++ b/nyx-storage/src/directory.rs @@ -0,0 +1,114 @@ +use std::{ + fmt::Debug, + fs::{self}, + path::PathBuf, +}; + +#[derive(Debug, Default)] +pub struct Directory { + custom_dir: Option, + title: String, +} + +impl Directory { + /// Creates a managed directory in the predefined path of the root directory for Nyx. + pub fn new(title: &str) -> Self { + Self { + custom_dir: None, + title: title.to_owned(), + } + } + + /// Create a managed directory in the predefined path of the root directory + /// for Nyx at specified custom directory `custom_dir`, this manager can then perform + /// different fs actions such as opening a file in the directory, saving the file to the directory, + /// and creating empty files in the directory safely in the context + /// of the directory it was created in. + /// + /// when creating a Directory by passing `custom_dir` the Directory will still + /// work in the context of the Nyx foder. + pub fn with_dir(title: &str, custom_dir: Option<&PathBuf>) -> Self { + Self { + custom_dir: custom_dir.cloned(), + title: title.to_owned(), + } + } + + pub fn create(&self, path: &str) -> Result { + let project_dir = self.get_base_dir(self.custom_dir.as_ref())?; + let project_dir_str = project_dir + .to_str() + .ok_or("Failed while validating UTF-8 string integrity.")?; + let total_path = format!("{}/{}", project_dir_str, path); + match fs::create_dir_all(&total_path) { + Ok(_) => {} + Err(e) => { + println!("Directory -> create func error: {}", e) + } + }; + Ok(total_path.into()) + } + + fn get_filepath(&self, path: &str, custom_path: Option<&PathBuf>) -> Result { + let dir = self.get_base_dir(custom_path)?; + let dir_str = dir + .to_str() + .ok_or("Not valid UTF-8 string has been passed.".to_string())?; + + let filepath = format!("{}/{}", dir_str, path); + Ok(filepath.into()) + } + + pub fn get_base_dir(&self, custom_path: Option<&PathBuf>) -> Result { + let final_path = if let Some(custom_path) = custom_path { + let dist = custom_path + .clone() + .to_str() + .ok_or("Invalid format provided for the directory")? + .to_string(); + format!("{}/{}", self.title, dist) + } else { + self.title.clone() + }; + + let mut final_dir: Option = None; + + // Unix-based machines + if let Ok(home_dir) = std::env::var("HOME") { + let config_dir = format!("{}/.config/{}", home_dir, final_path); + final_dir = Some(config_dir.into()); + } + // Windows based machines + else if let Ok(user_profile) = std::env::var("USERPROFILE") { + let config_dir = format!(r"{}/AppData/Roaming/{}", user_profile, final_path); + final_dir = Some(config_dir.into()); + } + + final_dir.ok_or("Couldn't get the systems home directory. Please setup a HOME env variable and pass your system's home directory there.".to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + #[cfg_attr(miri, ignore)] + fn get_local_metadata_directory_returns_dir_as_expected() { + let dir = Directory::new("nyx-storage"); + let dir_path = dir.get_base_dir(None).unwrap(); + assert!(dir_path.to_str().unwrap().contains("nyx")); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn get_local_metadata_filepath_returns_filepath_as_expected() { + let dir = Directory::new("nyx-storage"); + let filepath = dir.get_filepath("metadata.json", None).unwrap(); + println!("{:?}", filepath); + assert!(filepath + .to_str() + .unwrap() + .contains("nyx-storage/metadata.json")); + } +} diff --git a/nyx-storage/src/lib.rs b/nyx-storage/src/lib.rs index 1b9f420..ccf8c36 100644 --- a/nyx-storage/src/lib.rs +++ b/nyx-storage/src/lib.rs @@ -18,10 +18,14 @@ mod offsets; mod storage_sender; mod write_queue; +pub mod directory; + use offsets::Offsets; +/// NOTE: Each partition of a topic should have Storage struct pub struct Storage { indices: HashMap, + max_index: usize, file: Arc, // TODO: When Storage will be accessed concurrently each concurrent accessor should have a // `retrievable_buffer` of its own to read into instead. @@ -32,13 +36,16 @@ pub struct Storage { impl Storage { pub async fn new(path: &'static Path, max_queue: usize) -> Result { + let file = Arc::new(File::open(path).await.map_err(|e| e.to_string())?); let (write_sender, write_receiver) = bounded(max_queue); - let write_queue_handle = async_std::task::spawn(WriteQueue::run(write_receiver, &path)); + let write_queue_handle = + async_std::task::spawn(WriteQueue::run(write_receiver, file.clone())); Ok(Self { indices: HashMap::new(), - file: Arc::new(File::open(path).await.map_err(|e| e.to_string())?), + max_index: 0, + file, retrivable_buffer: [0; 8192], write_sender, write_queue_handle, @@ -50,6 +57,14 @@ impl Storage { } pub async fn get(&mut self, index: usize) -> Result<&[u8], String> { + if index >= self.max_index { + return Err(format!( + "Request data at index {} but max index is {}", + index, + self.max_index - 1 + )); + } + let offsets = self .indices .get(&index) @@ -92,4 +107,9 @@ impl Storage { #[cfg(test)] mod tests { use super::*; + + #[test] + fn create_storage_instance() { + // let storage = Storage::new(); + } } diff --git a/nyx-storage/src/write_queue.rs b/nyx-storage/src/write_queue.rs index 64e01f3..df1d04f 100644 --- a/nyx-storage/src/write_queue.rs +++ b/nyx-storage/src/write_queue.rs @@ -1,19 +1,47 @@ -use std::io; +use std::{ + io::{self, SeekFrom}, + sync::Arc, +}; -use async_std::{channel::Receiver, fs::OpenOptions, io::WriteExt, path::Path}; +use async_std::{ + channel::Receiver, + fs::File, + io::{prelude::SeekExt, WriteExt}, +}; -pub struct WriteQueue; +pub struct WriteQueue { + file: Arc, + current_last_byte: usize, +} impl WriteQueue { - pub async fn run(queue: Receiver>, path: &Path) -> io::Result<()> { - let mut data_file = OpenOptions::new().append(true).open(path).await?; + pub async fn new(file: Arc) -> io::Result { + let mut file_ref = &*file; + file_ref.seek(SeekFrom::End(0)).await?; - data_file.write_all(b"I am trying to write").await?; + Ok(Self { + file, + current_last_byte: 0, + }) + } + + async fn write(&mut self, buf: &[u8]) -> io::Result<()> { + let mut file = &*self.file; + let n = file.write(buf).await?; + self.current_last_byte += n; + file.seek(SeekFrom::End(0)).await?; + Ok(()) + } + + pub async fn run(queue: Receiver>, file: Arc) -> io::Result<()> { + let mut write_queue = Self::new(file).await?; while let Ok(data) = queue.recv().await { - println!("Do something with received data: {:?}", data); + write_queue.write(&data[..]).await?; } Ok(()) } + + // pub }