Skip to content

Commit

Permalink
feat: implemented batch migration
Browse files Browse the repository at this point in the history
  • Loading branch information
Fischer0522 committed Sep 19, 2024
1 parent 821e393 commit 9b246e1
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 26 deletions.
153 changes: 130 additions & 23 deletions core/src/layers/5-disk/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use super::{
reverse_index::ReverseIndexTable,
sworndisk::{Hba, Lba, RecordKey, RecordValue},
};
use crate::{layers::lsm::TxLsmTree, BlockSet};
use crate::{
layers::{disk::chunk_alloc::CHUNK_SIZE, lsm::TxLsmTree},
BlockSet, Error,
};
use crate::{
layers::{
disk::{bio::BlockBuf, block_alloc},
Expand Down Expand Up @@ -214,9 +217,12 @@ impl<D: BlockSet + 'static> GcWorker<D> {
return Ok(());
}
// Safety: if victim is none, the function will return early
let remapped_hbas = self.clean_and_migrate_data(victim.unwrap())?;
self.reverse_index_table
.remap_index_batch(remapped_hbas, &self.logical_block_table)?;
let (remapped_hbas, discard_hbas) = self.clean_and_migrate_data(victim.unwrap())?;
self.reverse_index_table.remap_index_batch(
remapped_hbas,
discard_hbas,
&self.logical_block_table,
)?;
Ok(())
}

Expand All @@ -235,9 +241,12 @@ impl<D: BlockSet + 'static> GcWorker<D> {
break;
};
chunk_ids.push(victim.chunk_id);
let remapped_hbas = self.clean_and_migrate_data(victim)?;
self.reverse_index_table
.remap_index_batch(remapped_hbas, &self.logical_block_table)?;
let (remapped_hbas, discard_hbas) = self.clean_and_migrate_data(victim)?;
self.reverse_index_table.remap_index_batch(
remapped_hbas,
discard_hbas,
&self.logical_block_table,
)?;
}
#[cfg(not(feature = "linux"))]
debug!(
Expand All @@ -248,10 +257,30 @@ impl<D: BlockSet + 'static> GcWorker<D> {
Ok(())
}

pub fn clean_and_migrate_data(&self, victim: Victim) -> Result<Vec<(Hba, Hba)>> {
// Find valid blocks to migrate and invalid blocks to discard and free blocks to store
pub fn find_target_hbas(
&self,
victim: Victim,
) -> Result<(Vec<Hba>, Vec<(Lba, Hba)>, Vec<Hba>)> {
let victim_chunk = &self.block_validity_table.get_chunk_info_table_ref()[victim.chunk_id];
// TODO: use tx to migrate data from victim to other chunk ?
let victim_hbas = victim.blocks;

let (valid_hbas, discard_hbas) = victim.blocks.into_iter().try_fold(
(Vec::new(), Vec::new()),
|(mut valid, mut discard), hba| {
// if victim hba is different from the hba that stored in logical block table,
// it means the block is already invalid but not deallocated by compaction,
// it should be discarded and be marked to avoid double free
let lba = self.reverse_index_table.get_lba(&hba);
let old_hba = self.logical_block_table.get(&RecordKey { lba })?;
if hba == old_hba.hba {
valid.push(hba);
} else {
discard.push((lba, hba));
}
Ok::<_, Error>((valid, discard))
},
)?;

let mut target_hbas = Vec::new();
let mut found_enough_blocks = false;
for chunk in self.block_validity_table.get_chunk_info_table_ref() {
Expand All @@ -260,7 +289,7 @@ impl<D: BlockSet + 'static> GcWorker<D> {
}
let free_hbas = chunk.find_all_free_blocks();
for hba in free_hbas {
if target_hbas.len() >= victim_hbas.len() {
if target_hbas.len() >= valid_hbas.len() {
found_enough_blocks = true;
break;
}
Expand All @@ -270,25 +299,55 @@ impl<D: BlockSet + 'static> GcWorker<D> {
break;
}
}
// TODO: use batch to migrate data
debug_assert_eq!(valid_hbas.len(), target_hbas.len());
Ok((valid_hbas, discard_hbas, target_hbas))
}

pub fn clean_and_migrate_data(
&self,
victim: Victim,
) -> Result<(Vec<(Hba, Hba)>, Vec<(Lba, Hba)>)> {
let victim_chunk = &self.block_validity_table.get_chunk_info_table_ref()[victim.chunk_id];

// TODO: use tx to migrate data from victim to other chunk ?
let (victim_hbas, discard_hbas, free_hbas) = self.find_target_hbas(victim)?;
let mut victim_data = Buf::alloc(victim_chunk.nblocks())?;
let offset = victim_chunk.chunk_id() * CHUNK_SIZE;
self.user_data_disk.read(offset, victim_data.as_mut())?;

let target_hba_batches = free_hbas.group_by(|hba1, hba2| hba2.saturating_sub(*hba1) == 1);
let mut victim_hba_iter = victim_hbas.iter();
for target_hba_batch in target_hba_batches {
let batch_len = target_hba_batch.len();
let mut write_buf = Buf::alloc(batch_len)?;

// read enough blocks to fill the batch
for i in 0..batch_len {
let Some(victim_hba) = victim_hba_iter.next() else {
break;
};
let start = victim_hba * BLOCK_SIZE;
let end = (victim_hba + 1) * BLOCK_SIZE;

let des_start = i * BLOCK_SIZE;
write_buf.as_mut_slice()[des_start..des_start + BLOCK_SIZE]
.copy_from_slice(&victim_data.as_slice()[start..end]);
}

debug_assert_eq!(victim_hbas.len(), target_hbas.len());
for (victim_hba, target_hba) in victim_hbas.iter().zip(target_hbas.clone()) {
// TODO: use ReverseIndexTable and LogicalBlockTable to discard the invalid block
let mut victim_block = Buf::alloc(1)?;
self.user_data_disk
.read(*victim_hba, victim_block.as_mut())?;
self.user_data_disk
.write(target_hba, victim_block.as_ref())?;
.write(*target_hba_batch.first().unwrap(), write_buf.as_ref())?;
}

target_hbas
free_hbas
.iter()
.for_each(|hba| self.block_validity_table.set_allocated(*hba));

victim_chunk.clear_chunk();

Ok(victim_hbas.into_iter().zip(target_hbas).collect())
Ok((
victim_hbas.into_iter().zip(free_hbas).collect(),
discard_hbas,
))
}

// TODO: Support more rules
Expand Down Expand Up @@ -497,7 +556,7 @@ mod tests {

#[test]
fn simple_data_migration() {
// init_logger();
init_logger();
let nblocks = 64 * CHUNK_SIZE;
let mem_disk = MemDisk::create(nblocks).unwrap();
let greedy_victim_policy = GreedyVictimPolicy {};
Expand All @@ -520,12 +579,60 @@ mod tests {
disk.sync().unwrap();
}

std::thread::sleep(Duration::from_secs(5));
gc_worker.background_gc().unwrap();

// after gc, the block at offset 0 should be migrated to another chunk
let mut read_buf = Buf::alloc(1).unwrap();
disk.read(0, read_buf.as_mut()).unwrap();
assert_eq!(read_buf.as_slice(), content);
}

#[test]
fn batch_data_migration() {
init_logger();
let nblocks = 64 * CHUNK_SIZE;
let mem_disk = MemDisk::create(nblocks).unwrap();
let greedy_victim_policy = GreedyVictimPolicy {};
let root_key = AeadKey::random();

let disk = SwornDisk::create(mem_disk, root_key, None, true, None).unwrap();
let gc_worker = disk
.create_gc_worker(Arc::new(greedy_victim_policy))
.unwrap();

// write enough blocks to trigger gc,[0-249] blocks is invalid chunk, 【250-550】 will be migrated
for i in 0..300 {
let content: Vec<u8> = vec![1 as u8; BLOCK_SIZE];
let mut buf = Buf::alloc(1).unwrap();
buf.as_mut_slice().copy_from_slice(&content);
disk.write(i, buf.as_ref()).unwrap();
}
disk.sync().unwrap();

for i in 0..250 {
let content: Vec<u8> = vec![i as u8; BLOCK_SIZE];
let mut buf = Buf::alloc(1).unwrap();
buf.as_mut_slice().copy_from_slice(&content);
disk.write(i, buf.as_ref()).unwrap();
}
disk.sync().unwrap();

gc_worker.background_gc().unwrap();

for i in 0..250 {
let content: Vec<u8> = vec![i as u8; BLOCK_SIZE];
let mut read_buf = Buf::alloc(1).unwrap();
disk.read(i, read_buf.as_mut()).unwrap();
assert_eq!(read_buf.as_slice(), content, "block {} is not migrated", i);
}

for i in 250..300 {
let content: Vec<u8> = vec![1 as u8; BLOCK_SIZE];
let mut read_buf = Buf::alloc(1).unwrap();
disk.read(i, read_buf.as_mut()).unwrap();
assert_eq!(read_buf.as_slice(), content, "block {} is not migrated", i);
}

// after gc, the block at offset 0 should be migrated to another chunk
}
}
16 changes: 13 additions & 3 deletions core/src/layers/5-disk/reverse_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ impl ReverseIndexTable {
}
}

pub fn get_lba(&self, old_hba: &Hba) -> Lba {
let index_table = self.index_table.lock();
index_table
.get(&old_hba)
.map(|lba| *lba)
.expect("hba should exist in index table")
}

pub fn update_index_batch(&self, records: impl Iterator<Item = (RecordKey, RecordValue)>) {
let mut index_table = self.index_table.lock();
records.for_each(|(key, value)| {
Expand All @@ -33,10 +41,15 @@ impl ReverseIndexTable {
pub fn remap_index_batch<D: BlockSet + 'static>(
&self,
remapped_hbas: Vec<(Hba, Hba)>,
discard_hbas: Vec<(Lba, Hba)>,
tx_lsm_tree: &TxLsmTree<RecordKey, RecordValue, D>,
) -> Result<()> {
let mut index_table = self.index_table.lock();
let mut dealloc_table = self.dealloc_table.lock();

discard_hbas.into_iter().for_each(|(lba, hba)| {
dealloc_table.insert(lba, hba);
});
remapped_hbas
.into_iter()
.try_for_each(|(old_hba, new_hba)| {
Expand All @@ -60,9 +73,6 @@ impl ReverseIndexTable {
// write the record back to lsm tree
tx_lsm_tree.put(record_key, record_value)?;

// record the lba -> old hba mapping into the dealloc table
dealloc_table.insert(lba, old_hba);

// update the reverse index table
index_table.insert(new_hba, lba);
index_table.remove(&old_hba);
Expand Down

0 comments on commit 9b246e1

Please sign in to comment.