From 01f5bd91252ed0a9bbce07a37809b74bd5dfa3be Mon Sep 17 00:00:00 2001 From: Christopher Berner Date: Thu, 12 Oct 2023 16:12:50 -0700 Subject: [PATCH] Add option to set a repair callback This allows the user to abort the repair process and receive progress notifications --- src/db.rs | 91 +++++++++++++++++++++++++++-- src/error.rs | 11 ++++ src/lib.rs | 2 +- src/tree_store/page_store/header.rs | 39 ++++++++++++- 4 files changed, 136 insertions(+), 7 deletions(-) diff --git a/src/db.rs b/src/db.rs index cf8d0b1b..6ba46896 100644 --- a/src/db.rs +++ b/src/db.rs @@ -352,7 +352,10 @@ impl Database { return Ok(true); } - Self::do_repair(&mut self.mem)?; + Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err { + DatabaseError::Storage(storage_err) => storage_err, + _ => unreachable!(), + })?; self.mem.begin_writable()?; Ok(false) @@ -567,19 +570,35 @@ impl Database { Ok(()) } - fn do_repair(mem: &mut TransactionalMemory) -> Result { + fn do_repair( + mem: &mut TransactionalMemory, + repair_callback: &(dyn Fn(&mut RepairSession) + 'static), + ) -> Result<(), DatabaseError> { if !Self::verify_primary_checksums(mem)? { + // 0.3 because the repair takes 3 full scans and the first is done now + let mut handle = RepairSession::new(0.3); + repair_callback(&mut handle); + if handle.aborted() { + return Err(DatabaseError::RepairAborted); + } + mem.repair_primary_corrupted(); // We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may // have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since // that rolls back a partially committed transaction. mem.clear_read_cache(); if !Self::verify_primary_checksums(mem)? { - return Err(StorageError::Corrupted( + return Err(DatabaseError::Storage(StorageError::Corrupted( "Failed to repair database. All roots are corrupted".to_string(), - )); + ))); } } + // 0.6 because the repair takes 3 full scans and the second is done now + let mut handle = RepairSession::new(0.6); + repair_callback(&mut handle); + if handle.aborted() { + return Err(DatabaseError::RepairAborted); + } mem.begin_repair()?; @@ -607,6 +626,13 @@ impl Database { }; drop(freed_table); + // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left + let mut handle = RepairSession::new(0.9); + repair_callback(&mut handle); + if handle.aborted() { + return Err(DatabaseError::RepairAborted); + } + let system_root = mem.get_system_root(); if let Some((root, _)) = system_root { Self::mark_tables_recursive(root, mem, false)?; @@ -638,6 +664,7 @@ impl Database { region_size: Option, read_cache_size_bytes: usize, write_cache_size_bytes: usize, + repair_callback: &(dyn Fn(&mut RepairSession) + 'static), ) -> Result { #[cfg(feature = "logging")] let file_path = format!("{:?}", &file); @@ -653,7 +680,12 @@ impl Database { if mem.needs_repair()? { #[cfg(feature = "logging")] warn!("Database {:?} not shutdown cleanly. Repairing", &file_path); - Self::do_repair(&mut mem)?; + let mut handle = RepairSession::new(0.0); + repair_callback(&mut handle); + if handle.aborted() { + return Err(DatabaseError::RepairAborted); + } + Self::do_repair(&mut mem, repair_callback)?; } mem.begin_writable()?; @@ -745,12 +777,41 @@ impl Database { } } +pub struct RepairSession { + progress: f64, + aborted: bool, +} + +impl RepairSession { + pub(crate) fn new(progress: f64) -> Self { + Self { + progress, + aborted: false, + } + } + + pub(crate) fn aborted(&self) -> bool { + self.aborted + } + + /// Abort the repair process. The coorresponding call to [Builder::open] or [Builder::create] will return an error + pub fn abort(&mut self) { + self.aborted = true; + } + + /// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete. + pub fn progress(&self) -> f64 { + self.progress + } +} + /// Configuration builder of a redb [Database]. pub struct Builder { page_size: usize, region_size: Option, read_cache_size_bytes: usize, write_cache_size_bytes: usize, + repair_callback: Box, } impl Builder { @@ -771,12 +832,28 @@ impl Builder { read_cache_size_bytes: 0, // TODO: Default should probably take into account the total system memory write_cache_size_bytes: 0, + repair_callback: Box::new(|_| {}), }; result.set_cache_size(1024 * 1024 * 1024); result } + /// Set a callback which will be invoked periodically in the event that the database file needs + /// to be repaired. + /// + /// The [RepairSession] argument can be used to control the repair process. + /// + /// If the database file needs repair, the callback will be invoked at least once. + /// There is no upper limit on the number of times it may be called. + pub fn set_repair_callback( + &mut self, + callback: impl Fn(&mut RepairSession) + 'static, + ) -> &mut Self { + self.repair_callback = Box::new(callback); + self + } + /// Set the internal page size of the database /// /// Valid values are powers of two, greater than or equal to 512 @@ -823,6 +900,7 @@ impl Builder { self.region_size, self.read_cache_size_bytes, self.write_cache_size_bytes, + &self.repair_callback, ) } @@ -840,6 +918,7 @@ impl Builder { None, self.read_cache_size_bytes, self.write_cache_size_bytes, + &self.repair_callback, ) } @@ -853,6 +932,7 @@ impl Builder { self.region_size, self.read_cache_size_bytes, self.write_cache_size_bytes, + &self.repair_callback, ) } @@ -867,6 +947,7 @@ impl Builder { self.region_size, self.read_cache_size_bytes, self.write_cache_size_bytes, + &self.repair_callback, ) } } diff --git a/src/error.rs b/src/error.rs index 12490a05..c475f5a3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -187,6 +187,8 @@ impl std::error::Error for TableError {} pub enum DatabaseError { /// The Database is already open. Cannot acquire lock. DatabaseAlreadyOpen, + /// [crate::RepairSession::abort] was called. + RepairAborted, /// The database file is in an old file format and must be manually upgraded UpgradeRequired(u8), /// Error from underlying storage @@ -197,6 +199,7 @@ impl From for Error { fn from(err: DatabaseError) -> Error { match err { DatabaseError::DatabaseAlreadyOpen => Error::DatabaseAlreadyOpen, + DatabaseError::RepairAborted => Error::RepairAborted, DatabaseError::UpgradeRequired(x) => Error::UpgradeRequired(x), DatabaseError::Storage(storage) => storage.into(), } @@ -221,6 +224,9 @@ impl Display for DatabaseError { DatabaseError::UpgradeRequired(actual) => { write!(f, "Manual upgrade required. Expected file format version {FILE_FORMAT_VERSION}, but file is version {actual}") } + DatabaseError::RepairAborted => { + write!(f, "Database repair aborted.") + } DatabaseError::DatabaseAlreadyOpen => { write!(f, "Database already open. Cannot acquire lock.") } @@ -413,6 +419,8 @@ pub enum Error { /// Savepoints become invalid when an older savepoint is restored after it was created, /// and savepoints cannot be created if the transaction is "dirty" (any tables have been opened) InvalidSavepoint, + /// [crate::RepairSession::abort] was called. + RepairAborted, /// A persistent savepoint exists PersistentSavepointExists, /// An Ephemeral savepoint exists @@ -517,6 +525,9 @@ impl Display for Error { Error::DatabaseAlreadyOpen => { write!(f, "Database already open. Cannot acquire lock.") } + Error::RepairAborted => { + write!(f, "Database repair aborted.") + } Error::PersistentSavepointExists => { write!( f, diff --git a/src/lib.rs b/src/lib.rs index 0e58a2d6..57c6e58c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,7 +54,7 @@ //! [design]: https://github.com/cberner/redb/blob/master/docs/design.md pub use db::{ - Builder, Database, MultimapTableDefinition, MultimapTableHandle, StorageBackend, + Builder, Database, MultimapTableDefinition, MultimapTableHandle, RepairSession, StorageBackend, TableDefinition, TableHandle, UntypedMultimapTableHandle, UntypedTableHandle, }; pub use error::{ diff --git a/src/tree_store/page_store/header.rs b/src/tree_store/page_store/header.rs index 6e76f5b1..15d53654 100644 --- a/src/tree_store/page_store/header.rs +++ b/src/tree_store/page_store/header.rs @@ -414,7 +414,7 @@ mod test { use crate::tree_store::page_store::TransactionalMemory; #[cfg(not(target_os = "windows"))] use crate::StorageError; - use crate::{Database, ReadableTable}; + use crate::{Database, DatabaseError, ReadableTable}; use std::fs::OpenOptions; use std::io::{Read, Seek, SeekFrom, Write}; use std::mem::size_of; @@ -572,6 +572,43 @@ mod test { Database::open(tmpfile.path()).unwrap(); } + #[test] + fn abort_repair() { + let tmpfile = crate::create_tempfile(); + let db = Database::builder().create(tmpfile.path()).unwrap(); + drop(db); + + let mut file = OpenOptions::new() + .read(true) + .write(true) + .open(tmpfile.path()) + .unwrap(); + + file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap(); + let mut buffer = [0u8; 1]; + file.read_exact(&mut buffer).unwrap(); + file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap(); + buffer[0] |= RECOVERY_REQUIRED; + file.write_all(&buffer).unwrap(); + + assert!(TransactionalMemory::new( + Box::new(FileBackend::new(file).unwrap()), + PAGE_SIZE, + None, + 0, + 0 + ) + .unwrap() + .needs_repair() + .unwrap()); + + let err = Database::builder() + .set_repair_callback(|handle| handle.abort()) + .open(tmpfile.path()) + .unwrap_err(); + assert!(matches!(err, DatabaseError::RepairAborted)); + } + #[test] fn repair_insert_reserve_regression() { let tmpfile = crate::create_tempfile();