diff --git a/src/tree_store/page_store/cached_file.rs b/src/tree_store/page_store/cached_file.rs index 8f96d61b..79bdada5 100644 --- a/src/tree_store/page_store/cached_file.rs +++ b/src/tree_store/page_store/cached_file.rs @@ -206,11 +206,6 @@ impl PrioritizedWriteCache { } None } - - fn clear(&mut self) { - self.cache.clear(); - self.low_pri_cache.clear(); - } } pub(super) struct PagedCachedFile { @@ -237,10 +232,9 @@ impl PagedCachedFile { max_read_cache_bytes: usize, max_write_buffer_bytes: usize, ) -> Result { - let mut read_cache = Vec::with_capacity(Self::lock_stripes()); - for _ in 0..Self::lock_stripes() { - read_cache.push(RwLock::new(PrioritizedCache::new())); - } + let read_cache = (0..Self::lock_stripes()) + .map(|_| RwLock::new(PrioritizedCache::new())) + .collect(); Ok(Self { file, @@ -263,7 +257,7 @@ impl PagedCachedFile { self.file.len().map_err(StorageError::from) } - const fn lock_stripes() -> usize { + const fn lock_stripes() -> u64 { 131 } @@ -285,14 +279,27 @@ impl PagedCachedFile { self.check_fsync_failure()?; let mut write_buffer = self.write_buffer.lock().unwrap(); - for (offset, buffer) in write_buffer.cache.iter() { - self.file.write(*offset, buffer.as_ref().unwrap())?; + while let Some((offset, buffer)) = write_buffer.cache.pop_first() { + let buffer = buffer.unwrap(); + + self.file.write(offset, &buffer)?; + + let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap(); + let mut lock = self.read_cache[cache_slot].write().unwrap(); + lock.insert(offset, buffer, CachePriority::High); } - for (offset, buffer) in write_buffer.low_pri_cache.iter() { - self.file.write(*offset, buffer.as_ref().unwrap())?; + while let Some((offset, buffer)) = write_buffer.low_pri_cache.pop_first() { + let buffer = buffer.unwrap(); + + self.file.write(offset, &buffer)?; + + let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap(); + let mut lock = self.read_cache[cache_slot].write().unwrap(); + lock.insert(offset, buffer, CachePriority::Low); } - self.write_buffer_bytes.store(0, Ordering::Release); - write_buffer.clear(); + + let bytes = self.write_buffer_bytes.swap(0, Ordering::Release); + self.read_cache_bytes.fetch_add(bytes, Ordering::Release); Ok(()) } @@ -353,7 +360,7 @@ impl PagedCachedFile { } } - let cache_slot: usize = (offset % Self::lock_stripes() as u64).try_into().unwrap(); + let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap(); { let read_lock = self.read_cache[cache_slot].read().unwrap(); if let Some(cached) = read_lock.get(&offset) { @@ -398,7 +405,7 @@ impl PagedCachedFile { // // NOTE: Invalidating a cached region in subsections is permitted, as long as all subsections are invalidated pub(super) fn invalidate_cache(&self, offset: u64, len: usize) { - let cache_slot: usize = (offset % self.read_cache.len() as u64).try_into().unwrap(); + let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap(); let mut lock = self.read_cache[cache_slot].write().unwrap(); if let Some(removed) = lock.remove(&offset) { assert_eq!(len, removed.len()); @@ -431,7 +438,7 @@ impl PagedCachedFile { let mut lock = self.write_buffer.lock().unwrap(); // TODO: allow hint that page is known to be dirty and will not be in the read cache - let cache_slot: usize = (offset % self.read_cache.len() as u64).try_into().unwrap(); + let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap(); let existing = { let mut lock = self.read_cache[cache_slot].write().unwrap(); if let Some(removed) = lock.remove(&offset) {