Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify repair code for persistent savepoints #842

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 103 additions & 28 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,9 @@ impl Database {
Ok(compacted)
}

fn mark_persistent_savepoints(
fn check_repaired_persistent_savepoints(
system_root: Option<BtreeHeader>,
mem: Arc<TransactionalMemory>,
oldest_unprocessed_free_transaction: TransactionId,
) -> Result {
let freed_list = Arc::new(Mutex::new(vec![]));
let table_tree = TableTreeMut::new(
Expand Down Expand Up @@ -498,24 +497,27 @@ impl Database {
.value()
.to_savepoint(fake_transaction_tracker.clone());
if let Some(header) = savepoint.get_user_root() {
Self::mark_tables_recursive(header.root, mem.clone(), true)?;
Self::check_pages_allocated_recursive(header.root, mem.clone())?;
}
if let Some(header) = savepoint.get_freed_root() {
let freed_pages_iter = AllPageNumbersBtreeIter::new(
header.root,
FreedTableKey::fixed_width(),
FreedPageList::fixed_width(),
mem.clone(),
)?;
for page_number in freed_pages_iter {
let page = page_number?;
assert!(mem.is_allocated(page));
}
}
Self::mark_freed_tree(
savepoint.get_freed_root(),
mem.clone(),
oldest_unprocessed_free_transaction,
)?;
}
}

Ok(())
}

fn mark_freed_tree(
freed_root: Option<BtreeHeader>,
mem: Arc<TransactionalMemory>,
oldest_unprocessed_free_transaction: TransactionId,
) -> Result {
fn mark_freed_tree(freed_root: Option<BtreeHeader>, mem: Arc<TransactionalMemory>) -> Result {
if let Some(header) = freed_root {
let freed_pages_iter = AllPageNumbersBtreeIter::new(
header.root,
Expand All @@ -533,11 +535,7 @@ impl Database {
Arc::new(TransactionGuard::fake()),
mem.clone(),
)?;
let lookup_key = FreedTableKey {
transaction_id: oldest_unprocessed_free_transaction.raw_id(),
pagination_id: 0,
};
for result in freed_table.range::<FreedTableKey>(lookup_key..)? {
for result in freed_table.range::<FreedTableKey>(..)? {
let (_, freed_page_list) = result?;
let mut freed_page_list_as_vec = vec![];
for i in 0..freed_page_list.value().len() {
Expand All @@ -549,6 +547,86 @@ impl Database {
Ok(())
}

fn check_pages_allocated_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
// Repair the allocator state
// All pages in the master table
let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
for result in master_pages_iter {
let page = result?;
assert!(mem.is_allocated(page));
}

// Iterate over all other tables
let iter: BtreeRangeIter<&str, InternalTableDefinition> =
BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;

// Chain all the other tables to the master table iter
for entry in iter {
let definition = entry?.value();
if let Some(header) = definition.get_root() {
match definition.get_type() {
TableType::Normal => {
let table_pages_iter = AllPageNumbersBtreeIter::new(
header.root,
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
mem.clone(),
)?;
for result in table_pages_iter {
let page = result?;
assert!(mem.is_allocated(page));
}
}
TableType::Multimap => {
let table_pages_iter = AllPageNumbersBtreeIter::new(
header.root,
definition.get_fixed_key_size(),
DynamicCollection::<()>::fixed_width_with(
definition.get_fixed_value_size(),
),
mem.clone(),
)?;
for result in table_pages_iter {
let page = result?;
assert!(mem.is_allocated(page));
}

let table_pages_iter = AllPageNumbersBtreeIter::new(
header.root,
definition.get_fixed_key_size(),
DynamicCollection::<()>::fixed_width_with(
definition.get_fixed_value_size(),
),
mem.clone(),
)?;
for table_page in table_pages_iter {
let page = mem.get_page(table_page?)?;
let subtree_roots = parse_subtree_roots(
&page,
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
);
for subtree_header in subtree_roots {
let sub_root_iter = AllPageNumbersBtreeIter::new(
subtree_header.root,
definition.get_fixed_value_size(),
<()>::fixed_width(),
mem.clone(),
)?;
for result in sub_root_iter {
let page = result?;
assert!(mem.is_allocated(page));
}
}
}
}
}
}
}

Ok(())
}

fn mark_tables_recursive(
root: PageNumber,
mem: Arc<TransactionalMemory>,
Expand Down Expand Up @@ -660,22 +738,14 @@ impl Database {

let freed_root = mem.get_freed_root();
// Allow processing of all transactions, since this is the main freed tree
Self::mark_freed_tree(freed_root, mem.clone(), TransactionId::new(0))?;
Self::mark_freed_tree(freed_root, mem.clone())?;
let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
"internal freed table".to_string(),
freed_root,
PageHint::None,
Arc::new(TransactionGuard::fake()),
mem.clone(),
)?;
// The persistent savepoints might hold references to older freed trees that are partially processed.
// Make sure we don't reprocess those frees, as that would result in a double-free
let oldest_unprocessed_transaction =
if let Some(entry) = freed_table.range::<FreedTableKey>(..)?.next() {
TransactionId::new(entry?.0.value().transaction_id)
} else {
mem.get_last_committed_transaction_id()?
};
drop(freed_table);

// 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
Expand All @@ -689,7 +759,12 @@ impl Database {
if let Some(header) = system_root {
Self::mark_tables_recursive(header.root, mem.clone(), false)?;
}
Self::mark_persistent_savepoints(system_root, mem.clone(), oldest_unprocessed_transaction)?;
#[cfg(debug_assertions)]
{
// Savepoints only reference pages from a previous commit, so those are either referenced
// in the current root, or are in the freed tree
Self::check_repaired_persistent_savepoints(system_root, mem.clone())?;
}

mem.end_repair()?;

Expand Down
7 changes: 7 additions & 0 deletions src/tree_store/page_store/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,13 @@ impl TransactionalMemory {
result
}

pub(crate) fn is_allocated(&self, page: PageNumber) -> bool {
let state = self.state.lock().unwrap();
let allocator = state.get_region(page.region);

allocator.is_allocated(page.page_index, page.page_order)
}

// Relocates the region tracker to a lower page, if possible
// Returns true if the page was moved
pub(crate) fn relocate_region_tracker(&self) -> Result<bool> {
Expand Down
Loading