Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Aug 23, 2024
1 parent ec000b3 commit 4bd6904
Showing 1 changed file with 38 additions and 19 deletions.
57 changes: 38 additions & 19 deletions src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
while iter.is_valid() {
let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
panic!(
"decode failed for fast compact sst_id {} block_idx {} last_table_id {:?} table_id {}",
self.sstable_id, self.block_metas.len(), self.last_table_id, table_id
"decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
self.sstable_id, self.block_metas.len(), self.last_table_id
)
});
self.add_impl(iter.key(), value, false).await?;
Expand All @@ -220,11 +220,10 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
assert_eq!(
meta.len as usize,
buf.len(),
"meta {} buf {} last_table_id {:?} table_id {}",
"meta {} buf {} last_table_id {:?}",
meta.len,
buf.len(),
self.last_table_id,
table_id
self.last_table_id
);
meta.offset = self.writer.data_len() as u32;
self.block_metas.push(meta);
Expand Down Expand Up @@ -300,13 +299,12 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
"is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
is_new_user_key, self.sstable_id, self.block_metas.len(), table_id, self.last_table_id, full_key
);
if !self.block_builder.is_empty() {
self.build_block().await?;
}

self.table_ids.insert(table_id);
self.finalize_last_table_stats();
self.last_table_id = Some(table_id);
if !self.block_builder.is_empty() {
self.build_block().await?;
}
} else if is_block_full && could_switch_block {
self.build_block().await?;
}
Expand Down Expand Up @@ -372,9 +370,9 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
self.block_metas[0].smallest_key.clone()
};
let largest_key = self.last_full_key.clone();
self.build_block().await?;

self.finalize_last_table_stats();

self.build_block().await?;
let right_exclusive = false;
let meta_offset = self.writer.data_len() as u64;

Expand Down Expand Up @@ -533,6 +531,35 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
let bloom_filter_size = meta.bloom_filter.len();
let sstable_file_size = sst_info.file_size as usize;

{
// fill total_compressed_size

let mut last_total_compressed_size = 0;
let mut last_table_id = None;
for block_meta in &meta.block_metas {
let block_table_id = block_meta.table_id();
if last_table_id.is_none() || last_table_id.unwrap() != block_table_id.table_id() {
if last_table_id.is_some() {
self.table_stats
.get_mut(&last_table_id.unwrap())
.unwrap()
.total_compressed_size = last_total_compressed_size;
}

last_table_id = Some(block_table_id.table_id());
last_total_compressed_size = 0;
}
last_total_compressed_size += block_meta.len as u64;
}

if last_total_compressed_size != 0 {
self.table_stats
.get_mut(&last_table_id.unwrap())
.unwrap()
.total_compressed_size = last_total_compressed_size;
}
}

let writer_output = self.writer.finish(meta).await?;
Ok(SstableBuilderOutput::<W::Output> {
sst_info: LocalSstableInfo::new(sst_info, self.table_stats),
Expand Down Expand Up @@ -614,14 +641,6 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
if self.table_ids.is_empty() || self.last_table_id.is_none() {
return;
}

self.last_table_stats.total_compressed_size = self
.block_metas
.iter()
.filter(|block_meta| block_meta.table_id().table_id() == self.last_table_id.unwrap())
.map(|block_meta| block_meta.len as u64)
.sum::<u64>();

self.table_stats.insert(
self.last_table_id.unwrap(),
std::mem::take(&mut self.last_table_stats),
Expand Down

0 comments on commit 4bd6904

Please sign in to comment.