Skip to content

Commit

Permalink
Add option to set a repair callback
Browse files Browse the repository at this point in the history
This allows the user to abort the repair process and receive progress
notifications
  • Loading branch information
cberner committed Oct 24, 2023
1 parent 7f03945 commit 01f5bd9
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 7 deletions.
91 changes: 86 additions & 5 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,10 @@ impl Database {
return Ok(true);
}

Self::do_repair(&mut self.mem)?;
Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
DatabaseError::Storage(storage_err) => storage_err,
_ => unreachable!(),
})?;
self.mem.begin_writable()?;

Ok(false)
Expand Down Expand Up @@ -567,19 +570,35 @@ impl Database {
Ok(())
}

fn do_repair(mem: &mut TransactionalMemory) -> Result {
fn do_repair(
mem: &mut TransactionalMemory,
repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
) -> Result<(), DatabaseError> {
if !Self::verify_primary_checksums(mem)? {
// 0.3 because the repair takes 3 full scans and the first is done now
let mut handle = RepairSession::new(0.3);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}

mem.repair_primary_corrupted();
// We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may
// have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since
// that rolls back a partially committed transaction.
mem.clear_read_cache();
if !Self::verify_primary_checksums(mem)? {
return Err(StorageError::Corrupted(
return Err(DatabaseError::Storage(StorageError::Corrupted(
"Failed to repair database. All roots are corrupted".to_string(),
));
)));
}
}
// 0.6 because the repair takes 3 full scans and the second is done now
let mut handle = RepairSession::new(0.6);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}

mem.begin_repair()?;

Expand Down Expand Up @@ -607,6 +626,13 @@ impl Database {
};
drop(freed_table);

// 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
let mut handle = RepairSession::new(0.9);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}

let system_root = mem.get_system_root();
if let Some((root, _)) = system_root {
Self::mark_tables_recursive(root, mem, false)?;
Expand Down Expand Up @@ -638,6 +664,7 @@ impl Database {
region_size: Option<u64>,
read_cache_size_bytes: usize,
write_cache_size_bytes: usize,
repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
) -> Result<Self, DatabaseError> {
#[cfg(feature = "logging")]
let file_path = format!("{:?}", &file);
Expand All @@ -653,7 +680,12 @@ impl Database {
if mem.needs_repair()? {
#[cfg(feature = "logging")]
warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
Self::do_repair(&mut mem)?;
let mut handle = RepairSession::new(0.0);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}
Self::do_repair(&mut mem, repair_callback)?;
}

mem.begin_writable()?;
Expand Down Expand Up @@ -745,12 +777,41 @@ impl Database {
}
}

pub struct RepairSession {
progress: f64,
aborted: bool,
}

impl RepairSession {
pub(crate) fn new(progress: f64) -> Self {
Self {
progress,
aborted: false,
}
}

pub(crate) fn aborted(&self) -> bool {
self.aborted
}

/// Abort the repair process. The coorresponding call to [Builder::open] or [Builder::create] will return an error
pub fn abort(&mut self) {
self.aborted = true;
}

/// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete.
pub fn progress(&self) -> f64 {
self.progress
}
}

/// Configuration builder of a redb [Database].
pub struct Builder {
page_size: usize,
region_size: Option<u64>,
read_cache_size_bytes: usize,
write_cache_size_bytes: usize,
repair_callback: Box<dyn Fn(&mut RepairSession)>,
}

impl Builder {
Expand All @@ -771,12 +832,28 @@ impl Builder {
read_cache_size_bytes: 0,
// TODO: Default should probably take into account the total system memory
write_cache_size_bytes: 0,
repair_callback: Box::new(|_| {}),
};

result.set_cache_size(1024 * 1024 * 1024);
result
}

/// Set a callback which will be invoked periodically in the event that the database file needs
/// to be repaired.
///
/// The [RepairSession] argument can be used to control the repair process.
///
/// If the database file needs repair, the callback will be invoked at least once.
/// There is no upper limit on the number of times it may be called.
pub fn set_repair_callback(
&mut self,
callback: impl Fn(&mut RepairSession) + 'static,
) -> &mut Self {
self.repair_callback = Box::new(callback);
self
}

/// Set the internal page size of the database
///
/// Valid values are powers of two, greater than or equal to 512
Expand Down Expand Up @@ -823,6 +900,7 @@ impl Builder {
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
)
}

Expand All @@ -840,6 +918,7 @@ impl Builder {
None,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
)
}

Expand All @@ -853,6 +932,7 @@ impl Builder {
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
)
}

Expand All @@ -867,6 +947,7 @@ impl Builder {
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
)
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ impl std::error::Error for TableError {}
pub enum DatabaseError {
/// The Database is already open. Cannot acquire lock.
DatabaseAlreadyOpen,
/// [crate::RepairSession::abort] was called.
RepairAborted,
/// The database file is in an old file format and must be manually upgraded
UpgradeRequired(u8),
/// Error from underlying storage
Expand All @@ -197,6 +199,7 @@ impl From<DatabaseError> for Error {
fn from(err: DatabaseError) -> Error {
match err {
DatabaseError::DatabaseAlreadyOpen => Error::DatabaseAlreadyOpen,
DatabaseError::RepairAborted => Error::RepairAborted,
DatabaseError::UpgradeRequired(x) => Error::UpgradeRequired(x),
DatabaseError::Storage(storage) => storage.into(),
}
Expand All @@ -221,6 +224,9 @@ impl Display for DatabaseError {
DatabaseError::UpgradeRequired(actual) => {
write!(f, "Manual upgrade required. Expected file format version {FILE_FORMAT_VERSION}, but file is version {actual}")
}
DatabaseError::RepairAborted => {
write!(f, "Database repair aborted.")
}
DatabaseError::DatabaseAlreadyOpen => {
write!(f, "Database already open. Cannot acquire lock.")
}
Expand Down Expand Up @@ -413,6 +419,8 @@ pub enum Error {
/// Savepoints become invalid when an older savepoint is restored after it was created,
/// and savepoints cannot be created if the transaction is "dirty" (any tables have been opened)
InvalidSavepoint,
/// [crate::RepairSession::abort] was called.
RepairAborted,
/// A persistent savepoint exists
PersistentSavepointExists,
/// An Ephemeral savepoint exists
Expand Down Expand Up @@ -517,6 +525,9 @@ impl Display for Error {
Error::DatabaseAlreadyOpen => {
write!(f, "Database already open. Cannot acquire lock.")
}
Error::RepairAborted => {
write!(f, "Database repair aborted.")
}
Error::PersistentSavepointExists => {
write!(
f,
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
//! [design]: https://github.com/cberner/redb/blob/master/docs/design.md
pub use db::{
Builder, Database, MultimapTableDefinition, MultimapTableHandle, StorageBackend,
Builder, Database, MultimapTableDefinition, MultimapTableHandle, RepairSession, StorageBackend,
TableDefinition, TableHandle, UntypedMultimapTableHandle, UntypedTableHandle,
};
pub use error::{
Expand Down
39 changes: 38 additions & 1 deletion src/tree_store/page_store/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ mod test {
use crate::tree_store::page_store::TransactionalMemory;
#[cfg(not(target_os = "windows"))]
use crate::StorageError;
use crate::{Database, ReadableTable};
use crate::{Database, DatabaseError, ReadableTable};
use std::fs::OpenOptions;
use std::io::{Read, Seek, SeekFrom, Write};
use std::mem::size_of;
Expand Down Expand Up @@ -572,6 +572,43 @@ mod test {
Database::open(tmpfile.path()).unwrap();
}

#[test]
fn abort_repair() {
let tmpfile = crate::create_tempfile();
let db = Database::builder().create(tmpfile.path()).unwrap();
drop(db);

let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(tmpfile.path())
.unwrap();

file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
let mut buffer = [0u8; 1];
file.read_exact(&mut buffer).unwrap();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
buffer[0] |= RECOVERY_REQUIRED;
file.write_all(&buffer).unwrap();

assert!(TransactionalMemory::new(
Box::new(FileBackend::new(file).unwrap()),
PAGE_SIZE,
None,
0,
0
)
.unwrap()
.needs_repair()
.unwrap());

let err = Database::builder()
.set_repair_callback(|handle| handle.abort())
.open(tmpfile.path())
.unwrap_err();
assert!(matches!(err, DatabaseError::RepairAborted));
}

#[test]
fn repair_insert_reserve_regression() {
let tmpfile = crate::create_tempfile();
Expand Down

0 comments on commit 01f5bd9

Please sign in to comment.