Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Commit

Permalink
Check index doesn't exceed max index
Browse files Browse the repository at this point in the history
  • Loading branch information
pwbh committed Oct 18, 2023
1 parent 35cf760 commit 2e84852
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 9 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nyx-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ edition = "2021"

[dependencies]
async-std.workspace = true
serde.workspace = true
114 changes: 114 additions & 0 deletions nyx-storage/src/directory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::{
fmt::Debug,
fs::{self},
path::PathBuf,
};

#[derive(Debug, Default)]
pub struct Directory {
custom_dir: Option<PathBuf>,
title: String,
}

impl Directory {
/// Creates a managed directory in the predefined path of the root directory for Nyx.
pub fn new(title: &str) -> Self {
Self {
custom_dir: None,
title: title.to_owned(),
}
}

/// Create a managed directory in the predefined path of the root directory
/// for Nyx at specified custom directory `custom_dir`, this manager can then perform
/// different fs actions such as opening a file in the directory, saving the file to the directory,
/// and creating empty files in the directory safely in the context
/// of the directory it was created in.
///
/// when creating a Directory by passing `custom_dir` the Directory will still
/// work in the context of the Nyx foder.
pub fn with_dir(title: &str, custom_dir: Option<&PathBuf>) -> Self {
Self {
custom_dir: custom_dir.cloned(),
title: title.to_owned(),
}
}

pub fn create(&self, path: &str) -> Result<PathBuf, String> {
let project_dir = self.get_base_dir(self.custom_dir.as_ref())?;
let project_dir_str = project_dir
.to_str()
.ok_or("Failed while validating UTF-8 string integrity.")?;
let total_path = format!("{}/{}", project_dir_str, path);
match fs::create_dir_all(&total_path) {
Ok(_) => {}
Err(e) => {
println!("Directory -> create func error: {}", e)
}
};
Ok(total_path.into())
}

fn get_filepath(&self, path: &str, custom_path: Option<&PathBuf>) -> Result<PathBuf, String> {
let dir = self.get_base_dir(custom_path)?;
let dir_str = dir
.to_str()
.ok_or("Not valid UTF-8 string has been passed.".to_string())?;

let filepath = format!("{}/{}", dir_str, path);
Ok(filepath.into())
}

pub fn get_base_dir(&self, custom_path: Option<&PathBuf>) -> Result<PathBuf, String> {
let final_path = if let Some(custom_path) = custom_path {
let dist = custom_path
.clone()
.to_str()
.ok_or("Invalid format provided for the directory")?
.to_string();
format!("{}/{}", self.title, dist)
} else {
self.title.clone()
};

let mut final_dir: Option<PathBuf> = None;

// Unix-based machines
if let Ok(home_dir) = std::env::var("HOME") {
let config_dir = format!("{}/.config/{}", home_dir, final_path);
final_dir = Some(config_dir.into());
}
// Windows based machines
else if let Ok(user_profile) = std::env::var("USERPROFILE") {
let config_dir = format!(r"{}/AppData/Roaming/{}", user_profile, final_path);
final_dir = Some(config_dir.into());
}

final_dir.ok_or("Couldn't get the systems home directory. Please setup a HOME env variable and pass your system's home directory there.".to_string())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
#[cfg_attr(miri, ignore)]
fn get_local_metadata_directory_returns_dir_as_expected() {
let dir = Directory::new("nyx-storage");
let dir_path = dir.get_base_dir(None).unwrap();
assert!(dir_path.to_str().unwrap().contains("nyx"));
}

#[test]
#[cfg_attr(miri, ignore)]
fn get_local_metadata_filepath_returns_filepath_as_expected() {
let dir = Directory::new("nyx-storage");
let filepath = dir.get_filepath("metadata.json", None).unwrap();
println!("{:?}", filepath);
assert!(filepath
.to_str()
.unwrap()
.contains("nyx-storage/metadata.json"));
}
}
24 changes: 22 additions & 2 deletions nyx-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ mod offsets;
mod storage_sender;
mod write_queue;

pub mod directory;

use offsets::Offsets;

/// NOTE: Each partition of a topic should have Storage struct
pub struct Storage {
indices: HashMap<usize, Offsets>,
max_index: usize,
file: Arc<File>,
// TODO: When Storage will be accessed concurrently each concurrent accessor should have a
// `retrievable_buffer` of its own to read into instead.
Expand All @@ -32,13 +36,16 @@ pub struct Storage {

impl Storage {
pub async fn new(path: &'static Path, max_queue: usize) -> Result<Self, String> {
let file = Arc::new(File::open(path).await.map_err(|e| e.to_string())?);
let (write_sender, write_receiver) = bounded(max_queue);

let write_queue_handle = async_std::task::spawn(WriteQueue::run(write_receiver, &path));
let write_queue_handle =
async_std::task::spawn(WriteQueue::run(write_receiver, file.clone()));

Ok(Self {
indices: HashMap::new(),
file: Arc::new(File::open(path).await.map_err(|e| e.to_string())?),
max_index: 0,
file,
retrivable_buffer: [0; 8192],
write_sender,
write_queue_handle,
Expand All @@ -50,6 +57,14 @@ impl Storage {
}

pub async fn get(&mut self, index: usize) -> Result<&[u8], String> {
if index >= self.max_index {
return Err(format!(
"Request data at index {} but max index is {}",
index,
self.max_index - 1
));
}

let offsets = self
.indices
.get(&index)
Expand Down Expand Up @@ -92,4 +107,9 @@ impl Storage {
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn create_storage_instance() {
// let storage = Storage::new();
}
}
42 changes: 35 additions & 7 deletions nyx-storage/src/write_queue.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,47 @@
use std::io;
use std::{
io::{self, SeekFrom},
sync::Arc,
};

use async_std::{channel::Receiver, fs::OpenOptions, io::WriteExt, path::Path};
use async_std::{
channel::Receiver,
fs::File,
io::{prelude::SeekExt, WriteExt},
};

pub struct WriteQueue;
pub struct WriteQueue {
file: Arc<File>,
current_last_byte: usize,
}

impl WriteQueue {
pub async fn run(queue: Receiver<Vec<u8>>, path: &Path) -> io::Result<()> {
let mut data_file = OpenOptions::new().append(true).open(path).await?;
pub async fn new(file: Arc<File>) -> io::Result<Self> {
let mut file_ref = &*file;
file_ref.seek(SeekFrom::End(0)).await?;

data_file.write_all(b"I am trying to write").await?;
Ok(Self {
file,
current_last_byte: 0,
})
}

async fn write(&mut self, buf: &[u8]) -> io::Result<()> {
let mut file = &*self.file;
let n = file.write(buf).await?;
self.current_last_byte += n;
file.seek(SeekFrom::End(0)).await?;
Ok(())
}

pub async fn run(queue: Receiver<Vec<u8>>, file: Arc<File>) -> io::Result<()> {
let mut write_queue = Self::new(file).await?;

while let Ok(data) = queue.recv().await {
println!("Do something with received data: {:?}", data);
write_queue.write(&data[..]).await?;
}

Ok(())
}

// pub
}

0 comments on commit 2e84852

Please sign in to comment.