From 50740f366757cfc239d7ae6e7b53cbf8bba9c7b6 Mon Sep 17 00:00:00 2001 From: Dan Brownstein Date: Sat, 14 Oct 2023 21:34:06 +0300 Subject: [PATCH] fix(storage): don't drop mmap when writer is dropped --- .../src/mmap_file/mmap_file_test.rs | 50 +++++++--- crates/papyrus_storage/src/mmap_file/mod.rs | 92 ++++++++++--------- 2 files changed, 87 insertions(+), 55 deletions(-) diff --git a/crates/papyrus_storage/src/mmap_file/mmap_file_test.rs b/crates/papyrus_storage/src/mmap_file/mmap_file_test.rs index dc2a22d998..f398c4815b 100644 --- a/crates/papyrus_storage/src/mmap_file/mmap_file_test.rs +++ b/crates/papyrus_storage/src/mmap_file/mmap_file_test.rs @@ -38,13 +38,9 @@ fn write_read() { let len = writer.insert(offset, &data); let res_writer = writer.get(LocationInFile { offset, len }).unwrap(); - assert_eq!(res_writer, data); + assert_eq!(res_writer.unwrap(), data); - let another_reader = reader; - let res: Vec = reader.get(LocationInFile { offset, len }).unwrap(); - assert_eq!(res, data); - - let res: Vec = another_reader.get(LocationInFile { offset, len }).unwrap(); + let res: Vec = reader.get(LocationInFile { offset, len }).unwrap().unwrap(); assert_eq!(res, data); dir.close().unwrap(); @@ -66,12 +62,13 @@ fn concurrent_reads() { let mut handles = vec![]; for _ in 0..num_threads { + let reader = reader.clone(); let handle = std::thread::spawn(move || reader.get(location_in_file).unwrap()); handles.push(handle); } for handle in handles { - let res: Vec = handle.join().unwrap(); + let res: Vec = handle.join().unwrap().unwrap(); assert_eq!(res, data); } @@ -99,13 +96,11 @@ fn concurrent_reads_single_write() { let mut handles = Vec::with_capacity(n); for _ in 0..n { + let reader = reader.clone(); let reader_barrier = barrier.clone(); let first_data = first_data.clone(); handles.push(std::thread::spawn(move || { - assert_eq!( - >>::get(&reader, first_location).unwrap(), - first_data - ); + assert_eq!(reader.get(first_location).unwrap().unwrap(), first_data); reader_barrier.wait(); // readers wait for the writer to write the value. reader_barrier.wait(); @@ -120,7 +115,7 @@ fn concurrent_reads_single_write() { barrier.wait(); for handle in handles { - let res: Vec = handle.join().unwrap(); + let res: Vec = handle.join().unwrap().unwrap(); assert_eq!(res, second_data); } } @@ -196,7 +191,11 @@ async fn write_read_different_locations() { let barrier = Arc::new(Barrier::new(n_readers_per_phase + 1)); let lock = Arc::new(RwLock::new(0)); - async fn reader_task(reader: FileReader, lock: Arc>, barrier: Arc) { + async fn reader_task( + reader: FileReader>, + lock: Arc>, + barrier: Arc, + ) { barrier.wait().await; let round: usize; { @@ -204,7 +203,7 @@ async fn write_read_different_locations() { } let read_offset = 3 * rand::thread_rng().gen_range(0..round + 1); let read_location = LocationInFile { offset: read_offset, len: LEN }; - let read_value: Vec = reader.get(read_location).unwrap(); + let read_value: Vec = reader.get(read_location).unwrap().unwrap(); let first_expected_value: u8 = (read_offset / 3 * 2).try_into().unwrap(); let expected_value = vec![first_expected_value, first_expected_value + 1]; assert_eq!(read_value, expected_value); @@ -213,6 +212,7 @@ async fn write_read_different_locations() { let mut handles = Vec::new(); for round in 0..ROUNDS { for _ in 0..n_readers_per_phase { + let reader = reader.clone(); handles.push(tokio::spawn(reader_task(reader, lock.clone(), barrier.clone()))); } @@ -230,3 +230,25 @@ async fn write_read_different_locations() { handle.await.unwrap(); } } + +#[test] +fn reader_when_writer_is_out_of_scope() { + let dir = tempdir().unwrap(); + let (mut writer, reader) = open_file( + get_test_config(), + dir.path().to_path_buf().join("test_reader_when_writer_is_out_of_scope"), + ) + .unwrap(); + let data: Vec = vec![1, 2, 3]; + let offset = 0; + + let len = writer.insert(offset, &data); + let res: Vec = reader.get(LocationInFile { offset, len }).unwrap().unwrap(); + assert_eq!(res, data); + + drop(writer); + let res: Vec = reader.get(LocationInFile { offset, len }).unwrap().unwrap(); + assert_eq!(res, data); + + dir.close().unwrap(); +} diff --git a/crates/papyrus_storage/src/mmap_file/mod.rs b/crates/papyrus_storage/src/mmap_file/mod.rs index 0c17ca1481..31ae2032ef 100644 --- a/crates/papyrus_storage/src/mmap_file/mod.rs +++ b/crates/papyrus_storage/src/mmap_file/mod.rs @@ -12,6 +12,7 @@ use std::fs::{File, OpenOptions}; use std::marker::PhantomData; use std::path::PathBuf; use std::result; +use std::sync::{Arc, Mutex}; use memmap2::{MmapMut, MmapOptions}; use serde::{Deserialize, Serialize}; @@ -59,9 +60,13 @@ fn validate_config(config: &MmapFileConfig) -> result::Result<(), ValidationErro /// Errors associated with [`MMapFile`]. #[derive(Debug, Error)] pub enum MMapFileError { - #[error(transparent)] /// IO error. + #[error(transparent)] IO(#[from] std::io::Error), + + /// Number conversion error. + #[error(transparent)] + TryFromInt(#[from] std::num::TryFromIntError), } /// A trait for writing to a memory mapped file. @@ -73,7 +78,7 @@ pub trait Writer { /// A trait for reading from a memory mapped file. pub trait Reader { /// Returns an object from the file. - fn get(&self, location: LocationInFile) -> Option; + fn get(&self, location: LocationInFile) -> MmapFileResult>; } /// Represents a location in the file. @@ -95,22 +100,25 @@ impl LocationInFile { /// A wrapper around `MMapFile` that provides a write interface. pub struct FileWriter { - mmap_file: MMapFile, + memory_ptr: *const u8, + mmap_file: Arc>>, } impl FileWriter { /// Flushes the mmap to the file. #[allow(dead_code)] pub(crate) fn flush(&self) { - self.mmap_file.flush(); + let mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned"); + mmap_file.flush(); } fn grow_file_if_needed(&mut self, offset: usize) { - if self.mmap_file.size < offset + self.mmap_file.config.max_object_size { + let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned"); + if mmap_file.size < offset + mmap_file.config.max_object_size { debug!( "Attempting to grow file. File size: {}, offset: {}, max_object_size: {}", - self.mmap_file.size, offset, self.mmap_file.config.max_object_size + mmap_file.size, offset, mmap_file.config.max_object_size ); - self.mmap_file.grow(); + mmap_file.grow(); } } } @@ -120,14 +128,17 @@ impl Writer for FileWriter { fn insert(&mut self, offset: usize, val: &V) -> usize { debug!("Inserting object at offset: {}", offset); trace!("Inserting object: {:?}", val); - let mut mmap_slice = &mut self.mmap_file.mmap[offset..]; // TODO(dan): change serialize_into to return serialization size. - let _ = val.serialize_into(&mut mmap_slice); let len = val.serialize().expect("Should be able to serialize").len(); - self.mmap_file - .mmap - .flush_async_range(offset, len) - .expect("Failed to asynchronously flush the mmap after inserting"); + { + let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned"); + let mut mmap_slice = &mut mmap_file.mmap[offset..]; + let _ = val.serialize_into(&mut mmap_slice); + mmap_file + .mmap + .flush_async_range(offset, len) + .expect("Failed to asynchronously flush the mmap after inserting"); + } self.grow_file_if_needed(offset + len); len } @@ -135,36 +146,45 @@ impl Writer for FileWriter { impl Reader for FileWriter { /// Returns an object from the file. - fn get(&self, location: LocationInFile) -> Option { - self.mmap_file.get(location) + fn get(&self, location: LocationInFile) -> MmapFileResult> { + debug!("Reading object at location: {:?}", location); + let mut bytes = unsafe { + std::slice::from_raw_parts( + self.memory_ptr.offset(location.offset.try_into()?), + location.len, + ) + }; + trace!("Deserializing object: {:?}", bytes); + Ok(V::deserialize(&mut bytes)) } } /// A wrapper around `MMapFile` that provides a read interface. -#[derive(Clone, Copy, Debug)] -pub struct FileReader { - shared_data: *const u8, +#[derive(Clone, Debug)] +pub struct FileReader { + memory_ptr: *const u8, + _mmap_file: Arc>>, } -unsafe impl Send for FileReader {} -unsafe impl Sync for FileReader {} +unsafe impl Send for FileReader {} +unsafe impl Sync for FileReader {} -impl Reader for FileReader { +impl Reader for FileReader { /// Returns an object from the file. - fn get(&self, location: LocationInFile) -> Option { + fn get(&self, location: LocationInFile) -> MmapFileResult> { debug!("Reading object at location: {:?}", location); let mut bytes = unsafe { std::slice::from_raw_parts( - self.shared_data - .offset(location.offset.try_into().expect("offset should fit in usize")), + self.memory_ptr.offset(location.offset.try_into()?), location.len, ) }; trace!("Deserializing object: {:?}", bytes); - V::deserialize(&mut bytes) + Ok(V::deserialize(&mut bytes)) } } /// Represents a memory mapped append only file. +#[derive(Debug)] pub struct MMapFile { config: MmapFileConfig, file: File, @@ -174,19 +194,7 @@ pub struct MMapFile { } impl MMapFile { - /// Returns an object from the file. - fn get(&self, location: LocationInFile) -> Option { - debug!("Reading object at location: {:?}", location); - let bytes: std::borrow::Cow<'_, [u8]> = self.get_raw(location); - trace!("Deserializing object: {:?}", bytes.as_ref()); - V::deserialize(&mut bytes.as_ref()) - } - - /// Returns a COW pointer to a slice of the file. - fn get_raw(&self, location: LocationInFile) -> std::borrow::Cow<'_, [u8]> { - std::borrow::Cow::from(&self.mmap[location.offset..(location.offset + location.len)]) - } - + /// Grows the file by the growth step. fn grow(&mut self) { self.flush(); let new_size = self.size + self.config.growth_step; @@ -207,13 +215,14 @@ impl MMapFile { pub(crate) fn open_file( config: MmapFileConfig, path: PathBuf, -) -> MmapFileResult<(FileWriter, FileReader)> { +) -> MmapFileResult<(FileWriter, FileReader)> { debug!("Opening file"); // TODO: move validation to caller. config.validate().expect("Invalid config"); let file = OpenOptions::new().read(true).write(true).create(true).open(path)?; let size = file.metadata()?.len(); let mmap = unsafe { MmapOptions::new().len(config.max_size).map_mut(&file)? }; + let mmap_ptr = mmap.as_ptr(); let mmap_file = MMapFile { config, file, @@ -221,8 +230,9 @@ pub(crate) fn open_file( size: size.try_into().expect("size should fit in usize"), _value_type: PhantomData {}, }; - let reader = FileReader { shared_data: mmap_file.mmap.as_ptr() }; - let mut writer = FileWriter { mmap_file }; + let shared_mmap_file = Arc::new(Mutex::new(mmap_file)); + let reader = FileReader { memory_ptr: mmap_ptr, _mmap_file: shared_mmap_file.clone() }; + let mut writer = FileWriter { memory_ptr: mmap_ptr, mmap_file: shared_mmap_file }; writer.grow_file_if_needed(0); Ok((writer, reader)) }