diff --git a/src/multimap_table.rs b/src/multimap_table.rs index 0cc2df0e..3985867b 100644 --- a/src/multimap_table.rs +++ b/src/multimap_table.rs @@ -3,10 +3,10 @@ use crate::multimap_table::DynamicCollectionType::{Inline, SubtreeV2}; use crate::sealed::Sealed; use crate::table::{ReadableTableMetadata, TableStats}; use crate::tree_store::{ - btree_stats, AllPageNumbersBtreeIter, BranchAccessor, Btree, BtreeHeader, BtreeMut, - BtreeRangeIter, BtreeStats, CachePriority, Checksum, LeafAccessor, LeafMutator, Page, PageHint, - PageNumber, RawBtree, RawLeafBuilder, TransactionalMemory, UntypedBtreeMut, BRANCH, LEAF, - MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, + btree_stats, AllPageNumbersBtreeIter, BranchAccessor, BranchMutator, Btree, BtreeHeader, + BtreeMut, BtreeRangeIter, BtreeStats, CachePriority, Checksum, LeafAccessor, LeafMutator, Page, + PageHint, PageNumber, RawBtree, RawLeafBuilder, TransactionalMemory, UntypedBtreeMut, BRANCH, + DEFERRED, LEAF, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, }; use crate::types::{Key, TypeName, Value}; use crate::{AccessGuard, MultimapTableHandle, Result, StorageError, WriteTransaction}; @@ -190,6 +190,96 @@ pub(crate) fn verify_tree_and_subtree_checksums( Ok(true) } +// Relocate all subtrees to lower index pages, if possible +pub(crate) fn relocate_subtrees( + root: (PageNumber, Checksum), + key_size: Option, + value_size: Option, + mem: Arc, + freed_pages: Arc>>, +) -> Result<(PageNumber, Checksum)> { + let old_page = mem.get_page(root.0)?; + let mut new_page = mem.allocate_lowest( + old_page.memory().len(), + CachePriority::default_btree(old_page.memory()), + )?; + + let new_page_number = new_page.get_page_number(); + new_page.memory_mut().copy_from_slice(old_page.memory()); + + let mut changed = false; + + match old_page.memory()[0] { + LEAF => { + let accessor = LeafAccessor::new( + old_page.memory(), + key_size, + UntypedDynamicCollection::fixed_width_with(value_size), + ); + // TODO: maybe there's a better abstraction, so that we don't need to call into this low-level method? + let mut mutator = LeafMutator::new( + &mut new_page, + key_size, + UntypedDynamicCollection::fixed_width_with(value_size), + ); + for i in 0..accessor.num_pairs() { + let entry = accessor.entry(i).unwrap(); + let collection = UntypedDynamicCollection::from_bytes(entry.value()); + if matches!(collection.collection_type(), SubtreeV2) { + let sub_root = collection.as_subtree(); + let mut tree = UntypedBtreeMut::new( + Some(sub_root), + mem.clone(), + freed_pages.clone(), + value_size, + <() as Value>::fixed_width(), + ); + if tree.relocate()? { + let new_collection = + UntypedDynamicCollection::make_subtree_data(tree.get_root().unwrap()); + mutator.insert(i, true, entry.key(), &new_collection); + changed = true; + } + } + } + } + BRANCH => { + let accessor = BranchAccessor::new(&old_page, key_size); + let mut mutator = BranchMutator::new(&mut new_page); + for i in 0..accessor.count_children() { + if let Some(child) = accessor.child_page(i) { + let child_checksum = accessor.child_checksum(i).unwrap(); + let (new_child, new_checksum) = relocate_subtrees( + (child, child_checksum), + key_size, + value_size, + mem.clone(), + freed_pages.clone(), + )?; + mutator.write_child_page(i, new_child, new_checksum); + if new_child != child { + changed = true; + } + } + } + } + _ => unreachable!(), + } + + if changed || new_page_number.is_before(old_page.get_page_number()) { + let old_page_number = old_page.get_page_number(); + drop(old_page); + if !mem.free_if_uncommitted(old_page_number) { + freed_pages.lock().unwrap().push(old_page_number); + } + Ok((new_page_number, DEFERRED)) + } else { + drop(new_page); + mem.free(new_page_number); + Ok(root) + } +} + // Finalize all the checksums in the tree, including any Dynamic collection subtrees // Returns the root checksum pub(crate) fn finalize_tree_and_subtree_checksums( @@ -552,10 +642,24 @@ pub(crate) struct UntypedDynamicCollection { } impl UntypedDynamicCollection { + pub(crate) fn fixed_width_with(_value_width: Option) -> Option { + None + } + fn new(data: &[u8]) -> &Self { unsafe { mem::transmute(data) } } + fn make_subtree_data(header: BtreeHeader) -> Vec { + let mut result = vec![SubtreeV2.into()]; + result.extend_from_slice(&header.to_le_bytes()); + result + } + + fn from_bytes(data: &[u8]) -> &Self { + Self::new(data) + } + fn collection_type(&self) -> DynamicCollectionType { DynamicCollectionType::from(self.data[0]) } diff --git a/src/tree_store/btree.rs b/src/tree_store/btree.rs index 23229956..f51c5150 100644 --- a/src/tree_store/btree.rs +++ b/src/tree_store/btree.rs @@ -187,6 +187,7 @@ impl UntypedBtreeMut { CachePriority::default_btree(old_page.memory()), )?; let new_page_number = new_page.get_page_number(); + // TODO: we should only bail out if we're at a leaf page. Branch pages still need to process their children if !new_page_number.is_before(page_number) { drop(new_page); self.mem.free(new_page_number); diff --git a/src/tree_store/btree_base.rs b/src/tree_store/btree_base.rs index 65a6564c..4aca80e4 100644 --- a/src/tree_store/btree_base.rs +++ b/src/tree_store/btree_base.rs @@ -1212,7 +1212,7 @@ impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> { self.num_keys() + 1 } - pub(super) fn child_checksum(&self, n: usize) -> Option { + pub(crate) fn child_checksum(&self, n: usize) -> Option { if n >= self.count_children() { return None; } @@ -1529,12 +1529,12 @@ impl<'b> Drop for RawBranchBuilder<'b> { } } -pub(super) struct BranchMutator<'b> { +pub(crate) struct BranchMutator<'b> { page: &'b mut PageMut, } impl<'b> BranchMutator<'b> { - pub(super) fn new(page: &'b mut PageMut) -> Self { + pub(crate) fn new(page: &'b mut PageMut) -> Self { assert_eq!(page.memory()[0], BRANCH); Self { page } } @@ -1543,7 +1543,7 @@ impl<'b> BranchMutator<'b> { u16::from_le_bytes(self.page.memory()[2..4].try_into().unwrap()) as usize } - pub(super) fn write_child_page( + pub(crate) fn write_child_page( &mut self, i: usize, page_number: PageNumber, diff --git a/src/tree_store/mod.rs b/src/tree_store/mod.rs index 84c58700..b79bc79d 100644 --- a/src/tree_store/mod.rs +++ b/src/tree_store/mod.rs @@ -9,7 +9,8 @@ mod table_tree_base; pub(crate) use btree::{btree_stats, Btree, BtreeMut, BtreeStats, RawBtree, UntypedBtreeMut}; pub use btree_base::{AccessGuard, AccessGuardMut}; pub(crate) use btree_base::{ - BranchAccessor, BtreeHeader, Checksum, LeafAccessor, LeafMutator, RawLeafBuilder, BRANCH, LEAF, + BranchAccessor, BranchMutator, BtreeHeader, Checksum, LeafAccessor, LeafMutator, + RawLeafBuilder, BRANCH, DEFERRED, LEAF, }; pub(crate) use btree_iters::{AllPageNumbersBtreeIter, BtreeExtractIf, BtreeRangeIter}; pub use page_store::{file_backend, InMemoryBackend, Savepoint}; diff --git a/src/tree_store/table_tree_base.rs b/src/tree_store/table_tree_base.rs index 4e6f1de8..9a0a8f1b 100644 --- a/src/tree_store/table_tree_base.rs +++ b/src/tree_store/table_tree_base.rs @@ -1,4 +1,4 @@ -use crate::multimap_table::{parse_subtree_roots, DynamicCollection}; +use crate::multimap_table::{parse_subtree_roots, relocate_subtrees, DynamicCollection}; use crate::tree_store::{ AllPageNumbersBtreeIter, BtreeHeader, PageNumber, TransactionalMemory, UntypedBtreeMut, }; @@ -262,15 +262,38 @@ impl InternalTableDefinition { mem: Arc, freed_pages: Arc>>, ) -> Result>> { - // TODO: this does not correctly handle multimap subtrees + let original_root = self.private_get_root(); + let relocated_root = match self { + InternalTableDefinition::Normal { table_root, .. } => *table_root, + InternalTableDefinition::Multimap { + table_root, + fixed_key_size, + fixed_value_size, + .. + } => { + if let Some(header) = table_root { + let (page_number, checksum) = relocate_subtrees( + (header.root, header.checksum), + *fixed_key_size, + *fixed_value_size, + mem.clone(), + freed_pages.clone(), + )?; + Some(BtreeHeader::new(page_number, checksum, header.length)) + } else { + None + } + } + }; let mut tree = UntypedBtreeMut::new( - self.private_get_root(), + relocated_root, mem, freed_pages, self.private_get_fixed_key_size(), self.private_get_fixed_value_size(), ); - if tree.relocate()? { + tree.relocate()?; + if tree.get_root() != original_root { self.set_header(tree.get_root(), self.get_length()); Ok(Some(tree.get_root())) } else {