Skip to content

Commit

Permalink
Prioritize caching of branch pages over leaf pages
Browse files Browse the repository at this point in the history
This improves cache constrained write performance by ~30% in the
benchmarks, and even more in larger database workloads
  • Loading branch information
cberner committed Jun 9, 2023
1 parent 4dec271 commit 2020827
Showing 1 changed file with 75 additions and 22 deletions.
97 changes: 75 additions & 22 deletions src/tree_store/page_store/cached_file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::tree_store::page_store::base::PageHint;
use crate::tree_store::page_store::file_lock::LockedFile;
use crate::tree_store::LEAF;
use crate::{DatabaseError, Result, StorageError};
use std::collections::BTreeMap;
use std::fs::File;
Expand All @@ -15,7 +16,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};

pub(super) struct WritablePage<'a> {
buffer: &'a Mutex<BTreeMap<u64, Arc<Vec<u8>>>>,
buffer: &'a Mutex<PrioritizedCache>,
offset: u64,
data: Vec<u8>,
}
Expand All @@ -33,11 +34,13 @@ impl<'a> WritablePage<'a> {
impl<'a> Drop for WritablePage<'a> {
fn drop(&mut self) {
let data = mem::take(&mut self.data);
// TODO: it's quite a hack to check the leaf/branch byte here
let low_pri = data[0] == LEAF;
assert!(self
.buffer
.lock()
.unwrap()
.insert(self.offset, Arc::new(data))
.insert(self.offset, Arc::new(data), low_pri)
.is_none());
}
}
Expand All @@ -56,6 +59,58 @@ impl<'a, I: SliceIndex<[u8]>> IndexMut<I> for WritablePage<'a> {
}
}

#[derive(Default)]
struct PrioritizedCache {
cache: BTreeMap<u64, Arc<Vec<u8>>>,
low_pri_cache: BTreeMap<u64, Arc<Vec<u8>>>,
}

impl PrioritizedCache {
fn new() -> Self {
Self {
cache: Default::default(),
low_pri_cache: Default::default(),
}
}

fn insert(&mut self, key: u64, value: Arc<Vec<u8>>, low_pri: bool) -> Option<Arc<Vec<u8>>> {
if low_pri {
self.low_pri_cache.insert(key, value)
} else {
self.cache.insert(key, value)
}
}

fn remove(&mut self, key: &u64) -> Option<Arc<Vec<u8>>> {
let result = self.cache.remove(key);
if result.is_some() {
return result;
}
self.low_pri_cache.remove(key)
}

fn get(&self, key: &u64) -> Option<&Arc<Vec<u8>>> {
let result = self.cache.get(key);
if result.is_some() {
return result;
}
self.low_pri_cache.get(key)
}

fn pop_lowest_priority(&mut self) -> Option<(u64, Arc<Vec<u8>>)> {
let result = self.low_pri_cache.pop_first();
if result.is_some() {
return result;
}
self.cache.pop_first()
}

fn clear(&mut self) {
self.cache.clear();
self.low_pri_cache.clear();
}
}

pub(super) struct PagedCachedFile {
file: LockedFile,
page_size: u64,
Expand All @@ -68,9 +123,9 @@ pub(super) struct PagedCachedFile {
#[cfg(feature = "cache_metrics")]
reads_hits: AtomicU64,
fsync_failed: AtomicBool,
read_cache: Vec<RwLock<BTreeMap<u64, Arc<Vec<u8>>>>>,
read_cache: Vec<RwLock<PrioritizedCache>>,
// TODO: maybe move this cache to WriteTransaction?
write_buffer: Mutex<BTreeMap<u64, Arc<Vec<u8>>>>,
write_buffer: Mutex<PrioritizedCache>,
#[cfg(any(fuzzing, test))]
crash_countdown: AtomicU64,
}
Expand All @@ -84,7 +139,7 @@ impl PagedCachedFile {
) -> Result<Self, DatabaseError> {
let mut read_cache = Vec::with_capacity(Self::lock_stripes());
for _ in 0..Self::lock_stripes() {
read_cache.push(RwLock::new(BTreeMap::new()));
read_cache.push(RwLock::new(PrioritizedCache::new()));
}

let lock = LockedFile::new(file)?;
Expand All @@ -109,7 +164,7 @@ impl PagedCachedFile {
reads_hits: Default::default(),
fsync_failed: Default::default(),
read_cache,
write_buffer: Mutex::new(BTreeMap::new()),
write_buffer: Mutex::new(PrioritizedCache::new()),
#[cfg(any(fuzzing, test))]
crash_countdown: AtomicU64::new(u64::MAX),
})
Expand Down Expand Up @@ -146,13 +201,14 @@ impl PagedCachedFile {
fn flush_write_buffer(&self) -> Result {
self.check_fsync_failure()?;
let mut write_buffer = std::mem::take(self.write_buffer.lock().unwrap().deref_mut());
let total_bytes: usize = write_buffer.values().map(|buffer| buffer.len()).sum();
self.write_buffer_bytes
.fetch_sub(total_bytes, Ordering::Release);

for (offset, buffer) in write_buffer.iter() {
for (offset, buffer) in write_buffer.cache.iter() {
self.file.write(*offset, buffer)?;
}
for (offset, buffer) in write_buffer.low_pri_cache.iter() {
self.file.write(*offset, buffer)?;
}
self.write_buffer_bytes.store(0, Ordering::Release);
write_buffer.clear();

Ok(())
Expand All @@ -161,13 +217,7 @@ impl PagedCachedFile {
// Caller should invalidate all cached pages that are no longer valid
pub(super) fn resize(&self, len: u64) -> Result {
// TODO: be more fine-grained about this invalidation
for slot in 0..self.read_cache.len() {
let cache = mem::take(&mut *self.read_cache[slot].write().unwrap());
for (_, buffer) in cache {
self.read_cache_bytes
.fetch_sub(buffer.len(), Ordering::Release);
}
}
self.invalidate_cache_all();

self.file.file().set_len(len).map_err(StorageError::from)
}
Expand Down Expand Up @@ -268,11 +318,12 @@ impl PagedCachedFile {
let buffer = Arc::new(self.read_direct(offset, len)?);
let cache_size = self.read_cache_bytes.fetch_add(len, Ordering::AcqRel);
let mut write_lock = self.read_cache[cache_slot].write().unwrap();
write_lock.insert(offset, buffer.clone());
// TODO: this is quite a hack to check the leaf/branch byte here
write_lock.insert(offset, buffer.clone(), buffer[0] == LEAF);
let mut removed = 0;
if cache_size + len > self.max_read_cache_bytes {
while removed < len {
if let Some((_, v)) = write_lock.pop_first() {
if let Some((_, v)) = write_lock.pop_lowest_priority() {
removed += v.len();
} else {
break;
Expand Down Expand Up @@ -308,7 +359,7 @@ impl PagedCachedFile {
pub(super) fn invalidate_cache_all(&self) {
for cache_slot in 0..self.read_cache.len() {
let mut lock = self.read_cache[cache_slot].write().unwrap();
while let Some((_, removed)) = lock.pop_first() {
while let Some((_, removed)) = lock.pop_lowest_priority() {
self.read_cache_bytes
.fetch_sub(removed.len(), Ordering::AcqRel);
}
Expand Down Expand Up @@ -353,13 +404,15 @@ impl PagedCachedFile {
}
let mut removed_bytes = 0;
while removed_bytes < len {
if let Some((offset, buffer)) = lock.pop_first() {
if let Some((offset, buffer)) = lock.pop_lowest_priority() {
self.write_buffer_bytes
.fetch_sub(buffer.len(), Ordering::Release);
removed_bytes += buffer.len();
let result = self.file.write(offset, &buffer);
if result.is_err() {
lock.insert(offset, buffer);
let low_pri = buffer[0] == LEAF;
// TODO: it's quite a hack to check the leaf/branch byte here
lock.insert(offset, buffer, low_pri);
}
result?;
} else {
Expand Down

0 comments on commit 2020827

Please sign in to comment.