diff --git a/src/db.rs b/src/db.rs index 9b0d399f..4867662b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,13 +1,13 @@ use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker}; use crate::tree_store::{ AllPageNumbersBtreeIter, BtreeRangeIter, Checksum, FreedPageList, FreedTableKey, - InternalTableDefinition, PageHint, PageNumber, RawBtree, TableTree, TableType, - TransactionalMemory, PAGE_SIZE, + InternalTableDefinition, PageHint, PageNumber, RawBtree, SerializedSavepoint, TableTree, + TableType, TransactionalMemory, PAGE_SIZE, }; use crate::types::{RedbKey, RedbValue}; use crate::{ - CompactionError, DatabaseError, Durability, ReadOnlyTable, ReadableTable, Savepoint, - SavepointError, StorageError, + CompactionError, DatabaseError, Durability, ReadOnlyTable, ReadableTable, SavepointError, + StorageError, }; use crate::{ReadTransaction, Result, WriteTransaction}; use std::fmt::{Display, Formatter}; @@ -419,20 +419,21 @@ impl Database { let table_tree = TableTree::new(system_root, mem, freed_list); let fake_transaction_tracker = Arc::new(Mutex::new(TransactionTracker::new())); if let Some(savepoint_table_def) = table_tree - .get_table::(SAVEPOINT_TABLE.name(), TableType::Normal) + .get_table::( + SAVEPOINT_TABLE.name(), + TableType::Normal, + ) .map_err(|e| { e.into_storage_error_or_corrupted("Persistent savepoint table corrupted") })? { - let savepoint_table: ReadOnlyTable = + let savepoint_table: ReadOnlyTable = ReadOnlyTable::new(savepoint_table_def.get_root(), PageHint::None, mem)?; - for result in savepoint_table.range::(..)? { + for result in savepoint_table.range::(..)? { let (_, savepoint_data) = result?; - let savepoint = Savepoint::from_bytes( - savepoint_data.value(), - fake_transaction_tracker.clone(), - false, - ); + let savepoint = savepoint_data + .value() + .to_savepoint(fake_transaction_tracker.clone()); if let Some((root, _)) = savepoint.get_user_root() { Self::mark_tables_recursive(root, mem, true)?; } diff --git a/src/transaction_tracker.rs b/src/transaction_tracker.rs index b122203b..aeba7d27 100644 --- a/src/transaction_tracker.rs +++ b/src/transaction_tracker.rs @@ -1,6 +1,8 @@ -use crate::Savepoint; +use crate::{RedbKey, RedbValue, Savepoint, TypeName}; +use std::cmp::Ordering; use std::collections::btree_map::BTreeMap; use std::collections::btree_set::BTreeSet; +use std::mem::size_of; #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)] pub(crate) struct TransactionId(pub u64); @@ -19,15 +21,49 @@ impl TransactionId { } } -#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)] +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)] pub(crate) struct SavepointId(pub u64); impl SavepointId { - fn next(&self) -> SavepointId { + pub(crate) fn next(&self) -> SavepointId { SavepointId(self.0 + 1) } } +impl RedbValue for SavepointId { + type SelfType<'a> = SavepointId; + type AsBytes<'a> = [u8; size_of::()]; + + fn fixed_width() -> Option { + Some(size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + SavepointId(u64::from_le_bytes(data.try_into().unwrap())) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + value.0.to_le_bytes() + } + + fn type_name() -> TypeName { + TypeName::internal("redb::SavepointId") + } +} + +impl RedbKey for SavepointId { + fn compare(data1: &[u8], data2: &[u8]) -> Ordering { + Self::from_bytes(data1).0.cmp(&Self::from_bytes(data2).0) + } +} + pub(crate) struct TransactionTracker { next_savepoint_id: SavepointId, // reference count of read transactions per transaction id diff --git a/src/transactions.rs b/src/transactions.rs index 4cf3fd67..58d760e4 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -3,7 +3,7 @@ use crate::sealed::Sealed; use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker}; use crate::tree_store::{ Btree, BtreeMut, FreedPageList, FreedTableKey, InternalTableDefinition, PageHint, PageNumber, - TableTree, TableType, TransactionalMemory, + SerializedSavepoint, TableTree, TableType, TransactionalMemory, }; use crate::types::{RedbKey, RedbValue}; use crate::{ @@ -22,9 +22,9 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::{panic, thread}; -const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), u64> = +const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> = SystemTableDefinition::new("next_savepoint_id"); -pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition = +pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition = SystemTableDefinition::new("persistent_savepoints"); pub struct SystemTableDefinition<'a, K: RedbKey + 'static, V: RedbValue + 'static> { @@ -208,7 +208,7 @@ pub struct WriteTransaction<'db> { dirty: AtomicBool, durability: Durability, // Persistent savepoints created during this transaction - created_persistent_savepoints: Mutex>, + created_persistent_savepoints: Mutex>, deleted_persistent_savepoints: Mutex>, live_write_transaction: MutexGuard<'db, Option>, } @@ -326,16 +326,19 @@ impl<'db> WriteTransaction<'db> { let mut next_table = self.open_system_table(NEXT_SAVEPOINT_TABLE)?; let mut savepoint_table = self.open_system_table(SAVEPOINT_TABLE)?; - next_table.insert((), savepoint.get_id().0 + 1)?; + next_table.insert((), savepoint.get_id().next())?; - savepoint_table.insert(savepoint.get_id().0, savepoint.to_bytes().as_slice())?; + savepoint_table.insert( + savepoint.get_id(), + SerializedSavepoint::from_savepoint(&savepoint), + )?; savepoint.set_persistent(); self.created_persistent_savepoints .lock() .unwrap() - .insert(savepoint.get_id().0); + .insert(savepoint.get_id()); Ok(savepoint.get_id().0) } @@ -344,7 +347,7 @@ impl<'db> WriteTransaction<'db> { let next_table = self.open_system_table(NEXT_SAVEPOINT_TABLE)?; let value = next_table.get(())?; if let Some(next_id) = value { - Ok(Some(SavepointId(next_id.value()))) + Ok(Some(next_id.value())) } else { Ok(None) } @@ -353,10 +356,10 @@ impl<'db> WriteTransaction<'db> { /// Get a persistent savepoint given its id pub fn get_persistent_savepoint(&self, id: u64) -> Result { let table = self.open_system_table(SAVEPOINT_TABLE)?; - let value = table.get(id)?; + let value = table.get(SavepointId(id))?; value - .map(|x| Savepoint::from_bytes(x.value(), self.transaction_tracker.clone(), false)) + .map(|x| x.value().to_savepoint(self.transaction_tracker.clone())) .ok_or(SavepointError::InvalidSavepoint) } @@ -374,10 +377,11 @@ impl<'db> WriteTransaction<'db> { return Err(SavepointError::InvalidSavepoint); } let mut table = self.open_system_table(SAVEPOINT_TABLE)?; - let savepoint = table.remove(id)?; - if let Some(bytes) = savepoint { - let savepoint = - Savepoint::from_bytes(bytes.value(), self.transaction_tracker.clone(), false); + let savepoint = table.remove(SavepointId(id))?; + if let Some(serialized) = savepoint { + let savepoint = serialized + .value() + .to_savepoint(self.transaction_tracker.clone()); self.deleted_persistent_savepoints .lock() .unwrap() @@ -392,8 +396,8 @@ impl<'db> WriteTransaction<'db> { pub fn list_persistent_savepoints(&self) -> Result> { let table = self.open_system_table(SAVEPOINT_TABLE)?; let mut savepoints = vec![]; - for savepoint in table.range::(..)? { - savepoints.push(savepoint?.0.value()); + for savepoint in table.range::(..)? { + savepoints.push(savepoint?.0.value().0); } Ok(savepoints.into_iter()) } @@ -763,7 +767,7 @@ impl<'db> WriteTransaction<'db> { #[cfg(feature = "logging")] info!("Aborting transaction id={:?}", self.transaction_id); for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() { - match self.delete_persistent_savepoint(*savepoint) { + match self.delete_persistent_savepoint(savepoint.0) { Ok(_) => {} Err(err) => match err { SavepointError::InvalidSavepoint => { diff --git a/src/tree_store/mod.rs b/src/tree_store/mod.rs index dc1d336f..11bead14 100644 --- a/src/tree_store/mod.rs +++ b/src/tree_store/mod.rs @@ -14,8 +14,8 @@ pub(crate) use btree_iters::{ }; pub use page_store::Savepoint; pub(crate) use page_store::{ - Page, PageHint, PageNumber, TransactionalMemory, FILE_FORMAT_VERSION, MAX_VALUE_LENGTH, - PAGE_SIZE, + Page, PageHint, PageNumber, SerializedSavepoint, TransactionalMemory, FILE_FORMAT_VERSION, + MAX_VALUE_LENGTH, PAGE_SIZE, }; pub(crate) use table_tree::{ FreedPageList, FreedTableKey, InternalTableDefinition, TableTree, TableType, diff --git a/src/tree_store/page_store/mod.rs b/src/tree_store/page_store/mod.rs index 163fe426..cf7a1fe4 100644 --- a/src/tree_store/page_store/mod.rs +++ b/src/tree_store/page_store/mod.rs @@ -15,6 +15,7 @@ pub(crate) use base::{Page, PageHint, PageNumber, MAX_VALUE_LENGTH}; pub(crate) use header::PAGE_SIZE; pub(crate) use page_manager::{xxh3_checksum, TransactionalMemory, FILE_FORMAT_VERSION}; pub use savepoint::Savepoint; +pub(crate) use savepoint::SerializedSavepoint; pub(super) use base::{PageImpl, PageMut}; pub(super) use xxh3::hash128_with_seed; diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index b80586da..be135549 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -34,7 +34,7 @@ const MIN_DESIRED_USABLE_BYTES: u64 = 1024 * 1024; const NUM_REGIONS: u32 = 1000; // TODO: set to 1, when version 1.0 is released -pub(crate) const FILE_FORMAT_VERSION: u8 = 116; +pub(crate) const FILE_FORMAT_VERSION: u8 = 117; fn ceil_log2(x: usize) -> u8 { if x.is_power_of_two() { diff --git a/src/tree_store/page_store/savepoint.rs b/src/tree_store/page_store/savepoint.rs index 28169eae..df4bee67 100644 --- a/src/tree_store/page_store/savepoint.rs +++ b/src/tree_store/page_store/savepoint.rs @@ -1,5 +1,7 @@ use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker}; use crate::tree_store::{Checksum, PageNumber, TransactionalMemory}; +use crate::{RedbValue, TypeName}; +use std::fmt::Debug; use std::mem::size_of; use std::sync::{Arc, Mutex}; @@ -90,12 +92,93 @@ impl Savepoint { pub(crate) fn set_persistent(&mut self) { self.ephemeral = false; } +} + +impl Drop for Savepoint { + fn drop(&mut self) { + if self.ephemeral { + self.transaction_tracker + .lock() + .unwrap() + .deallocate_savepoint(self.get_id(), self.get_transaction_id()); + } + } +} + +#[derive(Debug)] +pub(crate) enum SerializedSavepoint<'a> { + Ref(&'a [u8]), + Owned(Vec), +} + +impl<'a> SerializedSavepoint<'a> { + pub(crate) fn from_savepoint(savepoint: &Savepoint) -> Self { + let mut result = vec![savepoint.version]; + result.extend(savepoint.id.0.to_le_bytes()); + result.extend(savepoint.transaction_id.0.to_le_bytes()); + + if let Some((root, checksum)) = savepoint.user_root { + result.push(1); + result.extend(root.to_le_bytes()); + result.extend(checksum.to_le_bytes()); + } else { + result.push(0); + result.extend([0; PageNumber::serialized_size()]); + result.extend((0 as Checksum).to_le_bytes()); + } + + if let Some((root, checksum)) = savepoint.system_root { + result.push(1); + result.extend(root.to_le_bytes()); + result.extend(checksum.to_le_bytes()); + } else { + result.push(0); + result.extend([0; PageNumber::serialized_size()]); + result.extend((0 as Checksum).to_le_bytes()); + } + + if let Some((root, checksum)) = savepoint.freed_root { + result.push(1); + result.extend(root.to_le_bytes()); + result.extend(checksum.to_le_bytes()); + } else { + result.push(0); + result.extend([0; PageNumber::serialized_size()]); + result.extend((0 as Checksum).to_le_bytes()); + } + + result.extend( + u32::try_from(savepoint.regional_allocators.len()) + .unwrap() + .to_le_bytes(), + ); + for region in savepoint.regional_allocators.iter() { + assert_eq!(savepoint.regional_allocators[0].len(), region.len()); + } + result.extend( + u32::try_from(savepoint.regional_allocators[0].len()) + .unwrap() + .to_le_bytes(), + ); + + for region in savepoint.regional_allocators.iter() { + result.extend(region); + } + Self::Owned(result) + } + + fn data(&self) -> &[u8] { + match self { + SerializedSavepoint::Ref(x) => x, + SerializedSavepoint::Owned(x) => x.as_slice(), + } + } - pub(crate) fn from_bytes( - data: &[u8], + pub(crate) fn to_savepoint( + &self, transaction_tracker: Arc>, - ephemeral: bool, - ) -> Self { + ) -> Savepoint { + let data = self.data(); let mut offset = 0; let version = data[offset]; offset += size_of::(); @@ -205,7 +288,7 @@ impl Savepoint { assert_eq!(offset, data.len()); - Self { + Savepoint { version, id: SavepointId(id), transaction_id: TransactionId(transaction_id), @@ -214,73 +297,35 @@ impl Savepoint { freed_root, regional_allocators, transaction_tracker, - ephemeral, + ephemeral: false, } } +} - pub(crate) fn to_bytes(&self) -> Vec { - let mut result = vec![self.version]; - result.extend(self.id.0.to_le_bytes()); - result.extend(self.transaction_id.0.to_le_bytes()); - - if let Some((root, checksum)) = self.user_root { - result.push(1); - result.extend(root.to_le_bytes()); - result.extend(checksum.to_le_bytes()); - } else { - result.push(0); - result.extend([0; PageNumber::serialized_size()]); - result.extend((0 as Checksum).to_le_bytes()); - } - - if let Some((root, checksum)) = self.system_root { - result.push(1); - result.extend(root.to_le_bytes()); - result.extend(checksum.to_le_bytes()); - } else { - result.push(0); - result.extend([0; PageNumber::serialized_size()]); - result.extend((0 as Checksum).to_le_bytes()); - } +impl<'data> RedbValue for SerializedSavepoint<'data> { + type SelfType<'a> = SerializedSavepoint<'a> where Self: 'a; + type AsBytes<'a> = &'a [u8] where Self: 'a; - if let Some((root, checksum)) = self.freed_root { - result.push(1); - result.extend(root.to_le_bytes()); - result.extend(checksum.to_le_bytes()); - } else { - result.push(0); - result.extend([0; PageNumber::serialized_size()]); - result.extend((0 as Checksum).to_le_bytes()); - } + fn fixed_width() -> Option { + None + } - result.extend( - u32::try_from(self.regional_allocators.len()) - .unwrap() - .to_le_bytes(), - ); - for region in self.regional_allocators.iter() { - assert_eq!(self.regional_allocators[0].len(), region.len()); - } - result.extend( - u32::try_from(self.regional_allocators[0].len()) - .unwrap() - .to_le_bytes(), - ); + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + SerializedSavepoint::Ref(data) + } - for region in self.regional_allocators.iter() { - result.extend(region); - } - result + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + value.data() } -} -impl Drop for Savepoint { - fn drop(&mut self) { - if self.ephemeral { - self.transaction_tracker - .lock() - .unwrap() - .deallocate_savepoint(self.get_id(), self.get_transaction_id()); - } + fn type_name() -> TypeName { + TypeName::internal("redb::SerializedSavepoint") } }