diff --git a/src/error.rs b/src/error.rs index d4a82f3..626240b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,18 +5,24 @@ pub enum Errno { TxAborted, /// Not found. NotFound, + /// Invalid arguments. InvalidArgs, - NoMemory, - IoError, - NotEnoughSpace, + /// Out of memory. + OutOfMemory, + /// Out of disk space. + OutOfDisk, + /// IO error. + IoFailed, + /// Permission denied. + PermissionDenied, /// OS-specific unknown error. OsSpecUnknown, /// Encryption operation failed. - EncryptFault, + EncryptFailed, /// Decryption operation failed. - DecryptFault, + DecryptFailed, /// Not aligned to `BLOCK_SIZE`. - NonBlockAlignedSizeError, + NotBlockSizeAligned, } /// error used in this crate diff --git a/src/layers/0-bio/block_buf.rs b/src/layers/0-bio/block_buf.rs index 8b3492c..395fd59 100644 --- a/src/layers/0-bio/block_buf.rs +++ b/src/layers/0-bio/block_buf.rs @@ -31,6 +31,12 @@ pub struct Buf(Pages); impl Buf { /// Allocate specific number of blocks as memory buffer. pub fn alloc(num_blocks: usize) -> Result { + if num_blocks == 0 { + return_errno_with_msg!( + InvalidArgs, + "num_blocks must be greater than 0 for allocation" + ) + } let pages = Pages::alloc(num_blocks)?; Ok(Self(pages)) } @@ -89,7 +95,7 @@ impl<'a> TryFrom<&'a [u8]> for BufRef<'a> { fn try_from(buf: &'a [u8]) -> Result { if buf.len() % BLOCK_SIZE != 0 { - return_errno!(Errno::NonBlockAlignedSizeError); + return_errno!(Errno::NotBlockSizeAligned); } let new_self = Self(buf); Ok(new_self) @@ -137,7 +143,7 @@ impl<'a> TryFrom<&'a mut [u8]> for BufMut<'a> { fn try_from(buf: &'a mut [u8]) -> Result { if buf.len() % BLOCK_SIZE != 0 { - return_errno!(Errno::NonBlockAlignedSizeError); + return_errno!(Errno::NotBlockSizeAligned); } let new_self = Self(buf); Ok(new_self) diff --git a/src/layers/0-bio/block_set.rs b/src/layers/0-bio/block_set.rs index 0829d51..a7240cb 100644 --- a/src/layers/0-bio/block_set.rs +++ b/src/layers/0-bio/block_set.rs @@ -125,6 +125,7 @@ impl_blockset_for!(Arc, "(**self)", |this: &Arc, range| { /// A disk that impl `BlockSet`. /// /// The `region` is the accessible subset. +#[derive(Clone)] pub struct MemDisk { disk: Arc>, region: Range, diff --git a/src/layers/1-crypto/crypto_blob.rs b/src/layers/1-crypto/crypto_blob.rs index a8db1b1..b37e92a 100644 --- a/src/layers/1-crypto/crypto_blob.rs +++ b/src/layers/1-crypto/crypto_blob.rs @@ -80,7 +80,7 @@ impl CryptoBlob { pub fn create(block_set: B, init_data: &[u8]) -> Result { let capacity = block_set.nblocks() * BLOCK_SIZE - Self::HEADER_NBYTES; if init_data.len() > capacity { - return_errno_with_msg!(NotEnoughSpace, "init_data is too large"); + return_errno_with_msg!(OutOfDisk, "init_data is too large"); } let nblocks = (Self::HEADER_NBYTES + init_data.len() + BLOCK_SIZE - 1) / BLOCK_SIZE; let mut block_buf = Buf::alloc(nblocks)?; @@ -129,7 +129,7 @@ impl CryptoBlob { /// known to an attacker. pub fn write(&mut self, buf: &[u8]) -> Result { if buf.len() > self.capacity() { - return_errno_with_msg!(NotEnoughSpace, "write data is too large"); + return_errno_with_msg!(OutOfDisk, "write data is too large"); } let nblocks = (Self::HEADER_NBYTES + buf.len() + BLOCK_SIZE - 1) / BLOCK_SIZE; let mut block_buf = Buf::alloc(nblocks)?; @@ -178,10 +178,10 @@ impl CryptoBlob { } }; if header.payload_len > self.capacity() { - return_errno_with_msg!(NotEnoughSpace, "payload_len is greater than the capacity"); + return_errno_with_msg!(OutOfDisk, "payload_len is greater than the capacity"); } if header.payload_len > buf.len() { - return_errno_with_msg!(NotEnoughSpace, "read_buf is too small"); + return_errno_with_msg!(OutOfDisk, "read_buf is too small"); } let nblock = (Self::HEADER_NBYTES + header.payload_len + BLOCK_SIZE - 1) / BLOCK_SIZE; let mut block_buf = Buf::alloc(nblock)?; diff --git a/src/layers/1-crypto/crypto_chain.rs b/src/layers/1-crypto/crypto_chain.rs index ea5b898..1448d02 100644 --- a/src/layers/1-crypto/crypto_chain.rs +++ b/src/layers/1-crypto/crypto_chain.rs @@ -108,10 +108,7 @@ impl CryptoChain { let payload_len = footer.len as usize; if payload_len > Self::AVAIL_BLOCK_SIZE || payload_len > buf.len() { - return_errno_with_msg!( - NotEnoughSpace, - "wrong payload_len or the read_buf is too small" - ); + return_errno_with_msg!(OutOfDisk, "wrong payload_len or the read_buf is too small"); } // Check the footer MAC, to ensure the orderness and consecutiveness of blocks. @@ -142,7 +139,7 @@ impl CryptoChain { /// The confidentiality of the block is guaranteed. pub fn append(&mut self, buf: &[u8]) -> Result<()> { if buf.len() > Self::AVAIL_BLOCK_SIZE { - return_errno_with_msg!(NotEnoughSpace, "append data is too large"); + return_errno_with_msg!(OutOfDisk, "append data is too large"); } let mut block_buf = Buf::alloc(1)?; diff --git a/src/layers/1-crypto/crypto_log.rs b/src/layers/1-crypto/crypto_log.rs index b099953..01fdd64 100644 --- a/src/layers/1-crypto/crypto_log.rs +++ b/src/layers/1-crypto/crypto_log.rs @@ -885,7 +885,7 @@ impl AppendDataBuf { pub fn append_data_nodes(&mut self, nodes: Vec>) -> Result<()> { if self.is_full() { - return_errno_with_msg!(NoMemory, "cache out of capacity"); + return_errno_with_msg!(OutOfMemory, "cache out of capacity"); } self.node_queue.extend(nodes.into_iter()); diff --git a/src/layers/2-edit/journal.rs b/src/layers/2-edit/journal.rs index 2634af7..631653c 100644 --- a/src/layers/2-edit/journal.rs +++ b/src/layers/2-edit/journal.rs @@ -61,6 +61,8 @@ pub struct EditJournal< /// The metadata of an edit journal. /// /// The metadata is mainly useful when recovering an edit journal after a reboot. +#[repr(C)] +#[derive(Clone, Copy, Pod, Debug)] pub struct EditJournalMeta { /// The number of blocks reserved for storing a snapshot `CryptoBlob`. pub snapshot_area_nblocks: usize, @@ -357,14 +359,14 @@ where CryptoBlob::::HEADER_NBYTES + state_max_nbytes + Snapshot::::meta_len(); let blob_blocks = (blob_bytes + BLOCK_SIZE - 1) / BLOCK_SIZE; if 2 * blob_blocks >= disk.nblocks() { - return_errno_with_msg!(NotEnoughSpace, "the block_set for journal is too small"); + return_errno_with_msg!(OutOfDisk, "the block_set for journal is too small"); }; let mut buf = Buf::alloc(blob_blocks)?; // Serialize snapshot (state + metadata). let snapshot = Snapshot::create(init_state.clone(), 0); let serialized = postcard::to_slice(snapshot.as_ref(), buf.as_mut_slice()) - .map_err(|_| Error::with_msg(NotEnoughSpace, "serialize snapshot failed"))?; + .map_err(|_| Error::with_msg(OutOfDisk, "serialize snapshot failed"))?; // Persist snapshot to `CryptoBlob`. let block_set0 = disk.subset(0..blob_blocks)?; @@ -398,7 +400,7 @@ where let snapshot0_res = match blob0.read(buf.as_mut_slice()) { Ok(snapshot_len) => { postcard::from_bytes::>(&buf.as_slice()[..snapshot_len]) - .map_err(|_| Error::with_msg(NotEnoughSpace, "deserialize snapshot0 failed")) + .map_err(|_| Error::with_msg(OutOfDisk, "deserialize snapshot0 failed")) .map(|snapshot| Arc::new(snapshot)) } Err(_) => Err(Error::with_msg(NotFound, "failed to read snapshot0")), @@ -406,7 +408,7 @@ where let snapshot1_res = match blob1.read(buf.as_mut_slice()) { Ok(snapshot_len) => { postcard::from_bytes::>(&buf.as_slice()[..snapshot_len]) - .map_err(|_| Error::with_msg(NotEnoughSpace, "deserialize snapshot1 failed")) + .map_err(|_| Error::with_msg(OutOfDisk, "deserialize snapshot1 failed")) .map(|snapshot| Arc::new(snapshot)) } Err(_) => Err(Error::with_msg(NotFound, "failed to read snapshot1")), @@ -456,7 +458,7 @@ where pub fn persist(&mut self, latest: Arc>) -> Result<()> { // Serialize the latest snapshot. let buf = postcard::to_slice(latest.as_ref(), self.buf.as_mut_slice()) - .map_err(|_| Error::with_msg(NotEnoughSpace, "serialize current state failed"))?; + .map_err(|_| Error::with_msg(OutOfDisk, "serialize current state failed"))?; // Persist the latest snapshot to `CryptoBlob`. let index = (self.latest_index + 1) % 2; // switch the `latest_index` @@ -630,7 +632,7 @@ impl, S: Sized> WriteBuf { e ); } - return_errno_with_msg!(NotEnoughSpace, "no space for new Record in WriteBuf"); + return_errno_with_msg!(OutOfDisk, "no space for new Record in WriteBuf"); } } } diff --git a/src/layers/3-log/chunk.rs b/src/layers/3-log/chunk.rs index 3a363bf..2b5becf 100644 --- a/src/layers/3-log/chunk.rs +++ b/src/layers/3-log/chunk.rs @@ -1,18 +1,18 @@ //! Chunk-based storage management. -//! +//! //! A chunk is a group of consecutive blocks. -//! As the size of a chunk is much greater than that of a block, +//! As the size of a chunk is much greater than that of a block, //! the number of chunks is naturally far smaller than that of blocks. //! This makes it possible to keep all metadata for chunks in memory. //! Thus, managing chunks is more efficient than managing blocks. -//! -//! The primary API provided by this module is chunk allocators, +//! +//! The primary API provided by this module is chunk allocators, //! `ChunkAlloc`, which tracks whether chunks are free or not. -//! +//! //! # Examples -//! +//! //! Chunk allocators are used within transactions. -//! +//! //! ``` //! fn alloc_chunks(chunk_alloc: &ChunkAlloc, num_chunks: usize) -> Option> { //! let mut tx = chunk_alloc.new_tx(); @@ -31,56 +31,76 @@ //! res //! } //! ``` -//! -//! This above example showcases the power of transaction atomicity: +//! +//! This above example showcases the power of transaction atomicity: //! if anything goes wrong (e.g., allocation failures) during the transaction, -//! then the transaction can be aborted and all changes made to `chuck_alloc` +//! then the transaction can be aborted and all changes made to `chuck_alloc` //! during the transaction will be rolled back automatically. +use crate::layers::edit::Edit; +use crate::os::Mutex; +use crate::prelude::*; +use crate::tx::{CurrentTx, Tx, TxData, TxProvider}; + +use alloc::collections::BTreeMap; +use serde::{Deserialize, Serialize}; /// The ID of a chunk. pub type ChunkId = usize; +/// Number of blocks of a chunk +pub const CHUNK_NBLOCKS: usize = 1024; /// The chunk size is a multiple of the block size. -pub const CHUNK_SIZE: usize = 1024 * BLOCK_SIZE; +pub const CHUNK_SIZE: usize = CHUNK_NBLOCKS * BLOCK_SIZE; /// A chunk allocator tracks which chunks are free. +#[derive(Clone)] pub struct ChunkAlloc { state: Arc>, tx_provider: Arc, } impl ChunkAlloc { - /// Creates a chunk manager that manages a specified number of + /// Creates a chunk allocator that manages a specified number of /// chunks (`capacity`). Initially, all chunks are free. pub fn new(capacity: usize, tx_provider: Arc) -> Self { - let new_self = { - let state = Arc::new(Mutex::new(ChunkAllocState::new(capacity))); - Self { - state, - tx_provider, - } + let state = ChunkAllocState::new(capacity); + Self::from_parts(state, tx_provider) + } + + /// Constructs a `ChunkAlloc` from its parts. + pub(super) fn from_parts(state: ChunkAllocState, tx_provider: Arc) -> Self { + let new_self = Self { + state: Arc::new(Mutex::new(state)), + tx_provider, }; - new_self.tx_provider.register_data_initializer(|| { - ChunkAllocEdit::new() - }); + // TX data + new_self + .tx_provider + .register_data_initializer(Box::new(|| ChunkAllocEdit::new())); + + // Commit handler new_self.tx_provider.register_commit_handler({ let state = new_self.state.clone(); move |current: CurrentTx<'_>| { - let mut state = state.lock(); - current.data_with(|edit: &ChunkAllocEdit| { + let state = state.clone(); + current.data_with(move |edit: &ChunkAllocEdit| { + let mut state = state.lock(); edit.apply_to(&mut state); }); } }); + + // Abort handler new_self.tx_provider.register_abort_handler({ let state = new_self.state.clone(); move |current: CurrentTx<'_>| { - let mut state = state.lock(); - current.data_with(|edit: &ChunkAllocEdit| { - edit.iter_allocated_chunks(|chunk_id| { + let state = state.clone(); + current.data_with(move |edit: &ChunkAllocEdit| { + let mut state = state.lock(); + for chunk_id in edit.iter_allocated_chunks() { state.dealloc(chunk_id); - }); + } }); } }); @@ -88,24 +108,16 @@ impl ChunkAlloc { new_self } - /// Reconstructs a `ChunkAlloc` from its parts. - pub(super) fn from_parts(state: ChunkAllocState, tx_provider: Arc) -> Self { - Self { - state: Arc::new(Mutex::new(state)), - tx_provider, - } - } - /// Creates a new transaction for the chunk allocator. pub fn new_tx(&self) -> Tx { self.tx_provider.new_tx() } - /// Allocate a chunk, returning its ID. + /// Allocates a chunk, returning its ID. pub fn alloc(&self) -> Option { let chunk_id = { let mut state = self.state.lock(); - state.alloc()? + state.alloc()? // Update global state immediately }; let mut current_tx = self.tx_provider.current(); @@ -116,10 +128,10 @@ impl ChunkAlloc { Some(chunk_id) } - /// Deallocate the chunk of a given ID. - /// + /// Deallocates the chunk of a given ID. + /// /// # Panic - /// + /// /// Deallocating a free chunk causes panic. pub fn dealloc(&self, chunk_id: ChunkId) { let mut current_tx = self.tx_provider.current(); @@ -133,23 +145,23 @@ impl ChunkAlloc { }); } - /// Deallocate the set of chunks of given IDs. - /// + /// Deallocates the set of chunks of given IDs. + /// /// # Panic - /// + /// /// Deallocating a free chunk causes panic. - pub fn dealloc_batch(&self, chunk_ids: I) + pub fn dealloc_batch(&self, chunk_ids: I) where - I: Iterator + I: Iterator, { let mut current_tx = self.tx_provider.current(); current_tx.data_mut_with(|edit: &mut ChunkAllocEdit| { let mut state = self.state.lock(); for chunk_id in chunk_ids { - let should_dealloc_now = edit.dealloc(*chunk_id); + let should_dealloc_now = edit.dealloc(chunk_id); if should_dealloc_now { - state.dealloc(*chunk_id); + state.dealloc(chunk_id); } } }); @@ -166,54 +178,67 @@ impl ChunkAlloc { } } +//////////////////////////////////////////////////////////////////////////////// +// Persistent State +//////////////////////////////////////////////////////////////////////////////// + +// TODO: Use crate 'bittle' +type BitMap = bitvec::prelude::BitVec; + /// The persistent state of a chunk allocator. -pub(super) struct ChunkAllocState { +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ChunkAllocState { // A bitmap where each bit indicates whether a corresponding chunk has been // allocated. - alloc_map: BitVec, + alloc_map: BitMap, // The number of free chunks. free_count: usize, - // The minimum free chunk Id. Useful to narrow the scope of searching for + // The minimum free chunk Id. Useful to narrow the scope of searching for // free chunk IDs. min_free: usize, } +// TODO: Separate persistent and volatile state of `ChunkAlloc` impl ChunkAllocState { /// Creates a persistent state for managing chunks of the specified number. /// Initially, all chunks are free. pub fn new(capacity: usize) -> Self { Self { - alloc_map: bitvec![usize, Lsb0; 0; capacity], + alloc_map: BitMap::repeat(false, capacity), free_count: capacity, min_free: 0, } } - /// Allocate a chunk, returning its ID. + /// Allocates a chunk, returning its ID. pub fn alloc(&mut self) -> Option { - if self.min_free >= self.alloc_map.len() { - return None; + let min_free = self.min_free; + if min_free >= self.alloc_map.len() { + return None; } let free_chunk_id = self.alloc_map[min_free..] .first_zero() - .expect("there must exist a zero"); - self.alloc_map[free_chunk_id] = true; + .expect("there must exists a zero") + + min_free; + self.alloc_map.set(free_chunk_id, true); + self.free_count -= 1; - // Keep the invariance that all free chunk IDs are no less than min_free + // Keep the invariance that all free chunk IDs are no less than `min_free`` self.min_free = free_chunk_id + 1; Some(free_chunk_id) } - /// Deallocate the chunk of a given ID. - /// + /// Deallocates the chunk of a given ID. + /// /// # Panic - /// + /// /// Deallocating a free chunk causes panic. pub fn dealloc(&mut self, chunk_id: ChunkId) { - debug_assert!(self.alloc_map[chunk_id] == true); - self.alloc_map[chunk_id] = false; + assert_eq!(self.alloc_map[chunk_id], true); + self.alloc_map.set(chunk_id, false); + self.free_count += 1; // Keep the invariance that all free chunk IDs are no less than min_free if chunk_id < self.min_free { @@ -230,74 +255,163 @@ impl ChunkAllocState { pub fn free_count(&self) -> usize { self.free_count } -} -impl Edit for ChunkAllocEdit { - fn apply_to(&self, state: &mut ChunkAllocState) { - for (chunk_id, chunk_edit) in &self.chunk_table { - match chunk_edit { - ChunkEdit::Alloc => { - // Nothing needs to be done - } - ChunkEdit::Dealloc => { - state.dealloc(chunk_id); - } - } - } + /// Returns whether a specific chunk is allocated. + pub fn is_chunk_allocated(&self, chunk_id: ChunkId) -> bool { + self.alloc_map[chunk_id] == true } -} +} + +//////////////////////////////////////////////////////////////////////////////// +// Persistent Edit +//////////////////////////////////////////////////////////////////////////////// /// A persistent edit to the state of a chunk allocator. -pub(super) struct ChunkAllocEdit { - chunk_table: BTreeMap, +#[derive(Clone, Serialize, Deserialize)] +pub struct ChunkAllocEdit { + edit_table: BTreeMap, } -/// The smallest unit of a persistent edit to the -/// state of a chunk allocator, which is +/// The smallest unit of a persistent edit to the +/// state of a chunk allocator, which is /// a chunk being either allocated or deallocated. +#[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] enum ChunkEdit { Alloc, Dealloc, } impl ChunkAllocEdit { - /// Creates a new empty edit. + /// Creates a new empty edit table. pub fn new() -> Self { Self { - chunk_table: BTreeMap::new(), + edit_table: BTreeMap::new(), } } /// Records a chunk allocation in the edit. pub fn alloc(&mut self, chunk_id: ChunkId) { - let old_edit = self.chunk_table.insert(chunk_id, ChunkEdit::Alloc); + let old_edit = self.edit_table.insert(chunk_id, ChunkEdit::Alloc); - // There must be a logical error if an edit has been recorded + // There must be a logical error if an edit has been recorded // for the chunk. If the chunk edit is `ChunkEdit::Alloc`, then // it is double allocations. If the chunk edit is `ChunkEdit::Dealloc`, - // then such deallocations can only take effect after the edit is + // then such deallocations can only take effect after the edit is // committed. Thus, it is impossible to allocate the chunk again now. - panic!(old_edit.is_some()); + assert!(old_edit.is_none()); } - /// Records a chunk deallocation in the edit. - /// - /// The return value indicates whether the chunk being deallocated - /// is previously recored in the edit as being allocated. + /// Records a chunk deallocation in the edit. + /// + /// The return value indicates whether the chunk being deallocated + /// is previously recorded in the edit as being allocated. /// If so, the chunk can be deallocated in the `ChunkAllocState`. pub fn dealloc(&mut self, chunk_id: ChunkId) -> bool { - match self.chunk_table.get(&chunk_id).to_owned() { + match self.edit_table.get(&chunk_id).to_owned() { None => { - self.chunk_table.insert(chunk_id, ChunkEdit::Dealloc); + self.edit_table.insert(chunk_id, ChunkEdit::Dealloc); false } Some(ChunkEdit::Alloc) => { - self.chunk_table.remove(&chunk_id); - true + self.edit_table.remove(&chunk_id); + true } Some(ChunkEdit::Dealloc) => { panic!("a chunk must not be deallocated twice"); } } } + + /// Returns an iterator over all allocated chunks. + pub fn iter_allocated_chunks(&self) -> impl Iterator + '_ { + self.edit_table.iter().filter_map(|(id, edit)| { + if *edit == ChunkEdit::Alloc { + Some(*id) + } else { + None + } + }) + } +} + +impl Edit for ChunkAllocEdit { + fn apply_to(&self, state: &mut ChunkAllocState) { + for (&chunk_id, chunk_edit) in &self.edit_table { + match chunk_edit { + ChunkEdit::Alloc => { + // Journal's state also needs to be updated + if !state.is_chunk_allocated(chunk_id) { + let _allocated_id = state.alloc().unwrap(); + // `_allocated_id` may not be equal to `chunk_id`due to concurrent TXs, + // but eventually the state will be consistent + } + + // Except journal, nothing needs to be done + } + ChunkEdit::Dealloc => { + state.dealloc(chunk_id); + } + } + } + } +} + +impl TxData for ChunkAllocEdit {} + +#[cfg(test)] +mod tests { + use super::*; + + fn new_chunk_alloc() -> ChunkAlloc { + let cap = 1024_usize; + let tx_provider = TxProvider::new(); + let chunk_alloc = ChunkAlloc::new(cap, tx_provider); + assert_eq!(chunk_alloc.capacity(), cap); + assert_eq!(chunk_alloc.free_count(), cap); + chunk_alloc + } + + fn do_alloc_dealloc_tx(chunk_alloc: &ChunkAlloc, alloc_cnt: usize, dealloc_cnt: usize) -> Tx { + debug_assert!(alloc_cnt <= chunk_alloc.capacity() && dealloc_cnt <= alloc_cnt); + let mut tx = chunk_alloc.new_tx(); + tx.context(|| { + let mut allocated_chunk_ids = Vec::with_capacity(alloc_cnt); + for _ in 0..alloc_cnt { + let chunk_id = chunk_alloc.alloc().unwrap(); + allocated_chunk_ids.push(chunk_id); + } + + chunk_alloc.dealloc(allocated_chunk_ids[0]); + chunk_alloc.dealloc_batch( + allocated_chunk_ids[alloc_cnt - dealloc_cnt + 1..alloc_cnt] + .iter() + .cloned(), + ); + }); + tx + } + + #[test] + fn chunk_alloc_dealloc_tx_commit() -> Result<()> { + let chunk_alloc = new_chunk_alloc(); + let cap = chunk_alloc.capacity(); + let (alloc_cnt, dealloc_cnt) = (cap, cap); + + let mut tx = do_alloc_dealloc_tx(&chunk_alloc, alloc_cnt, dealloc_cnt); + tx.commit()?; + assert_eq!(chunk_alloc.free_count(), cap - alloc_cnt + dealloc_cnt); + Ok(()) + } + + #[test] + fn chunk_alloc_dealloc_tx_abort() -> Result<()> { + let chunk_alloc = new_chunk_alloc(); + let cap = chunk_alloc.capacity(); + let (alloc_cnt, dealloc_cnt) = (cap / 2, cap / 4); + + let mut tx = do_alloc_dealloc_tx(&chunk_alloc, alloc_cnt, dealloc_cnt); + tx.abort(); + assert_eq!(chunk_alloc.free_count(), cap); + Ok(()) + } } diff --git a/src/layers/3-log/mod.rs b/src/layers/3-log/mod.rs index 085be13..90887f5 100644 --- a/src/layers/3-log/mod.rs +++ b/src/layers/3-log/mod.rs @@ -1,9 +1,7 @@ //! The layer of transactional logging. -pub mod chunk; -pub mod raw_log; +mod chunk; +mod raw_log; mod tx_log; pub use self::tx_log::{TxLog, TxLogId, TxLogStore}; - -// TODO: Use D as the parameter for BlockSet diff --git a/src/layers/3-log/raw_log.rs b/src/layers/3-log/raw_log.rs index 926aac2..9bad012 100644 --- a/src/layers/3-log/raw_log.rs +++ b/src/layers/3-log/raw_log.rs @@ -1,630 +1,1152 @@ -/// A store of raw (untrusted) logs. -/// -/// `RawLogStore` allows creating, deleting, reading and writing -/// `RawLog`. Each raw log is uniquely identified by its ID (`RawLogId`). -/// Writing to a raw log is append only. -/// -/// `RawLogStore` stores raw logs on a disk of `D: BlockSet`. -/// Internally, `RawLogStore` manages the disk space with `ChunkAlloc` -/// so that the disk space can be allocated and deallocated in the units of -/// chunk. An allocated chunk belongs to exactly one raw log. And one raw log -/// may be backed by multiple chunks. -/// -/// # Examples -/// -/// Raw logs are manipulated and accessed within transactions. -/// -/// ``` -/// fn concat_logs( -/// log_store: &RawLogStore, -/// log_ids: &[RawLogId] -/// ) -> Result { -/// let mut tx = log_store.new_tx(); -/// let res = tx.context(|| { -/// let mut buffer = BlockBuf::new_boxed(); -/// let output_log = log_store.create_log(); -/// for log_id in log_ids { -/// let input_log = log_store.open_log(log_id, false)?; -/// let input_len = input_log.num_blocks(); -/// let mut pos = 0; -/// while pos < input_len { -/// let read_len = buffer.len().min(input_len - pos); -/// input_log.read(&mut buffer[..read_len])?; -/// output_log.write(&buffer[..read_len])?; -/// } -/// } -/// Ok(output_log.id()) -/// }); -/// if res.is_ok() { -/// tx.commit()?; -/// } else { -/// tx.abort(); -/// } -/// res -/// } -/// ``` -/// -/// If any error occurs (e.g., failures to open, read, or write a log) during -/// the transaction, then all prior changes to raw logs shall have no -/// effects. On the other hand, if the commit operation succeeds, then -/// all changes made in the transaction shall take effect as a whole. -/// -/// # Expected behaviors -/// -/// We provide detailed descriptions about the expected behaviors of raw log -/// APIs under transactions. -/// -/// 1. The local changes made (e.g., creations, deletions, writes) in a Tx are -/// immediately visible to the Tx, but not other Tx until the Tx is committed. -/// For example, a newly-created log within Tx A is immediately usable within Tx, -/// but becomes visible to other Tx only until A is committed. -/// As another example, when a log is deleted within a Tx, then the Tx can no -/// longer open the log. But other concurrent Tx can still open the log. -/// -/// 2. If a Tx is aborted, then all the local changes made in the TX will be -/// discarded. -/// -/// 3. At any given time, a log can have at most one writer Tx. -/// A Tx becomes the writer of a log when the log is opened with the write -/// permission in the Tx. And it stops being the writer Tx of the log only when -/// the Tx is terminated (not when the log is closed within Tx). -/// This single-writer rule avoids potential conflicts between concurrent -/// writing to the same log. -/// -/// 4. Log creation does not conflict with log deleation, read, or write as -/// every newly-created log is assigned a unique ID automatically. -/// -/// 4. Deleting a log does not affect any opened instance of the log in the Tx -/// or other Tx (similar to deleting a file in a UNIX-style system). -/// It is only until the deleting Tx is committed and the last -/// instance of the log is closed shall the log be deleted and its disk space -/// be freed. -/// -/// 5. The Tx commitment will not fail due to conflicts between concurrent -/// operations in different Tx. +//! A store of raw (untrusted) logs. +//! +//! `RawLogStore` allows creating, deleting, reading and writing +//! `RawLog`. Each raw log is uniquely identified by its ID (`RawLogId`). +//! Writing to a raw log is append only. +//! +//! `RawLogStore` stores raw logs on a disk of `D: BlockSet`. +//! Internally, `RawLogStore` manages the disk space with `ChunkAlloc` +//! so that the disk space can be allocated and deallocated in the units of +//! chunk. An allocated chunk belongs to exactly one raw log. And one raw log +//! may be backed by multiple chunks. +//! +//! # Examples +//! +//! Raw logs are manipulated and accessed within transactions. +//! +//! ``` +//! fn concat_logs( +//! log_store: &RawLogStore, +//! log_ids: &[RawLogId] +//! ) -> Result { +//! let mut tx = log_store.new_tx(); +//! let res: Result<_> = tx.context(|| { +//! let mut buf = Buf::alloc(1)?; +//! let output_log = log_store.create_log()?; +//! for log_id in log_ids { +//! let input_log = log_store.open_log(log_id, false)?; +//! let input_len = input_log.nblocks(); +//! let mut pos = 0 as BlockId; +//! while pos < input_len { +//! input_log.read(pos, buf.as_mut())?; +//! output_log.append(buf.as_ref())?; +//! } +//! } +//! Ok(output_log.id()) +//! }); +//! if res.is_ok() { +//! tx.commit()?; +//! } else { +//! tx.abort(); +//! } +//! res +//! } +//! ``` +//! +//! If any error occurs (e.g., failures to open, read, or write a log) during +//! the transaction, then all prior changes to raw logs shall have no +//! effects. On the other hand, if the commit operation succeeds, then +//! all changes made in the transaction shall take effect as a whole. +//! +//! # Expected behaviors +//! +//! We provide detailed descriptions about the expected behaviors of raw log +//! APIs under transactions. +//! +//! 1. The local changes made (e.g., creations, deletions, writes) in a TX are +//! immediately visible to the TX, but not other TX until the TX is committed. +//! For example, a newly-created log within TX A is immediately usable within TX, +//! but becomes visible to other TX only until A is committed. +//! As another example, when a log is deleted within a TX, then the TX can no +//! longer open the log. But other concurrent TX can still open the log. +//! +//! 2. If a TX is aborted, then all the local changes made in the TX will be +//! discarded. +//! +//! 3. At any given time, a log can have at most one writer TX. +//! A TX becomes the writer of a log when the log is opened with the write +//! permission in the TX. And it stops being the writer TX of the log only when +//! the TX is terminated (not when the log is closed within TX). +//! This single-writer rule avoids potential conflicts between concurrent +//! writing to the same log. +//! +//! 4. Log creation does not conflict with log deleation, read, or write as +//! every newly-created log is assigned a unique ID automatically. +//! +//! 4. Deleting a log does not affect any opened instance of the log in the TX +//! or other TX (similar to deleting a file in a UNIX-style system). +//! It is only until the deleting TX is committed and the last +//! instance of the log is closed shall the log be deleted and its disk space +//! be freed. +//! +//! 5. The TX commitment will not fail due to conflicts between concurrent +//! operations in different TX. +use super::chunk::{ChunkAlloc, ChunkId, CHUNK_NBLOCKS}; +use crate::layers::bio::{BlockLog, BlockSet, BufMut, BufRef}; +use crate::layers::edit::Edit; +use crate::os::{Mutex, MutexGuard}; +use crate::prelude::*; +use crate::tx::{CurrentTx, Tx, TxData, TxProvider}; +use crate::util::LazyDelete; + +use alloc::collections::{BTreeMap, BTreeSet}; +use alloc::sync::Weak; +use core::sync::atomic::{AtomicUsize, Ordering}; +use lending_iterator::LendingIterator; +use serde::{Deserialize, Serialize}; + +pub type RawLogId = u64; + +/// A store for raw logs. pub struct RawLogStore { state: Arc>, disk: D, - chunk_alloc: ChunkAlloc, + chunk_alloc: ChunkAlloc, // Mapping: ChunkId * CHUNK_NBLOCKS = disk position (BlockId) tx_provider: Arc, + weak_self: Weak, } -pub type RawLogId = u64; +impl RawLogStore { + /// Creates a new store of raw logs given a chunk allocator and an untrusted disk. + pub fn new(disk: D, tx_provider: Arc, chunk_alloc: ChunkAlloc) -> Arc { + Self::from_parts(RawLogStoreState::new(), disk, chunk_alloc, tx_provider) + } -//--------------------------------------------------------------------------- -// The code below is still in early draft!!!!! -//--------------------------------------------------------------------------- + /// Constructs a `RawLogStore` from its parts. + pub(super) fn from_parts( + state: RawLogStoreState, + disk: D, + chunk_alloc: ChunkAlloc, + tx_provider: Arc, + ) -> Arc { + let new_self = { + // Prepare lazy deletes first from persistent state + let lazy_deletes = { + let mut delete_table = BTreeMap::new(); + for (&log_id, log_entry) in state.log_table.iter() { + Self::add_lazy_delete(log_id, log_entry, &chunk_alloc, &mut delete_table) + } + delete_table + }; + + Arc::new_cyclic(|weak_self| Self { + state: Arc::new(Mutex::new(State::new(state, lazy_deletes))), + disk, + chunk_alloc, + tx_provider, + weak_self: weak_self.clone(), + }) + }; -// TODO: Allocate Ids only when the raw logs are commited -// TODO: check log.tx_id == current.tx_id -// TODO: rename persistent state to state -// TODO: rename Change to Edit -// TODO: how to delete -// TODO: Rename to + // TX data + new_self + .tx_provider + .register_data_initializer(Box::new(|| RawLogStoreEdit::new())); + + // Commit handler + new_self.tx_provider.register_commit_handler({ + let state = new_self.state.clone(); + let chunk_alloc = new_self.chunk_alloc.clone(); + + move |current: CurrentTx<'_>| { + Self::do_lazy_deletion(&state, ¤t); + + current.data_with(|edit: &RawLogStoreEdit| { + let mut state = state.lock(); + state.apply(&edit); + + // Add lazy delete for newly created logs + for log_id in edit.iter_created_logs() { + let log_entry_opt = state.persistent.find_log(log_id); + if log_entry_opt.is_none() || state.lazy_deletes.contains_key(&log_id) { + continue; + } + + Self::add_lazy_delete( + log_id, + log_entry_opt.as_ref().unwrap(), + &chunk_alloc, + &mut state.lazy_deletes, + ); + } + }); + } + }); + new_self + } -/// The volatile and persistent state of a `RawLogStore`. -struct State { - persistent: RawLogStoreState, - write_set: BTreeSet, - next_free_log_id: RawLogId, -} + fn add_lazy_delete( + log_id: RawLogId, + log_entry: &RawLogEntry, + chunk_alloc: &ChunkAlloc, + delete_table: &mut BTreeMap>>, + ) { + let log_entry = log_entry.clone(); + let chunk_alloc = chunk_alloc.clone(); + delete_table.insert( + log_id, + Arc::new(LazyDelete::new(log_entry, move |entry| { + chunk_alloc.dealloc_batch(entry.head.chunks.iter().cloned()) + })), + ); + } -/// The persistent state of a `RawLogStore`. -pub(super) struct RawLogStoreState { - log_table: BTreeMap>>, - next_free_log_id: RawLogId, -} + fn do_lazy_deletion(state: &Arc>, current_tx: &CurrentTx) { + let deleted_logs = current_tx + .data_with(|edit: &RawLogStoreEdit| edit.iter_deleted_logs().collect::>()); -struct RawLogEntry { - head: RawLogHead, - is_deleted: bool, -} + for log_id in deleted_logs { + let Some(lazy_delete) = state.lock().lazy_deletes.remove(&log_id) else { + // Other concurrent TXs have deleted the same log + continue; + }; + LazyDelete::delete(&lazy_delete); + } + } -struct RawLogHead { - chunks: Vec, - num_blocks: usize, -} + pub fn new_tx(&self) -> Tx { + self.tx_provider.new_tx() + } -struct PersistentState { - chunk_logs: BTreeMap, - max_log_id: Option, -} + /// Creates a new raw log with a new log id. + /// + /// # Panics + /// + /// This method must be called within a TX. Otherwise, this method panics. + pub fn create_log(&self) -> Result> { + let mut state = self.state.lock(); + let new_log_id = state.persistent.alloc_log_id(); + state + .add_to_write_set(new_log_id) + .expect("created log can't appear in write set"); + + let mut current_tx = self.tx_provider.current(); + current_tx.data_mut_with(|edit: &mut RawLogStoreEdit| { + edit.create_log(new_log_id); + }); + + Ok(RawLog { + log_id: new_log_id, + log_entry: None, + log_store: self.weak_self.upgrade().unwrap(), + tx_provider: self.tx_provider.clone(), + lazy_delete: None, + append_pos: AtomicUsize::new(0), + can_append: true, + }) + } + + /// Opens the raw log of a given ID. + /// + /// For any log at any time, there can be at most one TX that opens the log + /// in the appendable mode. + /// + /// # Panics + /// + /// This method must be called within a TX. Otherwise, this method panics. + pub fn open_log(&self, log_id: u64, can_append: bool) -> Result> { + let mut state = self.state.lock(); + // Must check lazy deletes first in case there is concurrent deletion + let lazy_delete = state.lazy_deletes.get(&log_id).ok_or(NotFound)?.clone(); + let mut current_tx = self.tx_provider.current(); + + let log_entry_opt = state.persistent.find_log(log_id); + // The log is already created by other TX + if log_entry_opt.is_some() { + let log_entry = log_entry_opt.as_ref().unwrap(); + if can_append { + // Prevent other TX from opening this log in the append mode. + state.add_to_write_set(log_id)?; + + // If the log is open in the append mode, edit must be prepared + current_tx.data_mut_with(|edit: &mut RawLogStoreEdit| { + edit.open_log(log_id, &log_entry); + }); + } + } + // The log must has been created by this TX + else { + let is_log_created = + current_tx.data_mut_with(|edit: &mut RawLogStoreEdit| edit.is_log_created(log_id)); + if !is_log_created { + return_errno_with_msg!(NotFound, "raw log not found"); + } + } + + let append_pos: BlockId = log_entry_opt + .as_ref() + .map(|entry| entry.head.num_blocks as _) + .unwrap_or(0); + Ok(RawLog { + log_id, + log_entry: log_entry_opt.map(|entry| Arc::new(Mutex::new(entry.clone()))), + log_store: self.weak_self.upgrade().unwrap(), + tx_provider: self.tx_provider.clone(), + lazy_delete: Some(lazy_delete), + append_pos: AtomicUsize::new(append_pos), + can_append, + }) + } + + /// Deletes the raw log of a given ID. + /// + /// # Panics + /// + /// This method must be called within a TX. Otherwise, this method panics. + pub fn delete_log(&self, log_id: RawLogId) -> Result<()> { + let mut current_tx = self.tx_provider.current(); + // Free tail chunks + let tail_chunks = + current_tx.data_mut_with(|edit: &mut RawLogStoreEdit| edit.delete_log(log_id)); + tail_chunks.map(|chunks| self.chunk_alloc.dealloc_batch(chunks.iter().cloned())); + + // Leave freeing head chunks to lazy delete -struct PersistentEdit { - change_table: BTreeMap, + self.state.lock().remove_from_write_set(log_id); + Ok(()) + } } -enum RawLogEdit { - Create(RawLogCreate), - Append(RawLogAppend), - Delete, +/// A raw(untrusted) log. +pub struct RawLog { + log_id: RawLogId, + log_entry: Option>>, + log_store: Arc>, + tx_provider: Arc, + lazy_delete: Option>>, + append_pos: AtomicUsize, + can_append: bool, } -struct RawLogCreate { - tail: RawLogTail, - // Whether the log is deleted after being created in a TX. - is_deleted: bool, +struct RawLogRef<'a, D> { + log_store: &'a RawLogStore, + log_head: Option>, + log_tail: Option>, } -struct RawLogAppend { - tail: RawLogTail, - // Whether the log is deleted after being appended in a TX. - is_deleted: bool, +struct RawLogHeadRef<'a> { + entry: MutexGuard<'a, RawLogEntry>, } -struct RawLogTail { - // The last chunk of the head. If it is partially filled - // (head_last_chunk_free_blocks > 0), then the tail should write to the - // free blocks in the last chunk of the head. - head_last_chunk_id: ChunkId, - head_last_chunk_free_blocks: u16, - // The chunks allocated and owned by the tail - chunks: Vec, - // The total number of blocks in the tail, including the blocks written to - // the last chunk of head and those written to the chunks owned by the tail. - num_blocks: usize, +struct RawLogTailRef<'a> { + log_id: RawLogId, + current: CurrentTx<'a>, } -impl RawLogHead { - pub fn append(&mut self, tail: &mut RawLogTail) { - todo!() +impl BlockLog for RawLog { + /// # Panics + /// + /// This method must be called within a TX. Otherwise, this method panics. + fn read(&self, pos: BlockId, buf: BufMut) -> Result<()> { + let log_ref = self.as_ref(); + log_ref.read(pos, buf) } -} -impl State { - pub fn new(persistent: PersistentState) -> Self { - let next_log_id = persistent.max_log_id.map_or(0, |max| max + 1); - Self { - persistent, - write_set: BTreeSet::new(), - next_log_id, + /// # Panics + /// + /// This method must be called within a TX. Otherwise, this method panics. + fn append(&self, buf: BufRef) -> Result { + if !self.can_append { + return_errno_with_msg!(PermissionDenied, "raw log not in append mode"); } - } - pub fn alloc_log_id(&mut self) -> RawLogId { - let new_log_id = self.next_log_id; - self.next_log_id = self.next_log_id - .checked_add(1) - .expect("64-bit IDs won't be exhausted even though IDs are not recycled"); - new_log_id + let nblocks = buf.nblocks(); + let mut log_ref = self.as_ref(); + log_ref.append(buf)?; + + let pos = self.append_pos.fetch_add(nblocks, Ordering::Release); + Ok(pos) } - pub fn add_to_write_set(&mut self, log_id: RawLogId) -> Result<()> { - let already_exists = self.write_set.insert(log_id); - if already_exists { - return Err(Error::AlreadyExists); - } - Ok() + fn flush(&self) -> Result<()> { + self.log_store.disk.flush() } - pub fn remove_from_write_set(&mut self, log_id: RawLogId) { - let is_removed = self.write_set.remove(&log_id); - debug_assert!(is_removed == false); + /// # Panics + /// + /// This method must be called within a TX. Otherwise, this method panics. + fn nblocks(&self) -> usize { + let log_ref = self.as_ref(); + log_ref.nblocks() } +} - pub fn find_log(&self, log_id: RawLogId) -> Option>> { - let log_table = &self.persistent.log_table; - log_table.get(&log_id).map(|entry| entry.clone()) +impl RawLog { + /// Gets the unique ID of raw log. + pub fn id(&self) -> RawLogId { + self.log_id } - pub fn commit(&mut self, change: &mut PersistentEdit, chunk_alloc: &ChunkAlloc) { - let mut all_changes = change.change_table.drain_filter(|| true); - for (log_id, change) in all_changes { - match change { - RawLogEdit::Create(create) => { - let RawLogCreate { tail, is_deleted } = create; - if is_deleted { - chunk_alloc.dealloc_batch(tail.chunks.iter()); - continue; + /// Gets the reference (handle) of raw log. + /// + /// # Panics + /// + /// This method must be called within a TX. Otherwise, this method panics. + fn as_ref<'a>(&'a self) -> RawLogRef { + let log_head = self.log_entry.as_ref().map(|entry| RawLogHeadRef { + entry: entry.lock(), + }); + let log_tail = { + // Check if the log exists create or append edit + let has_valid_edit = self.tx_provider.current().data_mut_with( + |store_edit: &mut RawLogStoreEdit| -> bool { + let Some(edit) = store_edit.edit_table.get(&self.log_id) else { + return false; + }; + match edit { + RawLogEdit::Create(_) | RawLogEdit::Append(_) => true, + RawLogEdit::Delete => false, } + }, + ); + if has_valid_edit { + Some(RawLogTailRef { + log_id: self.log_id, + current: self.tx_provider.current(), + }) + } else { + None + } + }; - self.create_log(log_id); - self.append_log(log_id, &mut tail); - } - RawLogEdit::Append(append) => { - let RawLogAppend { tail, is_deleted } = append; + RawLogRef { + log_store: &self.log_store, + log_head, + log_tail, + } + } +} + +impl Drop for RawLog { + fn drop(&mut self) { + if self.can_append { + let mut state = self.log_store.state.lock(); + state.remove_from_write_set(self.log_id); + } + } +} - // Even if a log is to be deleted, we will still commit - // its newly-appended data. +impl<'a, D: BlockSet> RawLogRef<'a, D> { + pub fn read(&self, mut pos: BlockId, mut buf: BufMut) -> Result<()> { + let mut nblocks = buf.nblocks(); + let mut buf_slice = buf.as_mut_slice(); - self.append_log(log_id, &mut tail); - } - RawLogEdit::Delete => { - self.delete_log(log_id); - } - } + let head_len = self.head_len(); + let tail_len = self.tail_len(); + let total_len = head_len + tail_len; + + if pos + nblocks > total_len { + return_errno_with_msg!(InvalidArgs, "do not allow short read"); + } + + let disk = &self.log_store.disk; + // Read from the head if possible and necessary + let head_opt = &self.log_head; + if let Some(head) = head_opt && pos < head_len { + let num_read = nblocks.min(head_len - pos); + + let read_buf = BufMut::try_from(&mut buf_slice[..num_read * BLOCK_SIZE])?; + head.read(pos, read_buf, &disk)?; + + pos += num_read; + nblocks -= num_read; + buf_slice = &mut buf_slice[num_read * BLOCK_SIZE..]; } + + // Read from the tail if possible and necessary + let tail_opt = &self.log_tail; + if let Some(tail) = tail_opt && pos >= head_len { + let num_read = nblocks.min(total_len - pos); + let read_buf = BufMut::try_from(&mut buf_slice[..num_read * BLOCK_SIZE])?; + + tail.read(pos - head_len, read_buf, &disk)?; + } + Ok(()) } - fn create_log(&mut self, new_log_id: RawLogId) { - let log_table = &mut self.persistent.log_table; - let new_log_entry = Arc::new(Mutex::new(RawLogEntry::new())); - let already_exists = log_table.insert(new_log_id, new_log_entry); - debug_assert!(already_exists == false); + pub fn append(&mut self, buf: BufRef) -> Result<()> { + let append_nblocks = buf.nblocks(); + let log_tail = self + .log_tail + .as_mut() + .expect("raw log must be opened in append mode"); + + // Allocate new chunks if necessary + let new_chunks = { + let chunks_needed = log_tail.calc_needed_chunks(append_nblocks); + let mut chunk_ids = Vec::with_capacity(chunks_needed); + for _ in 0..chunks_needed { + let new_chunk_id = self + .log_store + .chunk_alloc + .alloc() + .ok_or(Error::with_msg(OutOfMemory, "chunk allocation failed"))?; + chunk_ids.push(new_chunk_id); + } + chunk_ids + }; + log_tail.tail_mut_with(|tail: &mut RawLogTail| { + tail.chunks.extend(new_chunks); + }); + + log_tail.append(buf, &self.log_store.disk)?; + + // Update tail metadata + log_tail.tail_mut_with(|tail: &mut RawLogTail| { + tail.num_blocks += append_nblocks as u32; + }); + Ok(()) } - fn append_log(&mut self, log_id: RawLogId, tail: &RawLogTail) { - let log_table = &mut self.persistent.log_table; - let mut log_entry = log_table.get(&log_id).unwrap().lock(); - log_entry.head.append(tail); + pub fn nblocks(&self) -> usize { + self.head_len() + self.tail_len() } - fn delete_log(&mut self, log_id: RawLogId) { - let log_table = &mut self.persistent.log_table; - let Some(log_entry) = log_table.remove(&log_id) else { - return; - }; + fn head_len(&self) -> usize { + self.log_head.as_ref().map_or(0, |head| head.len()) + } - chunk_alloc.dealloc_batch(log_entry.head.chunks.iter()); + fn tail_len(&self) -> usize { + self.log_tail.as_ref().map_or(0, |tail| tail.len()) } } -/* -struct PersistentEdit { - change_table: BTreeMap, -} - */ -impl PersistentEdit { - pub fn create_log(&mut self, new_log_id: RawLogId) { - let new_log_chanage = RawLogEdit::Create(RawLogCreate::new()); - let existing_change = self.change_table.insert(new_log_id, new_log_change); - debug_assert!(existing_change.is_none()); + +impl<'a> RawLogHeadRef<'a> { + pub fn len(&self) -> usize { + self.entry.head.num_blocks as _ } - pub fn open_log(&mut self, log_id: RawLogId, log_entry: &LogEntry) -> Result<()> { - match self.change_table.get(&log_id) { - None => { - // insert an Append - } - Some(change) => { - // if change == create, no nothing - // if change == append, no nothing - // if change == delete, panic - } + pub fn read(&self, offset: BlockId, mut buf: BufMut, disk: &D) -> Result<()> { + let nblocks = buf.nblocks(); + debug_assert!(offset + nblocks <= self.entry.head.num_blocks as _); + + let prepared_blocks = self.prepare_blocks(offset, nblocks); + debug_assert_eq!(prepared_blocks.len(), nblocks); + + // TODO: Batch read + let mut iter_mut = buf.iter_mut(); + let mut i = 0; + while let Some(block_buf) = iter_mut.next() { + disk.read(prepared_blocks[i], block_buf)?; + i += 1; } - let new_log_chanage = RawLogEdit::Create(RawLogCreate::new()); - self.changes.insert(new_log_id, new_log_change); + + Ok(()) } - pub fn delete_log(&mut self, log_id: RawLogId) -> Option<_> { - match self.changes.get(&log_id) { - None => { - self.changes.insert(log_id, LogMetaChange::Delete); - } - Some(Create(_)) => { - let create = self.changes.insert(log_id, LogMetaChange::Delete); - let LogMetaChange::Create(parts) = create else { - panic!(""); - }; - self.free_parts(parts); - } - Some(Append(_)) => { - let append = self.changes.insert(log_id, LogMetaChange::Delete); - let LogMetaChange::Append(parts) = append else { - panic!(""); - }; - self.free_parts(parts); - } - Some(Delete) => { - return None; + /// Collect and prepare a set of blocks in head for read + pub fn prepare_blocks(&self, offset: BlockId, nblocks: usize) -> Vec { + let chunks = &self.entry.head.chunks; + let mut res_blocks = Vec::with_capacity(nblocks); + + let mut curr_chunk_idx = offset / CHUNK_NBLOCKS; + let mut curr_chunk_inner_offset = offset % CHUNK_NBLOCKS; + while res_blocks.len() != nblocks { + res_blocks.push(chunks[curr_chunk_idx] * CHUNK_NBLOCKS + curr_chunk_inner_offset); + curr_chunk_inner_offset += 1; + if curr_chunk_inner_offset == CHUNK_NBLOCKS { + curr_chunk_inner_offset = 0; + curr_chunk_idx += 1; } } + res_blocks } } +impl<'a> RawLogTailRef<'a> { + /// Apply given function to the immutable tail + pub fn tail_with(&self, f: F) -> R + where + F: FnOnce(&RawLogTail) -> R, + { + self.current.data_with(|store_edit: &RawLogStoreEdit| -> R { + let edit = store_edit.edit_table.get(&self.log_id).unwrap(); + match edit { + RawLogEdit::Create(create) => f(&create.tail), + RawLogEdit::Append(append) => f(&append.tail), + RawLogEdit::Delete => unreachable!(), + } + }) + } + /// Apply given function to the mutable tail + pub fn tail_mut_with(&mut self, f: F) -> R + where + F: FnOnce(&mut RawLogTail) -> R, + { + self.current + .data_mut_with(|store_edit: &mut RawLogStoreEdit| -> R { + let edit = store_edit.edit_table.get_mut(&self.log_id).unwrap(); + match edit { + RawLogEdit::Create(create) => f(&mut create.tail), + RawLogEdit::Append(append) => f(&mut append.tail), + RawLogEdit::Delete => unreachable!(), + } + }) + } -impl RawLogStore { - pub fn new( - disk: S, - tx_manager: Arc, - ) -> Self { - todo!() + pub fn len(&self) -> usize { + self.tail_with(|tail: &RawLogTail| tail.num_blocks as _) } -} -impl RawLogStoreInner { - pub fn create_log(&self) -> Result> { - let mut state = self.state.lock(); - let mut current_tx = self.tx_provider.current(); + pub fn read(&self, offset: BlockId, mut buf: BufMut, disk: &D) -> Result<()> { + let nblocks = buf.nblocks(); + let tail_nblocks = self.len(); + debug_assert!(offset + nblocks <= tail_nblocks); - let tx_id = current_tx.id(); - let new_log_id = state.alloc_log_id(); - current_tx.data_mut_with(|change: &mut PersistentEdit| { - change.create_log(new_log_id); - }); + let prepared_blocks = self.prepare_blocks(offset, nblocks); + debug_assert_eq!(prepared_blocks.len(), nblocks); - Ok(RawLog { - log_id: new_log_id, - tx_id, - log_store: self.weak_self.upgrade(), - log_entry: None, - can_write: false, // can_append - }) + // TODO: Batch read + let mut iter_mut = buf.iter_mut(); + let mut i = 0; + while let Some(block_buf) = iter_mut.next() { + disk.read(prepared_blocks[i], block_buf)?; + i += 1; + } + + Ok(()) } - pub fn open_log(&self, log_id: RawLogId, can_write: bool) -> Result> { - let mut state = self.state.lock(); - let mut current_tx = self.tx_provider.current(); + pub fn append(&self, buf: BufRef, disk: &D) -> Result<()> { + let nblocks = buf.nblocks(); - let log_entry = state.find_log(log_id); - // The log is already created by other Tx - if log_entry.is_some() { - // Prevent other TX from opening this log in the write mode. - if can_write { - state.add_to_write_set(log_id)?; - } - } else { - // The log must has been created by this Tx - let not_created = current_tx.data_mut_with(|change: &mut PersistentEdit| { - change.is_log_created(log_id) - }); - if !not_created { - return Err(Error::NotFound); - } - } + let prepared_blocks = self.prepare_blocks(self.len() as _, nblocks); + debug_assert_eq!(prepared_blocks.len(), nblocks); - // If the log is open in the write mode, change must be prepared - if can_write { - current_tx.data_mut_with(|change: &mut PersistentEdit| { - change.open_log(log_id, &*log_entry.lock()); - }); + // TODO: Batch write + for (i, block_buf) in buf.iter().enumerate() { + disk.write(prepared_blocks[i], block_buf)?; } - let tx_id = current_tx.id(); - Ok(RawLog { - log_id, - tx_id, - log_store: self.weak_self.upgrade(), - log_entry, - can_write, + Ok(()) + } + + // Calculate how many new chunks we need for an append request + pub fn calc_needed_chunks(&self, append_nblocks: usize) -> usize { + self.tail_with(|tail: &RawLogTail| { + let avail_blocks = tail.head_last_chunk_free_blocks as usize + + tail.chunks.len() * CHUNK_NBLOCKS + - tail.num_blocks as usize; + if append_nblocks > avail_blocks { + align_up(append_nblocks - avail_blocks, CHUNK_NBLOCKS) / CHUNK_NBLOCKS + } else { + 0 + } }) } - pub fn delete_log(&self, log_id: RawLogId) -> Result<()> { - let mut current_tx = self.tx_provider.current(); - current_tx.data_mut_with(|change: &mut PersistentEdit| { - change.delete_log(log_id); + /// Collect and prepare a set of blocks in tail for read/append + fn prepare_blocks(&self, offset: BlockId, nblocks: usize) -> Vec { + self.tail_with(|tail: &RawLogTail| { + let mut res_blocks = Vec::with_capacity(nblocks); + let mut step = 0; + if offset < tail.head_last_chunk_free_blocks as _ { + for i in 0..tail.head_last_chunk_free_blocks { + // Collect available blocks from the last chunk of the head first if necessary + if res_blocks.len() == nblocks { + debug_assert_eq!(step, offset + nblocks); + return res_blocks; + } + if step >= offset && step - offset < tail.head_last_chunk_free_blocks as _ { + let avail_chunk = tail.head_last_chunk_id * CHUNK_NBLOCKS + + (CHUNK_NBLOCKS - tail.head_last_chunk_free_blocks as usize + + i as usize); + res_blocks.push(avail_chunk); + } + step += 1; + } + } + + // Collect available blocks from the tail first if necessary + let mut curr_chunk_idx = 0; + let mut curr_chunk_inner_offset = 0; + let chunks = &tail.chunks; + while res_blocks.len() != nblocks { + if step >= offset { + res_blocks + .push(chunks[curr_chunk_idx] * CHUNK_NBLOCKS + curr_chunk_inner_offset); + } + step += 1; + curr_chunk_inner_offset += 1; + if curr_chunk_inner_offset == CHUNK_NBLOCKS { + curr_chunk_inner_offset = 0; + curr_chunk_idx += 1; + } + } + + debug_assert_eq!(step, offset + nblocks); + res_blocks }) } } -pub struct RawLog { - log_id: RawLogId, - tx_id: TxId, - log_store: Arc>, - log_entry: Option>>, - can_write: bool, +unsafe impl Send for RawLogStore {} +unsafe impl Sync for RawLogStore {} + +//////////////////////////////////////////////////////////////////////////////// +// Persistent State +//////////////////////////////////////////////////////////////////////////////// + +/// The volatile and persistent state of a `RawLogStore`. +struct State { + persistent: RawLogStoreState, + write_set: BTreeSet, + lazy_deletes: BTreeMap>>, } -impl Drop for RawLog { - fn drop(&mut self) { - if self.can_write() { - let mut state = self.log_store.state.lock(); - state.remove_from_write_set(self.log_id); - } - } +/// The persistent state of a `RawLogStore`. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RawLogStoreState { + log_table: BTreeMap, + next_free_log_id: u64, } -impl BlockLog for RawLog { - fn read(&self, mut pos: BlockId, mut buf: &mut impl BlockBuf) -> Result<()> { - debug_assert!(buf.len() > 0); +/// A log entry implies the persistent state of the raw log. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(super) struct RawLogEntry { + head: RawLogHead, +} - let head_opt = self.head(); - let tail_opt = self.tail(); - let head_len = head_opt.map_or(0, |head| head.len()); - let tail_len = tail_opt.map_or(0, |tail| tail.len()); - let total_len = head_len + tail_len; +/// A log head contains chunk metadata of a log's already-persist data. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(super) struct RawLogHead { + pub chunks: Vec, + pub num_blocks: u32, +} - // Do not allow "short read" - if pos + buf.len() > total_len { - return Err(EINVAL); +impl State { + pub fn new( + persistent: RawLogStoreState, + lazy_deletes: BTreeMap>>, + ) -> Self { + Self { + persistent: persistent.clone(), + write_set: BTreeSet::new(), + lazy_deletes, } + } - // Read from the head if possible and necessary - if let Some(head) = head_opt && pos < head_len { - let read_len = buf.len().min(head_len - pos); - head.read(pos, &mut buf[..read_len])?; + pub fn apply(&mut self, edit: &RawLogStoreEdit) { + edit.apply_to(&mut self.persistent); + } - pos += read_len; - buf = &mut buf[read_len..]; - } - // Read from the tail if possible and necessary - if let Some(tail) = tail_opt && pos >= head_len { - let read_len = buf.len().min(total_len - pos); - tail.read(pos, &mut buf[..read_len])?; + pub fn add_to_write_set(&mut self, log_id: RawLogId) -> Result<()> { + let not_exists = self.write_set.insert(log_id); + if !not_exists { + // Obey single-writer rule + return_errno_with_msg!(PermissionDenied, "the raw log has more than one writer"); } Ok(()) } - fn append(&self, buf: &impl BlockBuf) -> Result { - debug_assert!(buf.len() > 0); + pub fn remove_from_write_set(&mut self, log_id: RawLogId) { + let _is_removed = self.write_set.remove(&log_id); + // `_is_removed` may equal to `false` if the log has already been deleted + } +} - if !self.can_write { - return Err(EPERM); +impl RawLogStoreState { + pub fn new() -> Self { + Self { + log_table: BTreeMap::new(), + next_free_log_id: 0, } + } - self.tail().unwrap().append(buf) + pub fn alloc_log_id(&mut self) -> u64 { + let new_log_id = self.next_free_log_id; + self.next_free_log_id = self + .next_free_log_id + .checked_add(1) + .expect("64-bit IDs won't be exhausted even though IDs are not recycled"); + new_log_id } - fn flush(&self) -> Result<()> { - self.log_store.disk.flush() + pub(super) fn find_log(&self, log_id: u64) -> Option { + self.log_table.get(&log_id).cloned() + } + + pub fn create_log(&mut self, new_log_id: u64) { + let new_log_entry = RawLogEntry { + head: RawLogHead::new(), + }; + let already_exists = self.log_table.insert(new_log_id, new_log_entry).is_some(); + debug_assert_eq!(already_exists, false); + } + + pub(super) fn append_log(&mut self, log_id: u64, tail: &RawLogTail) { + let log_entry = self.log_table.get_mut(&log_id).unwrap(); + log_entry.head.append(tail); } - fn num_blocks(&self) -> usize { - let head_opt = self.head(); - let tail_opt = self.tail(); - let head_len = head_opt.map_or(0, |head| head.len()); - let tail_len = tail_opt.map_or(0, |tail| tail.len()); - head_len + tail_len; + pub fn delete_log(&mut self, log_id: u64) { + let _ = self.log_table.remove(&log_id); + // Leave chunk deallocation to lazy delete } +} - fn head(&self) -> Option> { - +impl RawLogHead { + pub fn new() -> Self { + Self { + chunks: Vec::new(), + num_blocks: 0, + } } - fn tail(&self) -> Option> { - todo!() + pub fn append(&mut self, tail: &RawLogTail) { + // Update head + self.chunks.extend(tail.chunks.iter()); + self.num_blocks += tail.num_blocks; + // No need to update tail } } -struct RawLogHeadRef<'a, S> { - log_store: &'a RawLogStoreInner, - log_entry: MutexGuard<'a, RawLogEntry>, +//////////////////////////////////////////////////////////////////////////////// +// Persistent Edit +//////////////////////////////////////////////////////////////////////////////// + +/// A persistent edit to the state of `RawLogStore`. +#[derive(Clone, Serialize, Deserialize)] +pub struct RawLogStoreEdit { + edit_table: BTreeMap, +} + +/// The basic unit of a persistent edit to the state of `RawLogStore`. +#[derive(Clone, Serialize, Deserialize)] +pub(super) enum RawLogEdit { + Create(RawLogCreate), + Append(RawLogAppend), + Delete, } -struct RawLogTailRef<'a, S> { - log_store: &'a RawLogStoreInner, - log_tail: &'a mut RawLogTail, +/// An edit that implies a log being created. +#[derive(Clone, Serialize, Deserialize)] +pub(super) struct RawLogCreate { + tail: RawLogTail, } -struct RawLogRef<'a, S> { - log_store: &'a RawLogStoreInner, - log_entry: MutexGuard<'a, RawLogEntry>, - log_tail: &'a mut RawLogTail, +/// An edit that implies an existing log being appended. +#[derive(Clone, Serialize, Deserialize)] +pub(super) struct RawLogAppend { + tail: RawLogTail, } -impl<'a, S> RawLogRef<'a, S> { - fn read(&self, mut log_pos: BlockId, mut buf: &mut impl BlockBuf) -> Result<()> { +/// A log tail contains chunk metadata of a log's TX-ongoing data. +#[derive(Clone, Serialize, Deserialize)] +pub(super) struct RawLogTail { + // The last chunk of the head. If it is partially filled + // (head_last_chunk_free_blocks > 0), then the tail should write to the + // free blocks in the last chunk of the head. + head_last_chunk_id: ChunkId, + head_last_chunk_free_blocks: u16, + // The chunks allocated and owned by the tail + chunks: Vec, + // The total number of blocks in the tail, including the blocks written to + // the last chunk of head and those written to the chunks owned by the tail. + num_blocks: u32, +} +impl RawLogStoreEdit { + /// Creates a new empty edit table. + pub fn new() -> Self { + Self { + edit_table: BTreeMap::new(), + } } - fn append(&mut self, buf: &impl BlockBuf) -> Result<()> { + /// Records a log creation in the edit. + pub fn create_log(&mut self, new_log_id: RawLogId) { + let create_edit = RawLogEdit::Create(RawLogCreate::new()); + let edit_exists = self.edit_table.insert(new_log_id, create_edit); + debug_assert!(edit_exists.is_none()); } - pub fn len(&self) -> usize { - self.log_entry.num_blocks + self.log_tail.len() - } - - /// Get a number of consecutive blocks starting at a specified position. - /// - /// The start of the returned range is always equal to `pos`. - /// The length of the returned range is not greater than `max_len`. - /// - /// If `pos` is smaller than the length of the raw log, - /// then the returned range is non-empty. Otherwise, the returned range - /// is empty. - fn get_consecutive(&self, pos: BlockId, max_len: usize) -> Range { - todo!() - } - - /// Extend the length of `ChunkBackedBlockVec` by allocating some - /// consecutive blocks. - /// - /// The newly-allocated consecutive blocks are returned as a block range, - /// whose length is not greater than `max_len`. If the allocation fails, - /// then a zero-length range shall be returned. - fn extend_consecutive(&mut self, max_len: usize) -> Option> { - todo!() + /// Records a log being opened in the edit. + pub(super) fn open_log(&mut self, log_id: RawLogId, log_entry: &RawLogEntry) { + match self.edit_table.get(&log_id) { + None => { + // Insert an append edit + let tail = RawLogTail::from_head(&log_entry.head); + let append_edit = RawLogEdit::Append(RawLogAppend { tail }); + let edit_exists = self.edit_table.insert(log_id, append_edit); + debug_assert!(edit_exists.is_none()); + } + Some(edit) => { + // If edit == create, unreachable: there can't be a persistent log entry + // when the log is just created in an ongoing TX + if let RawLogEdit::Create(_) = edit { + unreachable!(); + } + // If edit == append, do nothing + // If edit == delete, panic + if let RawLogEdit::Delete = edit { + panic!("try to open a deleted log!"); + } + } + } } -} + /// Records a log deletion in the edit, returns the tail chunks of the deleted log. + pub fn delete_log(&mut self, log_id: RawLogId) -> Option> { + match self.edit_table.insert(log_id, RawLogEdit::Delete) { + None => None, + Some(RawLogEdit::Create(create)) => { + // No need to panic in create + Some(create.tail.chunks.clone()) + } + Some(RawLogEdit::Append(append)) => { + // No need to panic in append (WAL case) + Some(append.tail.chunks.clone()) + } + Some(RawLogEdit::Delete) => panic!("try to delete a deleted log!"), + } + } -impl BlockLog for RawLog { - fn read_len(&self) -> usize { + pub fn is_log_created(&self, log_id: RawLogId) -> bool { + match self.edit_table.get(&log_id) { + Some(RawLogEdit::Create(_)) | Some(RawLogEdit::Append(_)) => true, + Some(RawLogEdit::Delete) | None => false, + } + } + pub fn iter_created_logs(&self) -> impl Iterator + '_ { + self.edit_table + .iter() + .filter(|(_, edit)| { + if let RawLogEdit::Create(_) = edit { + true + } else { + false + } + }) + .map(|(id, _)| *id) } -} -struct PersistentState { - chunk_logs: BTreeMap, - max_log_id: Option, + pub fn iter_deleted_logs(&self) -> impl Iterator + '_ { + self.edit_table + .iter() + .filter(|(_, edit)| { + if let RawLogEdit::Delete = edit { + true + } else { + false + } + }) + .map(|(id, _)| *id) + } } -struct PersistentEdit { - changed_logs: BTreeMap, - max_log_id: Option, -} +impl Edit for RawLogStoreEdit { + fn apply_to(&self, state: &mut RawLogStoreState) { + for (&log_id, log_edit) in self.edit_table.iter() { + match log_edit { + RawLogEdit::Create(create) => { + let RawLogCreate { tail } = create; + state.create_log(log_id); + state.append_log(log_id, tail); -struct RawLogState { - blocks: Arc, - len: usize, - is_deleted: bool, + // Journal's state also needs to be updated + if state.next_free_log_id <= log_id { + let _ = state.alloc_log_id(); + } + } + RawLogEdit::Append(append) => { + let RawLogAppend { tail } = append; + state.append_log(log_id, tail); + } + RawLogEdit::Delete => { + state.delete_log(log_id); + } + } + } + } } -// What's the expected behaviors of create, open, delete, read, write of -// raw logs in Tx? -// -// 1. The local changes made (e.g., creations, deletions, writes) in a Tx are -// immediately visible to the Tx, but not other Tx until the Tx is committed. -// For example, a newly-created log within Tx A is immediately usable within Tx, -// but becomes visible to other Tx only until A is committed. -// As another example, when a log is deleted within a Tx, then the Tx can no -// longer open the log. But other concurrent Tx can still open the log. -// -// 2. If a Tx is aborted, then all the local changes made in the TX will be -// discarded. -// -// 3. At any given time, a log can have at most one writer Tx. -// A Tx becomes the writer of a log when the log is opened with the write -// permission in the Tx. And it stops being the writer Tx of the log only when -// the Tx is terminated (not when the log is closed within Tx). -// This single-writer rule avoids potential conflicts between concurrent -// writing to the same log. -// -// 4. Log creation does not conflict with log deleation, read, or write as -// every newly-created log is assigned a unique ID automatically. -// -// 4. Deleting a log does not affect any opened instance of the log in the Tx -// or other Tx (similar to deleting a file in a UNIX-style system). -// It is only until the deleting Tx is committed and the last -// instance of the log is closed shall the log be deleted and its disk space -// be freed. -// -// 5. The Tx commitment will not fail due to conflicts between concurrent -// operations in different Tx. - -// How to create? -// -// 1. - -// * State and Change are quite similar. -// * Change must be self-contained. - -struct RawLog { - meta: Arc>, +impl RawLogCreate { + pub fn new() -> Self { + Self { + tail: RawLogTail::new(), + } + } } -impl Drop for RawLog { - fn drop(&mut self) { - let mut meta = self.meta.lock(); - meta.delete_count +impl RawLogTail { + pub fn new() -> Self { + Self { + head_last_chunk_id: 0, + head_last_chunk_free_blocks: 0, + chunks: Vec::new(), + num_blocks: 0, + } + } + + pub fn from_head(head: &RawLogHead) -> Self { + Self { + head_last_chunk_id: *head.chunks.last().unwrap_or(&0), + head_last_chunk_free_blocks: (head.chunks.len() * CHUNK_NBLOCKS + - head.num_blocks as usize) as _, + chunks: Vec::new(), + num_blocks: 0, + } } } +impl TxData for RawLogStoreEdit {} + +#[cfg(test)] +mod tests { + use super::*; + use crate::layers::{ + bio::{Buf, MemDisk}, + log::chunk::{CHUNK_NBLOCKS, CHUNK_SIZE}, + }; + + use std::thread::{self, JoinHandle}; + + fn create_raw_log_store() -> Result>> { + let nchunks = 8; + let nblocks = nchunks * CHUNK_NBLOCKS; + let tx_provider = TxProvider::new(); + let chunk_alloc = ChunkAlloc::new(nchunks, tx_provider.clone()); + let mem_disk = MemDisk::create(nblocks)?; + Ok(RawLogStore::new(mem_disk, tx_provider, chunk_alloc)) + } + fn find_persistent_log_entry( + log_store: &Arc>, + log_id: RawLogId, + ) -> Option { + let state = log_store.state.lock(); + state.persistent.find_log(log_id) + } + #[test] + fn raw_log_store_fns() -> Result<()> { + let raw_log_store = create_raw_log_store()?; + + // TX 1: create a new log and append contents (committed) + let mut tx = raw_log_store.new_tx(); + let res: Result = tx.context(|| { + let new_log = raw_log_store.create_log()?; + let mut buf = Buf::alloc(4)?; + buf.as_mut_slice().fill(2u8); + new_log.append(buf.as_ref())?; + assert_eq!(new_log.nblocks(), 4); + Ok(new_log.id()) + }); + let log_id = res?; + tx.commit()?; -impl PersistentState { - pub fn commit(&mut self, change: &mut PersistentEdit) { + let entry = find_persistent_log_entry(&raw_log_store, log_id).unwrap(); + assert_eq!(entry.head.num_blocks, 4); - } + // TX 2: open the log, append contents then read (committed) + let mut tx = raw_log_store.new_tx(); + let res: Result<_> = tx.context(|| { + let log = raw_log_store.open_log(log_id, true)?; - pub fn uncommit(&mut self, change: &mut PersistentEdit) { + let mut buf = Buf::alloc(CHUNK_NBLOCKS)?; + buf.as_mut_slice().fill(5u8); + log.append(buf.as_ref())?; - } -} + Ok(()) + }); + res?; -struct RawLog { - id: RawLogId, + let res: Result<_> = tx.context(|| { + let log = raw_log_store.open_log(log_id, true)?; -} + let mut buf = Buf::alloc(CHUNK_NBLOCKS)?; + log.read(1, buf.as_mut())?; + assert_eq!(&buf.as_slice()[..3 * BLOCK_SIZE], &[2u8; 3 * BLOCK_SIZE]); + assert_eq!( + &buf.as_slice()[3 * BLOCK_SIZE..CHUNK_SIZE], + &[5u8; 1021 * BLOCK_SIZE] + ); + Ok(()) + }); + res?; + tx.commit()?; + + let entry = find_persistent_log_entry(&raw_log_store, log_id).unwrap(); + assert_eq!(entry.head.num_blocks, 1028); + + // TX 3: delete the log (committed) + let mut tx = raw_log_store.new_tx(); + let res: Result<_> = tx.context(|| raw_log_store.delete_log(log_id)); + res?; + tx.commit()?; + + let entry_opt = find_persistent_log_entry(&raw_log_store, log_id); + assert!(entry_opt.is_none()); + + // TX 4: create a new log (aborted) + let mut tx = raw_log_store.new_tx(); + let res: Result<_> = tx.context(|| { + let new_log = raw_log_store.create_log()?; + Ok(new_log.id()) + }); + let new_log_id = res?; + tx.abort(); + let entry_opt = find_persistent_log_entry(&raw_log_store, new_log_id); + assert!(entry_opt.is_none()); -// How to delete? -// 1. Mark the log deleted in Change. so that the log -// cannot be opened again within the transaction. -// 2. When commit, mark the log deleted in State. This way, -// other transactions that are still using the log can continue -// use the log. But future transactions are not allowed to open -// the deleted log. -// 3. When the last active instance of the deleted log (which -// could be in Change or State) is dropped -// (wh), -// then + Ok(()) + } -// A log joins a Tx when it is opened. And leaves the Tx -// when the Tx ends. -// If the log joins the Tx with writable mode, then -// no other Tx can open it. \ No newline at end of file + #[test] + fn raw_log_deletion() -> Result<()> { + let raw_log_store = create_raw_log_store()?; + + // Create a new log and append contents + let mut tx = raw_log_store.new_tx(); + let content = 5_u8; + let res: Result<_> = tx.context(|| { + let new_log = raw_log_store.create_log()?; + let mut buf = Buf::alloc(1)?; + buf.as_mut_slice().fill(content); + new_log.append(buf.as_ref())?; + Ok(new_log.id()) + }); + let log_id = res?; + tx.commit()?; + + // Concurrently open, read then delete the log + let handlers = (0..16) + .map(|_| { + let raw_log_store = raw_log_store.clone(); + thread::spawn(move || -> Result<()> { + let mut tx = raw_log_store.new_tx(); + println!( + "TX[{:?}] executed on thread[{:?}]", + tx.id(), + crate::os::CurrentThread::id() + ); + let _ = tx.context(|| { + let log = raw_log_store.open_log(log_id, false)?; + let mut buf = Buf::alloc(1)?; + log.read(0 as BlockId, buf.as_mut())?; + assert_eq!(buf.as_slice(), &[content; BLOCK_SIZE]); + raw_log_store.delete_log(log_id) + }); + tx.commit() + }) + }) + .collect::>>>(); + for handler in handlers { + handler.join().unwrap()?; + } + + // The log has already been deleted + let mut tx = raw_log_store.new_tx(); + let _ = tx.context(|| { + let res = raw_log_store.open_log(log_id, false).map(|_| ()); + res.expect_err("result must be NotFound"); + }); + tx.commit() + } +} diff --git a/src/layers/3-log/tx_log.rs b/src/layers/3-log/tx_log.rs index a027441..640dfd9 100644 --- a/src/layers/3-log/tx_log.rs +++ b/src/layers/3-log/tx_log.rs @@ -1,374 +1,335 @@ -use self::journaling::{Journal, JournalCompactPolicy}; -use super::chunk::{ChunkAlloc, ChunkAllocEdit}; -use super::raw_log::{RawLog, RawLogId, RawLogStore}; -use crate::layers::bio::{BlockBuf, BlockId, BlockSet}; -use crate::layers::crypto::{CryptoLog, RootMhtMeta}; -use crate::layers::edit::{Edit, EditJournalMeta}; -use crate::tx::{CurrentTx, Tx, TxId, TxProvider}; -use crate::util::LazyDelete; - -use spin::Mutex; -use std::collections::{BTreeMap, BTreeSet}; -use std::sync::Arc; +//! A store of transactional logs. +use self::journaling::{AllEdit, AllState, Journal, JournalCompactPolicy}; +use super::chunk::{ChunkAlloc, ChunkAllocEdit, ChunkAllocState}; +use super::raw_log::{RawLog, RawLogId, RawLogStore, RawLogStoreEdit, RawLogStoreState}; +use crate::layers::bio::{BlockId, BlockSet, Buf, BufMut, BufRef}; +use crate::layers::crypto::{CryptoLog, NodeCache, RootMhtMeta}; +use crate::layers::edit::{CompactPolicy, Edit, EditJournal, EditJournalMeta}; +use crate::layers::log::chunk::CHUNK_NBLOCKS; +use crate::os::{AeadKey as Key, Mutex, RwLock, Skcipher, SkcipherIv, SkcipherKey}; +use crate::prelude::*; +use crate::tx::{CurrentTx, Tx, TxData, TxId, TxProvider}; +use crate::util::{LazyDelete, RandomInit}; + +use alloc::collections::{BTreeMap, BTreeSet}; // TODO: Find alternatives to adapt 'rust-for-linux' +use core::any::Any; +use core::sync::atomic::{AtomicBool, Ordering}; +use lru::LruCache; +use pod::Pod; +use serde::{Deserialize, Serialize}; + +pub type TxLogId = RawLogId; +type BucketName = String; /// A store of transactional logs. +/// +/// Disk layout: +/// ```text +/// ------------------------------------------------------ +/// | Superblock | RawLogStore region | Journal region | +/// ------------------------------------------------------ +/// ``` +#[derive(Clone)] pub struct TxLogStore { state: Arc>, key: Key, raw_log_store: Arc>, - journal: Arc>, + journal: Arc>>, tx_provider: Arc, } -pub type TxLogId = RawLogId; - -struct Superblock { +/// Superblock of `TxLogStore`. +#[repr(C)] +#[derive(Clone, Copy, Pod, Debug)] +pub struct Superblock { journal_area_meta: EditJournalMeta, chunk_area_nblocks: usize, } -impl Superblock { - /// Returns the total number of blocks occupied by the `TxLogStore`. - pub fn total_nblocks(&self) -> usize { - self.journal_area_meta.total_nblocks() + self.chunk_area_nblocks - } -} - -/// The volatile and persistent state of a `TxLogStore`. -struct State { - persistent: TxLogStoreState, - log_table: BTreeMap>>, -} - -/// The persistent state of a `TxLogStore`. -struct TxLogStoreState { - log_table: BTreeMap, - bucket_table: BTreeMap, -} - -struct TxLogEntry { - bucket: BucketName, - key: Key, - root_mht: RootMhtMeta, -} - -type BucketName = Arc; - -struct Bucket { - log_ids: BTreeSet, -} - -struct TxLogStoreEdit { - edit_table: BTreeMap, -} - -enum TxLogEdit { - Create(TxLogCreate), - Append(TxLogAppend), - Delete, -} - -struct TxLogCreate { - bucket: BucketName, - key: Key, - root_mht: Option, -} - -struct TxLogAppend { - root_mht: RootMhtMeta, -} - -// Used for per-tx data, track open logs in memory -struct OpenLogTable { - log_table: BTreeMap>>, -} +impl TxLogStore { + /// Formats the disk to create a new instance of `TxLogStore`. + /// + /// Each instance will be assigned a unique, automatically-generated root + /// key. + pub fn format(disk: D) -> Result { + let total_nblocks = disk.nblocks(); + let (log_store_nblocks, journal_nblocks) = + Self::calc_store_and_journal_nblocks(total_nblocks); + let log_store_area = disk.subset(1..1 + log_store_nblocks)?; + let journal_area = + disk.subset(1 + log_store_nblocks..1 + log_store_nblocks + journal_nblocks)?; -/* -struct State { - persistent: TxLogStoreState, - log_table: BTreeMap>>, -} - */ -impl State { - pub fn new(persistent: TxLogStoreState, raw_log_store: &Arc>) -> Self { - todo!("init LazyDelete") - } + let tx_provider = TxProvider::new(); - pub fn apply(&mut self, edit: &TxLogStoreEdit) { - edit.apply_to(&mut self.persistent); // apply edit to self.persistent + let nchunks = log_store_nblocks / CHUNK_NBLOCKS; + let chunk_alloc = ChunkAlloc::new(nchunks, tx_provider.clone()); + let raw_log_store = RawLogStore::new(log_store_area, tx_provider.clone(), chunk_alloc); + let tx_log_store_state = TxLogStoreState::new(); - // Do lazy deletion of logs - let deleted_log_ids = edit.edit_table.iter_deleted_logs(); - for deleted_log_id in deleted_log_ids { - let Some(lazy_delete) = self.log_table.remove(deleted_log_id) else { - // Other concurrent TXs have deleted the same log - continue; + let journal = { + let all_state = AllState { + chunk_alloc: ChunkAllocState::new(nchunks), + raw_log_store: RawLogStoreState::new(), + tx_log_store: tx_log_store_state.clone(), }; - LazyDelete::delete(&lazy_delete); - } - } -} - -impl Edit for TxLogStoreEdit { - fn apply_to(&self, state: &mut TxLogStoreState) { - for (log_id, log_edit) in &self.edit_table { - match log_edit { - TxLogEdit::Create(create_log) => { - let TxLogCreate { - bucket, - key, - root_mht, - .. - } = create_log; - state.create_log(log_id, bucket, key, root_mht.unwrap()); - } - TxLogEdit::Append(append_log) => { - let TxLogAppend { root_mht, .. } = append_log; - state.append_log(root_mht); - } - TxLogEdit::Delete => { - state.delete_log(log_id); - } - } - } - } -} - -/* -struct TxLogStoreState { - log_table: BTreeMap, - bucket_table: BTreeMap, -} - */ -impl TxLogStoreState { - pub fn create_log( - &mut self, - new_log_id: TxLogId, - bucket: BucketName, - key: Key, - root_mht: RootMhtMeta, - ) { - todo!() - } - - pub fn append_log(&mut self, root_mht: RootMhtMeta) { - todo!() - } - - pub fn list_logs(&self, bucket_name: &str) -> Result> { - let bucket = self.bucket_table.get(bucket_name).ok_or(Error::NotFound)?; - Ok(bucket.log_ids.clone()); - } - - pub fn find_log(&self, log_id: TxLogId) -> Result<&TxLogEntry> { - self.log_table.get(&log_id).ok_or(Error::NotFound) - } - - pub fn contains_log(&self, log_id: TxLogId) -> bool { - self.log_table.contains(&log_id) - } - - pub fn delete_log(&mut self, log_id: TxLogId) { - // Do not check the result because concurrent TXs may decide to delete - // the same logs. - let _ = self.log_table.remove(&log_id); - } -} - -/* -struct TxLogStoreEdit { - edit_table: BTreeMap, -} - */ -impl TxLogStoreEdit { - pub fn is_log_deleted(&self, log_id: TxLogId) -> bool { - todo!() - } - - pub fn is_log_created(&self, log_id: TxLogId) -> bool { - todo!() - } - - pub fn delete_log(&mut self, log_id: TxLogId) -> Result<()> { - match self.edit_table.get_mut(&log_id) { - None => { - self.edit_table.insert(log_id, TxLogEdit::Delete); - } - Some(TxLogEdit::Create(_)) => { - self.edit_table.remove(&log_id); - } - Some(TxLogEdit::Append(_)) => { - panic!("TxLogEdit::Append is added at very late stage, after which logs won't get deleted"); - } - Some(TxLogEdit::Delete) => { - return Err(Error::NotFound); - } - } - Ok(()) - } + Arc::new(Mutex::new(Journal::format( + journal_area, + all_state, + 16384, // TBD + JournalCompactPolicy {}, + )?)) + }; - pub fn iter_deleted_logs(&self) -> impl Iterator { - self.edit_table.iter - } + let superblock = Superblock { + journal_area_meta: journal.lock().meta(), + chunk_area_nblocks: log_store_nblocks, + }; + let key = Key::random(); + superblock.persist(&disk, &key)?; - pub fn update_log_metas(&mut self, iter: I) - where - I: Iterator, - { - // For newly-created logs, update RootMhtMeta - // For existing logs that are appended, add TxLogAppend - todo!() + Ok(Self::from_parts( + tx_log_store_state, + key, + raw_log_store, + journal, + tx_provider, + )) } -} -impl TxLogStore { - /// Format the disk to create a new instance of `TxLogStore`. - /// - /// Each instance will be assigned a unique, automatically-generated root - /// key. - pub fn format(disk: D) -> Result { - todo!() + fn calc_store_and_journal_nblocks(total_nblocks: usize) -> (usize, usize) { + let log_store_nblocks = { + let nblocks = (total_nblocks - 1) * 9 / 10; + align_down(nblocks, CHUNK_NBLOCKS) + }; + let journal_nblocks = total_nblocks - 1 - log_store_nblocks; + debug_assert!(1 + log_store_nblocks + journal_nblocks <= total_nblocks); + (log_store_nblocks, journal_nblocks) } - /// Recover an existing `TxLogStore` from a disk using the given key. + /// Recovers an existing `TxLogStore` from a disk using the given key. pub fn recover(disk: D, key: Key) -> Result { - let sb = Superblock::open(disk.subset(0..1), key)?; - - if disk.nblocks() < sb.required_nblocks() { - return Err(Error::NotEnoughSpace); + let superblock = Superblock::open(&disk.subset(0..1)?, &key)?; + if disk.nblocks() < superblock.total_nblocks() { + return_errno_with_msg!(OutOfDisk, "given disk lacks space for recovering"); } let tx_provider = TxProvider::new(); let journal = { - let journal_area_size = sb.journal_area_meta.total_nblocks(); - let journal_area = disk.subset(1..1 + journal_area_size); - let journal = Journal::recover( - journal_area, - &sb.journal_area_meta, - JournalCompactPolicy::new(), + let journal_area_meta = &superblock.journal_area_meta; + let journal_area = disk.subset( + 1 + superblock.chunk_area_nblocks + ..1 + superblock.chunk_area_nblocks + journal_area_meta.total_nblocks(), )?; - let journal = Arc::new(Mutex::new(journal)); - - tx_provider.register_commit_handler({ - let journal = journal.clone(); - |current| { - let mut journal = journal.lock(); - current.data_with(|edit: &ChunkAllocEdit| { - journal.add(edit); - }); - current.data_with(|edit: &RawLogStoreEdit| { - journal.add(edit); - }); - current.data_with(|edit: &TxLogStoreEdit| { - journal.add(edit); - }); - journal.commit(); - } - }); - - journal + Journal::recover(journal_area, &journal_area_meta, JournalCompactPolicy {})? }; + let all_state = journal.state(); - let state = journal.state(); - let chunk_alloc = ChunkAlloc::from_parts(state.chunk_alloc.clone(), tx_provider.clone()); - let chunk_area = - disk.subset(1 + journal_area_size..1 + journal_area_size + sb.chunk_area_nblocks); - let raw_log_store: RawLogStore = RawLogStore::from_parts( - state.raw_log_store.clone(), + let chunk_alloc = + ChunkAlloc::from_parts(all_state.chunk_alloc.clone(), tx_provider.clone()); + let chunk_area = disk.subset(1..1 + superblock.chunk_area_nblocks)?; + let raw_log_store = RawLogStore::from_parts( + all_state.raw_log_store.clone(), chunk_area, chunk_alloc, tx_provider.clone(), ); let tx_log_store = TxLogStore::from_parts( - state.tx_log_store.clone(), + all_state.tx_log_store.clone(), key, raw_log_store, - journal, + Arc::new(Mutex::new(journal)), tx_provider, ); + Ok(tx_log_store) } fn from_parts( state: TxLogStoreState, key: Key, - raw_log_store: RawLogStore, - journal: Journal, + raw_log_store: Arc>, + journal: Arc>>, tx_provider: Arc, ) -> Self { - let new_self = Self { - state: Arc::new(Mutex::new(state)), - key, - raw_log_store, - journal, - tx_provider, + let new_self = { + // Prepare lazy deletes and log caches first from persistent state + let (lazy_deletes, log_caches) = { + let (mut delete_table, mut cache_table) = (BTreeMap::new(), BTreeMap::new()); + for log_id in state.list_all_logs() { + Self::add_lazy_delete(log_id, &mut delete_table, &raw_log_store); + cache_table.insert(log_id, Arc::new(CryptoLogCache::new(log_id, &tx_provider))); + } + (delete_table, cache_table) + }; + + Self { + state: Arc::new(Mutex::new(State::new(state, lazy_deletes, log_caches))), + key, + raw_log_store, + journal: journal.clone(), + tx_provider: tx_provider.clone(), + } }; - tx_provider.register_data_initializer(|| TxLogStoreEdit::new()); - tx_provider.register_data_initializer(|| OpenLogTable::new()); + // TX data + tx_provider.register_data_initializer(Box::new(|| TxLogStoreEdit::new())); + tx_provider.register_data_initializer(Box::new(|| OpenLogTable::::new())); + tx_provider.register_data_initializer(Box::new(|| OpenLogCache::new())); + + // Precommit handler tx_provider.register_precommit_handler({ - |current| { - // Do I/O in the pre-commit phase. If any I/O error occured, + move |mut current: CurrentTx<'_>| { + // Do I/O in the pre-commit phase. If any I/O error occurred, // the TX would be aborted. - let dirty_log_ids_and_metas = - current.data_with(|open_log_table: &OpenLogTable| { - let mut ids_and_metas = Vec::new(); - for (log_id, inner_log) in &open_log_table.log_table { - if !inner_log.is_dirty { - continue; - } - - let crypto_log = &inner_log.crypto_log; - crypto_log.flush()?; - let new_log_meta = *crypto_log.root_meta().unwrap(); - ids_and_metas.push((log_id, new_log_meta)); - } - Ok(ids_and_metas) - })?; - current.data_mut_with(|store_edit: &mut TxLogStoreEdit| { - store_edit.update_log_metas(dirty_log_ids_and_metas.iter()) + Self::update_dirty_log_metas(&mut current) + } + }); + + // Commit handler for journal + let journal = journal.clone(); + tx_provider.register_commit_handler({ + move |current: CurrentTx<'_>| { + let mut journal = journal.lock(); + current.data_with(|chunk_edit: &ChunkAllocEdit| { + journal.add(AllEdit::from_chunk_edit(chunk_edit)); + }); + current.data_with(|raw_log_edit: &RawLogStoreEdit| { + journal.add(AllEdit::from_raw_log_edit(raw_log_edit)); }); - Ok(()) + current.data_with(|tx_log_edit: &TxLogStoreEdit| { + journal.add(AllEdit::from_tx_log_edit(tx_log_edit)); + }); + journal.commit(); + // TODO: Decide when to call `flush()` to ensure journal's persistence } }); + + // Commit handler for log store tx_provider.register_commit_handler({ let state = new_self.state.clone(); - |current| { + let raw_log_store = new_self.raw_log_store.clone(); + move |current: CurrentTx<'_>| { + Self::do_lazy_deletion(&state, ¤t); + current.data_with(|store_edit: &TxLogStoreEdit| { let mut state = state.lock(); state.apply(&store_edit); + + // Add lazy delete for newly created logs + for &log_id in store_edit.iter_created_logs() { + if state.lazy_deletes.contains_key(&log_id) { + continue; + } + Self::add_lazy_delete(log_id, &mut state.lazy_deletes, &raw_log_store); + } }); + + Self::apply_log_caches(&state, ¤t); } }); new_self } - /// List the IDs of all logs in a bucket. + fn update_dirty_log_metas(current_tx: &mut CurrentTx<'_>) -> Result<()> { + let dirty_logs: Vec<(TxLogId, Arc>)> = + current_tx.data_with(|open_log_table: &OpenLogTable| { + open_log_table + .open_table + .iter() + .filter_map(|(id, inner_log)| { + if inner_log.is_dirty.load(Ordering::Relaxed) { + Some((*id, inner_log.clone())) + } else { + None + } + }) + .collect() + }); + + for (log_id, inner_log) in dirty_logs { + let crypto_log = &inner_log.crypto_log; + crypto_log.flush()?; + + current_tx.data_mut_with(|store_edit: &mut TxLogStoreEdit| { + store_edit.update_log_meta((log_id, crypto_log.root_meta().unwrap())) + }); + } + Ok(()) + } + + fn add_lazy_delete( + log_id: TxLogId, + delete_table: &mut BTreeMap>>, + raw_log_store: &Arc>, + ) { + let raw_log_store = raw_log_store.clone(); + delete_table.insert( + log_id, + Arc::new(LazyDelete::new(log_id, move |log_id| { + raw_log_store.delete_log(*log_id).unwrap(); + })), + ); + } + + fn do_lazy_deletion(state: &Arc>, current_tx: &CurrentTx<'_>) { + let deleted_logs = current_tx.data_with(|edit: &TxLogStoreEdit| { + edit.iter_deleted_logs().cloned().collect::>() + }); + + let mut state = state.lock(); + for deleted_log_id in deleted_logs { + let Some(lazy_delete) = state.lazy_deletes.remove(&deleted_log_id) else { + // Other concurrent TXs have deleted the same log + continue; + }; + LazyDelete::delete(&lazy_delete); + + // Also remove the cache by the way + state.log_caches.remove(&deleted_log_id); + } + } + + // TODO: Need performance improvement + fn apply_log_caches(state: &Arc>, current_tx: &CurrentTx<'_>) { + // Apply per-TX log cache + current_tx.data_with(|open_cache_table: &OpenLogCache| { + let mut state = state.lock(); + let log_caches = &mut state.log_caches; + for (log_id, open_cache) in open_cache_table.open_table.iter() { + let log_cache = log_caches.get_mut(log_id).unwrap(); + let mut cache_inner = log_cache.inner.write(); + open_cache.lru_cache.iter().for_each(|(&pos, node)| { + cache_inner.lru_cache.put(pos, node.clone()); + }); + } + }); + } + + /// Lists the IDs of all logs in a bucket. /// /// # Panics /// /// This method must be called within a TX. Otherwise, this method panics. pub fn list_logs(&self, bucket_name: &str) -> Result> { - let mut current_tx = self.tx_provider.current(); let state = self.state.lock(); - - let mut log_id_set = state.list_logs(bucket_name)?; + let mut log_id_set = state.persistent.list_logs(bucket_name)?; + let current_tx = self.tx_provider.current(); current_tx.data_with(|store_edit: &TxLogStoreEdit| { - for (log_id, log_edit) in &store_edit.edit_table { + for (&log_id, log_edit) in &store_edit.edit_table { match log_edit { TxLogEdit::Create(_) => { log_id_set.insert(log_id); } TxLogEdit::Append(_) => {} TxLogEdit::Delete => { - log_id_set.remove(log_id); + log_id_set.remove(&log_id); } } } }); - let log_id_vec = log_id_set.iter().collect(); + let log_id_vec = log_id_set.into_iter().collect::>(); Ok(log_id_vec) } @@ -379,23 +340,61 @@ impl TxLogStore { /// # Panics /// /// This method must be called within a TX. Otherwise, this method panics. - pub fn create_log(&self, bucket: &str) -> Result { - todo!() + pub fn create_log(&self, bucket: &str) -> Result>> { + let raw_log = self.raw_log_store.create_log()?; + let log_id = raw_log.id(); + + let log_cache = Arc::new(CryptoLogCache::new(log_id, &self.tx_provider)); + self.state + .lock() + .log_caches + .insert(log_id, log_cache.clone()); + let crypto_log = CryptoLog::new(raw_log, self.key, log_cache); + + let mut current_tx = self.tx_provider.current(); + let bucket = bucket.to_string(); + let inner_log = Arc::new(TxLogInner { + log_id, + tx_id: current_tx.id(), + bucket: bucket.clone(), + crypto_log, + lazy_delete: None, + is_dirty: AtomicBool::new(false), + }); + + current_tx.data_mut_with(|store_edit: &mut TxLogStoreEdit| { + store_edit.create_log(log_id, bucket, self.key); + }); + + current_tx.data_mut_with(|open_log_table: &mut OpenLogTable| { + let _ = open_log_table.open_table.insert(log_id, inner_log.clone()); + }); + + current_tx.data_mut_with(|open_cache_table: &mut OpenLogCache| { + let _ = open_cache_table + .open_table + .insert(log_id, CacheInner::new()); + }); + + Ok(Arc::new(TxLog { + inner_log, + can_append: true, + })) } /// Opens the log of a given ID. /// - /// For any log at any time, there can be at most one Tx that opens the log + /// For any log at any time, there can be at most one TX that opens the log /// in the appendable mode. /// /// # Panics /// /// This method must be called within a TX. Otherwise, this method panics. - pub fn open_log(&self, log_id: TxLogId, can_append: bool) -> Result { + pub fn open_log(&self, log_id: TxLogId, can_append: bool) -> Result>> { let mut current_tx = self.tx_provider.current(); let inner_log = self.open_inner_log(log_id, can_append, &mut current_tx)?; let tx_log = TxLog::new(inner_log, can_append); - Ok(tx_log) + Ok(Arc::new(tx_log)) } fn open_inner_log( @@ -405,54 +404,67 @@ impl TxLogStore { current_tx: &mut CurrentTx<'_>, ) -> Result>> { // Fast path: the log has been opened in this TX - let has_opened = current_tx.data_with(|table: &OpenLogTable| { - table.log_table.get(&log_id).map(|log| log.clone()) + let opened_log_opt = current_tx.data_with(|open_log_table: &OpenLogTable| { + open_log_table.open_table.get(&log_id).cloned() }); - if let Some(inner_log) = has_opened { + if let Some(inner_log) = opened_log_opt { return Ok(inner_log); } // Slow path: the first time a log is to be opened in a TX let state = self.state.lock(); + // Must check lazy deletes first in case concurrent deletion + let lazy_delete = state + .lazy_deletes + .get(&log_id) + .ok_or(Error::with_msg(NotFound, "log has been deleted"))? + .clone(); let log_entry = { // The log must exist in state... - let log_entry = state.find_log(log_id)?; - // ...and not be marked deleted by data.edit + let log_entry: &TxLogEntry = state.persistent.find_log(log_id)?; + // ...and not be marked deleted by edit let is_deleted = current_tx .data_with(|store_edit: &TxLogStoreEdit| store_edit.is_log_deleted(log_id)); if is_deleted { - return Err(Error::NotFound); + return_errno_with_msg!(NotFound, "log has been marked deleted"); } log_entry }; + let bucket = log_entry.bucket.clone(); let crypto_log = { let raw_log = self.raw_log_store.open_log(log_id, can_append)?; let key = log_entry.key; - let root_meta = log_entry.root_meta; - CryptoLog::open(raw_log, key, root_meta)?; - }; - let tx_id = current_tx.id(); - let lazy_delete = { - let raw_log_store = self.raw_log_store.clone(); - let lazy_delete = LazyDelete::new(log_id, move |log_id| { - raw_log_store.delete_log(log_id).unwrap(); - }); - Arc::new(lazy_delete) + let root_meta = log_entry.root_mht; + let cache = state.log_caches.get(&log_id).unwrap().clone(); + CryptoLog::open(raw_log, key, root_meta, cache)? }; + let root_mht = crypto_log.root_meta().unwrap(); let inner_log = Arc::new(TxLogInner { log_id, - tx_id, + tx_id: current_tx.id(), bucket, crypto_log, - lazy_delete, - is_dirty: false, + lazy_delete: Some(lazy_delete), + is_dirty: AtomicBool::new(false), }); - current_tx.data_mut_with(|open_table: &mut OpenLogTable| { - open_table.log_table.insert(log_id, inner_log.clone()) + current_tx.data_mut_with(|open_log_table: &mut OpenLogTable| { + open_log_table.open_table.insert(log_id, inner_log.clone()); }); + + current_tx.data_mut_with(|open_cache_table: &mut OpenLogCache| { + open_cache_table + .open_table + .insert(log_id, CacheInner::new()); + }); + + if can_append { + current_tx.data_mut_with(|store_edit: &mut TxLogStoreEdit| { + store_edit.append_log(log_id, root_mht); + }); + } Ok(inner_log) } @@ -463,8 +475,11 @@ impl TxLogStore { /// This method must be called within a TX. Otherwise, this method panics. pub fn open_log_in(&self, bucket: &str) -> Result>> { let log_ids = self.list_logs(bucket)?; - let max_log_id = log_ids.iter().max().ok_or(Error::NotFound)?; - self.open_log(max_log_id) + let max_log_id = log_ids + .iter() + .max() + .ok_or(Error::with_msg(NotFound, "tx log not found"))?; + self.open_log(*max_log_id, false) } /// Checks whether the log of a given log ID exists or not. @@ -473,13 +488,13 @@ impl TxLogStore { /// /// This method must be called within a TX. Otherwise, this method panics. pub fn contains_log(&self, log_id: TxLogId) -> bool { - let mut current_tx = self.tx_provider.current(); let state = self.state.lock(); + let current_tx = self.tx_provider.current(); self.do_contain_log(log_id, &state, ¤t_tx) } fn do_contain_log(&self, log_id: TxLogId, state: &State, current_tx: &CurrentTx<'_>) -> bool { - if state.contains_log(log_id) { + if state.persistent.contains_log(log_id) { let not_deleted = current_tx .data_with(|store_edit: &TxLogStoreEdit| !store_edit.is_log_deleted(log_id)); not_deleted @@ -497,22 +512,24 @@ impl TxLogStore { /// This method must be called within a TX. Otherwise, this method panics. pub fn delete_log(&self, log_id: TxLogId) -> Result<()> { let mut current_tx = self.tx_provider.current(); - let state = self.state.lock(); - if !self.do_contain_log(log_id, &state, ¤t_tx) { - return Err(Error::NotFound); + current_tx.data_mut_with(|open_log_table: &mut OpenLogTable| { + let _ = open_log_table.open_table.remove(&log_id); + }); + + current_tx.data_mut_with(|open_cache_table: &mut OpenLogCache| { + let _ = open_cache_table.open_table.remove(&log_id); + }); + + if !self.do_contain_log(log_id, &self.state.lock(), ¤t_tx) { + return_errno_with_msg!(NotFound, "target deleted log not found"); } - current_tx - .data_mut_with(|store_edit: &mut TxLogStoreEdit| store_edit.delete(log_id).unwrap()); - current_tx.data_mut_with(|open_log_table: &mut OpenLogTable| { - let inner_log_opt = open_log_table.log_table.remove(&log_id); - let Some(inner_log) = inner_log_opt else { - // The deleted log has not been opened - return - }; - // LazyDelete::delete(&inner_log.lazy_delete); // No need, do this in commit phase + current_tx.data_mut_with(|store_edit: &mut TxLogStoreEdit| { + store_edit.delete_log(log_id); }); + + // Do lazy delete in precommit phase Ok(()) } @@ -521,9 +538,56 @@ impl TxLogStore { &self.key } + /// Creates a new transaction. pub fn new_tx(&self) -> Tx { self.tx_provider.new_tx() } + + /// Returns the current transaction. + pub fn current_tx(&self) -> CurrentTx<'_> { + self.tx_provider.current() + } +} + +impl Superblock { + const SUPERBLOCK_SIZE: usize = core::mem::size_of::(); + + /// Returns the total number of blocks occupied by the `TxLogStore`. + pub fn total_nblocks(&self) -> usize { + self.journal_area_meta.total_nblocks() + self.chunk_area_nblocks + } + + pub fn open(disk: &D, root_key: &Key) -> Result { + let mut cipher = Buf::alloc(1)?; + disk.read(0, cipher.as_mut())?; + let mut plain = Buf::alloc(1)?; + Skcipher::new().decrypt( + cipher.as_slice(), + &Self::derive_skcipher_key(root_key), + &SkcipherIv::new_zeroed(), + plain.as_mut_slice(), + )?; + Ok(Superblock::from_bytes( + &plain.as_slice()[..Self::SUPERBLOCK_SIZE], + )) + } + + fn persist(&self, disk: &D, root_key: &Key) -> Result<()> { + let mut plain = Buf::alloc(1)?; + plain.as_mut_slice()[..Self::SUPERBLOCK_SIZE].copy_from_slice(self.as_bytes()); + let mut cipher = Buf::alloc(1)?; + Skcipher::new().encrypt( + plain.as_slice(), + &Self::derive_skcipher_key(&root_key), + &SkcipherIv::new_zeroed(), + cipher.as_mut_slice(), + )?; + disk.write(0, cipher.as_ref()) + } + + fn derive_skcipher_key(root_key: &Key) -> SkcipherKey { + SkcipherKey::from_bytes(&root_key.as_bytes()) + } } /// A transactional log. @@ -538,11 +602,11 @@ struct TxLogInner { tx_id: TxId, bucket: BucketName, crypto_log: CryptoLog>, - lazy_delete: Arc>, - is_dirty: bool, + lazy_delete: Option>>, + is_dirty: AtomicBool, } -impl TxLog { +impl TxLog { fn new(inner_log: Arc>, can_append: bool) -> Self { Self { inner_log, @@ -552,17 +616,17 @@ impl TxLog { /// Returns the log ID. pub fn id(&self) -> TxLogId { - self.log_id + self.inner_log.log_id } /// Returns the TX ID. pub fn tx_id(&self) -> TxId { - self.tx_id + self.inner_log.tx_id } /// Returns the bucket that this log belongs to. pub fn bucket(&self) -> &str { - &self.bucket + &self.inner_log.bucket } /// Returns whether the log is opened in the appendable mode. @@ -575,7 +639,7 @@ impl TxLog { /// # Panics /// /// This method must be called within a TX. Otherwise, this method panics. - pub fn read(&self, pos: BlockId, buf: &mut impl BlockBuf) -> Result<()> { + pub fn read(&self, pos: BlockId, buf: BufMut) -> Result<()> { self.inner_log.crypto_log.read(pos, buf) } @@ -584,13 +648,13 @@ impl TxLog { /// # Panics /// /// This method must be called within a TX. Otherwise, this method panics. - pub fn append(&self, buf: &impl BlockBuf) -> Result<()> { + pub fn append(&self, buf: BufRef) -> Result<()> { if !self.can_append { - return Err(Error::NotPermitted); + return_errno_with_msg!(PermissionDenied, "tx log not in append mode"); } - self.inner_log.is_dirty = true; - self.inner_log.crypto_log.append(pos, buf) + self.inner_log.is_dirty.store(true, Ordering::Release); + self.inner_log.crypto_log.append(buf) } /// Returns the length of the log in unit of block. @@ -599,43 +663,592 @@ impl TxLog { /// /// This method must be called within a TX. Otherwise, this method panics. pub fn nblocks(&self) -> usize { - self.crypto_log.num_blocks() + self.inner_log.crypto_log.nblocks() + } +} + +pub struct CryptoLogCache { + inner: RwLock, + log_id: TxLogId, + tx_provider: Arc, +} + +pub(super) struct CacheInner { + pub lru_cache: LruCache>, +} +// TODO: Give the cache a bound + +impl CryptoLogCache { + fn new(log_id: TxLogId, tx_provider: &Arc) -> Self { + Self { + inner: RwLock::new(CacheInner::new()), + log_id, + tx_provider: tx_provider.clone(), + } + } +} + +impl NodeCache for CryptoLogCache { + fn get(&self, pos: BlockId) -> Option> { + let mut current = self.tx_provider.current(); + current.data_mut_with(|open_cache_table: &mut OpenLogCache| { + let open_cache = open_cache_table.open_table.get_mut(&self.log_id)?; + open_cache.lru_cache.get(&pos).cloned() + })?; + + let mut inner = self.inner.write(); + inner.lru_cache.get(&pos).cloned() + } + + fn put( + &self, + pos: BlockId, + value: Arc, + ) -> Option> { + let mut current = self.tx_provider.current(); + current.data_mut_with(|open_cache_table: &mut OpenLogCache| { + let open_cache = open_cache_table.open_table.get_mut(&self.log_id)?; + open_cache.lru_cache.put(pos, value) + }) + } +} + +impl CacheInner { + pub fn new() -> Self { + Self { + lru_cache: LruCache::unbounded(), + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Persistent State +//////////////////////////////////////////////////////////////////////////////// + +/// The volatile and persistent state of a `TxLogStore`. +struct State { + persistent: TxLogStoreState, + lazy_deletes: BTreeMap>>, + log_caches: BTreeMap>, +} + +/// The persistent state of a `TxLogStore`. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TxLogStoreState { + log_table: BTreeMap, + bucket_table: BTreeMap, +} + +/// A log entry implies the persistent state of the tx log. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TxLogEntry { + pub bucket: BucketName, + pub key: Key, + pub root_mht: RootMhtMeta, +} + +/// A bucket contains a set of logs which have the same name. +#[derive(Clone, Debug, Serialize, Deserialize)] +struct Bucket { + log_ids: BTreeSet, +} + +impl State { + pub fn new( + persistent: TxLogStoreState, + lazy_deletes: BTreeMap>>, + log_caches: BTreeMap>, + ) -> Self { + Self { + persistent, + lazy_deletes, + log_caches, + } + } + + pub fn apply(&mut self, edit: &TxLogStoreEdit) { + edit.apply_to(&mut self.persistent); + } +} + +impl TxLogStoreState { + pub fn new() -> Self { + Self { + log_table: BTreeMap::new(), + bucket_table: BTreeMap::new(), + } + } + + pub fn create_log( + &mut self, + new_log_id: TxLogId, + bucket: BucketName, + key: Key, + root_mht: RootMhtMeta, + ) { + let already_exists = self.log_table.insert( + new_log_id, + TxLogEntry { + bucket: bucket.clone(), + key, + root_mht, + }, + ); + debug_assert!(already_exists.is_none()); + + match self.bucket_table.get_mut(&bucket) { + Some(bucket) => { + bucket.log_ids.insert(new_log_id); + } + None => { + self.bucket_table.insert( + bucket, + Bucket { + log_ids: BTreeSet::from([new_log_id]), + }, + ); + } + } + } + + pub fn append_log(&mut self, log_id: TxLogId, root_mht: RootMhtMeta) { + let entry = self.log_table.get_mut(&log_id).unwrap(); + entry.root_mht = root_mht; + } + + pub fn list_logs(&self, bucket_name: &str) -> Result> { + let bucket = self + .bucket_table + .get(&bucket_name.to_string()) + .ok_or(Error::with_msg(NotFound, "bucket not found"))?; + Ok(bucket.log_ids.clone()) + } + + pub fn list_all_logs(&self) -> impl Iterator + '_ { + self.log_table.iter().map(|(id, _)| *id) + } + + pub fn find_log(&self, log_id: TxLogId) -> Result<&TxLogEntry> { + self.log_table + .get(&log_id) + .ok_or(Error::with_msg(NotFound, "log entry not found")) + } + + pub fn contains_log(&self, log_id: TxLogId) -> bool { + self.log_table.contains_key(&log_id) + } + + pub fn delete_log(&mut self, log_id: TxLogId) { + // Do not check the result because concurrent TXs + // may decide to delete the same logs + let entry_opt = self.log_table.remove(&log_id); + entry_opt.map(|entry| { + self.bucket_table + .get_mut(&entry.bucket) + .map(|bucket| bucket.log_ids.remove(&log_id)); + }); + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Persistent Edit +//////////////////////////////////////////////////////////////////////////////// + +/// A persistent edit to the state of `TxLogStore`. +#[derive(Clone, Serialize, Deserialize)] +pub struct TxLogStoreEdit { + edit_table: BTreeMap, +} + +/// Used for per-TX data, track open logs in memory +pub(super) struct OpenLogTable { + open_table: BTreeMap>>, +} + +/// Used for per-TX data, track open log caches in memory +pub(super) struct OpenLogCache { + open_table: BTreeMap, +} + +/// The basic unit of a persistent edit to the state of `TxLogStore`. +#[derive(Clone, Serialize, Deserialize)] +pub(super) enum TxLogEdit { + Create(TxLogCreate), + Append(TxLogAppend), + Delete, +} + +/// An edit that implies a log being created. +#[derive(Clone, Serialize, Deserialize)] +pub(super) struct TxLogCreate { + bucket: BucketName, + key: Key, + root_mht: Option, +} + +/// An edit that implies an existing log being appended. +#[derive(Clone, Serialize, Deserialize)] +pub(super) struct TxLogAppend { + root_mht: RootMhtMeta, +} + +impl TxLogStoreEdit { + pub fn new() -> Self { + Self { + edit_table: BTreeMap::new(), + } + } + + pub fn is_log_deleted(&self, log_id: TxLogId) -> bool { + match self.edit_table.get(&log_id) { + Some(TxLogEdit::Delete) => true, + _ => false, + } + } + + pub fn is_log_created(&self, log_id: TxLogId) -> bool { + match self.edit_table.get(&log_id) { + Some(TxLogEdit::Create(_)) | Some(TxLogEdit::Append(_)) => true, + None | Some(TxLogEdit::Delete) => false, + } + } + + pub fn delete_log(&mut self, log_id: TxLogId) { + match self.edit_table.get_mut(&log_id) { + None => { + let _ = self.edit_table.insert(log_id, TxLogEdit::Delete); + } + Some(TxLogEdit::Create(_)) => { + let _ = self.edit_table.insert(log_id, TxLogEdit::Delete); + } + Some(TxLogEdit::Append(_)) => { + panic!( + "append edit is added at very late stage, after which logs won't get deleted" + ); + } + Some(TxLogEdit::Delete) => { + panic!("can't delete a deleted log"); + } + } + } + + pub fn iter_deleted_logs(&self) -> impl Iterator { + self.edit_table + .iter() + .filter(|(_, edit)| { + if let TxLogEdit::Delete = edit { + true + } else { + false + } + }) + .map(|(id, _)| id) + } + + pub fn iter_created_logs(&self) -> impl Iterator { + self.edit_table + .iter() + .filter(|(_, edit)| { + if let TxLogEdit::Create(_) = edit { + true + } else { + false + } + }) + .map(|(id, _)| id) + } + + pub fn update_log_meta(&mut self, meta: (TxLogId, RootMhtMeta)) { + // For newly-created logs and existing logs + // that are appended, update `RootMhtMeta` + match self.edit_table.get_mut(&meta.0) { + None | Some(TxLogEdit::Delete) => { + unreachable!(); + } + Some(TxLogEdit::Create(create)) => { + let _ = create.root_mht.insert(meta.1); + } + Some(TxLogEdit::Append(append)) => { + append.root_mht = meta.1; + } + } + } + + pub fn create_log(&mut self, log_id: TxLogId, bucket: BucketName, key: Key) { + let already_created = self.edit_table.insert( + log_id, + TxLogEdit::Create(TxLogCreate { + bucket, + key, + root_mht: None, + }), + ); + debug_assert!(already_created.is_none()); + } + + pub fn append_log(&mut self, log_id: TxLogId, root_mht: RootMhtMeta) { + let already_existed = self + .edit_table + .insert(log_id, TxLogEdit::Append(TxLogAppend { root_mht })); + debug_assert!(already_existed.is_none()); + } +} + +impl Edit for TxLogStoreEdit { + fn apply_to(&self, state: &mut TxLogStoreState) { + for (&log_id, log_edit) in &self.edit_table { + match log_edit { + TxLogEdit::Create(create_edit) => { + let TxLogCreate { + bucket, + key, + root_mht, + .. + } = create_edit; + state.create_log(log_id, bucket.clone(), key.clone(), root_mht.unwrap()); + } + TxLogEdit::Append(append_edit) => { + let TxLogAppend { root_mht, .. } = append_edit; + state.append_log(log_id, *root_mht); + } + TxLogEdit::Delete => { + state.delete_log(log_id); + } + } + } } } -type Key = [u8; 16]; +impl TxData for TxLogStoreEdit {} + +impl OpenLogTable { + pub fn new() -> Self { + Self { + open_table: BTreeMap::new(), + } + } +} + +impl OpenLogCache { + pub fn new() -> Self { + Self { + open_table: BTreeMap::new(), + } + } +} + +impl TxData for OpenLogTable {} +impl TxData for OpenLogCache {} + +//////////////////////////////////////////////////////////////////////////////// +// Journaling +//////////////////////////////////////////////////////////////////////////////// mod journaling { - use super::super::chunk::ChunkAllocState; - use super::super::raw_log::RawLogStoreState; - use super::TxLogStoreState; - use crate::layers::edit::{CompactPolicy, Edit, EditJournal}; + use super::*; + use crate::layers::edit::EditGroup; - pub type Journal = EditJournal; + pub type Journal = EditJournal; + pub type JournalCompactPolicy = NeverCompactPolicy; + #[derive(Clone, Serialize, Deserialize)] pub struct AllState { pub chunk_alloc: ChunkAllocState, pub raw_log_store: RawLogStoreState, pub tx_log_store: TxLogStoreState, } - impl> Edit for E { - fn apply_to(&mut self, state: &mut AllState) { - self.apply_to(&mut state.chunk_alloc) + #[derive(Serialize, Deserialize)] + pub struct AllEdit { + pub chunk_edit: ChunkAllocEdit, + pub raw_log_edit: RawLogStoreEdit, + pub tx_log_edit: TxLogStoreEdit, + } + + impl Edit for AllEdit { + fn apply_to(&self, state: &mut AllState) { + self.chunk_edit.apply_to(&mut state.chunk_alloc); + self.raw_log_edit.apply_to(&mut state.raw_log_store); + self.tx_log_edit.apply_to(&mut state.tx_log_store); } } - impl> Edit for E { - fn apply_to(&mut self, state: &mut AllState) { - self.apply_to(&mut state.raw_log_store) + impl AllEdit { + pub fn from_chunk_edit(chunk_edit: &ChunkAllocEdit) -> Self { + Self { + chunk_edit: chunk_edit.clone(), + raw_log_edit: RawLogStoreEdit::new(), + tx_log_edit: TxLogStoreEdit::new(), + } + } + + pub fn from_raw_log_edit(raw_log_edit: &RawLogStoreEdit) -> Self { + Self { + chunk_edit: ChunkAllocEdit::new(), + raw_log_edit: raw_log_edit.clone(), + tx_log_edit: TxLogStoreEdit::new(), + } + } + + pub fn from_tx_log_edit(tx_log_edit: &TxLogStoreEdit) -> Self { + Self { + chunk_edit: ChunkAllocEdit::new(), + raw_log_edit: RawLogStoreEdit::new(), + tx_log_edit: tx_log_edit.clone(), + } } } - impl> Edit for E { - fn apply_to(&mut self, state: &mut AllState) { - self.apply_to(&mut state.tx_log_store) + pub struct NeverCompactPolicy; + + impl CompactPolicy for JournalCompactPolicy { + fn on_commit_edits(&mut self, _edits: &EditGroup) {} + + fn should_compact(&self) -> bool { + false } + + fn done_compact(&mut self) {} } +} - pub type JournalCompactPolicy = NeverCompactPolicy; +#[cfg(test)] +mod tests { + use super::*; + use crate::layers::bio::{Buf, MemDisk}; + + use std::thread::{self, JoinHandle}; + + #[test] + fn tx_log_store_fns() -> Result<()> { + let nblocks = 4 * CHUNK_NBLOCKS; + let mem_disk = MemDisk::create(nblocks)?; + let disk = mem_disk.clone(); + let tx_log_store = TxLogStore::format(mem_disk)?; + let bucket = "TEST"; + let content = 5_u8; + + // TX 1: create a new log and append contents (committed) + let mut tx = tx_log_store.new_tx(); + let res: Result = tx.context(|| { + let new_log = tx_log_store.create_log(bucket)?; + let log_id = new_log.id(); + assert_eq!(log_id, 0); + assert_eq!(new_log.tx_id(), tx_log_store.current_tx().id()); + assert_eq!(new_log.can_append(), true); + let mut buf = Buf::alloc(1)?; + buf.as_mut_slice().fill(content); + new_log.append(buf.as_ref())?; + + assert_eq!(new_log.nblocks(), 1); + assert_eq!(new_log.bucket(), bucket); + Ok(log_id) + }); + let log_id = res?; + tx.commit()?; + + // TX 2: open the log then read (committed) + let mut tx = tx_log_store.new_tx(); + let res: Result<_> = tx.context(|| { + let log_list = tx_log_store.list_logs(bucket)?; + assert_eq!(log_list, vec![log_id]); + assert_eq!(tx_log_store.contains_log(log_id), true); + assert_eq!(tx_log_store.contains_log(1), false); + + let log = tx_log_store.open_log(0, false)?; + assert_eq!(log.id(), log_id); + assert_eq!(log.tx_id(), tx_log_store.current_tx().id()); + let mut buf = Buf::alloc(1)?; + log.read(0, buf.as_mut())?; + assert_eq!(buf.as_slice()[0], content); + + let log = tx_log_store.open_log_in(bucket)?; + assert_eq!(log.id(), log_id); + log.read(0 as BlockId, buf.as_mut())?; + assert_eq!(buf.as_slice()[0], content); + Ok(()) + }); + res?; + tx.commit()?; + + // Recover the tx log store + let key = tx_log_store.key().clone(); + let _ = tx_log_store.journal.lock().flush(); + drop(tx_log_store); + let recovered_store = TxLogStore::recover(disk, key)?; + + // TX 3: create a new log from recovered_store (aborted) + let tx_log_store = recovered_store.clone(); + let handler = thread::spawn(move || -> Result { + let mut tx = tx_log_store.new_tx(); + let res: Result<_> = tx.context(|| { + let new_log = tx_log_store.create_log(bucket)?; + assert_eq!(tx_log_store.list_logs(bucket)?.len(), 2); + Ok(new_log.id()) + }); + tx.abort(); + res + }); + let new_log_id = handler.join().unwrap()?; + + recovered_store + .state + .lock() + .persistent + .find_log(new_log_id) + .expect_err("log not found"); + + Ok(()) + } + + #[test] + fn tx_log_deletion() -> Result<()> { + let tx_log_store = TxLogStore::format(MemDisk::create(4 * CHUNK_NBLOCKS)?)?; + + let mut tx = tx_log_store.new_tx(); + let content = 5_u8; + let res: Result<_> = tx.context(|| { + let new_log = tx_log_store.create_log("TEST")?; + let mut buf = Buf::alloc(1)?; + buf.as_mut_slice().fill(content); + new_log.append(buf.as_ref())?; + Ok(new_log.id()) + }); + let log_id = res?; + tx.commit()?; + + let handlers = (0..16) + .map(|_| { + let tx_log_store = tx_log_store.clone(); + thread::spawn(move || -> Result<()> { + let mut tx = tx_log_store.new_tx(); + println!( + "TX[{:?}] executed on thread[{:?}]", + tx.id(), + crate::os::CurrentThread::id() + ); + let _ = tx.context(|| { + let log = tx_log_store.open_log(log_id, false)?; + assert_eq!(log.id(), log_id); + assert_eq!(log.tx_id(), tx_log_store.current_tx().id()); + let mut buf = Buf::alloc(1)?; + log.read(0 as BlockId, buf.as_mut())?; + assert_eq!(buf.as_slice(), &[content; BLOCK_SIZE]); + tx_log_store.delete_log(log_id) + }); + tx.commit() + }) + }) + .collect::>>>(); + for handler in handlers { + handler.join().unwrap()?; + } + + let mut tx = tx_log_store.new_tx(); + let _ = tx.context(|| { + let res = tx_log_store.open_log(log_id, false).map(|_| ()); + res.expect_err("result must be NotFound"); + }); + tx.commit() + } } diff --git a/src/layers/mod.rs b/src/layers/mod.rs index 6db3111..671a084 100644 --- a/src/layers/mod.rs +++ b/src/layers/mod.rs @@ -4,8 +4,8 @@ pub mod bio; pub mod crypto; #[path = "2-edit/mod.rs"] pub mod edit; -// #[path = "3-log/mod.rs"] // Uncomment this when it's ready -// pub mod log; +#[path = "3-log/mod.rs"] +pub mod log; // #[path = "4-lsm/mod.rs"] // Uncomment this when it's ready // pub mod lsm; // #[path = "5-disk/mod.rs"] // Uncomment this when it's ready diff --git a/src/os/std/mod.rs b/src/os/std/mod.rs index 8a6111b..dc9c34f 100644 --- a/src/os/std/mod.rs +++ b/src/os/std/mod.rs @@ -87,7 +87,10 @@ unsafe impl Send for Pages {} impl Pages { /// Allocate specific number of pages. pub fn alloc(len: usize) -> Result { - let ptr = PageAllocator::alloc(len).ok_or(Error::new(Errno::NoMemory))?; + let ptr = PageAllocator::alloc(len).ok_or(Error::with_msg( + Errno::OutOfMemory, + "page allocation failed", + ))?; Ok(Self { ptr, len, @@ -206,7 +209,7 @@ impl crate::util::Aead for Aead { let mut mac = AeadMac::default(); let result = encrypt_aead(Cipher::aes_128_gcm(), key, Some(iv), aad, input, &mut mac) - .map_err(|_| Error::new(Errno::EncryptFault))?; + .map_err(|_| Error::new(Errno::EncryptFailed))?; output.copy_from_slice(result.as_slice()); Ok(mac) } @@ -221,7 +224,7 @@ impl crate::util::Aead for Aead { output: &mut [u8], ) -> Result<()> { let result = decrypt_aead(Cipher::aes_128_gcm(), key, Some(iv), aad, input, mac) - .map_err(|_| Error::new(Errno::DecryptFault))?; + .map_err(|_| Error::new(Errno::DecryptFailed))?; output.copy_from_slice(result.as_slice()); Ok(()) } @@ -255,7 +258,7 @@ impl crate::util::Skcipher for Skcipher { output: &mut [u8], ) -> Result<()> { let result = encrypt(Cipher::aes_128_ctr(), key, Some(iv), input) - .map_err(|_| Error::new(Errno::EncryptFault))?; + .map_err(|_| Error::new(Errno::EncryptFailed))?; output.copy_from_slice(result.as_slice()); Ok(()) } @@ -268,7 +271,7 @@ impl crate::util::Skcipher for Skcipher { output: &mut [u8], ) -> Result<()> { let result = decrypt(Cipher::aes_128_ctr(), key, Some(iv), input) - .map_err(|_| Error::new(Errno::DecryptFault))?; + .map_err(|_| Error::new(Errno::DecryptFailed))?; output.copy_from_slice(result.as_slice()); Ok(()) } diff --git a/src/tx/current.rs b/src/tx/current.rs index 4544604..06b5b45 100644 --- a/src/tx/current.rs +++ b/src/tx/current.rs @@ -33,7 +33,11 @@ impl<'a> CurrentTx<'a> { } /// Get immutable access to some type of the per-transaction data within a closure. - pub fn data_with(&mut self, f: F) -> R + /// + /// # Panics + /// + /// The `data_with` method must _not_ be called recursively. + pub fn data_with(&self, f: F) -> R where F: FnOnce(&T) -> R, { @@ -60,7 +64,7 @@ impl<'a> CurrentTx<'a> { /// /// # Panics /// -/// The `set_with` method must _not_ be called recursively. +/// The `set_and_exec_with` method must _not_ be called recursively. pub(super) fn set_and_exec_with(tx: &mut Tx, f: F) -> R where F: FnOnce() -> R, @@ -93,10 +97,10 @@ where /// /// # Panics /// -/// The `get_with` (or `get_mut_with`) method must be called within the closure -/// of `set_with`. +/// The `get_current_mut_with` method must be called within the closure +/// of `set_and_exec_with`. /// -/// In addition, the `get_with` (or `get_mut_with`) method must _not_ be called +/// In addition, the `get_current_mut_with` method must _not_ be called /// recursively. fn get_current_mut_with(f: F) -> R where @@ -107,8 +111,8 @@ where let current_ptr = cell.replace(core::ptr::null_mut()); assert!( current_ptr != ptr::null_mut(), - "get_mut_with must not - 1) be called without calling set_mut_with first, or + "get_current_mut_with must not + 1) be called without calling set_and_exec_with first, or 2) be called recursively" ); diff --git a/src/tx/mod.rs b/src/tx/mod.rs index 5405f28..cdf020c 100644 --- a/src/tx/mod.rs +++ b/src/tx/mod.rs @@ -359,7 +359,7 @@ mod tests { /// This method must be called within the context of a transaction. pub fn contains(&self, item: &T) -> bool { let is_new_item = { - let mut current_tx = self.tx_provider.current(); + let current_tx = self.tx_provider.current(); current_tx.data_with(|update: &DbUpdate| update.new_items.contains(item)) }; if is_new_item { @@ -397,7 +397,7 @@ mod tests { T: Copy, { let all_items = self.all_items.lock(); - let mut current_tx = self.tx_provider.current(); + let current_tx = self.tx_provider.current(); current_tx.data_with(|update: &DbUpdate| { all_items.union(&update.new_items).cloned().collect() }) @@ -410,7 +410,7 @@ mod tests { /// This method must be called within the context of a transaction. pub fn len(&self) -> usize { let all_items = self.all_items.lock(); - let mut current_tx = self.tx_provider.current(); + let current_tx = self.tx_provider.current(); let new_items_len = current_tx.data_with(|update: &DbUpdate| update.new_items.len()); all_items.len() + new_items_len } diff --git a/src/util/lazy_delete.rs b/src/util/lazy_delete.rs index 8c6bf57..fb38b31 100644 --- a/src/util/lazy_delete.rs +++ b/src/util/lazy_delete.rs @@ -96,3 +96,6 @@ impl Drop for LazyDelete { } } } + +unsafe impl Send for LazyDelete {} +unsafe impl Sync for LazyDelete {}