From aee076f6110af618c5bf9a96e5dd97f7ec3cf1a6 Mon Sep 17 00:00:00 2001 From: Christopher Berner Date: Sat, 20 Jan 2024 08:08:44 -0800 Subject: [PATCH 1/3] Split TableTree into mut and non-mut versions --- src/db.rs | 10 +- src/transactions.rs | 32 ++++-- src/tree_store/mod.rs | 2 +- src/tree_store/table_tree.rs | 217 +++++++++++++++++++++++------------ 4 files changed, 171 insertions(+), 90 deletions(-) diff --git a/src/db.rs b/src/db.rs index d6394116..10ca74c4 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,7 +1,7 @@ use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker}; use crate::tree_store::{ AllPageNumbersBtreeIter, BtreeRangeIter, Checksum, FreedPageList, FreedTableKey, - InternalTableDefinition, PageHint, PageNumber, RawBtree, SerializedSavepoint, TableTree, + InternalTableDefinition, PageHint, PageNumber, RawBtree, SerializedSavepoint, TableTreeMut, TableType, TransactionalMemory, PAGE_SIZE, }; use crate::types::{RedbKey, RedbValue}; @@ -337,7 +337,7 @@ impl Database { pub(crate) fn verify_primary_checksums(mem: Arc) -> Result { let fake_freed_pages = Arc::new(Mutex::new(vec![])); - let table_tree = TableTree::new( + let table_tree = TableTreeMut::new( mem.get_data_root(), Arc::new(TransactionGuard::fake()), mem.clone(), @@ -346,7 +346,7 @@ impl Database { if !table_tree.verify_checksums()? { return Ok(false); } - let system_table_tree = TableTree::new( + let system_table_tree = TableTreeMut::new( mem.get_system_root(), Arc::new(TransactionGuard::fake()), mem.clone(), @@ -453,7 +453,7 @@ impl Database { oldest_unprocessed_free_transaction: TransactionId, ) -> Result { let freed_list = Arc::new(Mutex::new(vec![])); - let table_tree = TableTree::new( + let table_tree = TableTreeMut::new( system_root, Arc::new(TransactionGuard::fake()), mem.clone(), @@ -803,7 +803,7 @@ impl Database { let guard = self.allocate_read_transaction()?; #[cfg(feature = "logging")] info!("Beginning read transaction id={:?}", guard.id()); - Ok(ReadTransaction::new(self.get_memory(), guard)) + ReadTransaction::new(self.get_memory(), guard) } } diff --git a/src/transactions.rs b/src/transactions.rs index 2782eee2..373b855e 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -6,13 +6,15 @@ use crate::table::ReadOnlyUntypedTable; use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker}; use crate::tree_store::{ Btree, BtreeMut, Checksum, FreedPageList, FreedTableKey, InternalTableDefinition, PageHint, - PageNumber, SerializedSavepoint, TableTree, TableType, TransactionalMemory, MAX_VALUE_LENGTH, + PageNumber, SerializedSavepoint, TableTree, TableTreeMut, TableType, TransactionalMemory, + MAX_VALUE_LENGTH, }; use crate::types::{RedbKey, RedbValue}; use crate::{ AccessGuard, Database, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table, - TableDefinition, TableError, TableHandle, UntypedMultimapTableHandle, UntypedTableHandle, + TableDefinition, TableError, TableHandle, TransactionError, UntypedMultimapTableHandle, + UntypedTableHandle, }; #[cfg(feature = "logging")] use log::{info, warn}; @@ -265,7 +267,7 @@ impl<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> Drop for SystemTable } struct SystemNamespace<'db> { - table_tree: TableTree<'db>, + table_tree: TableTreeMut<'db>, transaction_guard: Arc, } @@ -307,7 +309,7 @@ impl<'db> SystemNamespace<'db> { struct TableNamespace<'db> { open_tables: HashMap>, - table_tree: TableTree<'db>, + table_tree: TableTreeMut<'db>, } impl<'db> TableNamespace<'db> { @@ -457,7 +459,7 @@ impl<'db> WriteTransaction<'db> { let tables = TableNamespace { open_tables: Default::default(), - table_tree: TableTree::new( + table_tree: TableTreeMut::new( root_page, guard.clone(), db.get_memory(), @@ -465,7 +467,7 @@ impl<'db> WriteTransaction<'db> { ), }; let system_tables = SystemNamespace { - table_tree: TableTree::new( + table_tree: TableTreeMut::new( system_page, guard.clone(), db.get_memory(), @@ -691,7 +693,7 @@ impl<'db> WriteTransaction<'db> { } } *self.freed_pages.lock().unwrap() = freed_pages; - self.tables.lock().unwrap().table_tree = TableTree::new( + self.tables.lock().unwrap().table_tree = TableTreeMut::new( savepoint.get_user_root(), self.transaction_guard.clone(), self.mem.clone(), @@ -1165,19 +1167,25 @@ impl<'a> Drop for WriteTransaction<'a> { /// Read-only transactions may exist concurrently with writes pub struct ReadTransaction<'a> { mem: Arc, - tree: TableTree<'a>, + tree: TableTree, transaction_guard: Arc, + _lifetime: PhantomData<&'a ()>, } impl<'db> ReadTransaction<'db> { - pub(crate) fn new(mem: Arc, guard: TransactionGuard) -> Self { + pub(crate) fn new( + mem: Arc, + guard: TransactionGuard, + ) -> Result { let root_page = mem.get_data_root(); let guard = Arc::new(guard); - Self { + Ok(Self { mem: mem.clone(), - tree: TableTree::new(root_page, guard.clone(), mem, Default::default()), + tree: TableTree::new(root_page, PageHint::Clean, guard.clone(), mem) + .map_err(TransactionError::Storage)?, transaction_guard: guard, - } + _lifetime: Default::default(), + }) } /// Open the given table diff --git a/src/tree_store/mod.rs b/src/tree_store/mod.rs index bb453f98..a3687b57 100644 --- a/src/tree_store/mod.rs +++ b/src/tree_store/mod.rs @@ -18,5 +18,5 @@ pub(crate) use page_store::{ FILE_FORMAT_VERSION, MAX_VALUE_LENGTH, PAGE_SIZE, }; pub(crate) use table_tree::{ - FreedPageList, FreedTableKey, InternalTableDefinition, TableTree, TableType, + FreedPageList, FreedTableKey, InternalTableDefinition, TableTree, TableTreeMut, TableType, }; diff --git a/src/tree_store/table_tree.rs b/src/tree_store/table_tree.rs index 2fd55fac..c5de5eb6 100644 --- a/src/tree_store/table_tree.rs +++ b/src/tree_store/table_tree.rs @@ -6,7 +6,9 @@ use crate::multimap_table::{ use crate::tree_store::btree::{btree_stats, UntypedBtreeMut}; use crate::tree_store::btree_base::Checksum; use crate::tree_store::btree_iters::AllPageNumbersBtreeIter; -use crate::tree_store::{BtreeMut, BtreeRangeIter, PageNumber, RawBtree, TransactionalMemory}; +use crate::tree_store::{ + Btree, BtreeMut, BtreeRangeIter, PageHint, PageNumber, RawBtree, TransactionalMemory, +}; use crate::types::{MutInPlaceValue, RedbKey, RedbValue, TypeName}; use crate::{DatabaseStats, Result}; use std::cmp::max; @@ -407,15 +409,120 @@ impl Iterator for TableNameIter { } } -pub(crate) struct TableTree<'txn> { +pub(crate) struct TableTree { + tree: Btree<&'static str, InternalTableDefinition>, +} + +impl TableTree { + pub(crate) fn new( + master_root: Option<(PageNumber, Checksum)>, + page_hint: PageHint, + guard: Arc, + mem: Arc, + ) -> Result { + Ok(Self { + tree: Btree::new(master_root, page_hint, guard, mem)?, + }) + } + + // root_page: the root of the master table + pub(crate) fn list_tables(&self, table_type: TableType) -> Result> { + let iter = self.tree.range::(&(..))?; + let iter = TableNameIter { + inner: iter, + table_type, + }; + let mut result = vec![]; + for table in iter { + result.push(table?); + } + Ok(result) + } + + pub(crate) fn get_table_untyped( + &self, + name: &str, + table_type: TableType, + ) -> Result, TableError> { + if let Some(guard) = self.tree.get(&name)? { + let definition = guard.value(); + if definition.get_type() != table_type { + return if definition.get_type() == TableType::Multimap { + Err(TableError::TableIsMultimap(name.to_string())) + } else { + Err(TableError::TableIsNotMultimap(name.to_string())) + }; + } + if definition.get_key_alignment() != ALIGNMENT { + return Err(TableError::TypeDefinitionChanged { + name: definition.key_type.clone(), + alignment: definition.get_key_alignment(), + width: definition.get_fixed_key_size(), + }); + } + if definition.get_value_alignment() != ALIGNMENT { + return Err(TableError::TypeDefinitionChanged { + name: definition.value_type.clone(), + alignment: definition.get_value_alignment(), + width: definition.get_fixed_value_size(), + }); + } + + Ok(Some(definition)) + } else { + Ok(None) + } + } + + // root_page: the root of the master table + pub(crate) fn get_table( + &self, + name: &str, + table_type: TableType, + ) -> Result, TableError> { + Ok( + if let Some(definition) = self.get_table_untyped(name, table_type)? { + // Do additional checks on the types to be sure they match + if definition.key_type != K::type_name() || definition.value_type != V::type_name() + { + return Err(TableError::TableTypeMismatch { + table: name.to_string(), + key: definition.key_type, + value: definition.value_type, + }); + } + if definition.get_fixed_key_size() != K::fixed_width() { + return Err(TableError::TypeDefinitionChanged { + name: K::type_name(), + alignment: definition.get_key_alignment(), + width: definition.get_fixed_key_size(), + }); + } + if definition.get_fixed_value_size() != V::fixed_width() { + return Err(TableError::TypeDefinitionChanged { + name: V::type_name(), + alignment: definition.get_value_alignment(), + width: definition.get_fixed_value_size(), + }); + } + Some(definition) + } else { + None + }, + ) + } +} + +pub(crate) struct TableTreeMut<'txn> { tree: BtreeMut<'txn, &'static str, InternalTableDefinition>, + guard: Arc, mem: Arc, // Cached updates from tables that have been closed. These must be flushed to the btree pending_table_updates: HashMap>, freed_pages: Arc>>, } -impl<'txn> TableTree<'txn> { +impl<'txn> TableTreeMut<'txn> { pub(crate) fn new( master_root: Option<(PageNumber, Checksum)>, guard: Arc, @@ -423,7 +530,8 @@ impl<'txn> TableTree<'txn> { freed_pages: Arc>>, ) -> Self { Self { - tree: BtreeMut::new(master_root, guard, mem.clone(), freed_pages.clone()), + tree: BtreeMut::new(master_root, guard.clone(), mem.clone(), freed_pages.clone()), + guard, mem, pending_table_updates: Default::default(), freed_pages, @@ -557,16 +665,13 @@ impl<'txn> TableTree<'txn> { // root_page: the root of the master table pub(crate) fn list_tables(&self, table_type: TableType) -> Result> { - let iter = self.tree.range::(&(..))?; - let iter = TableNameIter { - inner: iter, - table_type, - }; - let mut result = vec![]; - for table in iter { - result.push(table?); - } - Ok(result) + let tree = TableTree::new( + self.tree.get_root(), + PageHint::None, + self.guard.clone(), + self.mem.clone(), + )?; + tree.list_tables(table_type) } pub(crate) fn get_table_untyped( @@ -574,38 +679,21 @@ impl<'txn> TableTree<'txn> { name: &str, table_type: TableType, ) -> Result, TableError> { - if let Some(guard) = self.tree.get(&name)? { - let mut definition = guard.value(); - if definition.get_type() != table_type { - return if definition.get_type() == TableType::Multimap { - Err(TableError::TableIsMultimap(name.to_string())) - } else { - Err(TableError::TableIsNotMultimap(name.to_string())) - }; - } - if definition.get_key_alignment() != ALIGNMENT { - return Err(TableError::TypeDefinitionChanged { - name: definition.key_type.clone(), - alignment: definition.get_key_alignment(), - width: definition.get_fixed_key_size(), - }); - } - if definition.get_value_alignment() != ALIGNMENT { - return Err(TableError::TypeDefinitionChanged { - name: definition.value_type.clone(), - alignment: definition.get_value_alignment(), - width: definition.get_fixed_value_size(), - }); - } - + let tree = TableTree::new( + self.tree.get_root(), + PageHint::None, + self.guard.clone(), + self.mem.clone(), + )?; + let mut result = tree.get_table_untyped(name, table_type); + + if let Ok(Some(definition)) = result.as_mut() { if let Some(updated_root) = self.pending_table_updates.get(name) { definition.table_root = *updated_root; } - - Ok(Some(definition)) - } else { - Ok(None) } + + result } // root_page: the root of the master table @@ -614,36 +702,21 @@ impl<'txn> TableTree<'txn> { name: &str, table_type: TableType, ) -> Result, TableError> { - Ok( - if let Some(definition) = self.get_table_untyped(name, table_type)? { - // Do additional checks on the types to be sure they match - if definition.key_type != K::type_name() || definition.value_type != V::type_name() - { - return Err(TableError::TableTypeMismatch { - table: name.to_string(), - key: definition.key_type, - value: definition.value_type, - }); - } - if definition.get_fixed_key_size() != K::fixed_width() { - return Err(TableError::TypeDefinitionChanged { - name: K::type_name(), - alignment: definition.get_key_alignment(), - width: definition.get_fixed_key_size(), - }); - } - if definition.get_fixed_value_size() != V::fixed_width() { - return Err(TableError::TypeDefinitionChanged { - name: V::type_name(), - alignment: definition.get_value_alignment(), - width: definition.get_fixed_value_size(), - }); - } - Some(definition) - } else { - None - }, - ) + let tree = TableTree::new( + self.tree.get_root(), + PageHint::None, + self.guard.clone(), + self.mem.clone(), + )?; + let mut result = tree.get_table::(name, table_type); + + if let Ok(Some(definition)) = result.as_mut() { + if let Some(updated_root) = self.pending_table_updates.get(name) { + definition.table_root = *updated_root; + } + } + + result } // root_page: the root of the master table From c87fbb4ae2ccc208e1b51e0729881aa2ebf97ab6 Mon Sep 17 00:00:00 2001 From: Christopher Berner Date: Sat, 20 Jan 2024 08:16:43 -0800 Subject: [PATCH 2/3] Use static lifetime for read-only tables and transactions In the future, these lifetimes will be removed --- src/db.rs | 2 +- src/transactions.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/db.rs b/src/db.rs index 10ca74c4..e6606a3e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -799,7 +799,7 @@ impl Database { /// /// Returns a [`ReadTransaction`] which may be used to read from the database. Read transactions /// may exist concurrently with writes - pub fn begin_read(&self) -> Result { + pub fn begin_read(&self) -> Result, TransactionError> { let guard = self.allocate_read_transaction()?; #[cfg(feature = "logging")] info!("Beginning read transaction id={:?}", guard.id()); diff --git a/src/transactions.rs b/src/transactions.rs index 373b855e..568f2e0f 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -1172,7 +1172,7 @@ pub struct ReadTransaction<'a> { _lifetime: PhantomData<&'a ()>, } -impl<'db> ReadTransaction<'db> { +impl<'a> ReadTransaction<'a> { pub(crate) fn new( mem: Arc, guard: TransactionGuard, @@ -1192,7 +1192,7 @@ impl<'db> ReadTransaction<'db> { pub fn open_table( &self, definition: TableDefinition, - ) -> Result, TableError> { + ) -> Result, TableError> { let header = self .tree .get_table::(definition.name(), TableType::Normal)? @@ -1211,7 +1211,7 @@ impl<'db> ReadTransaction<'db> { pub fn open_untyped_table( &self, handle: impl TableHandle, - ) -> Result { + ) -> Result, TableError> { let header = self .tree .get_table_untyped(handle.name(), TableType::Normal)? @@ -1229,7 +1229,7 @@ impl<'db> ReadTransaction<'db> { pub fn open_multimap_table( &self, definition: MultimapTableDefinition, - ) -> Result, TableError> { + ) -> Result, TableError> { let header = self .tree .get_table::(definition.name(), TableType::Multimap)? @@ -1247,7 +1247,7 @@ impl<'db> ReadTransaction<'db> { pub fn open_untyped_multimap_table( &self, handle: impl MultimapTableHandle, - ) -> Result { + ) -> Result, TableError> { let header = self .tree .get_table_untyped(handle.name(), TableType::Multimap)? From 6601fa8d1c8736e3bb0cc1bc1819dc2c4ff9846a Mon Sep 17 00:00:00 2001 From: Christopher Berner Date: Sat, 20 Jan 2024 08:37:33 -0800 Subject: [PATCH 3/3] Add close() method to ReadTransaction Calling this method is entirely optional. Transactions still close automatically, but this can be used to ensure that it can be closed immediately --- src/error.rs | 16 +++++++++++++++- src/transactions.rs | 24 +++++++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/src/error.rs b/src/error.rs index c475f5a3..c9735790 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,5 @@ use crate::tree_store::{FILE_FORMAT_VERSION, MAX_VALUE_LENGTH}; -use crate::TypeName; +use crate::{ReadTransaction, TypeName}; use std::fmt::{Display, Formatter}; use std::sync::PoisonError; use std::{io, panic}; @@ -334,12 +334,15 @@ impl std::error::Error for CompactionError {} pub enum TransactionError { /// Error from underlying storage Storage(StorageError), + /// The transaction is still referenced by a table or other object + ReadTransactionStillInUse(ReadTransaction<'static>), } impl TransactionError { pub(crate) fn into_storage_error(self) -> StorageError { match self { TransactionError::Storage(storage) => storage, + _ => unreachable!(), } } } @@ -348,6 +351,9 @@ impl From for Error { fn from(err: TransactionError) -> Error { match err { TransactionError::Storage(storage) => storage.into(), + TransactionError::ReadTransactionStillInUse(txn) => { + Error::ReadTransactionStillInUse(txn) + } } } } @@ -362,6 +368,9 @@ impl Display for TransactionError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { TransactionError::Storage(storage) => storage.fmt(f), + TransactionError::ReadTransactionStillInUse(_) => { + write!(f, "Transaction still in use") + } } } } @@ -453,6 +462,8 @@ pub enum Error { TableAlreadyOpen(String, &'static panic::Location<'static>), Io(io::Error), LockPoisoned(&'static panic::Location<'static>), + /// The transaction is still referenced by a table or other object + ReadTransactionStillInUse(ReadTransaction<'static>), } impl From> for Error { @@ -543,6 +554,9 @@ impl Display for Error { Error::InvalidSavepoint => { write!(f, "Savepoint is invalid or cannot be created.") } + Error::ReadTransactionStillInUse(_) => { + write!(f, "Transaction still in use") + } } } } diff --git a/src/transactions.rs b/src/transactions.rs index 568f2e0f..29c2aef7 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -21,7 +21,7 @@ use log::{info, warn}; use std::borrow::Borrow; use std::cmp::min; use std::collections::{HashMap, HashSet}; -use std::fmt::{Display, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use std::marker::PhantomData; use std::ops::{RangeBounds, RangeFull}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -1276,6 +1276,28 @@ impl<'a> ReadTransaction<'a> { } } +impl ReadTransaction<'static> { + /// Close the transaction + /// + /// Transactions are automatically closed when they and all objects referencing them have been dropped. + /// This method can be used to ensure that there are no outstanding objects remaining. + /// + /// Returns `ReadTransactionStillInUse` error if a table or other object retrieved from the transaction still references this transaction + pub fn close(self) -> Result<(), TransactionError> { + if Arc::strong_count(&self.transaction_guard) > 1 { + return Err(TransactionError::ReadTransactionStillInUse(self)); + } + // No-op, just drop ourself + Ok(()) + } +} + +impl Debug for ReadTransaction<'static> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("ReadTransaction") + } +} + #[cfg(test)] mod test { use crate::{Database, TableDefinition};