Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): use common file handler #1281

Merged
merged 2 commits into from
Oct 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 84 additions & 63 deletions crates/papyrus_storage/src/mmap_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ pub enum MMapFileError {
pub trait Writer<V: StorageSerde> {
/// Inserts an object to the file, returns the number of bytes written.
fn insert(&mut self, offset: usize, val: &V) -> usize;

/// Flushes the mmap to the file.
fn flush(&self);
}

/// A trait for reading from a memory mapped file.
Expand All @@ -98,88 +101,40 @@ impl LocationInFile {
}
}

/// A wrapper around `MMapFile` that provides a write interface.
/// A wrapper around `FileHandler` that provides a write interface.
#[derive(Debug)]
pub struct FileWriter<V: StorageSerde> {
memory_ptr: *const u8,
mmap_file: Arc<Mutex<MMapFile<V>>>,
}
impl<V: StorageSerde> FileWriter<V> {
/// Flushes the mmap to the file.
#[allow(dead_code)]
pub(crate) fn flush(&self) {
let mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
mmap_file.flush();
}

fn grow_file_if_needed(&mut self, offset: usize) {
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
if mmap_file.size < offset + mmap_file.config.max_object_size {
debug!(
"Attempting to grow file. File size: {}, offset: {}, max_object_size: {}",
mmap_file.size, offset, mmap_file.config.max_object_size
);
mmap_file.grow();
}
}
file_handler: FileHandler<V>,
}

impl<V: StorageSerde + Debug> Writer<V> for FileWriter<V> {
/// Inserts an object to the file, returns the number of bytes written. Grow file if needed.
fn insert(&mut self, offset: usize, val: &V) -> usize {
debug!("Inserting object at offset: {}", offset);
trace!("Inserting object: {:?}", val);
// TODO(dan): change serialize_into to return serialization size.
let len = val.serialize().expect("Should be able to serialize").len();
{
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
let mut mmap_slice = &mut mmap_file.mmap[offset..];
let _ = val.serialize_into(&mut mmap_slice);
mmap_file
.mmap
.flush_async_range(offset, len)
.expect("Failed to asynchronously flush the mmap after inserting");
}
self.grow_file_if_needed(offset + len);
len
self.file_handler.insert(offset, val)
}

fn flush(&self) {
self.file_handler.flush();
}
}

impl<V: StorageSerde> Reader<V> for FileWriter<V> {
/// Returns an object from the file.
fn get(&self, location: LocationInFile) -> MmapFileResult<Option<V>> {
debug!("Reading object at location: {:?}", location);
let mut bytes = unsafe {
std::slice::from_raw_parts(
self.memory_ptr.offset(location.offset.try_into()?),
location.len,
)
};
trace!("Deserializing object: {:?}", bytes);
Ok(V::deserialize(&mut bytes))
self.file_handler.get(location)
}
}

/// A wrapper around `MMapFile` that provides a read interface.
/// A wrapper around `FileHandler` that provides a read interface.
#[derive(Clone, Debug)]
pub struct FileReader<V: StorageSerde> {
memory_ptr: *const u8,
_mmap_file: Arc<Mutex<MMapFile<V>>>,
file_handler: FileHandler<V>,
}
unsafe impl<V: StorageSerde> Send for FileReader<V> {}
unsafe impl<V: StorageSerde> Sync for FileReader<V> {}

impl<V: StorageSerde> Reader<V> for FileReader<V> {
/// Returns an object from the file.
fn get(&self, location: LocationInFile) -> MmapFileResult<Option<V>> {
debug!("Reading object at location: {:?}", location);
let mut bytes = unsafe {
std::slice::from_raw_parts(
self.memory_ptr.offset(location.offset.try_into()?),
location.len,
)
};
trace!("Deserializing object: {:?}", bytes);
Ok(V::deserialize(&mut bytes))
self.file_handler.get(location)
}
}

Expand Down Expand Up @@ -231,8 +186,74 @@ pub(crate) fn open_file<V: StorageSerde>(
_value_type: PhantomData {},
};
let shared_mmap_file = Arc::new(Mutex::new(mmap_file));
let reader = FileReader { memory_ptr: mmap_ptr, _mmap_file: shared_mmap_file.clone() };
let mut writer = FileWriter { memory_ptr: mmap_ptr, mmap_file: shared_mmap_file };
writer.grow_file_if_needed(0);

let mut file_handler =
FileHandler { memory_ptr: mmap_ptr, mmap_file: shared_mmap_file.clone() };
file_handler.grow_file_if_needed(0);
let writer = FileWriter { file_handler };

let file_handler = FileHandler { memory_ptr: mmap_ptr, mmap_file: shared_mmap_file };
let reader = FileReader { file_handler };

Ok((writer, reader))
}

/// A wrapper around `MMapFile` that provides both write and read interfaces.
#[derive(Clone, Debug)]
struct FileHandler<V: StorageSerde> {
memory_ptr: *const u8,
mmap_file: Arc<Mutex<MMapFile<V>>>,
}

impl<V: StorageSerde> FileHandler<V> {
fn grow_file_if_needed(&mut self, offset: usize) {
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
if mmap_file.size < offset + mmap_file.config.max_object_size {
debug!(
"Attempting to grow file. File size: {}, offset: {}, max_object_size: {}",
mmap_file.size, offset, mmap_file.config.max_object_size
);
mmap_file.grow();
}
}
}

impl<V: StorageSerde + Debug> Writer<V> for FileHandler<V> {
fn insert(&mut self, offset: usize, val: &V) -> usize {
debug!("Inserting object at offset: {}", offset);
trace!("Inserting object: {:?}", val);
// TODO(dan): change serialize_into to return serialization size.
let len = val.serialize().expect("Should be able to serialize").len();
{
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
let mut mmap_slice = &mut mmap_file.mmap[offset..];
let _ = val.serialize_into(&mut mmap_slice);
mmap_file
.mmap
.flush_async_range(offset, len)
.expect("Failed to asynchronously flush the mmap after inserting");
}
self.grow_file_if_needed(offset + len);
len
}

fn flush(&self) {
let mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
mmap_file.flush();
}
}

impl<V: StorageSerde> Reader<V> for FileHandler<V> {
/// Returns an object from the file.
fn get(&self, location: LocationInFile) -> MmapFileResult<Option<V>> {
debug!("Reading object at location: {:?}", location);
let mut bytes = unsafe {
std::slice::from_raw_parts(
self.memory_ptr.offset(location.offset.try_into()?),
location.len,
)
};
trace!("Deserializing object: {:?}", bytes);
Ok(V::deserialize(&mut bytes))
}
}
Loading