From 2b6d32820b2b76c0db641acf92286ed0b2cf4b25 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 14 Nov 2023 21:53:46 +0100 Subject: [PATCH] Remove old, unused log storage engines --- src/storage/log/hybrid.rs | 308 -------------------------------------- src/storage/log/memory.rs | 120 --------------- src/storage/log/mod.rs | 200 ------------------------- src/storage/log/test.rs | 81 ---------- src/storage/mod.rs | 1 - 5 files changed, 710 deletions(-) delete mode 100644 src/storage/log/hybrid.rs delete mode 100644 src/storage/log/memory.rs delete mode 100644 src/storage/log/mod.rs delete mode 100644 src/storage/log/test.rs diff --git a/src/storage/log/hybrid.rs b/src/storage/log/hybrid.rs deleted file mode 100644 index f79bcb399..000000000 --- a/src/storage/log/hybrid.rs +++ /dev/null @@ -1,308 +0,0 @@ -use super::{Range, Scan, Store}; -use crate::error::{Error, Result}; - -use std::cmp::{max, min}; -use std::collections::{BTreeMap, HashMap, VecDeque}; -use std::fmt::Display; -use std::fs::{create_dir_all, File, OpenOptions}; -use std::io::{BufReader, BufWriter, Read, Seek as _, SeekFrom, Write}; -use std::ops::Bound; -use std::path::Path; -use std::sync::{Mutex, MutexGuard}; - -/// A hybrid log store, storing committed entries in an append-only file, uncommitted entries -/// in memory, and metadata in a separate file (should be an on-disk key-value store). -/// -/// The log file contains sequential binary log entries, length-prefixed with a big-endian u32. -/// Entries are only flushed to disk when they are committed and permanent, thus the file is -/// written append-only. -/// -/// An index of entry positions and sizes is maintained in memory. This is rebuilt on startup by -/// scanning the file, since maintaining the index in a separate file requires additional fsyncing -/// which is expensive. Since datasets are expected to be small, scanning the file on startup is -/// reasonably cheap. -/// -/// TODO: Should use crate::storage::bincode instead of ::bincode. -pub struct Hybrid { - /// The append-only log file. Protected by a mutex for interior mutability (i.e. read seeks). - file: Mutex, - /// Index of entry locations and sizes in the log file. - index: BTreeMap, - /// Uncommitted log entries. - uncommitted: VecDeque>, - /// Metadata cache. Flushed to disk on changes. - metadata: HashMap, Vec>, - /// The file used to store metadata. - /// FIXME Should be an on-disk B-tree key-value store. - metadata_file: File, - /// If true, fsync writes. - sync: bool, -} - -impl Display for Hybrid { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "hybrid") - } -} - -impl Hybrid { - /// Creates or opens a new hybrid log, with files in the given directory. - pub fn new(dir: &Path, sync: bool) -> Result { - create_dir_all(dir)?; - - let file = - OpenOptions::new().read(true).write(true).create(true).open(dir.join("raft-log"))?; - - let metadata_file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(dir.join("raft-metadata"))?; - - Ok(Self { - index: Self::build_index(&file)?, - file: Mutex::new(file), - uncommitted: VecDeque::new(), - metadata: Self::load_metadata(&metadata_file)?, - metadata_file, - sync, - }) - } - - /// Builds the index by scanning the log file. - fn build_index(file: &File) -> Result> { - let filesize = file.metadata()?.len(); - let mut bufreader = BufReader::new(file); - let mut index = BTreeMap::new(); - let mut sizebuf = [0; 4]; - let mut pos = 0; - let mut i = 1; - while pos < filesize { - bufreader.read_exact(&mut sizebuf)?; - pos += 4; - let size = u32::from_be_bytes(sizebuf); - index.insert(i, (pos, size)); - let mut buf = vec![0; size as usize]; - bufreader.read_exact(&mut buf)?; - pos += size as u64; - i += 1; - } - Ok(index) - } - - /// Loads metadata from a file. - fn load_metadata(file: &File) -> Result, Vec>> { - match bincode::deserialize_from(file) { - Ok(metadata) => Ok(metadata), - Err(err) => { - if let bincode::ErrorKind::Io(err) = &*err { - if err.kind() == std::io::ErrorKind::UnexpectedEof { - return Ok(HashMap::new()); - } - } - Err(err.into()) - } - } - } -} - -impl Store for Hybrid { - fn append(&mut self, entry: Vec) -> Result { - self.uncommitted.push_back(entry); - Ok(self.len()) - } - - fn commit(&mut self, index: u64) -> Result<()> { - if index > self.len() { - return Err(Error::Internal(format!("Cannot commit non-existant index {}", index))); - } - if index < self.index.len() as u64 { - return Err(Error::Internal(format!( - "Cannot commit below current committed index {}", - self.index.len() as u64 - ))); - } - if index == self.index.len() as u64 { - return Ok(()); - } - - let mut file = self.file.lock()?; - let mut pos = file.seek(SeekFrom::End(0))?; - let mut bufwriter = BufWriter::new(&mut *file); - for i in (self.index.len() as u64 + 1)..=index { - let entry = self - .uncommitted - .pop_front() - .ok_or_else(|| Error::Internal("Unexpected end of uncommitted entries".into()))?; - bufwriter.write_all(&(entry.len() as u32).to_be_bytes())?; - pos += 4; - self.index.insert(i, (pos, entry.len() as u32)); - bufwriter.write_all(&entry)?; - pos += entry.len() as u64; - } - bufwriter.flush()?; - drop(bufwriter); - if self.sync { - file.sync_data()?; - } - Ok(()) - } - - fn committed(&self) -> u64 { - self.index.len() as u64 - } - - fn get(&self, index: u64) -> Result>> { - match index { - 0 => Ok(None), - i if i <= self.index.len() as u64 => { - let (pos, size) = self.index.get(&i).copied().ok_or_else(|| { - Error::Internal(format!("Indexed position not found for entry {}", i)) - })?; - let mut entry = vec![0; size as usize]; - let mut file = self.file.lock()?; - file.seek(SeekFrom::Start(pos))?; - file.read_exact(&mut entry)?; - Ok(Some(entry)) - } - i => Ok(self.uncommitted.get(i as usize - self.index.len() - 1).cloned()), - } - } - - fn len(&self) -> u64 { - self.index.len() as u64 + self.uncommitted.len() as u64 - } - - fn scan(&self, range: Range) -> Scan { - let start = match range.start { - Bound::Included(0) => 1, - Bound::Included(n) => n, - Bound::Excluded(n) => n + 1, - Bound::Unbounded => 1, - }; - let end = match range.end { - Bound::Included(n) => n, - Bound::Excluded(0) => 0, - Bound::Excluded(n) => n - 1, - Bound::Unbounded => self.len(), - }; - - let mut scan: Scan = Box::new(std::iter::empty()); - if start > end { - return scan; - } - - // Scan committed entries in file - if let Some((offset, _)) = self.index.get(&start) { - let mut file = self.file.lock().unwrap(); - file.seek(SeekFrom::Start(*offset - 4)).unwrap(); // seek to length prefix - let mut bufreader = BufReader::new(MutexReader(file)); // FIXME Avoid MutexReader - scan = - Box::new(scan.chain(self.index.range(start..=end).map(move |(_, (_, size))| { - let mut sizebuf = vec![0; 4]; - bufreader.read_exact(&mut sizebuf)?; - let mut entry = vec![0; *size as usize]; - bufreader.read_exact(&mut entry)?; - Ok(entry) - }))); - } - - // Scan uncommitted entries in memory - if end > self.index.len() as u64 { - scan = Box::new( - scan.chain( - self.uncommitted - .iter() - .skip(start as usize - min(start as usize, self.index.len() + 1)) - .take(end as usize - max(start as usize, self.index.len()) + 1) - .cloned() - .map(Ok), - ), - ) - } - - scan - } - - fn size(&self) -> u64 { - self.index.iter().next_back().map(|(_, (pos, size))| *pos + *size as u64).unwrap_or(0) - } - - fn truncate(&mut self, index: u64) -> Result { - if index < self.index.len() as u64 { - return Err(Error::Internal(format!( - "Cannot truncate below committed index {}", - self.index.len() as u64 - ))); - } - self.uncommitted.truncate(index as usize - self.index.len()); - Ok(self.len()) - } - - fn get_metadata(&self, key: &[u8]) -> Result>> { - Ok(self.metadata.get(key).cloned()) - } - - fn set_metadata(&mut self, key: &[u8], value: Vec) -> Result<()> { - self.metadata.insert(key.to_vec(), value); - self.metadata_file.set_len(0)?; - self.metadata_file.seek(SeekFrom::Start(0))?; - bincode::serialize_into(&mut self.metadata_file, &self.metadata)?; - if self.sync { - self.metadata_file.sync_data()?; - } - Ok(()) - } -} - -impl Drop for Hybrid { - /// Attempt to fsync data on drop, in case we're running without sync. - fn drop(&mut self) { - self.metadata_file.sync_all().ok(); - self.file.lock().map(|f| f.sync_all()).ok(); - } -} - -struct MutexReader<'a>(MutexGuard<'a, File>); - -impl<'a> Read for MutexReader<'a> { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - self.0.read(buf) - } -} - -#[cfg(test)] -impl super::TestSuite for Hybrid { - fn setup() -> Result { - let dir = tempdir::TempDir::new("toydb")?; - Hybrid::new(dir.as_ref(), false) - } -} - -#[test] -fn tests() -> Result<()> { - use super::TestSuite; - Hybrid::test() -} - -#[test] -fn test_persistent() -> Result<()> { - let dir = tempdir::TempDir::new("toydb")?; - let mut l = Hybrid::new(dir.as_ref(), true)?; - - l.append(vec![0x01])?; - l.append(vec![0x02])?; - l.append(vec![0x03])?; - l.append(vec![0x04])?; - l.append(vec![0x05])?; - l.commit(3)?; - - let l = Hybrid::new(dir.as_ref(), true)?; - - assert_eq!( - vec![vec![1], vec![2], vec![3]], - l.scan(Range::from(..)).collect::>>()? - ); - - Ok(()) -} diff --git a/src/storage/log/memory.rs b/src/storage/log/memory.rs deleted file mode 100644 index bce7bcf1b..000000000 --- a/src/storage/log/memory.rs +++ /dev/null @@ -1,120 +0,0 @@ -use super::{Range, Store}; -use crate::error::{Error, Result}; - -use std::collections::HashMap; -use std::fmt::Display; -use std::ops::Bound; - -// An in-memory log store. -pub struct Memory { - log: Vec>, - committed: u64, - metadata: HashMap, Vec>, -} - -impl Memory { - /// Creates a new in-memory log. - pub fn new() -> Self { - Self { log: Vec::new(), committed: 0, metadata: HashMap::new() } - } -} - -impl Display for Memory { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "memory") - } -} - -impl Store for Memory { - fn append(&mut self, entry: Vec) -> Result { - self.log.push(entry); - Ok(self.log.len() as u64) - } - - fn commit(&mut self, index: u64) -> Result<()> { - if index > self.len() { - return Err(Error::Internal(format!("Cannot commit non-existant index {}", index))); - } - if index < self.committed { - return Err(Error::Internal(format!( - "Cannot commit below current index {}", - self.committed - ))); - } - self.committed = index; - Ok(()) - } - - fn committed(&self) -> u64 { - self.committed - } - - fn get(&self, index: u64) -> Result>> { - match index { - 0 => Ok(None), - i => Ok(self.log.get(i as usize - 1).cloned()), - } - } - - fn len(&self) -> u64 { - self.log.len() as u64 - } - - fn scan(&self, range: Range) -> super::Scan { - Box::new( - self.log - .iter() - .take(match range.end { - Bound::Included(n) => n as usize, - Bound::Excluded(0) => 0, - Bound::Excluded(n) => n as usize - 1, - Bound::Unbounded => std::usize::MAX, - }) - .skip(match range.start { - Bound::Included(0) => 0, - Bound::Included(n) => n as usize - 1, - Bound::Excluded(n) => n as usize, - Bound::Unbounded => 0, - }) - .cloned() - .map(Ok), - ) - } - - fn size(&self) -> u64 { - self.log.iter().map(|v| v.len() as u64).sum() - } - - fn truncate(&mut self, index: u64) -> Result { - if index < self.committed { - return Err(Error::Internal(format!( - "Cannot truncate below committed index {}", - self.committed - ))); - } - self.log.truncate(index as usize); - Ok(self.log.len() as u64) - } - - fn get_metadata(&self, key: &[u8]) -> Result>> { - Ok(self.metadata.get(key).cloned()) - } - - fn set_metadata(&mut self, key: &[u8], value: Vec) -> Result<()> { - self.metadata.insert(key.to_vec(), value); - Ok(()) - } -} - -#[cfg(test)] -impl super::TestSuite for Memory { - fn setup() -> Result { - Ok(Memory::new()) - } -} - -#[test] -fn tests() -> Result<()> { - use super::TestSuite; - Memory::test() -} diff --git a/src/storage/log/mod.rs b/src/storage/log/mod.rs deleted file mode 100644 index c6f3cf257..000000000 --- a/src/storage/log/mod.rs +++ /dev/null @@ -1,200 +0,0 @@ -mod hybrid; -mod memory; -#[cfg(test)] -mod test; - -pub use hybrid::Hybrid; -pub use memory::Memory; -#[cfg(test)] -pub use test::Test; - -use crate::error::Result; - -use std::fmt::Display; -use std::ops::{Bound, RangeBounds}; - -/// A log store. Entry indexes are 1-based, to match Raft semantics. -pub trait Store: Display + Sync + Send { - /// Appends a log entry, returning its index. - fn append(&mut self, entry: Vec) -> Result; - - /// Commits log entries up to and including the given index, making them immutable. - fn commit(&mut self, index: u64) -> Result<()>; - - /// Returns the committed index, if any. - fn committed(&self) -> u64; - - /// Fetches a log entry, if it exists. - fn get(&self, index: u64) -> Result>>; - - /// Returns the number of entries in the log. - fn len(&self) -> u64; - - /// Scans the log between the given indexes. - fn scan(&self, range: Range) -> Scan; - - /// Returns the size of the log, in bytes. - fn size(&self) -> u64; - - /// Truncates the log be removing any entries above the given index, and returns the - /// highest index. Errors if asked to truncate any committed entries. - fn truncate(&mut self, index: u64) -> Result; - - /// Gets a metadata value. - fn get_metadata(&self, key: &[u8]) -> Result>>; - - /// Sets a metadata value. - fn set_metadata(&mut self, key: &[u8], value: Vec) -> Result<()>; - - /// Returns true if the log has no entries. - fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -/// A scan range. -pub struct Range { - start: Bound, - end: Bound, -} - -impl Range { - /// Creates a new range from the given Rust range. We can't use the RangeBounds directly in - /// scan() since that prevents us from Store into a trait object. - pub fn from(range: impl RangeBounds) -> Self { - Self { - start: match range.start_bound() { - Bound::Included(v) => Bound::Included(*v), - Bound::Excluded(v) => Bound::Excluded(*v), - Bound::Unbounded => Bound::Unbounded, - }, - end: match range.end_bound() { - Bound::Included(v) => Bound::Included(*v), - Bound::Excluded(v) => Bound::Excluded(*v), - Bound::Unbounded => Bound::Unbounded, - }, - } - } -} - -/// Iterator over a log range. -pub type Scan<'a> = Box>> + 'a>; - -#[cfg(test)] -use crate::error::Error; - -#[cfg(test)] -trait TestSuite { - fn setup() -> Result; - - fn test() -> Result<()> { - Self::test_append()?; - Self::test_commit_truncate()?; - Self::test_get()?; - Self::test_metadata()?; - Self::test_scan()?; - Ok(()) - } - - fn test_append() -> Result<()> { - let mut s = Self::setup()?; - assert_eq!(0, s.len()); - assert_eq!(1, s.append(vec![0x01])?); - assert_eq!(2, s.append(vec![0x02])?); - assert_eq!(3, s.append(vec![0x03])?); - assert_eq!(3, s.len()); - assert_eq!( - vec![vec![1], vec![2], vec![3]], - s.scan(Range::from(..)).collect::>>()? - ); - Ok(()) - } - - fn test_commit_truncate() -> Result<()> { - let mut s = Self::setup()?; - - assert_eq!(0, s.committed()); - - // Truncating an empty store should be fine. - assert_eq!(0, s.truncate(0)?); - - s.append(vec![0x01])?; - s.append(vec![0x02])?; - s.append(vec![0x03])?; - s.commit(1)?; - assert_eq!(1, s.committed()); - - // Truncating beyond the end should be fine. - assert_eq!(3, s.truncate(4)?); - assert_eq!( - vec![vec![1], vec![2], vec![3]], - s.scan(Range::from(..)).collect::>>()? - ); - - // Truncating a committed entry should error. - assert_eq!( - Err(Error::Internal("Cannot truncate below committed index 1".into())), - s.truncate(0) - ); - - // Truncating above should work. - assert_eq!(1, s.truncate(1)?); - assert_eq!(vec![vec![1]], s.scan(Range::from(..)).collect::>>()?); - - Ok(()) - } - - fn test_get() -> Result<()> { - let mut s = Self::setup()?; - s.append(vec![0x01])?; - s.append(vec![0x02])?; - s.append(vec![0x03])?; - assert_eq!(None, s.get(0)?); - assert_eq!(Some(vec![0x01]), s.get(1)?); - assert_eq!(None, s.get(4)?); - Ok(()) - } - - fn test_metadata() -> Result<()> { - let mut s = Self::setup()?; - s.set_metadata(b"a", vec![0x01])?; - assert_eq!(Some(vec![0x01]), s.get_metadata(b"a")?); - assert_eq!(None, s.get_metadata(b"b")?); - Ok(()) - } - - #[allow(clippy::reversed_empty_ranges)] - fn test_scan() -> Result<()> { - let mut s = Self::setup()?; - s.append(vec![0x01])?; - s.append(vec![0x02])?; - s.append(vec![0x03])?; - s.commit(2)?; - - assert_eq!( - vec![vec![1], vec![2], vec![3]], - s.scan(Range::from(..)).collect::>>()? - ); - - assert_eq!(vec![vec![1]], s.scan(Range::from(0..2)).collect::>>()?); - assert_eq!(vec![vec![1], vec![2]], s.scan(Range::from(1..3)).collect::>>()?); - assert_eq!( - vec![vec![1], vec![2], vec![3]], - s.scan(Range::from(1..=3)).collect::>>()? - ); - assert!(s.scan(Range::from(3..1)).collect::>>()?.is_empty()); - assert!(s.scan(Range::from(1..1)).collect::>>()?.is_empty()); - assert_eq!(vec![vec![2]], s.scan(Range::from(2..=2)).collect::>>()?); - assert_eq!(vec![vec![2], vec![3]], s.scan(Range::from(2..5)).collect::>>()?); - - assert!(s.scan(Range::from(..0)).collect::>>()?.is_empty()); - assert_eq!(vec![vec![1]], s.scan(Range::from(..=1)).collect::>>()?); - assert_eq!(vec![vec![1], vec![2]], s.scan(Range::from(..3)).collect::>>()?); - - assert!(s.scan(Range::from(4..)).collect::>>()?.is_empty()); - assert_eq!(vec![vec![3]], s.scan(Range::from(3..)).collect::>>()?); - assert_eq!(vec![vec![2], vec![3]], s.scan(Range::from(2..)).collect::>>()?); - - Ok(()) - } -} diff --git a/src/storage/log/test.rs b/src/storage/log/test.rs deleted file mode 100644 index d2431d575..000000000 --- a/src/storage/log/test.rs +++ /dev/null @@ -1,81 +0,0 @@ -use super::{Memory, Range, Scan, Store}; -use crate::error::Result; - -use std::fmt::Display; -use std::sync::{Arc, RwLock}; - -/// Log storage backend for testing. Protects an inner Memory backend using a mutex, so it can -/// be cloned and inspected. -#[derive(Clone)] -pub struct Test { - store: Arc>, -} - -impl Test { - /// Creates a new Test key-value storage engine. - pub fn new() -> Self { - Self { store: Arc::new(RwLock::new(Memory::new())) } - } -} - -impl Display for Test { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "test") - } -} - -impl Store for Test { - fn append(&mut self, entry: Vec) -> Result { - self.store.write()?.append(entry) - } - - fn commit(&mut self, index: u64) -> Result<()> { - self.store.write()?.commit(index) - } - - fn committed(&self) -> u64 { - self.store.read().unwrap().committed() - } - - fn get(&self, index: u64) -> Result>> { - self.store.read()?.get(index) - } - - fn len(&self) -> u64 { - self.store.read().unwrap().len() - } - - fn scan(&self, range: Range) -> Scan { - // Since the mutex guard is scoped to this method, we simply buffer the result. - Box::new(self.store.read().unwrap().scan(range).collect::>>().into_iter()) - } - - fn size(&self) -> u64 { - self.store.read().unwrap().size() - } - - fn truncate(&mut self, index: u64) -> Result { - self.store.write()?.truncate(index) - } - - fn get_metadata(&self, key: &[u8]) -> Result>> { - self.store.read()?.get_metadata(key) - } - - fn set_metadata(&mut self, key: &[u8], value: Vec) -> Result<()> { - self.store.write()?.set_metadata(key, value) - } -} - -#[cfg(test)] -impl super::TestSuite for Test { - fn setup() -> Result { - Ok(Test::new()) - } -} - -#[test] -fn tests() -> Result<()> { - use super::TestSuite; - Test::test() -} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 28e83d969..8d878f831 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -2,5 +2,4 @@ pub mod bincode; pub mod debug; pub mod engine; pub mod keycode; -pub mod log; pub mod mvcc;