diff --git a/src/db.rs b/src/db.rs index 105b6b0a..7ced2957 100644 --- a/src/db.rs +++ b/src/db.rs @@ -462,10 +462,9 @@ impl Database { Ok(compacted) } - fn mark_persistent_savepoints( + fn check_repaired_persistent_savepoints( system_root: Option, mem: Arc, - oldest_unprocessed_free_transaction: TransactionId, ) -> Result { let freed_list = Arc::new(Mutex::new(vec![])); let table_tree = TableTreeMut::new( @@ -498,24 +497,27 @@ impl Database { .value() .to_savepoint(fake_transaction_tracker.clone()); if let Some(header) = savepoint.get_user_root() { - Self::mark_tables_recursive(header.root, mem.clone(), true)?; + Self::check_pages_allocated_recursive(header.root, mem.clone())?; + } + if let Some(header) = savepoint.get_freed_root() { + let freed_pages_iter = AllPageNumbersBtreeIter::new( + header.root, + FreedTableKey::fixed_width(), + FreedPageList::fixed_width(), + mem.clone(), + )?; + for page_number in freed_pages_iter { + let page = page_number?; + assert!(mem.is_allocated(page)); + } } - Self::mark_freed_tree( - savepoint.get_freed_root(), - mem.clone(), - oldest_unprocessed_free_transaction, - )?; } } Ok(()) } - fn mark_freed_tree( - freed_root: Option, - mem: Arc, - oldest_unprocessed_free_transaction: TransactionId, - ) -> Result { + fn mark_freed_tree(freed_root: Option, mem: Arc) -> Result { if let Some(header) = freed_root { let freed_pages_iter = AllPageNumbersBtreeIter::new( header.root, @@ -533,11 +535,7 @@ impl Database { Arc::new(TransactionGuard::fake()), mem.clone(), )?; - let lookup_key = FreedTableKey { - transaction_id: oldest_unprocessed_free_transaction.raw_id(), - pagination_id: 0, - }; - for result in freed_table.range::(lookup_key..)? { + for result in freed_table.range::(..)? { let (_, freed_page_list) = result?; let mut freed_page_list_as_vec = vec![]; for i in 0..freed_page_list.value().len() { @@ -549,6 +547,86 @@ impl Database { Ok(()) } + fn check_pages_allocated_recursive(root: PageNumber, mem: Arc) -> Result { + // Repair the allocator state + // All pages in the master table + let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?; + for result in master_pages_iter { + let page = result?; + assert!(mem.is_allocated(page)); + } + + // Iterate over all other tables + let iter: BtreeRangeIter<&str, InternalTableDefinition> = + BtreeRangeIter::new::(&(..), Some(root), mem.clone())?; + + // Chain all the other tables to the master table iter + for entry in iter { + let definition = entry?.value(); + if let Some(header) = definition.get_root() { + match definition.get_type() { + TableType::Normal => { + let table_pages_iter = AllPageNumbersBtreeIter::new( + header.root, + definition.get_fixed_key_size(), + definition.get_fixed_value_size(), + mem.clone(), + )?; + for result in table_pages_iter { + let page = result?; + assert!(mem.is_allocated(page)); + } + } + TableType::Multimap => { + let table_pages_iter = AllPageNumbersBtreeIter::new( + header.root, + definition.get_fixed_key_size(), + DynamicCollection::<()>::fixed_width_with( + definition.get_fixed_value_size(), + ), + mem.clone(), + )?; + for result in table_pages_iter { + let page = result?; + assert!(mem.is_allocated(page)); + } + + let table_pages_iter = AllPageNumbersBtreeIter::new( + header.root, + definition.get_fixed_key_size(), + DynamicCollection::<()>::fixed_width_with( + definition.get_fixed_value_size(), + ), + mem.clone(), + )?; + for table_page in table_pages_iter { + let page = mem.get_page(table_page?)?; + let subtree_roots = parse_subtree_roots( + &page, + definition.get_fixed_key_size(), + definition.get_fixed_value_size(), + ); + for subtree_header in subtree_roots { + let sub_root_iter = AllPageNumbersBtreeIter::new( + subtree_header.root, + definition.get_fixed_value_size(), + <()>::fixed_width(), + mem.clone(), + )?; + for result in sub_root_iter { + let page = result?; + assert!(mem.is_allocated(page)); + } + } + } + } + } + } + } + + Ok(()) + } + fn mark_tables_recursive( root: PageNumber, mem: Arc, @@ -660,7 +738,7 @@ impl Database { let freed_root = mem.get_freed_root(); // Allow processing of all transactions, since this is the main freed tree - Self::mark_freed_tree(freed_root, mem.clone(), TransactionId::new(0))?; + Self::mark_freed_tree(freed_root, mem.clone())?; let freed_table: ReadOnlyTable> = ReadOnlyTable::new( "internal freed table".to_string(), freed_root, @@ -668,14 +746,6 @@ impl Database { Arc::new(TransactionGuard::fake()), mem.clone(), )?; - // The persistent savepoints might hold references to older freed trees that are partially processed. - // Make sure we don't reprocess those frees, as that would result in a double-free - let oldest_unprocessed_transaction = - if let Some(entry) = freed_table.range::(..)?.next() { - TransactionId::new(entry?.0.value().transaction_id) - } else { - mem.get_last_committed_transaction_id()? - }; drop(freed_table); // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left @@ -689,7 +759,12 @@ impl Database { if let Some(header) = system_root { Self::mark_tables_recursive(header.root, mem.clone(), false)?; } - Self::mark_persistent_savepoints(system_root, mem.clone(), oldest_unprocessed_transaction)?; + #[cfg(debug_assertions)] + { + // Savepoints only reference pages from a previous commit, so those are either referenced + // in the current root, or are in the freed tree + Self::check_repaired_persistent_savepoints(system_root, mem.clone())?; + } mem.end_repair()?; diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 55001aae..0b046a24 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -410,6 +410,13 @@ impl TransactionalMemory { result } + pub(crate) fn is_allocated(&self, page: PageNumber) -> bool { + let state = self.state.lock().unwrap(); + let allocator = state.get_region(page.region); + + allocator.is_allocated(page.page_index, page.page_order) + } + // Relocates the region tracker to a lower page, if possible // Returns true if the page was moved pub(crate) fn relocate_region_tracker(&self) -> Result {