From 9b246e135aa489bc2c561f0b133d591410edfe14 Mon Sep 17 00:00:00 2001 From: Fischer <1809327837@qq.com> Date: Thu, 19 Sep 2024 21:54:16 +0800 Subject: [PATCH] feat: implemented batch migration --- core/src/layers/5-disk/gc.rs | 153 ++++++++++++++++++++---- core/src/layers/5-disk/reverse_index.rs | 16 ++- 2 files changed, 143 insertions(+), 26 deletions(-) diff --git a/core/src/layers/5-disk/gc.rs b/core/src/layers/5-disk/gc.rs index 7f98519..06c4abd 100644 --- a/core/src/layers/5-disk/gc.rs +++ b/core/src/layers/5-disk/gc.rs @@ -4,7 +4,10 @@ use super::{ reverse_index::ReverseIndexTable, sworndisk::{Hba, Lba, RecordKey, RecordValue}, }; -use crate::{layers::lsm::TxLsmTree, BlockSet}; +use crate::{ + layers::{disk::chunk_alloc::CHUNK_SIZE, lsm::TxLsmTree}, + BlockSet, Error, +}; use crate::{ layers::{ disk::{bio::BlockBuf, block_alloc}, @@ -214,9 +217,12 @@ impl GcWorker { return Ok(()); } // Safety: if victim is none, the function will return early - let remapped_hbas = self.clean_and_migrate_data(victim.unwrap())?; - self.reverse_index_table - .remap_index_batch(remapped_hbas, &self.logical_block_table)?; + let (remapped_hbas, discard_hbas) = self.clean_and_migrate_data(victim.unwrap())?; + self.reverse_index_table.remap_index_batch( + remapped_hbas, + discard_hbas, + &self.logical_block_table, + )?; Ok(()) } @@ -235,9 +241,12 @@ impl GcWorker { break; }; chunk_ids.push(victim.chunk_id); - let remapped_hbas = self.clean_and_migrate_data(victim)?; - self.reverse_index_table - .remap_index_batch(remapped_hbas, &self.logical_block_table)?; + let (remapped_hbas, discard_hbas) = self.clean_and_migrate_data(victim)?; + self.reverse_index_table.remap_index_batch( + remapped_hbas, + discard_hbas, + &self.logical_block_table, + )?; } #[cfg(not(feature = "linux"))] debug!( @@ -248,10 +257,30 @@ impl GcWorker { Ok(()) } - pub fn clean_and_migrate_data(&self, victim: Victim) -> Result> { + // Find valid blocks to migrate and invalid blocks to discard and free blocks to store + pub fn find_target_hbas( + &self, + victim: Victim, + ) -> Result<(Vec, Vec<(Lba, Hba)>, Vec)> { let victim_chunk = &self.block_validity_table.get_chunk_info_table_ref()[victim.chunk_id]; - // TODO: use tx to migrate data from victim to other chunk ? - let victim_hbas = victim.blocks; + + let (valid_hbas, discard_hbas) = victim.blocks.into_iter().try_fold( + (Vec::new(), Vec::new()), + |(mut valid, mut discard), hba| { + // if victim hba is different from the hba that stored in logical block table, + // it means the block is already invalid but not deallocated by compaction, + // it should be discarded and be marked to avoid double free + let lba = self.reverse_index_table.get_lba(&hba); + let old_hba = self.logical_block_table.get(&RecordKey { lba })?; + if hba == old_hba.hba { + valid.push(hba); + } else { + discard.push((lba, hba)); + } + Ok::<_, Error>((valid, discard)) + }, + )?; + let mut target_hbas = Vec::new(); let mut found_enough_blocks = false; for chunk in self.block_validity_table.get_chunk_info_table_ref() { @@ -260,7 +289,7 @@ impl GcWorker { } let free_hbas = chunk.find_all_free_blocks(); for hba in free_hbas { - if target_hbas.len() >= victim_hbas.len() { + if target_hbas.len() >= valid_hbas.len() { found_enough_blocks = true; break; } @@ -270,25 +299,55 @@ impl GcWorker { break; } } - // TODO: use batch to migrate data + debug_assert_eq!(valid_hbas.len(), target_hbas.len()); + Ok((valid_hbas, discard_hbas, target_hbas)) + } + + pub fn clean_and_migrate_data( + &self, + victim: Victim, + ) -> Result<(Vec<(Hba, Hba)>, Vec<(Lba, Hba)>)> { + let victim_chunk = &self.block_validity_table.get_chunk_info_table_ref()[victim.chunk_id]; + + // TODO: use tx to migrate data from victim to other chunk ? + let (victim_hbas, discard_hbas, free_hbas) = self.find_target_hbas(victim)?; + let mut victim_data = Buf::alloc(victim_chunk.nblocks())?; + let offset = victim_chunk.chunk_id() * CHUNK_SIZE; + self.user_data_disk.read(offset, victim_data.as_mut())?; + + let target_hba_batches = free_hbas.group_by(|hba1, hba2| hba2.saturating_sub(*hba1) == 1); + let mut victim_hba_iter = victim_hbas.iter(); + for target_hba_batch in target_hba_batches { + let batch_len = target_hba_batch.len(); + let mut write_buf = Buf::alloc(batch_len)?; + + // read enough blocks to fill the batch + for i in 0..batch_len { + let Some(victim_hba) = victim_hba_iter.next() else { + break; + }; + let start = victim_hba * BLOCK_SIZE; + let end = (victim_hba + 1) * BLOCK_SIZE; + + let des_start = i * BLOCK_SIZE; + write_buf.as_mut_slice()[des_start..des_start + BLOCK_SIZE] + .copy_from_slice(&victim_data.as_slice()[start..end]); + } - debug_assert_eq!(victim_hbas.len(), target_hbas.len()); - for (victim_hba, target_hba) in victim_hbas.iter().zip(target_hbas.clone()) { - // TODO: use ReverseIndexTable and LogicalBlockTable to discard the invalid block - let mut victim_block = Buf::alloc(1)?; - self.user_data_disk - .read(*victim_hba, victim_block.as_mut())?; self.user_data_disk - .write(target_hba, victim_block.as_ref())?; + .write(*target_hba_batch.first().unwrap(), write_buf.as_ref())?; } - target_hbas + free_hbas .iter() .for_each(|hba| self.block_validity_table.set_allocated(*hba)); victim_chunk.clear_chunk(); - Ok(victim_hbas.into_iter().zip(target_hbas).collect()) + Ok(( + victim_hbas.into_iter().zip(free_hbas).collect(), + discard_hbas, + )) } // TODO: Support more rules @@ -497,7 +556,7 @@ mod tests { #[test] fn simple_data_migration() { - // init_logger(); + init_logger(); let nblocks = 64 * CHUNK_SIZE; let mem_disk = MemDisk::create(nblocks).unwrap(); let greedy_victim_policy = GreedyVictimPolicy {}; @@ -520,7 +579,6 @@ mod tests { disk.sync().unwrap(); } - std::thread::sleep(Duration::from_secs(5)); gc_worker.background_gc().unwrap(); // after gc, the block at offset 0 should be migrated to another chunk @@ -528,4 +586,53 @@ mod tests { disk.read(0, read_buf.as_mut()).unwrap(); assert_eq!(read_buf.as_slice(), content); } + + #[test] + fn batch_data_migration() { + init_logger(); + let nblocks = 64 * CHUNK_SIZE; + let mem_disk = MemDisk::create(nblocks).unwrap(); + let greedy_victim_policy = GreedyVictimPolicy {}; + let root_key = AeadKey::random(); + + let disk = SwornDisk::create(mem_disk, root_key, None, true, None).unwrap(); + let gc_worker = disk + .create_gc_worker(Arc::new(greedy_victim_policy)) + .unwrap(); + + // write enough blocks to trigger gc,[0-249] blocks is invalid chunk, 【250-550】 will be migrated + for i in 0..300 { + let content: Vec = vec![1 as u8; BLOCK_SIZE]; + let mut buf = Buf::alloc(1).unwrap(); + buf.as_mut_slice().copy_from_slice(&content); + disk.write(i, buf.as_ref()).unwrap(); + } + disk.sync().unwrap(); + + for i in 0..250 { + let content: Vec = vec![i as u8; BLOCK_SIZE]; + let mut buf = Buf::alloc(1).unwrap(); + buf.as_mut_slice().copy_from_slice(&content); + disk.write(i, buf.as_ref()).unwrap(); + } + disk.sync().unwrap(); + + gc_worker.background_gc().unwrap(); + + for i in 0..250 { + let content: Vec = vec![i as u8; BLOCK_SIZE]; + let mut read_buf = Buf::alloc(1).unwrap(); + disk.read(i, read_buf.as_mut()).unwrap(); + assert_eq!(read_buf.as_slice(), content, "block {} is not migrated", i); + } + + for i in 250..300 { + let content: Vec = vec![1 as u8; BLOCK_SIZE]; + let mut read_buf = Buf::alloc(1).unwrap(); + disk.read(i, read_buf.as_mut()).unwrap(); + assert_eq!(read_buf.as_slice(), content, "block {} is not migrated", i); + } + + // after gc, the block at offset 0 should be migrated to another chunk + } } diff --git a/core/src/layers/5-disk/reverse_index.rs b/core/src/layers/5-disk/reverse_index.rs index 3edad8d..eac3a55 100644 --- a/core/src/layers/5-disk/reverse_index.rs +++ b/core/src/layers/5-disk/reverse_index.rs @@ -18,6 +18,14 @@ impl ReverseIndexTable { } } + pub fn get_lba(&self, old_hba: &Hba) -> Lba { + let index_table = self.index_table.lock(); + index_table + .get(&old_hba) + .map(|lba| *lba) + .expect("hba should exist in index table") + } + pub fn update_index_batch(&self, records: impl Iterator) { let mut index_table = self.index_table.lock(); records.for_each(|(key, value)| { @@ -33,10 +41,15 @@ impl ReverseIndexTable { pub fn remap_index_batch( &self, remapped_hbas: Vec<(Hba, Hba)>, + discard_hbas: Vec<(Lba, Hba)>, tx_lsm_tree: &TxLsmTree, ) -> Result<()> { let mut index_table = self.index_table.lock(); let mut dealloc_table = self.dealloc_table.lock(); + + discard_hbas.into_iter().for_each(|(lba, hba)| { + dealloc_table.insert(lba, hba); + }); remapped_hbas .into_iter() .try_for_each(|(old_hba, new_hba)| { @@ -60,9 +73,6 @@ impl ReverseIndexTable { // write the record back to lsm tree tx_lsm_tree.put(record_key, record_value)?; - // record the lba -> old hba mapping into the dealloc table - dealloc_table.insert(lba, old_hba); - // update the reverse index table index_table.insert(new_hba, lba); index_table.remove(&old_hba);