From 35cf76068bed725e4bd8909c3a537c4209e175b7 Mon Sep 17 00:00:00 2001 From: pwbh <127856937+pwbh@users.noreply.github.com> Date: Wed, 18 Oct 2023 09:40:40 +0300 Subject: [PATCH] Added more async capabitilies for storage --- nyx-storage/src/lib.rs | 68 +++++++++++++++++----------------- nyx-storage/src/offsets.rs | 26 +++++++++++++ nyx-storage/src/write_queue.rs | 10 ++++- 3 files changed, 68 insertions(+), 36 deletions(-) create mode 100644 nyx-storage/src/offsets.rs diff --git a/nyx-storage/src/lib.rs b/nyx-storage/src/lib.rs index 9b67c46..1b9f420 100644 --- a/nyx-storage/src/lib.rs +++ b/nyx-storage/src/lib.rs @@ -1,36 +1,24 @@ use std::{ collections::HashMap, - fs::File, - io::{self, Read, Seek}, - path::Path, + io::{self}, sync::Arc, }; -use async_std::channel::{bounded, Sender}; +use async_std::{ + channel::{bounded, Sender}, + fs::File, + io::{prelude::SeekExt, ReadExt, SeekFrom}, + path::Path, + task::JoinHandle, +}; use storage_sender::StorageSender; use write_queue::WriteQueue; +mod offsets; mod storage_sender; mod write_queue; -#[derive(Clone, Copy)] -struct Offsets { - start: usize, - end: usize, -} - -impl Offsets { - pub fn new(start: usize, end: usize) -> Result { - if start >= end { - return Err(format!( - "Start ({}) can't be greater o equal to end ({}) of file byte", - start, end - )); - } - - Ok(Self { start, end }) - } -} +use offsets::Offsets; pub struct Storage { indices: HashMap, @@ -39,33 +27,35 @@ pub struct Storage { // `retrievable_buffer` of its own to read into instead. retrivable_buffer: [u8; 8192], write_sender: Sender>, + write_queue_handle: JoinHandle>, } impl Storage { - pub fn new(path: &Path, max_queue: usize) -> Result { + pub async fn new(path: &'static Path, max_queue: usize) -> Result { let (write_sender, write_receiver) = bounded(max_queue); - async_std::task::spawn(WriteQueue::run(write_receiver)); + let write_queue_handle = async_std::task::spawn(WriteQueue::run(write_receiver, &path)); Ok(Self { indices: HashMap::new(), - file: Arc::new(File::open(path).map_err(|e| e.to_string())?), + file: Arc::new(File::open(path).await.map_err(|e| e.to_string())?), retrivable_buffer: [0; 8192], write_sender, + write_queue_handle, }) } - pub fn new_sender(&mut self) -> Result { - Ok(StorageSender::new(self.write_sender.clone())) + pub fn get_storage_sender(&mut self) -> StorageSender { + StorageSender::new(self.write_sender.clone()) } - pub fn get(&mut self, index: usize) -> Result<&[u8], String> { + pub async fn get(&mut self, index: usize) -> Result<&[u8], String> { let offsets = self .indices .get(&index) .ok_or("record doesn't exist.".to_string())?; - let data_size = offsets.end - offsets.start; + let data_size = offsets.end() - offsets.start(); if data_size > self.retrivable_buffer.len() { return Err(format!( @@ -76,17 +66,27 @@ impl Storage { } return self - .seek_bytes_between(offsets.start, data_size) + .seek_bytes_between(offsets.start(), data_size) + .await .map_err(|e| format!("Error in Storage (get): {}", e)); } - fn seek_bytes_between(&mut self, start: usize, data_size: usize) -> io::Result<&[u8]> { + async fn seek_bytes_between(&mut self, start: usize, data_size: usize) -> io::Result<&[u8]> { let mut file = &*self.file; - file.seek(std::io::SeekFrom::Start(start as u64))?; - file.read_exact(&mut self.retrivable_buffer[..data_size])?; - file.rewind()?; + file.seek(SeekFrom::Start(start as u64)).await?; + let n = file.read(&mut self.retrivable_buffer[..data_size]).await?; + if n == 0 { + // Theoratically should never get here + panic!("Got 0 bytes in file read") + }; + self.rewind().await?; return Ok(&self.retrivable_buffer[..data_size]); } + + async fn rewind(&mut self) -> io::Result { + let mut file = &*self.file; + file.seek(SeekFrom::Start(0)).await + } } #[cfg(test)] diff --git a/nyx-storage/src/offsets.rs b/nyx-storage/src/offsets.rs new file mode 100644 index 0000000..83cce21 --- /dev/null +++ b/nyx-storage/src/offsets.rs @@ -0,0 +1,26 @@ +#[derive(Clone, Copy)] +pub struct Offsets { + start: usize, + end: usize, +} + +impl Offsets { + pub fn new(start: usize, end: usize) -> Result { + if start >= end { + return Err(format!( + "Start ({}) can't be greater or equal to end ({})", + start, end + )); + } + + Ok(Self { start, end }) + } + + pub fn start(&self) -> usize { + return self.start; + } + + pub fn end(&self) -> usize { + return self.end; + } +} diff --git a/nyx-storage/src/write_queue.rs b/nyx-storage/src/write_queue.rs index 07eb891..64e01f3 100644 --- a/nyx-storage/src/write_queue.rs +++ b/nyx-storage/src/write_queue.rs @@ -1,9 +1,15 @@ -use async_std::channel::Receiver; +use std::io; + +use async_std::{channel::Receiver, fs::OpenOptions, io::WriteExt, path::Path}; pub struct WriteQueue; impl WriteQueue { - pub async fn run(queue: Receiver>) -> Result<(), String> { + pub async fn run(queue: Receiver>, path: &Path) -> io::Result<()> { + let mut data_file = OpenOptions::new().append(true).open(path).await?; + + data_file.write_all(b"I am trying to write").await?; + while let Ok(data) = queue.recv().await { println!("Do something with received data: {:?}", data); }