Skip to content

Commit

Permalink
Simplify repair code for persistent savepoints
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Aug 12, 2024
1 parent baa86e7 commit 784cece
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 28 deletions.
131 changes: 103 additions & 28 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,9 @@ impl Database {
Ok(compacted)
}

fn mark_persistent_savepoints(
fn check_repaired_persistent_savepoints(
system_root: Option<BtreeHeader>,
mem: Arc<TransactionalMemory>,
oldest_unprocessed_free_transaction: TransactionId,
) -> Result {
let freed_list = Arc::new(Mutex::new(vec![]));
let table_tree = TableTreeMut::new(
Expand Down Expand Up @@ -498,24 +497,27 @@ impl Database {
.value()
.to_savepoint(fake_transaction_tracker.clone());
if let Some(header) = savepoint.get_user_root() {
Self::mark_tables_recursive(header.root, mem.clone(), true)?;
Self::check_pages_allocated_recursive(header.root, mem.clone())?;
}
if let Some(header) = savepoint.get_freed_root() {
let freed_pages_iter = AllPageNumbersBtreeIter::new(
header.root,
FreedTableKey::fixed_width(),
FreedPageList::fixed_width(),
mem.clone(),
)?;
for page_number in freed_pages_iter {
let page = page_number?;
assert!(mem.is_allocated(page));
}
}
Self::mark_freed_tree(
savepoint.get_freed_root(),
mem.clone(),
oldest_unprocessed_free_transaction,
)?;
}
}

Ok(())
}

fn mark_freed_tree(
freed_root: Option<BtreeHeader>,
mem: Arc<TransactionalMemory>,
oldest_unprocessed_free_transaction: TransactionId,
) -> Result {
fn mark_freed_tree(freed_root: Option<BtreeHeader>, mem: Arc<TransactionalMemory>) -> Result {
if let Some(header) = freed_root {
let freed_pages_iter = AllPageNumbersBtreeIter::new(
header.root,
Expand All @@ -533,11 +535,7 @@ impl Database {
Arc::new(TransactionGuard::fake()),
mem.clone(),
)?;
let lookup_key = FreedTableKey {
transaction_id: oldest_unprocessed_free_transaction.raw_id(),
pagination_id: 0,
};
for result in freed_table.range::<FreedTableKey>(lookup_key..)? {
for result in freed_table.range::<FreedTableKey>(..)? {
let (_, freed_page_list) = result?;
let mut freed_page_list_as_vec = vec![];
for i in 0..freed_page_list.value().len() {
Expand All @@ -549,6 +547,86 @@ impl Database {
Ok(())
}

fn check_pages_allocated_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
// Repair the allocator state
// All pages in the master table
let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
for result in master_pages_iter {
let page = result?;
assert!(mem.is_allocated(page));
}

// Iterate over all other tables
let iter: BtreeRangeIter<&str, InternalTableDefinition> =
BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;

// Chain all the other tables to the master table iter
for entry in iter {
let definition = entry?.value();
if let Some(header) = definition.get_root() {
match definition.get_type() {
TableType::Normal => {
let table_pages_iter = AllPageNumbersBtreeIter::new(
header.root,
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
mem.clone(),
)?;
for result in table_pages_iter {
let page = result?;
assert!(mem.is_allocated(page));
}
}
TableType::Multimap => {
let table_pages_iter = AllPageNumbersBtreeIter::new(
header.root,
definition.get_fixed_key_size(),
DynamicCollection::<()>::fixed_width_with(
definition.get_fixed_value_size(),
),
mem.clone(),
)?;
for result in table_pages_iter {
let page = result?;
assert!(mem.is_allocated(page));
}

let table_pages_iter = AllPageNumbersBtreeIter::new(
header.root,
definition.get_fixed_key_size(),
DynamicCollection::<()>::fixed_width_with(
definition.get_fixed_value_size(),
),
mem.clone(),
)?;
for table_page in table_pages_iter {
let page = mem.get_page(table_page?)?;
let subtree_roots = parse_subtree_roots(
&page,
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
);
for subtree_header in subtree_roots {
let sub_root_iter = AllPageNumbersBtreeIter::new(
subtree_header.root,
definition.get_fixed_value_size(),
<()>::fixed_width(),
mem.clone(),
)?;
for result in sub_root_iter {
let page = result?;
assert!(mem.is_allocated(page));
}
}
}
}
}
}
}

Ok(())
}

fn mark_tables_recursive(
root: PageNumber,
mem: Arc<TransactionalMemory>,
Expand Down Expand Up @@ -660,22 +738,14 @@ impl Database {

let freed_root = mem.get_freed_root();
// Allow processing of all transactions, since this is the main freed tree
Self::mark_freed_tree(freed_root, mem.clone(), TransactionId::new(0))?;
Self::mark_freed_tree(freed_root, mem.clone())?;
let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
"internal freed table".to_string(),
freed_root,
PageHint::None,
Arc::new(TransactionGuard::fake()),
mem.clone(),
)?;
// The persistent savepoints might hold references to older freed trees that are partially processed.
// Make sure we don't reprocess those frees, as that would result in a double-free
let oldest_unprocessed_transaction =
if let Some(entry) = freed_table.range::<FreedTableKey>(..)?.next() {
TransactionId::new(entry?.0.value().transaction_id)
} else {
mem.get_last_committed_transaction_id()?
};
drop(freed_table);

// 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
Expand All @@ -689,7 +759,12 @@ impl Database {
if let Some(header) = system_root {
Self::mark_tables_recursive(header.root, mem.clone(), false)?;
}
Self::mark_persistent_savepoints(system_root, mem.clone(), oldest_unprocessed_transaction)?;
#[cfg(debug_assertions)]
{
// Savepoints only reference pages from a previous commit, so those are either referenced
// in the current root, or are in the freed tree
Self::check_repaired_persistent_savepoints(system_root, mem.clone())?;
}

mem.end_repair()?;

Expand Down
7 changes: 7 additions & 0 deletions src/tree_store/page_store/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,13 @@ impl TransactionalMemory {
result
}

pub(crate) fn is_allocated(&self, page: PageNumber) -> bool {
let state = self.state.lock().unwrap();
let allocator = state.get_region(page.region);

allocator.is_allocated(page.page_index, page.page_order)
}

// Relocates the region tracker to a lower page, if possible
// Returns true if the page was moved
pub(crate) fn relocate_region_tracker(&self) -> Result<bool> {
Expand Down

0 comments on commit 784cece

Please sign in to comment.