Skip to content

Commit

Permalink
Split Error into more granular error types
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed May 28, 2023
1 parent 78bd804 commit da964a7
Show file tree
Hide file tree
Showing 16 changed files with 587 additions and 166 deletions.
4 changes: 2 additions & 2 deletions fuzz/fuzz_targets/fuzz_redb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ fn handle_multimap_table_op(op: &FuzzOperation, reference: &mut BTreeMap<u64, BT
} else {
Box::new(reference.range(start..end))
};
let mut iter: Box<dyn Iterator<Item = Result<(AccessGuard<u64>, MultimapValue<&[u8]>), redb::Error>>> = if *reversed {
let mut iter: Box<dyn Iterator<Item = Result<(AccessGuard<u64>, MultimapValue<&[u8]>), redb::StorageError>>> = if *reversed {
Box::new(table.range(start..end)?.rev())
} else {
Box::new(table.range(start..end)?)
Expand Down Expand Up @@ -276,7 +276,7 @@ fn handle_table_op(op: &FuzzOperation, reference: &mut BTreeMap<u64, usize>, tab
} else {
Box::new(reference.range(start..end))
};
let mut iter: Box<dyn Iterator<Item = Result<(AccessGuard<u64>, AccessGuard<&[u8]>), redb::Error>>> = if *reversed {
let mut iter: Box<dyn Iterator<Item = Result<(AccessGuard<u64>, AccessGuard<&[u8]>), redb::StorageError>>> = if *reversed {
Box::new(table.range(start..end)?.rev())
} else {
Box::new(table.range(start..end)?)
Expand Down
78 changes: 48 additions & 30 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use crate::tree_store::{
TransactionalMemory, PAGE_SIZE,
};
use crate::types::{RedbKey, RedbValue};
use crate::{Durability, Error, ReadOnlyTable, ReadableTable, Savepoint};
use crate::{
CompactionError, DatabaseError, Durability, ReadOnlyTable, ReadableTable, Savepoint,
SavepointError, StorageError,
};
use crate::{ReadTransaction, Result, WriteTransaction};
use std::fmt::{Display, Formatter};
use std::fs::{File, OpenOptions};
Expand All @@ -17,10 +20,10 @@ use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

use crate::error::TransactionError;
use crate::multimap_table::parse_subtree_roots;
use crate::sealed::Sealed;
use crate::transactions::SAVEPOINT_TABLE;
use crate::Error::Corrupted;
#[cfg(feature = "logging")]
use log::{info, warn};

Expand Down Expand Up @@ -242,12 +245,12 @@ impl Database {
/// * if the file does not exist, or is an empty file, a new database will be initialized in it
/// * if the file is a valid redb database, it will be opened
/// * otherwise this function will return an error
pub fn create(path: impl AsRef<Path>) -> Result<Database> {
pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
Self::builder().create(path)
}

/// Opens an existing redb database.
pub fn open(path: impl AsRef<Path>) -> Result<Database> {
pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
Self::builder().open(path)
}

Expand Down Expand Up @@ -361,19 +364,19 @@ impl Database {
/// Compacts the database file
///
/// Returns `true` if compaction was performed, and `false` if no futher compaction was possible
pub fn compact(&mut self) -> Result<bool> {
pub fn compact(&mut self) -> Result<bool, CompactionError> {
// Commit to free up any pending free pages
// Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter
let mut txn = self.begin_write()?;
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
if txn.list_persistent_savepoints()?.next().is_some() {
return Err(Error::PersistentSavepointExists);
return Err(CompactionError::PersistentSavepointExists);
}
txn.set_durability(Durability::Paranoid);
txn.commit()?;
txn.commit().map_err(|e| e.into_storage_error())?;
// Repeat, just in case executing list_persistent_savepoints() created a new table
let mut txn = self.begin_write()?;
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
txn.set_durability(Durability::Paranoid);
txn.commit()?;
txn.commit().map_err(|e| e.into_storage_error())?;
// There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages
// should have been cleared out by the above commit()
assert!(self.mem.get_freed_root().is_none());
Expand All @@ -383,18 +386,18 @@ impl Database {
loop {
let mut progress = false;

let mut txn = self.begin_write()?;
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
if txn.compact_pages()? {
progress = true;
txn.commit()?;
txn.commit().map_err(|e| e.into_storage_error())?;
} else {
txn.abort()?;
}

// Double commit to free up the relocated pages for reuse
let mut txn = self.begin_write()?;
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
txn.set_durability(Durability::Paranoid);
txn.commit()?;
txn.commit().map_err(|e| e.into_storage_error())?;
assert!(self.mem.get_freed_root().is_none());

if !progress {
Expand All @@ -415,8 +418,11 @@ impl Database {
let freed_list = Arc::new(Mutex::new(vec![]));
let table_tree = TableTree::new(system_root, mem, freed_list);
let fake_transaction_tracker = Arc::new(Mutex::new(TransactionTracker::new()));
if let Some(savepoint_table_def) =
table_tree.get_table::<u64, &[u8]>(SAVEPOINT_TABLE.name(), TableType::Normal)?
if let Some(savepoint_table_def) = table_tree
.get_table::<u64, &[u8]>(SAVEPOINT_TABLE.name(), TableType::Normal)
.map_err(|e| {
e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
})?
{
let savepoint_table: ReadOnlyTable<u64, &[u8]> =
ReadOnlyTable::new(savepoint_table_def.get_root(), PageHint::None, mem)?;
Expand Down Expand Up @@ -540,7 +546,7 @@ impl Database {
// that rolls back a partially committed transaction.
mem.clear_read_cache();
if !Self::verify_primary_checksums(mem)? {
return Err(Corrupted(
return Err(StorageError::Corrupted(
"Failed to repair database. All roots are corrupted".to_string(),
));
}
Expand Down Expand Up @@ -599,7 +605,7 @@ impl Database {
region_size: Option<u64>,
read_cache_size_bytes: usize,
write_cache_size_bytes: usize,
) -> Result<Self> {
) -> Result<Self, DatabaseError> {
#[cfg(feature = "logging")]
let file_path = format!("{:?}", &file);
#[cfg(feature = "logging")]
Expand Down Expand Up @@ -628,15 +634,23 @@ impl Database {
};

// Restore the tracker state for any persistent savepoints
let txn = db.begin_write()?;
let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
if let Some(next_id) = txn.next_persistent_savepoint_id()? {
db.transaction_tracker
.lock()
.unwrap()
.restore_savepoint_counter_state(next_id);
}
for id in txn.list_persistent_savepoints()? {
let savepoint = txn.get_persistent_savepoint(id)?;
let savepoint = match txn.get_persistent_savepoint(id) {
Ok(savepoint) => savepoint,
Err(err) => match err {
SavepointError::InvalidSavepoint => unreachable!(),
SavepointError::Storage(storage) => {
return Err(storage.into());
}
},
};
db.transaction_tracker
.lock()
.unwrap()
Expand Down Expand Up @@ -678,8 +692,8 @@ impl Database {
/// Returns a [`WriteTransaction`] which may be used to read/write to the database. Only a single
/// write may be in progress at a time. If a write is in progress, this function will block
/// until it completes.
pub fn begin_write(&self) -> Result<WriteTransaction> {
WriteTransaction::new(self, self.transaction_tracker.clone())
pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
WriteTransaction::new(self, self.transaction_tracker.clone()).map_err(|e| e.into())
}

/// Begins a read transaction
Expand All @@ -689,7 +703,7 @@ impl Database {
///
/// Returns a [`ReadTransaction`] which may be used to read from the database. Read transactions
/// may exist concurrently with writes
pub fn begin_read(&self) -> Result<ReadTransaction> {
pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
let id = self.allocate_read_transaction()?;
#[cfg(feature = "logging")]
info!("Beginning read transaction id={:?}", id);
Expand Down Expand Up @@ -766,7 +780,7 @@ impl Builder {
/// * if the file does not exist, or is an empty file, a new database will be initialized in it
/// * if the file is a valid redb database, it will be opened
/// * otherwise this function will return an error
pub fn create(&self, path: impl AsRef<Path>) -> Result<Database> {
pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
let file = OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -783,9 +797,9 @@ impl Builder {
}

/// Opens an existing redb database.
pub fn open(&self, path: impl AsRef<Path>) -> Result<Database> {
pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
if !path.as_ref().exists() {
Err(Error::Io(ErrorKind::NotFound.into()))
Err(StorageError::Io(ErrorKind::NotFound.into()).into())
} else if File::open(path.as_ref())?.metadata()?.len() > 0 {
let file = OpenOptions::new().read(true).write(true).open(path)?;
Database::new(
Expand All @@ -796,7 +810,7 @@ impl Builder {
self.write_cache_size_bytes,
)
} else {
Err(Error::Io(io::Error::from(ErrorKind::InvalidData)))
Err(StorageError::Io(io::Error::from(ErrorKind::InvalidData)).into())
}
}
}
Expand All @@ -810,7 +824,8 @@ impl std::fmt::Debug for Database {

#[cfg(test)]
mod test {
use crate::{Database, Durability, Error, ReadableTable, TableDefinition};
use crate::error::CommitError;
use crate::{Database, Durability, ReadableTable, StorageError, TableDefinition, TableError};

#[test]
fn small_pages() {
Expand Down Expand Up @@ -1010,7 +1025,7 @@ mod test {
{
assert!(matches!(
tx.open_table(table_def),
Err(Error::SimulatedIOFailure)
Err(TableError::Storage(StorageError::SimulatedIOFailure))
));
}
tx.abort().unwrap();
Expand Down Expand Up @@ -1087,7 +1102,10 @@ mod test {
let savepoint6 = tx.ephemeral_savepoint().unwrap();
let _savepoint7 = tx.persistent_savepoint().unwrap();
tx.restore_savepoint(&savepoint6).unwrap();
assert!(matches!(tx.commit(), Err(Error::SimulatedIOFailure)));
assert!(matches!(
tx.commit(),
Err(CommitError::Storage(StorageError::SimulatedIOFailure))
));

drop(db);
Database::builder()
Expand Down
Loading

0 comments on commit da964a7

Please sign in to comment.