diff --git a/docs/design.md b/docs/design.md index 882beb6c..2e1b4b16 100644 --- a/docs/design.md +++ b/docs/design.md @@ -88,12 +88,16 @@ controls which transaction pointer is the primary. `magic number` must be set to the ASCII letters 'redb' followed by 0x1A, 0x0A, 0xA9, 0x0D, 0x0A. This sequence is inspired by the PNG magic number. -`god byte`, so named because this byte controls the state of the entire database, is a bitfield containing two flags: +`god byte`, so named because this byte controls the state of the entire database, is a bitfield containing three flags: * first bit: `primary_bit` flag which indicates whether transaction slot 0 or transaction slot 1 contains the latest commit. - redb relies on the fact that this is a single bit to perform atomic commits. -* second bit: `recovery_required` flag, if set then the recovery process must be run when opening the database. - During the recovery process, the region tracker and regional allocator states -- described below -- are reconstructed - by walking the btree from all active roots. +* second bit: `recovery_required` flag, if set then the recovery process must be run when opening the database. This can be + a full repair, in which the region tracker and regional allocator states -- described below -- are reconstructed by walking + the btree from all active roots, or a quick-repair, in which the state is simply loaded from the allocator state table. +* third bit: `two_phase_commit` flag, which indicates whether the transaction in the primary slot was written using 2-phase + commit. If so, the primary slot is guaranteed to be valid, and repair won't look at the secondary slot. This flag is always + updated atomically along with the primary bit. + +redb relies on the fact that this is a single byte to perform atomic commits. `page size` is the size of a redb page in bytes @@ -155,7 +159,9 @@ changed during an upgrade. ### Region tracker The region tracker is an array of `BtreeBitmap`s that tracks the page orders which are free in each region. -It is stored in a page in the data section of a region: +There are two different places it can be stored: on shutdown, it's written to a page in the data section of +a region, and when making a commit with quick-repair enabled, it's written to an entry in the allocator state +table. The former is valid only after a clean shutdown; the latter is usable even after a crash. ``` <-------------------------------------------- 8 bytes -------------------------------------------> ================================================================================================== @@ -216,6 +222,11 @@ range has been allocated * n bytes: free index data * n bytes: allocated data +Like the region tracker, there are two different places where the regional allocator state can be +stored. On shutdown, it's written to the region header as described above, and when making a commit +with quick-repair enabled, it's written to an entry in the allocator state table. The former is valid +only after a clean shutdown; the latter is usable even after a crash. + ``` <-------------------------------------------- 8 bytes -------------------------------------------> ================================================================================================== @@ -461,6 +472,12 @@ To repair the database after an unclean shutdown we must: 2) Update the allocator state, so that it is consistent with all the database roots in the above transaction +If the last commit before the crash had quick-repair enabled, then these are both trivial. The +primary commit slot is guaranteed to be valid, because it was written using 2-phase commit, and +the corresponding allocator state is stored in the allocator state table. + +Otherwise, we need to perform a full repair: + For (1), if the primary commit slot is invalid we switch to the secondary slot. For (2), we rebuild the allocator state by walking the following trees and marking all referenced diff --git a/fuzz/fuzz_targets/common.rs b/fuzz/fuzz_targets/common.rs index faec1d70..ac0d15cb 100644 --- a/fuzz/fuzz_targets/common.rs +++ b/fuzz/fuzz_targets/common.rs @@ -164,6 +164,7 @@ pub(crate) enum FuzzOperation { pub(crate) struct FuzzTransaction { pub ops: Vec, pub durable: bool, + pub quick_repair: bool, pub commit: bool, pub create_ephemeral_savepoint: bool, pub create_persistent_savepoint: bool, diff --git a/fuzz/fuzz_targets/fuzz_redb.rs b/fuzz/fuzz_targets/fuzz_redb.rs index 599784d0..34c56cab 100644 --- a/fuzz/fuzz_targets/fuzz_redb.rs +++ b/fuzz/fuzz_targets/fuzz_redb.rs @@ -583,6 +583,7 @@ fn exec_table_crash_support(config: &FuzzConfig, apply: fn(WriteTransa if !transaction.durable { txn.set_durability(Durability::None); } + txn.set_quick_repair(transaction.quick_repair); let mut counter_table = txn.open_table(COUNTER_TABLE).unwrap(); let uncommitted_id = txn_id as u64 + 1; counter_table.insert((), uncommitted_id)?; diff --git a/src/db.rs b/src/db.rs index e2ba23fa..c0558e5b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -19,7 +19,9 @@ use std::sync::{Arc, Mutex}; use crate::error::TransactionError; use crate::sealed::Sealed; -use crate::transactions::SAVEPOINT_TABLE; +use crate::transactions::{ + AllocatorStateKey, AllocatorStateTree, ALLOCATOR_STATE_TABLE_NAME, SAVEPOINT_TABLE, +}; use crate::tree_store::file_backend::FileBackend; #[cfg(feature = "logging")] use log::{debug, info, warn}; @@ -429,7 +431,9 @@ impl Database { return Err(CompactionError::TransactionInProgress); } // Commit to free up any pending free pages - // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter + // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter. + // Once https://github.com/cberner/redb/issues/829 is fixed, we should upgrade this to use quick-repair -- that way the user + // can cancel the compaction without requiring a full repair afterwards let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?; if txn.list_persistent_savepoints()?.next().is_some() { return Err(CompactionError::PersistentSavepointExists); @@ -609,6 +613,12 @@ impl Database { repair_callback: &(dyn Fn(&mut RepairSession) + 'static), ) -> Result<[Option; 3], DatabaseError> { if !Self::verify_primary_checksums(mem.clone())? { + if mem.used_two_phase_commit() { + return Err(DatabaseError::Storage(StorageError::Corrupted( + "Primary is corrupted despite 2-phase commit".to_string(), + ))); + } + // 0.3 because the repair takes 3 full scans and the first is done now let mut handle = RepairSession::new(0.3); repair_callback(&mut handle); @@ -701,23 +711,31 @@ impl Database { )?; let mut mem = Arc::new(mem); if mem.needs_repair()? { - #[cfg(feature = "logging")] - warn!("Database {:?} not shutdown cleanly. Repairing", &file_path); - let mut handle = RepairSession::new(0.0); - repair_callback(&mut handle); - if handle.aborted() { - return Err(DatabaseError::RepairAborted); + // If the last transaction used 2-phase commit and updated the allocator state table, then + // we can just load the allocator state from there. Otherwise, we need a full repair + if Self::try_quick_repair(mem.clone())? { + #[cfg(feature = "logging")] + info!("Quick-repair successful, full repair not needed"); + } else { + #[cfg(feature = "logging")] + warn!("Database {:?} not shutdown cleanly. Repairing", &file_path); + let mut handle = RepairSession::new(0.0); + repair_callback(&mut handle); + if handle.aborted() { + return Err(DatabaseError::RepairAborted); + } + let [data_root, system_root, freed_root] = + Self::do_repair(&mut mem, repair_callback)?; + let next_transaction_id = mem.get_last_committed_transaction_id()?.next(); + mem.commit( + data_root, + system_root, + freed_root, + next_transaction_id, + false, + true, + )?; } - let [data_root, system_root, freed_root] = Self::do_repair(&mut mem, repair_callback)?; - let next_transaction_id = mem.get_last_committed_transaction_id()?.next(); - mem.commit( - data_root, - system_root, - freed_root, - next_transaction_id, - false, - true, - )?; } mem.begin_writable()?; @@ -752,6 +770,42 @@ impl Database { Ok(db) } + // Returns true if quick-repair was successful, or false if a full repair is needed + fn try_quick_repair(mem: Arc) -> Result { + // Quick-repair is only possible if the primary was written using 2-phase commit + if !mem.used_two_phase_commit() { + return Ok(false); + } + + // See if the allocator state table is present in the system table tree + let fake_freed_pages = Arc::new(Mutex::new(vec![])); + let system_table_tree = TableTreeMut::new( + mem.get_system_root(), + Arc::new(TransactionGuard::fake()), + mem.clone(), + fake_freed_pages.clone(), + ); + let Some(allocator_state_table) = system_table_tree + .get_table::(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal) + .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))? + else { + return Ok(false); + }; + + // Load the allocator state from the table + let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else { + unreachable!(); + }; + let tree = AllocatorStateTree::new( + table_root, + Arc::new(TransactionGuard::fake()), + mem.clone(), + fake_freed_pages, + ); + + mem.try_load_allocator_state(&tree) + } + fn allocate_read_transaction(&self) -> Result { let id = self .transaction_tracker diff --git a/src/transactions.rs b/src/transactions.rs index 63c9e8e6..c2bbe47b 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -13,8 +13,8 @@ use crate::types::{Key, Value}; use crate::{ AccessGuard, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table, - TableDefinition, TableError, TableHandle, TransactionError, UntypedMultimapTableHandle, - UntypedTableHandle, + TableDefinition, TableError, TableHandle, TransactionError, TypeName, + UntypedMultimapTableHandle, UntypedTableHandle, }; #[cfg(feature = "logging")] use log::{debug, warn}; @@ -23,6 +23,7 @@ use std::cmp::min; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; use std::marker::PhantomData; +use std::mem::size_of; use std::ops::RangeBounds; #[cfg(any(test, fuzzing))] use std::ops::RangeFull; @@ -35,6 +36,70 @@ const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> = SystemTableDefinition::new("next_savepoint_id"); pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition = SystemTableDefinition::new("persistent_savepoints"); +// The allocator state table is stored in the system table tree, but it's accessed using +// raw btree operations rather than open_system_table(), so there's no SystemTableDefinition +pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state"; +pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>; + +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)] +pub(crate) enum AllocatorStateKey { + Region(u32), + RegionTracker, + TransactionId, +} + +impl Value for AllocatorStateKey { + type SelfType<'a> = Self; + type AsBytes<'a> = [u8; 1 + size_of::()]; + + fn fixed_width() -> Option { + Some(1 + size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + match data[0] { + 0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())), + 1 => Self::RegionTracker, + 2 => Self::TransactionId, + _ => unreachable!(), + } + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + let mut result = Self::AsBytes::default(); + match value { + Self::Region(region) => { + result[0] = 0; + result[1..].copy_from_slice(&u32::to_le_bytes(*region)); + } + Self::RegionTracker => { + result[0] = 1; + } + Self::TransactionId => { + result[0] = 2; + } + } + + result + } + + fn type_name() -> TypeName { + TypeName::internal("redb::AllocatorStateKey") + } +} + +impl Key for AllocatorStateKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + Self::from_bytes(data1).cmp(&Self::from_bytes(data2)) + } +} pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> { name: &'a str, @@ -423,6 +488,7 @@ pub struct WriteTransaction { dirty: AtomicBool, durability: InternalDurability, two_phase_commit: bool, + quick_repair: bool, // Persistent savepoints created during this transaction created_persistent_savepoints: Mutex>, deleted_persistent_savepoints: Mutex>, @@ -481,6 +547,7 @@ impl WriteTransaction { dirty: AtomicBool::new(false), durability: InternalDurability::Immediate, two_phase_commit: false, + quick_repair: false, created_persistent_savepoints: Mutex::new(Default::default()), deleted_persistent_savepoints: Mutex::new(vec![]), }) @@ -930,6 +997,20 @@ impl WriteTransaction { self.two_phase_commit = enabled; } + /// Enable or disable quick-repair (defaults to disabled) + /// + /// By default, when reopening the database after a crash, redb needs to do a full repair. + /// This involves walking the entire database to verify the checksums and reconstruct the + /// allocator state, so it can be very slow if the database is large. + /// + /// Alternatively, you can enable quick-repair. In this mode, redb saves the allocator state + /// as part of each commit (so it doesn't need to be reconstructed), and enables 2-phase commit + /// (which guarantees that the primary commit slot is valid without needing to look at the + /// checksums). This means commits are slower, but recovery after a crash is almost instant. + pub fn set_quick_repair(&mut self, enabled: bool) { + self.quick_repair = enabled; + } + /// Open the given table /// /// The table will be created if it does not exist @@ -1023,10 +1104,15 @@ impl WriteTransaction { } fn commit_inner(&mut self) -> Result<(), CommitError> { + // Quick-repair requires 2-phase commit + if self.quick_repair { + self.two_phase_commit = true; + } + #[cfg(feature = "logging")] debug!( - "Committing transaction id={:?} with durability={:?} two_phase={}", - self.transaction_id, self.durability, self.two_phase_commit + "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}", + self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair ); match self.durability { InternalDurability::None => self.non_durable_commit()?, @@ -1089,6 +1175,7 @@ impl WriteTransaction { .transaction_tracker .oldest_live_read_transaction() .map_or(self.transaction_id, |x| x.next()); + self.process_freed_pages(free_until_transaction)?; let user_root = self .tables @@ -1098,21 +1185,54 @@ impl WriteTransaction { .flush_table_root_updates()? .finalize_dirty_checksums()?; - let system_root = self - .system_tables - .lock() - .unwrap() - .table_tree - .flush_table_root_updates()? - .finalize_dirty_checksums()?; + let mut system_tables = self.system_tables.lock().unwrap(); + let system_tree = system_tables.table_tree.flush_table_root_updates()?; + system_tree + .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal) + .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?; + + if self.quick_repair { + system_tree.create_table_and_flush_table_root( + ALLOCATOR_STATE_TABLE_NAME, + |tree: &mut AllocatorStateTree| { + let mut pagination_counter = 0; + + loop { + let num_regions = self + .mem + .reserve_allocator_state(tree, self.transaction_id)?; + + // We can't free pages after the commit, because that would invalidate our + // saved allocator state. Everything needs to go through the transactional + // free mechanism + self.store_freed_pages(&mut pagination_counter, true)?; + + if self.mem.try_save_allocator_state(tree, num_regions)? { + return Ok(()); + } + + // Clear out the table before retrying, just in case the number of regions + // has somehow shrunk. Don't use retain_in() for this, since it doesn't + // free the pages immediately -- we need to reuse those pages to guarantee + // that our retry loop will eventually terminate + while let Some(guards) = tree.last()? { + let key = guards.0.value(); + drop(guards); + tree.remove(&key)?; + } + } + }, + )?; + } else { + // If a savepoint exists it might reference the freed-tree, since it holds a reference to the + // root of the freed-tree. Therefore, we must use the transactional free mechanism to free + // those pages. If there are no save points then these can be immediately freed, which is + // done at the end of this function. + let savepoint_exists = self.transaction_tracker.any_savepoint_exists(); + self.store_freed_pages(&mut 0, savepoint_exists)?; + } - self.process_freed_pages(free_until_transaction)?; - // If a savepoint exists it might reference the freed-tree, since it holds a reference to the - // root of the freed-tree. Therefore, we must use the transactional free mechanism to free - // those pages. If there are no save points then these can be immediately freed, which is - // done at the end of this function. - let savepoint_exists = self.transaction_tracker.any_savepoint_exists(); - self.store_freed_pages(savepoint_exists)?; + let system_root = system_tree.finalize_dirty_checksums()?; // Finalize freed table checksums, before doing the final commit let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; @@ -1158,7 +1278,7 @@ impl WriteTransaction { // Store all freed pages for a future commit(), since we can't free pages during a // non-durable commit (it's non-durable, so could be rolled back anytime in the future) - self.store_freed_pages(true)?; + self.store_freed_pages(&mut 0, true)?; // Finalize all checksums, before doing the final commit let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; @@ -1264,10 +1384,13 @@ impl WriteTransaction { Ok(()) } - fn store_freed_pages(&mut self, include_post_commit_free: bool) -> Result { + fn store_freed_pages( + &self, + pagination_counter: &mut u64, + include_post_commit_free: bool, + ) -> Result { assert_eq!(PageNumber::serialized_size(), 8); // We assume below that PageNumber is length 8 - let mut pagination_counter = 0u64; let mut freed_tree = self.freed_tree.lock().unwrap(); if include_post_commit_free { // Move all the post-commit pages that came from the freed-tree. These need to be stored @@ -1282,7 +1405,7 @@ impl WriteTransaction { let buffer_size = FreedPageList::required_bytes(chunk_size); let key = FreedTableKey { transaction_id: self.transaction_id.raw_id(), - pagination_id: pagination_counter, + pagination_id: *pagination_counter, }; let mut access_guard = freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?; @@ -1295,7 +1418,7 @@ impl WriteTransaction { } drop(access_guard); - pagination_counter += 1; + *pagination_counter += 1; if include_post_commit_free { // Move all the post-commit pages that came from the freed-tree. These need to be stored diff --git a/src/tree_store/btree.rs b/src/tree_store/btree.rs index ac3bc784..541e9e3d 100644 --- a/src/tree_store/btree.rs +++ b/src/tree_store/btree.rs @@ -434,6 +434,23 @@ impl BtreeMut<'_, K, V> { Ok(old_value) } + // Insert without allocating or freeing any pages. This requires that you've previously + // inserted the same key, with a value of at least the same serialized length, earlier + // in the same transaction. If those preconditions aren't satisfied, insert_inplace() + // will panic; it won't allocate under any circumstances + pub(crate) fn insert_inplace( + &mut self, + key: &K::SelfType<'_>, + value: &V::SelfType<'_>, + ) -> Result<()> { + let mut fake_freed_pages = vec![]; + let mut operation = + MutateHelper::::new(&mut self.root, self.mem.clone(), fake_freed_pages.as_mut()); + operation.insert_inplace(key, value)?; + assert!(fake_freed_pages.is_empty()); + Ok(()) + } + pub(crate) fn remove(&mut self, key: &K::SelfType<'_>) -> Result>> { #[cfg(feature = "logging")] trace!("Btree(root={:?}): Deleting {:?}", &self.root, key); diff --git a/src/tree_store/btree_mutator.rs b/src/tree_store/btree_mutator.rs index 7c50335c..3d1790cd 100644 --- a/src/tree_store/btree_mutator.rs +++ b/src/tree_store/btree_mutator.rs @@ -5,7 +5,7 @@ use crate::tree_store::btree_base::{ use crate::tree_store::btree_mutator::DeletionResult::{ DeletedBranch, DeletedLeaf, PartialBranch, PartialLeaf, Subtree, }; -use crate::tree_store::page_store::{Page, PageImpl}; +use crate::tree_store::page_store::{Page, PageImpl, PageMut}; use crate::tree_store::{ AccessGuardMut, BtreeHeader, PageNumber, RawLeafBuilder, TransactionalMemory, }; @@ -496,6 +496,49 @@ impl<'a, 'b, K: Key, V: Value> MutateHelper<'a, 'b, K, V> { }) } + pub(crate) fn insert_inplace( + &mut self, + key: &K::SelfType<'_>, + value: &V::SelfType<'_>, + ) -> Result<()> { + assert!(self.modify_uncommitted); + let header = self.root.expect("Key not found (tree is empty)"); + self.insert_inplace_helper( + self.mem.get_page_mut(header.root)?, + K::as_bytes(key).as_ref(), + V::as_bytes(value).as_ref(), + )?; + *self.root = Some(BtreeHeader::new(header.root, DEFERRED, header.length)); + Ok(()) + } + + fn insert_inplace_helper(&mut self, mut page: PageMut, key: &[u8], value: &[u8]) -> Result<()> { + assert!(self.mem.uncommitted(page.get_page_number())); + + let node_mem = page.memory(); + match node_mem[0] { + LEAF => { + let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width()); + let (position, found) = accessor.position::(key); + assert!(found); + let old_len = accessor.entry(position).unwrap().value().len(); + assert!(value.len() <= old_len); + let mut mutator = LeafMutator::new(&mut page, K::fixed_width(), V::fixed_width()); + mutator.insert(position, true, key, value); + } + BRANCH => { + let accessor = BranchAccessor::new(&page, K::fixed_width()); + let (child_index, child_page) = accessor.child_for_key::(key); + self.insert_inplace_helper(self.mem.get_page_mut(child_page)?, key, value)?; + let mut mutator = BranchMutator::new(&mut page); + mutator.write_child_page(child_index, child_page, DEFERRED); + } + _ => unreachable!(), + } + + Ok(()) + } + fn delete_leaf_helper( &mut self, page: PageImpl, diff --git a/src/tree_store/page_store/header.rs b/src/tree_store/page_store/header.rs index 81820fd2..351a2db2 100644 --- a/src/tree_store/page_store/header.rs +++ b/src/tree_store/page_store/header.rs @@ -5,7 +5,7 @@ use crate::tree_store::page_store::page_manager::{ xxh3_checksum, FILE_FORMAT_VERSION1, FILE_FORMAT_VERSION2, }; use crate::tree_store::{Checksum, PageNumber}; -use crate::{DatabaseError, StorageError}; +use crate::{DatabaseError, Result, StorageError}; use std::mem::size_of; // Database layout: @@ -58,6 +58,7 @@ pub(super) const DB_HEADER_SIZE: usize = TRANSACTION_1_OFFSET + TRANSACTION_SIZE // God byte flags const PRIMARY_BIT: u8 = 1; const RECOVERY_REQUIRED: u8 = 2; +const TWO_PHASE_COMMIT: u8 = 4; // Structure of each commit slot const VERSION_OFFSET: usize = 0; @@ -95,6 +96,7 @@ pub(super) struct HeaderRepairInfo { pub(super) struct DatabaseHeader { primary_slot: usize, pub(super) recovery_required: bool, + pub(super) two_phase_commit: bool, page_size: u32, region_header_pages: u32, region_max_data_pages: u32, @@ -120,6 +122,7 @@ impl DatabaseHeader { Self { primary_slot: 0, recovery_required: true, + two_phase_commit: false, page_size: layout.full_region_layout().page_size(), region_header_pages: layout.full_region_layout().get_header_pages(), region_max_data_pages: layout.full_region_layout().num_pages(), @@ -194,12 +197,57 @@ impl DatabaseHeader { self.primary_slot ^= 1; } + // Figure out which slot to use as the primary when starting a repair. The repair process might + // still switch to the other slot later, if the tree checksums turn out to be invalid. + // + // Returns true if we picked the original primary, or false if we swapped + pub(super) fn pick_primary_for_repair( + &mut self, + repair_info: HeaderRepairInfo, + ) -> Result { + // If the primary was written using 2-phase commit, it's guaranteed to be valid. Don't look + // at the secondary; even if it happens to have a valid checksum, Durability::Paranoid means + // we can't trust it + if self.two_phase_commit { + if repair_info.primary_corrupted { + return Err(StorageError::Corrupted( + "Primary is corrupted despite 2-phase commit".to_string(), + )); + } + return Ok(true); + } + + // Pick whichever slot is newer, assuming it has a valid checksum. This handles an edge case + // where we crash during fsync(), and the only data that got written to disk was the god byte + // update swapping the primary -- in that case, the primary contains a valid but out-of-date + // transaction, so we need to load from the secondary instead + if repair_info.primary_corrupted { + if repair_info.secondary_corrupted { + return Err(StorageError::Corrupted( + "Both commit slots are corrupted".to_string(), + )); + } + self.swap_primary_slot(); + return Ok(false); + } + + let secondary_newer = + self.secondary_slot().transaction_id > self.primary_slot().transaction_id; + if secondary_newer && !repair_info.secondary_corrupted { + self.swap_primary_slot(); + return Ok(false); + } + + Ok(true) + } + // TODO: consider returning an Err with the repair info pub(super) fn from_bytes(data: &[u8]) -> Result<(Self, HeaderRepairInfo), DatabaseError> { let invalid_magic_number = data[..MAGICNUMBER.len()] != MAGICNUMBER; let primary_slot = usize::from(data[GOD_BYTE_OFFSET] & PRIMARY_BIT != 0); let recovery_required = (data[GOD_BYTE_OFFSET] & RECOVERY_REQUIRED) != 0; + let two_phase_commit = (data[GOD_BYTE_OFFSET] & TWO_PHASE_COMMIT) != 0; let page_size = get_u32(&data[PAGE_SIZE_OFFSET..]); let region_header_pages = get_u32(&data[REGION_HEADER_PAGES_OFFSET..]); let region_max_data_pages = get_u32(&data[REGION_MAX_DATA_PAGES_OFFSET..]); @@ -226,6 +274,7 @@ impl DatabaseHeader { let result = Self { primary_slot, recovery_required, + two_phase_commit, page_size, region_header_pages, region_max_data_pages, @@ -251,6 +300,9 @@ impl DatabaseHeader { if self.recovery_required { result[GOD_BYTE_OFFSET] |= RECOVERY_REQUIRED; } + if self.two_phase_commit { + result[GOD_BYTE_OFFSET] |= TWO_PHASE_COMMIT; + } result[PAGE_SIZE_OFFSET..(PAGE_SIZE_OFFSET + size_of::())] .copy_from_slice(&self.page_size.to_le_bytes()); result[REGION_HEADER_PAGES_OFFSET..(REGION_HEADER_PAGES_OFFSET + size_of::())] diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 80fd6fab..51e0b0aa 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -1,4 +1,5 @@ use crate::transaction_tracker::TransactionId; +use crate::transactions::{AllocatorStateKey, AllocatorStateTree}; use crate::tree_store::btree_base::{BtreeHeader, Checksum}; use crate::tree_store::page_store::base::{PageHint, MAX_PAGE_INDEX}; use crate::tree_store::page_store::buddy_allocator::BuddyAllocator; @@ -11,7 +12,7 @@ use crate::tree_store::{Page, PageNumber}; use crate::StorageBackend; use crate::{DatabaseError, Result, StorageError}; #[cfg(feature = "logging")] -use log::warn; +use log::{debug, warn}; use std::cmp::{max, min}; #[cfg(debug_assertions)] use std::collections::HashMap; @@ -189,6 +190,7 @@ impl TransactionalMemory { ); header.recovery_required = false; + header.two_phase_commit = true; storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() @@ -221,18 +223,7 @@ impl TransactionalMemory { region_max_pages, page_size.try_into().unwrap(), )); - if repair_info.primary_corrupted { - header.swap_primary_slot(); - } else { - // If the secondary is a valid commit, verify that the primary is newer. This handles an edge case where: - // * the primary bit is flipped to the secondary - // * a crash occurs during fsync, such that no other data is written out to the secondary. meaning that it contains a valid, but out of date transaction - let secondary_newer = - header.secondary_slot().transaction_id > header.primary_slot().transaction_id; - if secondary_newer && !repair_info.secondary_corrupted { - header.swap_primary_slot(); - } - } + header.pick_primary_for_repair(repair_info)?; assert!(!repair_info.invalid_magic_number); storage .write(0, DB_HEADER_SIZE, true)? @@ -296,19 +287,8 @@ impl TransactionalMemory { // TODO: Also we should recheck the layout let mut was_clean = true; if header.recovery_required { - if repair_info.primary_corrupted { - header.swap_primary_slot(); + if !header.pick_primary_for_repair(repair_info)? { was_clean = false; - } else { - // If the secondary is a valid commit, verify that the primary is newer. This handles an edge case where: - // * the primary bit is flipped to the secondary - // * a crash occurs during fsync, such that no other data is written out to the secondary. meaning that it contains a valid, but out of date transaction - let secondary_newer = - header.secondary_slot().transaction_id > header.primary_slot().transaction_id; - if secondary_newer && !repair_info.secondary_corrupted { - header.swap_primary_slot(); - was_clean = false; - } } if repair_info.invalid_magic_number { return Err(StorageError::Corrupted("Invalid magic number".to_string()).into()); @@ -339,6 +319,10 @@ impl TransactionalMemory { Ok(self.state.lock().unwrap().header.recovery_required) } + pub(crate) fn used_two_phase_commit(&self) -> bool { + self.state.lock().unwrap().header.two_phase_commit + } + pub(crate) fn allocator_hash(&self) -> u128 { self.state.lock().unwrap().allocators.xxh3_hash() } @@ -412,6 +396,130 @@ impl TransactionalMemory { result } + pub(crate) fn reserve_allocator_state( + &self, + tree: &mut AllocatorStateTree, + transaction_id: TransactionId, + ) -> Result { + let state = self.state.lock().unwrap(); + let layout = state.header.layout(); + let num_regions = layout.num_regions(); + let region_header_len = layout.full_region_layout().get_header_pages() + * layout.full_region_layout().page_size(); + let region_tracker_len = state.allocators.region_tracker.to_vec().len(); + drop(state); + + for i in 0..num_regions { + tree.insert( + &AllocatorStateKey::Region(i), + &vec![0; region_header_len as usize].as_ref(), + )?; + } + + tree.insert( + &AllocatorStateKey::RegionTracker, + &vec![0; region_tracker_len].as_ref(), + )?; + + tree.insert( + &AllocatorStateKey::TransactionId, + &transaction_id.raw_id().to_le_bytes().as_ref(), + )?; + + Ok(num_regions) + } + + // Returns true on success, or false if the number of regions has changed + pub(crate) fn try_save_allocator_state( + &self, + tree: &mut AllocatorStateTree, + num_regions: u32, + ) -> Result { + // Has the number of regions changed since reserve_allocator_state() was called? + if num_regions != self.state.lock().unwrap().header.layout().num_regions() { + return Ok(false); + } + + for i in 0..num_regions { + let region_bytes = + &self.state.lock().unwrap().allocators.region_allocators[i as usize].to_vec(); + tree.insert_inplace(&AllocatorStateKey::Region(i), ®ion_bytes.as_ref())?; + } + + let region_tracker_bytes = self + .state + .lock() + .unwrap() + .allocators + .region_tracker + .to_vec(); + tree.insert_inplace( + &AllocatorStateKey::RegionTracker, + ®ion_tracker_bytes.as_ref(), + )?; + + Ok(true) + } + + // Returns true on success, or false if the allocator state was stale (in which case we need + // to fall back to a full repair) + pub(crate) fn try_load_allocator_state(&self, tree: &AllocatorStateTree) -> Result { + // See if this is stale allocator state left over from a previous transaction. That won't + // happen during normal operation, since WriteTransaction::commit() always updates the + // allocator state table before calling TransactionalMemory::commit(), but there are also + // a few places where TransactionalMemory::commit() is called directly without using a + // WriteTransaction. When that happens, any existing allocator state table will be left + // in place but is no longer valid. (And even if there were no such calls today, it would + // be an easy mistake to make! So it's good that we check.) + let transaction_id = TransactionId::new(u64::from_le_bytes( + tree.get(&AllocatorStateKey::TransactionId)? + .unwrap() + .value() + .try_into() + .unwrap(), + )); + if transaction_id != self.get_last_committed_transaction_id()? { + #[cfg(feature = "logging")] + debug!("Ignoring stale allocator state from {:?}", transaction_id); + return Ok(false); + } + + // Load the allocator state + let mut region_allocators = vec![]; + for region in + tree.range(&(AllocatorStateKey::Region(0)..=AllocatorStateKey::Region(u32::MAX)))? + { + region_allocators.push(BuddyAllocator::from_bytes(region?.value())); + } + + let region_tracker = RegionTracker::from_page( + tree.get(&AllocatorStateKey::RegionTracker)? + .unwrap() + .value(), + ); + + let mut state = self.state.lock().unwrap(); + state.allocators = Allocators { + region_tracker, + region_allocators, + }; + + // Resize the allocators to match the current file size + let layout = state.header.layout(); + state.allocators.resize_to(layout); + drop(state); + + // Allocate a larger region tracker page if necessary. This also happens automatically on + // shutdown, but we do it here because we want our allocator state to exactly match the + // result of running a full repair + self.ensure_region_tracker_page()?; + + self.state.lock().unwrap().header.recovery_required = false; + self.needs_recovery.store(false, Ordering::Release); + + Ok(true) + } + pub(crate) fn is_allocated(&self, page: PageNumber) -> bool { let state = self.state.lock().unwrap(); let allocator = state.get_region(page.region); @@ -419,6 +527,27 @@ impl TransactionalMemory { allocator.is_allocated(page.page_index, page.page_order) } + // Make sure we have a large enough region-tracker page. This uses allocate_non_transactional(), + // so it should only be called from outside a transaction + fn ensure_region_tracker_page(&self) -> Result { + let state = self.state.lock().unwrap(); + let tracker_len = state.allocators.region_tracker.to_vec().len(); + let mut tracker_page = state.header.region_tracker(); + drop(state); + + if tracker_page.page_size_bytes(self.page_size) < (tracker_len as u64) { + // Allocate a larger tracker page + self.free(tracker_page); + tracker_page = self + .allocate_non_transactional(tracker_len)? + .get_page_number(); + let mut state = self.state.lock().unwrap(); + state.header.set_region_tracker(tracker_page); + } + + Ok(()) + } + // Relocates the region tracker to a lower page, if possible // Returns true if the page was moved pub(crate) fn relocate_region_tracker(&self) -> Result { @@ -430,6 +559,9 @@ impl TransactionalMemory { let old_tracker_page = state.header.region_tracker(); // allocate acquires this lock, so we need to drop it drop(state); + // Allocate the new page. Unlike other region-tracker allocations, this happens inside + // a transaction, so we use an ordinary allocation (which gets committed or rolled back + // along with the rest of the transaction) rather than allocate_non_transactional() let new_page = self.allocate_lowest(region_tracker_size.try_into().unwrap())?; if new_page.get_page_number().is_before(old_tracker_page) { let mut state = self.state.lock().unwrap(); @@ -519,8 +651,10 @@ impl TransactionalMemory { self.storage.flush(eventual)?; } - // Make our new commit the primary + // Make our new commit the primary, and record whether it was a 2-phase commit. + // These two bits need to be written atomically header.swap_primary_slot(); + header.two_phase_commit = two_phase; // Write the new header to disk self.write_header(&header)?; @@ -797,7 +931,12 @@ impl TransactionalMemory { self.allocated_since_commit.lock().unwrap().contains(&page) } - pub(crate) fn allocate_helper(&self, allocation_size: usize, lowest: bool) -> Result { + pub(crate) fn allocate_helper( + &self, + allocation_size: usize, + lowest: bool, + transactional: bool, + ) -> Result { let required_pages = (allocation_size + self.get_page_size() - 1) / self.get_page_size(); let required_order = ceil_log2(required_pages); @@ -825,10 +964,12 @@ impl TransactionalMemory { assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number)); } - self.allocated_since_commit - .lock() - .unwrap() - .insert(page_number); + if transactional { + self.allocated_since_commit + .lock() + .unwrap() + .insert(page_number); + } let address_range = page_number.address_range( self.page_size.into(), @@ -960,11 +1101,17 @@ impl TransactionalMemory { } pub(crate) fn allocate(&self, allocation_size: usize) -> Result { - self.allocate_helper(allocation_size, false) + self.allocate_helper(allocation_size, false, true) } pub(crate) fn allocate_lowest(&self, allocation_size: usize) -> Result { - self.allocate_helper(allocation_size, true) + self.allocate_helper(allocation_size, true, true) + } + + // Allocate a page not associated with any transaction. The page is immediately considered committed, + // and won't be rolled back if an abort happens. This is only used for the region tracker + fn allocate_non_transactional(&self, allocation_size: usize) -> Result { + self.allocate_helper(allocation_size, false, false) } pub(crate) fn count_allocated_pages(&self) -> Result { @@ -1017,24 +1164,15 @@ impl Drop for TransactionalMemory { warn!("Failure while finalizing non-durable commit. Database may have rolled back"); } } - let mut state = self.state.lock().unwrap(); - let tracker_len = state.allocators.region_tracker.to_vec().len(); - let tracker_page = state.header.region_tracker(); - if tracker_page.page_size_bytes(self.page_size) < (tracker_len as u64) { - drop(state); - // Allocate a larger tracker page - self.free(tracker_page); - if let Ok(tracker_page) = self.allocate(tracker_len) { - state = self.state.lock().unwrap(); - state - .header - .set_region_tracker(tracker_page.get_page_number()); - } else { - #[cfg(feature = "logging")] - warn!("Failure while flushing allocator state. Repair required at restart."); - return; - } + + // Allocate a larger region-tracker page if necessary + if self.ensure_region_tracker_page().is_err() { + #[cfg(feature = "logging")] + warn!("Failure while flushing allocator state. Repair required at restart."); + return; } + + let mut state = self.state.lock().unwrap(); if state .allocators .flush_to( diff --git a/src/tree_store/table_tree.rs b/src/tree_store/table_tree.rs index fdef1ced..0d6d26a1 100644 --- a/src/tree_store/table_tree.rs +++ b/src/tree_store/table_tree.rs @@ -442,6 +442,45 @@ impl<'txn> TableTreeMut<'txn> { Ok(self) } + // Creates a new table, calls the provided closure to insert entries into it, and then + // flushes the table root. The flush is done using insert_inplace(), so it's guaranteed + // that no pages will be allocated or freed after the closure returns + pub(crate) fn create_table_and_flush_table_root( + &mut self, + name: &str, + f: impl FnOnce(&mut BtreeMut) -> Result, + ) -> Result { + assert!(self.pending_table_updates.is_empty()); + assert!(self.tree.get(&name)?.is_none()); + + // Reserve space in the table tree + self.tree.insert( + &name, + &InternalTableDefinition::new::(TableType::Normal, None, 0), + )?; + + // Create an empty table and call the provided closure on it + let mut tree: BtreeMut = BtreeMut::new( + None, + self.guard.clone(), + self.mem.clone(), + self.freed_pages.clone(), + ); + f(&mut tree)?; + + // Finalize the table's checksums + let table_root = tree.finalize_dirty_checksums()?; + let table_length = tree.get_root().map(|x| x.length).unwrap_or_default(); + + // Flush the root to the table tree, without allocating + self.tree.insert_inplace( + &name, + &InternalTableDefinition::new::(TableType::Normal, table_root, table_length), + )?; + + Ok(()) + } + pub(crate) fn finalize_dirty_checksums(&mut self) -> Result> { self.tree.finalize_dirty_checksums() } diff --git a/src/tree_store/table_tree_base.rs b/src/tree_store/table_tree_base.rs index b3f3dbd1..e2bae031 100644 --- a/src/tree_store/table_tree_base.rs +++ b/src/tree_store/table_tree_base.rs @@ -441,6 +441,10 @@ impl Value for InternalTableDefinition { } } + // Be careful if you change this serialization format! The InternalTableDefinition for + // a given table needs to have a consistent serialized length, regardless of the table + // contents, so that create_table_and_flush_table_root() can update the allocator state + // table without causing more allocations fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Vec where Self: 'b,