diff --git a/crates/papyrus_storage/src/mmap_file/mod.rs b/crates/papyrus_storage/src/mmap_file/mod.rs index 31ae2032ef..30ddb29fee 100644 --- a/crates/papyrus_storage/src/mmap_file/mod.rs +++ b/crates/papyrus_storage/src/mmap_file/mod.rs @@ -73,6 +73,9 @@ pub enum MMapFileError { pub trait Writer { /// Inserts an object to the file, returns the number of bytes written. fn insert(&mut self, offset: usize, val: &V) -> usize; + + /// Flushes the mmap to the file. + fn flush(&self); } /// A trait for reading from a memory mapped file. @@ -98,72 +101,32 @@ impl LocationInFile { } } -/// A wrapper around `MMapFile` that provides a write interface. +/// A wrapper around `FileHandler` that provides a write interface. +#[derive(Debug)] pub struct FileWriter { - memory_ptr: *const u8, - mmap_file: Arc>>, -} -impl FileWriter { - /// Flushes the mmap to the file. - #[allow(dead_code)] - pub(crate) fn flush(&self) { - let mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned"); - mmap_file.flush(); - } - - fn grow_file_if_needed(&mut self, offset: usize) { - let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned"); - if mmap_file.size < offset + mmap_file.config.max_object_size { - debug!( - "Attempting to grow file. File size: {}, offset: {}, max_object_size: {}", - mmap_file.size, offset, mmap_file.config.max_object_size - ); - mmap_file.grow(); - } - } + file_handler: FileHandler, } impl Writer for FileWriter { - /// Inserts an object to the file, returns the number of bytes written. Grow file if needed. fn insert(&mut self, offset: usize, val: &V) -> usize { - debug!("Inserting object at offset: {}", offset); - trace!("Inserting object: {:?}", val); - // TODO(dan): change serialize_into to return serialization size. - let len = val.serialize().expect("Should be able to serialize").len(); - { - let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned"); - let mut mmap_slice = &mut mmap_file.mmap[offset..]; - let _ = val.serialize_into(&mut mmap_slice); - mmap_file - .mmap - .flush_async_range(offset, len) - .expect("Failed to asynchronously flush the mmap after inserting"); - } - self.grow_file_if_needed(offset + len); - len + self.file_handler.insert(offset, val) + } + + fn flush(&self) { + self.file_handler.flush(); } } impl Reader for FileWriter { - /// Returns an object from the file. fn get(&self, location: LocationInFile) -> MmapFileResult> { - debug!("Reading object at location: {:?}", location); - let mut bytes = unsafe { - std::slice::from_raw_parts( - self.memory_ptr.offset(location.offset.try_into()?), - location.len, - ) - }; - trace!("Deserializing object: {:?}", bytes); - Ok(V::deserialize(&mut bytes)) + self.file_handler.get(location) } } -/// A wrapper around `MMapFile` that provides a read interface. +/// A wrapper around `FileHandler` that provides a read interface. #[derive(Clone, Debug)] pub struct FileReader { - memory_ptr: *const u8, - _mmap_file: Arc>>, + file_handler: FileHandler, } unsafe impl Send for FileReader {} unsafe impl Sync for FileReader {} @@ -171,15 +134,7 @@ unsafe impl Sync for FileReader {} impl Reader for FileReader { /// Returns an object from the file. fn get(&self, location: LocationInFile) -> MmapFileResult> { - debug!("Reading object at location: {:?}", location); - let mut bytes = unsafe { - std::slice::from_raw_parts( - self.memory_ptr.offset(location.offset.try_into()?), - location.len, - ) - }; - trace!("Deserializing object: {:?}", bytes); - Ok(V::deserialize(&mut bytes)) + self.file_handler.get(location) } } @@ -231,8 +186,74 @@ pub(crate) fn open_file( _value_type: PhantomData {}, }; let shared_mmap_file = Arc::new(Mutex::new(mmap_file)); - let reader = FileReader { memory_ptr: mmap_ptr, _mmap_file: shared_mmap_file.clone() }; - let mut writer = FileWriter { memory_ptr: mmap_ptr, mmap_file: shared_mmap_file }; - writer.grow_file_if_needed(0); + + let mut file_handler = + FileHandler { memory_ptr: mmap_ptr, mmap_file: shared_mmap_file.clone() }; + file_handler.grow_file_if_needed(0); + let writer = FileWriter { file_handler }; + + let file_handler = FileHandler { memory_ptr: mmap_ptr, mmap_file: shared_mmap_file }; + let reader = FileReader { file_handler }; + Ok((writer, reader)) } + +/// A wrapper around `MMapFile` that provides both write and read interfaces. +#[derive(Clone, Debug)] +struct FileHandler { + memory_ptr: *const u8, + mmap_file: Arc>>, +} + +impl FileHandler { + fn grow_file_if_needed(&mut self, offset: usize) { + let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned"); + if mmap_file.size < offset + mmap_file.config.max_object_size { + debug!( + "Attempting to grow file. File size: {}, offset: {}, max_object_size: {}", + mmap_file.size, offset, mmap_file.config.max_object_size + ); + mmap_file.grow(); + } + } +} + +impl Writer for FileHandler { + fn insert(&mut self, offset: usize, val: &V) -> usize { + debug!("Inserting object at offset: {}", offset); + trace!("Inserting object: {:?}", val); + // TODO(dan): change serialize_into to return serialization size. + let len = val.serialize().expect("Should be able to serialize").len(); + { + let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned"); + let mut mmap_slice = &mut mmap_file.mmap[offset..]; + let _ = val.serialize_into(&mut mmap_slice); + mmap_file + .mmap + .flush_async_range(offset, len) + .expect("Failed to asynchronously flush the mmap after inserting"); + } + self.grow_file_if_needed(offset + len); + len + } + + fn flush(&self) { + let mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned"); + mmap_file.flush(); + } +} + +impl Reader for FileHandler { + /// Returns an object from the file. + fn get(&self, location: LocationInFile) -> MmapFileResult> { + debug!("Reading object at location: {:?}", location); + let mut bytes = unsafe { + std::slice::from_raw_parts( + self.memory_ptr.offset(location.offset.try_into()?), + location.len, + ) + }; + trace!("Deserializing object: {:?}", bytes); + Ok(V::deserialize(&mut bytes)) + } +}