diff --git a/src/tree_store/page_store/cached_file.rs b/src/tree_store/page_store/cached_file.rs index 8f96d61b..2c927f3a 100644 --- a/src/tree_store/page_store/cached_file.rs +++ b/src/tree_store/page_store/cached_file.rs @@ -225,7 +225,7 @@ pub(super) struct PagedCachedFile { #[cfg(feature = "cache_metrics")] reads_hits: AtomicU64, fsync_failed: AtomicBool, - read_cache: Vec>, + read_cache: Box<[RwLock]>, // TODO: maybe move this cache to WriteTransaction? write_buffer: Arc>, } @@ -237,10 +237,9 @@ impl PagedCachedFile { max_read_cache_bytes: usize, max_write_buffer_bytes: usize, ) -> Result { - let mut read_cache = Vec::with_capacity(Self::lock_stripes()); - for _ in 0..Self::lock_stripes() { - read_cache.push(RwLock::new(PrioritizedCache::new())); - } + let read_cache = (0..Self::lock_stripes()) + .map(|_| RwLock::new(PrioritizedCache::new())) + .collect(); Ok(Self { file, @@ -263,7 +262,7 @@ impl PagedCachedFile { self.file.len().map_err(StorageError::from) } - const fn lock_stripes() -> usize { + const fn lock_stripes() -> u64 { 131 } @@ -286,10 +285,40 @@ impl PagedCachedFile { let mut write_buffer = self.write_buffer.lock().unwrap(); for (offset, buffer) in write_buffer.cache.iter() { - self.file.write(*offset, buffer.as_ref().unwrap())?; + let buffer = buffer.as_ref().unwrap(); + + self.file.write(*offset, buffer)?; + + let cache_size = self + .read_cache_bytes + .fetch_add(buffer.len(), Ordering::AcqRel); + + if cache_size + buffer.len() <= self.max_read_cache_bytes { + let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap(); + let mut lock = self.read_cache[cache_slot].write().unwrap(); + lock.insert(*offset, buffer.clone(), CachePriority::High); + } else { + self.read_cache_bytes + .fetch_sub(buffer.len(), Ordering::AcqRel); + } } for (offset, buffer) in write_buffer.low_pri_cache.iter() { - self.file.write(*offset, buffer.as_ref().unwrap())?; + let buffer = buffer.as_ref().unwrap(); + + self.file.write(*offset, &buffer)?; + + let cache_size = self + .read_cache_bytes + .fetch_add(buffer.len(), Ordering::AcqRel); + + if cache_size + buffer.len() <= self.max_read_cache_bytes { + let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap(); + let mut lock = self.read_cache[cache_slot].write().unwrap(); + lock.insert(*offset, buffer.clone(), CachePriority::Low); + } else { + self.read_cache_bytes + .fetch_sub(buffer.len(), Ordering::AcqRel); + } } self.write_buffer_bytes.store(0, Ordering::Release); write_buffer.clear(); @@ -353,7 +382,7 @@ impl PagedCachedFile { } } - let cache_slot: usize = (offset % Self::lock_stripes() as u64).try_into().unwrap(); + let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap(); { let read_lock = self.read_cache[cache_slot].read().unwrap(); if let Some(cached) = read_lock.get(&offset) { @@ -398,7 +427,7 @@ impl PagedCachedFile { // // NOTE: Invalidating a cached region in subsections is permitted, as long as all subsections are invalidated pub(super) fn invalidate_cache(&self, offset: u64, len: usize) { - let cache_slot: usize = (offset % self.read_cache.len() as u64).try_into().unwrap(); + let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap(); let mut lock = self.read_cache[cache_slot].write().unwrap(); if let Some(removed) = lock.remove(&offset) { assert_eq!(len, removed.len()); @@ -431,7 +460,7 @@ impl PagedCachedFile { let mut lock = self.write_buffer.lock().unwrap(); // TODO: allow hint that page is known to be dirty and will not be in the read cache - let cache_slot: usize = (offset % self.read_cache.len() as u64).try_into().unwrap(); + let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap(); let existing = { let mut lock = self.read_cache[cache_slot].write().unwrap(); if let Some(removed) = lock.remove(&offset) {