diff --git a/core/src/layers/3-log/chunk.rs b/core/src/layers/3-log/chunk.rs index 15b1069..db73b1f 100644 --- a/core/src/layers/3-log/chunk.rs +++ b/core/src/layers/3-log/chunk.rs @@ -68,7 +68,8 @@ impl ChunkAlloc { } /// Constructs a `ChunkAlloc` from its parts. - pub(super) fn from_parts(state: ChunkAllocState, tx_provider: Arc) -> Self { + pub(super) fn from_parts(mut state: ChunkAllocState, tx_provider: Arc) -> Self { + state.in_journal = false; let new_self = Self { state: Arc::new(Mutex::new(state)), tx_provider, @@ -147,6 +148,7 @@ impl ChunkAlloc { } } } + ids.sort_unstable(); ids }; @@ -215,7 +217,7 @@ impl Debug for ChunkAlloc { let state = self.state.lock(); f.debug_struct("ChunkAlloc") .field("bitmap_free_count", &state.free_count) - .field("bitmap_min_free", &state.min_free) + .field("bitmap_next_free", &state.next_free) .finish() } } @@ -232,9 +234,11 @@ pub struct ChunkAllocState { alloc_map: BitMap, // The number of free chunks. free_count: usize, - // The minimum free chunk Id. Useful to narrow the scope of searching for - // free chunk IDs. - min_free: usize, + // The next free chunk Id. Used to narrow the scope of + // searching for free chunk IDs. + next_free: usize, + /// Whether the state is in the journal or not. + in_journal: bool, } // TODO: Separate persistent and volatile state of `ChunkAlloc` @@ -245,26 +249,43 @@ impl ChunkAllocState { Self { alloc_map: BitMap::repeat(false, capacity), free_count: capacity, - min_free: 0, + next_free: 0, + in_journal: false, + } + } + + /// Creates a persistent state in the journal. The state in the journal and + /// the state that `RawLogStore` manages act differently on allocation and + /// edits' appliance. + pub fn new_in_journal(capacity: usize) -> Self { + Self { + alloc_map: BitMap::repeat(false, capacity), + free_count: capacity, + next_free: 0, + in_journal: true, } } /// Allocates a chunk, returning its ID. pub fn alloc(&mut self) -> Option { - let min_free = self.min_free; - if min_free >= self.alloc_map.len() { - return None; + let mut next_free = self.next_free; + if next_free == self.alloc_map.len() { + next_free = 0; } - let free_chunk_id = self - .alloc_map - .first_zero(min_free) - .expect("there must exists a zero"); + let free_chunk_id = { + if let Some(chunk_id) = self.alloc_map.first_zero(next_free) { + chunk_id + } else { + self.alloc_map + .first_zero(0) + .expect("there must exists a zero") + } + }; + self.alloc_map.set(free_chunk_id, true); self.free_count -= 1; - - // Keep the invariance that all free chunk IDs are no less than `min_free` - self.min_free = free_chunk_id + 1; + self.next_free = free_chunk_id + 1; Some(free_chunk_id) } @@ -275,14 +296,9 @@ impl ChunkAllocState { /// /// Deallocating a free chunk causes panic. pub fn dealloc(&mut self, chunk_id: ChunkId) { - // debug_assert_eq!(self.alloc_map[chunk_id], true); // may fail in journal's commit + debug_assert_eq!(self.alloc_map[chunk_id], true); self.alloc_map.set(chunk_id, false); self.free_count += 1; - - // Keep the invariance that all free chunk IDs are no less than min_free - if chunk_id < self.min_free { - self.min_free = chunk_id; - } } /// Returns the total number of chunks. @@ -306,7 +322,7 @@ impl ChunkAllocState { //////////////////////////////////////////////////////////////////////////////// /// A persistent edit to the state of a chunk allocator. -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChunkAllocEdit { edit_table: HashMap, } @@ -314,7 +330,7 @@ pub struct ChunkAllocEdit { /// The smallest unit of a persistent edit to the /// state of a chunk allocator, which is /// a chunk being either allocated or deallocated. -#[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] enum ChunkEdit { Alloc, Dealloc, @@ -379,23 +395,23 @@ impl ChunkAllocEdit { impl Edit for ChunkAllocEdit { fn apply_to(&self, state: &mut ChunkAllocState) { + let mut to_be_deallocated = Vec::new(); for (&chunk_id, chunk_edit) in &self.edit_table { match chunk_edit { ChunkEdit::Alloc => { - // Journal's state also needs to be updated - if !state.is_chunk_allocated(chunk_id) { + if state.in_journal { let _allocated_id = state.alloc().unwrap(); - // `_allocated_id` may not be equal to `chunk_id` due to concurrent TXs, - // but eventually the state will be consistent } - // Except journal, nothing needs to be done } ChunkEdit::Dealloc => { - state.dealloc(chunk_id); + to_be_deallocated.push(chunk_id); } } } + for chunk_id in to_be_deallocated { + state.dealloc(chunk_id); + } } } diff --git a/core/src/layers/3-log/raw_log.rs b/core/src/layers/3-log/raw_log.rs index 50f5a27..edd8730 100644 --- a/core/src/layers/3-log/raw_log.rs +++ b/core/src/layers/3-log/raw_log.rs @@ -235,7 +235,7 @@ impl RawLogStore { /// This method must be called within a TX. Otherwise, this method panics. pub fn create_log(&self) -> Result> { let mut state = self.state.lock(); - let new_log_id = state.persistent.alloc_log_id(); + let new_log_id = state.alloc_log_id(); state .add_to_write_set(new_log_id) .expect("created log can't appear in write set"); @@ -335,10 +335,7 @@ impl Debug for RawLogStore { let state = self.state.lock(); f.debug_struct("RawLogStore") .field("persistent_log_table", &state.persistent.log_table) - .field( - "persistent_next_free_log_id", - &state.persistent.next_free_log_id, - ) + .field("next_free_log_id", &state.next_free_log_id) .field("write_set", &state.write_set) .field("chunk_alloc", &self.chunk_alloc) .finish() @@ -612,11 +609,12 @@ impl<'a> RawLogHeadRef<'a> { debug_assert!(offset + nblocks <= self.entry.head.num_blocks as _); let prepared_blocks = self.prepare_blocks(offset, nblocks); - debug_assert!(prepared_blocks.len() == nblocks && prepared_blocks.is_sorted()); + debug_assert_eq!(prepared_blocks.len(), nblocks); // Batch read + // Note that `prepared_blocks` are not always sorted let mut offset = 0; - for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2 - b1 == 1) { + for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2.saturating_sub(*b1) == 1) { let len = consecutive_blocks.len(); let first_bid = *consecutive_blocks.first().unwrap(); let buf_slice = @@ -687,11 +685,12 @@ impl<'a> RawLogTailRef<'a> { debug_assert!(offset + nblocks <= tail_nblocks); let prepared_blocks = self.prepare_blocks(offset, nblocks); - debug_assert!(prepared_blocks.len() == nblocks && prepared_blocks.is_sorted()); + debug_assert_eq!(prepared_blocks.len(), nblocks); // Batch read + // Note that `prepared_blocks` are not always sorted let mut offset = 0; - for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2 - b1 == 1) { + for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2.saturating_sub(*b1) == 1) { let len = consecutive_blocks.len(); let first_bid = *consecutive_blocks.first().unwrap(); let buf_slice = @@ -707,11 +706,12 @@ impl<'a> RawLogTailRef<'a> { let nblocks = buf.nblocks(); let prepared_blocks = self.prepare_blocks(self.len() as _, nblocks); - debug_assert!(prepared_blocks.len() == nblocks && prepared_blocks.is_sorted()); + debug_assert_eq!(prepared_blocks.len(), nblocks); // Batch write + // Note that `prepared_blocks` are not always sorted let mut offset = 0; - for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2 - b1 == 1) { + for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2.saturating_sub(*b1) == 1) { let len = consecutive_blocks.len(); let first_bid = *consecutive_blocks.first().unwrap(); let buf_slice = &buf.as_slice()[offset * BLOCK_SIZE..(offset + len) * BLOCK_SIZE]; @@ -781,6 +781,7 @@ impl<'a> RawLogTailRef<'a> { /// The volatile and persistent state of a `RawLogStore`. struct State { persistent: RawLogStoreState, + next_free_log_id: u64, write_set: HashSet, lazy_deletes: HashMap>>, } @@ -789,7 +790,6 @@ struct State { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct RawLogStoreState { log_table: HashMap, - next_free_log_id: u64, } /// A log entry implies the persistent state of the raw log. @@ -810,8 +810,14 @@ impl State { persistent: RawLogStoreState, lazy_deletes: HashMap>>, ) -> Self { + let next_free_log_id = if let Some(max_log_id) = lazy_deletes.keys().max() { + max_log_id + 1 + } else { + 0 + }; Self { persistent: persistent.clone(), + next_free_log_id, write_set: HashSet::new(), lazy_deletes, } @@ -821,6 +827,15 @@ impl State { edit.apply_to(&mut self.persistent); } + pub fn alloc_log_id(&mut self) -> u64 { + let new_log_id = self.next_free_log_id; + self.next_free_log_id = self + .next_free_log_id + .checked_add(1) + .expect("64-bit IDs won't be exhausted even though IDs are not recycled"); + new_log_id + } + pub fn add_to_write_set(&mut self, log_id: RawLogId) -> Result<()> { let not_exists = self.write_set.insert(log_id); if !not_exists { @@ -840,19 +855,9 @@ impl RawLogStoreState { pub fn new() -> Self { Self { log_table: HashMap::new(), - next_free_log_id: 0, } } - pub fn alloc_log_id(&mut self) -> u64 { - let new_log_id = self.next_free_log_id; - self.next_free_log_id = self - .next_free_log_id - .checked_add(1) - .expect("64-bit IDs won't be exhausted even though IDs are not recycled"); - new_log_id - } - pub fn create_log(&mut self, new_log_id: u64) { let new_log_entry = RawLogEntry { head: RawLogHead::new(), @@ -1039,11 +1044,6 @@ impl Edit for RawLogStoreEdit { let RawLogCreate { tail } = create; state.create_log(log_id); state.append_log(log_id, tail); - - // Journal's state also needs to be updated - if state.next_free_log_id <= log_id { - let _ = state.alloc_log_id(); - } } RawLogEdit::Append(append) => { let RawLogAppend { tail } = append; diff --git a/core/src/layers/3-log/tx_log.rs b/core/src/layers/3-log/tx_log.rs index acf29d9..ae3ce97 100644 --- a/core/src/layers/3-log/tx_log.rs +++ b/core/src/layers/3-log/tx_log.rs @@ -117,22 +117,19 @@ impl TxLogStore { let total_nblocks = disk.nblocks(); let (log_store_nblocks, journal_nblocks) = Self::calc_store_and_journal_nblocks(total_nblocks); + let nchunks = log_store_nblocks / CHUNK_NBLOCKS; + let log_store_area = disk.subset(1..1 + log_store_nblocks)?; let journal_area = disk.subset(1 + log_store_nblocks..1 + log_store_nblocks + journal_nblocks)?; let tx_provider = TxProvider::new(); - let nchunks = log_store_nblocks / CHUNK_NBLOCKS; - let chunk_alloc = ChunkAlloc::new(nchunks, tx_provider.clone()); - let raw_log_store = RawLogStore::new(log_store_area, tx_provider.clone(), chunk_alloc); - let tx_log_store_state = TxLogStoreState::new(); - let journal = { let all_state = AllState { - chunk_alloc: ChunkAllocState::new(nchunks), + chunk_alloc: ChunkAllocState::new_in_journal(nchunks), raw_log_store: RawLogStoreState::new(), - tx_log_store: tx_log_store_state.clone(), + tx_log_store: TxLogStoreState::new(), }; Arc::new(Mutex::new(Journal::format( journal_area, @@ -141,6 +138,11 @@ impl TxLogStore { JournalCompactPolicy {}, )?)) }; + Self::register_commit_handler_for_journal(&journal, &tx_provider); + + let chunk_alloc = ChunkAlloc::new(nchunks, tx_provider.clone()); + let raw_log_store = RawLogStore::new(log_store_area, tx_provider.clone(), chunk_alloc); + let tx_log_store_state = TxLogStoreState::new(); let superblock = Superblock { journal_area_meta: journal.lock().meta(), @@ -171,6 +173,37 @@ impl TxLogStore { (log_store_nblocks, journal_nblocks) // TBD } + fn register_commit_handler_for_journal( + journal: &Arc>>, + tx_provider: &Arc, + ) { + let journal = journal.clone(); + tx_provider.register_commit_handler({ + move |current: CurrentTx<'_>| { + let mut journal = journal.lock(); + current.data_with(|tx_log_edit: &TxLogStoreEdit| { + if tx_log_edit.is_empty() { + return; + } + journal.add(AllEdit::from_tx_log_edit(tx_log_edit)); + }); + current.data_with(|raw_log_edit: &RawLogStoreEdit| { + if raw_log_edit.is_empty() { + return; + } + journal.add(AllEdit::from_raw_log_edit(raw_log_edit)); + }); + current.data_with(|chunk_edit: &ChunkAllocEdit| { + if chunk_edit.is_empty() { + return; + } + journal.add(AllEdit::from_chunk_edit(chunk_edit)); + }); + journal.commit(); + } + }); + } + /// Recovers an existing `TxLogStore` from a disk using the given key. pub fn recover(disk: D, root_key: Key) -> Result { let superblock = Superblock::open(&disk.subset(0..1)?, &root_key)?; @@ -186,23 +219,28 @@ impl TxLogStore { 1 + superblock.chunk_area_nblocks ..1 + superblock.chunk_area_nblocks + journal_area_meta.total_nblocks(), )?; - Journal::recover(journal_area, &journal_area_meta, JournalCompactPolicy {})? + Arc::new(Mutex::new(Journal::recover( + journal_area, + &journal_area_meta, + JournalCompactPolicy {}, + )?)) }; - let all_state = journal.state(); + Self::register_commit_handler_for_journal(&journal, &tx_provider); - let chunk_alloc = - ChunkAlloc::from_parts(all_state.chunk_alloc.clone(), tx_provider.clone()); - let chunk_area = disk.subset(1..1 + superblock.chunk_area_nblocks)?; - let raw_log_store = RawLogStore::from_parts( - all_state.raw_log_store.clone(), - chunk_area, + let AllState { chunk_alloc, - tx_provider.clone(), - ); + raw_log_store, + tx_log_store, + } = journal.lock().state().clone(); + + let chunk_alloc = ChunkAlloc::from_parts(chunk_alloc, tx_provider.clone()); + let chunk_area = disk.subset(1..1 + superblock.chunk_area_nblocks)?; + let raw_log_store = + RawLogStore::from_parts(raw_log_store, chunk_area, chunk_alloc, tx_provider.clone()); let tx_log_store = TxLogStore::from_parts( - all_state.tx_log_store.clone(), + tx_log_store, raw_log_store, - Arc::new(Mutex::new(journal)), + journal, superblock, root_key, disk, @@ -258,33 +296,6 @@ impl TxLogStore { } }); - // Commit handler for journal - let journal = journal.clone(); - tx_provider.register_commit_handler({ - move |current: CurrentTx<'_>| { - let mut journal = journal.lock(); - current.data_with(|chunk_edit: &ChunkAllocEdit| { - if chunk_edit.is_empty() { - return; - } - journal.add(AllEdit::from_chunk_edit(chunk_edit)); - }); - current.data_with(|raw_log_edit: &RawLogStoreEdit| { - if raw_log_edit.is_empty() { - return; - } - journal.add(AllEdit::from_raw_log_edit(raw_log_edit)); - }); - current.data_with(|tx_log_edit: &TxLogStoreEdit| { - if tx_log_edit.is_empty() { - return; - } - journal.add(AllEdit::from_tx_log_edit(tx_log_edit)); - }); - journal.commit(); - } - }); - // Commit handler for log store tx_provider.register_commit_handler({ let state = new_self.state.clone(); @@ -679,10 +690,9 @@ impl TxLogStore { /// Syncs all the data managed by `TxLogStore` for persistence. pub fn sync(&self) -> Result<()> { self.raw_log_store.sync().unwrap(); - self.journal.lock().flush()?; + self.journal.lock().flush().unwrap(); - self.raw_disk.flush()?; - Ok(()) + self.raw_disk.flush() } } @@ -1290,14 +1300,14 @@ mod journaling { pub type Journal = EditJournal; pub type JournalCompactPolicy = NeverCompactPolicy; - #[derive(Clone, Serialize, Deserialize)] + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AllState { pub chunk_alloc: ChunkAllocState, pub raw_log_store: RawLogStoreState, pub tx_log_store: TxLogStoreState, } - #[derive(Serialize, Deserialize)] + #[derive(Debug, Serialize, Deserialize)] pub struct AllEdit { pub chunk_edit: ChunkAllocEdit, pub raw_log_edit: RawLogStoreEdit, @@ -1306,9 +1316,15 @@ mod journaling { impl Edit for AllEdit { fn apply_to(&self, state: &mut AllState) { - self.chunk_edit.apply_to(&mut state.chunk_alloc); - self.raw_log_edit.apply_to(&mut state.raw_log_store); - self.tx_log_edit.apply_to(&mut state.tx_log_store); + if !self.tx_log_edit.is_empty() { + self.tx_log_edit.apply_to(&mut state.tx_log_store); + } + if !self.raw_log_edit.is_empty() { + self.raw_log_edit.apply_to(&mut state.raw_log_store); + } + if !self.chunk_edit.is_empty() { + self.chunk_edit.apply_to(&mut state.chunk_alloc); + } } } diff --git a/core/src/layers/4-lsm/tx_lsm_tree.rs b/core/src/layers/4-lsm/tx_lsm_tree.rs index a693c00..6cfb26d 100644 --- a/core/src/layers/4-lsm/tx_lsm_tree.rs +++ b/core/src/layers/4-lsm/tx_lsm_tree.rs @@ -407,8 +407,6 @@ impl, V: RecordValue, D: BlockSet + 'static> TreeInner self.memtable_manager.sync(master_sync_id); - self.tx_log_store.sync().unwrap(); - // TODO: Error handling: try twice or ignore self.master_sync_id.increment()?; Ok(()) @@ -767,6 +765,12 @@ impl, V: RecordValue, D: BlockSet + 'static> Debug for TreeInner } } +impl, V: RecordValue, D: BlockSet + 'static> Debug for TxLsmTree { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.0) + } +} + impl LsmLevel { const LEVEL0_RATIO: u16 = 4; const LEVELI_RATIO: u16 = 10; @@ -963,7 +967,7 @@ mod tests { #[test] fn tx_lsm_tree_fns() -> Result<()> { - let nblocks = 64 * 1024; + let nblocks = 102400; let mem_disk = MemDisk::create(nblocks)?; let tx_log_store = Arc::new(TxLogStore::format(mem_disk, Key::random())?); let tx_lsm_tree: TxLsmTree = diff --git a/core/src/layers/5-disk/block_alloc.rs b/core/src/layers/5-disk/block_alloc.rs index 8674973..795ef23 100644 --- a/core/src/layers/5-disk/block_alloc.rs +++ b/core/src/layers/5-disk/block_alloc.rs @@ -8,7 +8,7 @@ use crate::util::BitMap; use core::mem::size_of; use core::num::NonZeroUsize; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use pod::Pod; use serde::{Deserialize, Serialize}; @@ -29,6 +29,7 @@ pub(super) struct AllocTable { bitmap: Mutex, next_avail: AtomicUsize, nblocks: NonZeroUsize, + is_dirty: AtomicBool, cvar: Condvar, num_free: CvarMutex, } @@ -60,6 +61,7 @@ impl AllocTable { bitmap: Mutex::new(BitMap::repeat(true, nblocks.get())), next_avail: AtomicUsize::new(0), nblocks, + is_dirty: AtomicBool::new(false), cvar: Condvar::new(), num_free: CvarMutex::new(nblocks.get()), } @@ -97,6 +99,9 @@ impl AllocTable { debug_assert_eq!(hbas.len(), cnt); *num_free -= cnt; + let _ = self + .is_dirty + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); Ok(hbas) } @@ -160,6 +165,7 @@ impl AllocTable { bitmap: Mutex::new(bitmap), next_avail: AtomicUsize::new(next_avail), nblocks, + is_dirty: AtomicBool::new(false), cvar: Condvar::new(), num_free: CvarMutex::new(num_free), }); @@ -203,6 +209,7 @@ impl AllocTable { bitmap: Mutex::new(bitmap), next_avail: AtomicUsize::new(next_avail), nblocks, + is_dirty: AtomicBool::new(false), cvar: Condvar::new(), num_free: CvarMutex::new(num_free), }) @@ -218,6 +225,10 @@ impl AllocTable { /// Persist the block validity table to `BVT` log. GC all existed `BAL` logs. pub fn do_compaction(&self, store: &Arc>) -> Result<()> { + if !self.is_dirty.load(Ordering::Relaxed) { + return Ok(()); + } + // Serialize the block validity table let bitmap = self.bitmap.lock(); const BITMAP_MAX_SIZE: usize = 1792 * BLOCK_SIZE; // TBD @@ -252,7 +263,10 @@ impl AllocTable { tx.abort(); return_errno_with_msg!(TxAborted, "persist block validity table TX aborted"); } - tx.commit() + tx.commit()?; + + self.is_dirty.store(false, Ordering::Relaxed); + Ok(()) } /// Mark a specific slot deallocated. diff --git a/core/src/layers/5-disk/sworndisk.rs b/core/src/layers/5-disk/sworndisk.rs index 3322f66..128d911 100644 --- a/core/src/layers/5-disk/sworndisk.rs +++ b/core/src/layers/5-disk/sworndisk.rs @@ -91,10 +91,11 @@ impl SwornDisk { /// Sync all cached data in the device to the storage medium for durability. pub fn sync(&self) -> Result<()> { let _wguard = self.inner.write_sync_region.write(); - self.inner.sync()?; + // TODO: Error handling the sync operation + self.inner.sync().unwrap(); #[cfg(not(feature = "linux"))] - debug!("[SwornDisk] Sync completed"); + trace!("[SwornDisk] Sync completed. {self:?}"); Ok(()) } @@ -150,7 +151,7 @@ impl SwornDisk { }; #[cfg(not(feature = "linux"))] - debug!("[SwornDisk] Created successfully!"); + info!("[SwornDisk] Created successfully! {:?}", &new_self); // XXX: Would `disk::drop()` bring unexpected behavior? Ok(new_self) } @@ -203,7 +204,7 @@ impl SwornDisk { }; #[cfg(not(feature = "linux"))] - debug!("[SwornDisk] Opened successfully!"); + info!("[SwornDisk] Opened successfully! {:?}", &opened_self); Ok(opened_self) } @@ -450,13 +451,13 @@ impl DiskInner { self.logical_block_table.sync()?; - self.user_data_disk.flush()?; - // XXX: May impact performance when there comes frequent syncs self.block_validity_table .do_compaction(&self.tx_log_store)?; - Ok(()) + self.tx_log_store.sync()?; + + self.user_data_disk.flush() } /// Handle one block I/O request. Mark the request completed when finished, @@ -524,6 +525,15 @@ impl Drop for SwornDisk { } } +impl Debug for SwornDisk { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SwornDisk") + .field("user_data_nblocks", &self.inner.user_data_disk.nblocks()) + .field("logical_block_table", &self.inner.logical_block_table) + .finish() + } +} + /// A wrapper for `[BufMut]` used in `readv()`. struct BufMutVec<'a> { bufs: &'a mut [BufMut<'a>],