From 2020827378b49feb7ecf5216bb40bd0cf2dda67f Mon Sep 17 00:00:00 2001 From: Christopher Berner Date: Thu, 8 Jun 2023 21:31:08 -0700 Subject: [PATCH] Prioritize caching of branch pages over leaf pages This improves cache constrained write performance by ~30% in the benchmarks, and even more in larger database workloads --- src/tree_store/page_store/cached_file.rs | 97 ++++++++++++++++++------ 1 file changed, 75 insertions(+), 22 deletions(-) diff --git a/src/tree_store/page_store/cached_file.rs b/src/tree_store/page_store/cached_file.rs index 8a700c4b..963acef9 100644 --- a/src/tree_store/page_store/cached_file.rs +++ b/src/tree_store/page_store/cached_file.rs @@ -1,5 +1,6 @@ use crate::tree_store::page_store::base::PageHint; use crate::tree_store::page_store::file_lock::LockedFile; +use crate::tree_store::LEAF; use crate::{DatabaseError, Result, StorageError}; use std::collections::BTreeMap; use std::fs::File; @@ -15,7 +16,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; pub(super) struct WritablePage<'a> { - buffer: &'a Mutex>>>, + buffer: &'a Mutex, offset: u64, data: Vec, } @@ -33,11 +34,13 @@ impl<'a> WritablePage<'a> { impl<'a> Drop for WritablePage<'a> { fn drop(&mut self) { let data = mem::take(&mut self.data); + // TODO: it's quite a hack to check the leaf/branch byte here + let low_pri = data[0] == LEAF; assert!(self .buffer .lock() .unwrap() - .insert(self.offset, Arc::new(data)) + .insert(self.offset, Arc::new(data), low_pri) .is_none()); } } @@ -56,6 +59,58 @@ impl<'a, I: SliceIndex<[u8]>> IndexMut for WritablePage<'a> { } } +#[derive(Default)] +struct PrioritizedCache { + cache: BTreeMap>>, + low_pri_cache: BTreeMap>>, +} + +impl PrioritizedCache { + fn new() -> Self { + Self { + cache: Default::default(), + low_pri_cache: Default::default(), + } + } + + fn insert(&mut self, key: u64, value: Arc>, low_pri: bool) -> Option>> { + if low_pri { + self.low_pri_cache.insert(key, value) + } else { + self.cache.insert(key, value) + } + } + + fn remove(&mut self, key: &u64) -> Option>> { + let result = self.cache.remove(key); + if result.is_some() { + return result; + } + self.low_pri_cache.remove(key) + } + + fn get(&self, key: &u64) -> Option<&Arc>> { + let result = self.cache.get(key); + if result.is_some() { + return result; + } + self.low_pri_cache.get(key) + } + + fn pop_lowest_priority(&mut self) -> Option<(u64, Arc>)> { + let result = self.low_pri_cache.pop_first(); + if result.is_some() { + return result; + } + self.cache.pop_first() + } + + fn clear(&mut self) { + self.cache.clear(); + self.low_pri_cache.clear(); + } +} + pub(super) struct PagedCachedFile { file: LockedFile, page_size: u64, @@ -68,9 +123,9 @@ pub(super) struct PagedCachedFile { #[cfg(feature = "cache_metrics")] reads_hits: AtomicU64, fsync_failed: AtomicBool, - read_cache: Vec>>>>, + read_cache: Vec>, // TODO: maybe move this cache to WriteTransaction? - write_buffer: Mutex>>>, + write_buffer: Mutex, #[cfg(any(fuzzing, test))] crash_countdown: AtomicU64, } @@ -84,7 +139,7 @@ impl PagedCachedFile { ) -> Result { let mut read_cache = Vec::with_capacity(Self::lock_stripes()); for _ in 0..Self::lock_stripes() { - read_cache.push(RwLock::new(BTreeMap::new())); + read_cache.push(RwLock::new(PrioritizedCache::new())); } let lock = LockedFile::new(file)?; @@ -109,7 +164,7 @@ impl PagedCachedFile { reads_hits: Default::default(), fsync_failed: Default::default(), read_cache, - write_buffer: Mutex::new(BTreeMap::new()), + write_buffer: Mutex::new(PrioritizedCache::new()), #[cfg(any(fuzzing, test))] crash_countdown: AtomicU64::new(u64::MAX), }) @@ -146,13 +201,14 @@ impl PagedCachedFile { fn flush_write_buffer(&self) -> Result { self.check_fsync_failure()?; let mut write_buffer = std::mem::take(self.write_buffer.lock().unwrap().deref_mut()); - let total_bytes: usize = write_buffer.values().map(|buffer| buffer.len()).sum(); - self.write_buffer_bytes - .fetch_sub(total_bytes, Ordering::Release); - for (offset, buffer) in write_buffer.iter() { + for (offset, buffer) in write_buffer.cache.iter() { + self.file.write(*offset, buffer)?; + } + for (offset, buffer) in write_buffer.low_pri_cache.iter() { self.file.write(*offset, buffer)?; } + self.write_buffer_bytes.store(0, Ordering::Release); write_buffer.clear(); Ok(()) @@ -161,13 +217,7 @@ impl PagedCachedFile { // Caller should invalidate all cached pages that are no longer valid pub(super) fn resize(&self, len: u64) -> Result { // TODO: be more fine-grained about this invalidation - for slot in 0..self.read_cache.len() { - let cache = mem::take(&mut *self.read_cache[slot].write().unwrap()); - for (_, buffer) in cache { - self.read_cache_bytes - .fetch_sub(buffer.len(), Ordering::Release); - } - } + self.invalidate_cache_all(); self.file.file().set_len(len).map_err(StorageError::from) } @@ -268,11 +318,12 @@ impl PagedCachedFile { let buffer = Arc::new(self.read_direct(offset, len)?); let cache_size = self.read_cache_bytes.fetch_add(len, Ordering::AcqRel); let mut write_lock = self.read_cache[cache_slot].write().unwrap(); - write_lock.insert(offset, buffer.clone()); + // TODO: this is quite a hack to check the leaf/branch byte here + write_lock.insert(offset, buffer.clone(), buffer[0] == LEAF); let mut removed = 0; if cache_size + len > self.max_read_cache_bytes { while removed < len { - if let Some((_, v)) = write_lock.pop_first() { + if let Some((_, v)) = write_lock.pop_lowest_priority() { removed += v.len(); } else { break; @@ -308,7 +359,7 @@ impl PagedCachedFile { pub(super) fn invalidate_cache_all(&self) { for cache_slot in 0..self.read_cache.len() { let mut lock = self.read_cache[cache_slot].write().unwrap(); - while let Some((_, removed)) = lock.pop_first() { + while let Some((_, removed)) = lock.pop_lowest_priority() { self.read_cache_bytes .fetch_sub(removed.len(), Ordering::AcqRel); } @@ -353,13 +404,15 @@ impl PagedCachedFile { } let mut removed_bytes = 0; while removed_bytes < len { - if let Some((offset, buffer)) = lock.pop_first() { + if let Some((offset, buffer)) = lock.pop_lowest_priority() { self.write_buffer_bytes .fetch_sub(buffer.len(), Ordering::Release); removed_bytes += buffer.len(); let result = self.file.write(offset, &buffer); if result.is_err() { - lock.insert(offset, buffer); + let low_pri = buffer[0] == LEAF; + // TODO: it's quite a hack to check the leaf/branch byte here + lock.insert(offset, buffer, low_pri); } result?; } else {