From 79d98109deee8518740be6a00c3e4f41c29d5f1b Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 27 Aug 2024 13:58:57 +0800 Subject: [PATCH] feat(storage): Support for repairing the size of a split sst based on table stats (#18053) --- proto/hummock.proto | 4 ++ .../picker/trivial_move_compaction_picker.rs | 3 +- .../picker/vnode_watermark_picker.rs | 2 +- src/meta/src/hummock/manager/commit_epoch.rs | 55 ++++++++++++++-- .../manager/compaction_group_manager.rs | 14 +--- src/meta/src/hummock/manager/tests.rs | 64 +++++-------------- src/meta/src/manager/diagnose.rs | 2 +- .../compaction_group/hummock_version_ext.rs | 37 ++++++++++- src/storage/hummock_sdk/src/table_stats.rs | 8 +++ .../src/hummock/compactor/compaction_utils.rs | 11 ++-- .../src/hummock/compactor/compactor_runner.rs | 2 - .../compactor/fast_compactor_runner.rs | 5 +- .../compactor/shared_buffer_compact.rs | 2 +- src/storage/src/hummock/sstable/builder.rs | 16 +++++ src/storage/src/hummock/sstable_store.rs | 2 +- 15 files changed, 141 insertions(+), 86 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 60dbe176958f9..9015cb44d42a3 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -922,6 +922,10 @@ message TableStats { int64 total_key_size = 1; int64 total_value_size = 2; int64 total_key_count = 3; + + // `total_compressed_size`` represents the size that the table takes up in the output sst + // and this field is only filled and used by CN flushes, not compactor compaction + uint64 total_compressed_size = 4; } message HummockVersionStats { diff --git a/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs index 6e172f15d3804..458919cd8b717 100644 --- a/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs @@ -55,7 +55,7 @@ impl TrivialMovePicker { ) -> Option { let mut skip_by_pending = false; for sst in select_tables { - if sst.file_size < self.sst_allowed_trivial_move_min_size { + if sst.sst_size < self.sst_allowed_trivial_move_min_size { continue; } @@ -128,6 +128,7 @@ pub mod tests { let sst = SstableInfo { sst_id: 1, file_size: 100, + sst_size: 100, ..Default::default() }; diff --git a/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs b/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs index 50a33c7d42e4f..5171c48ad9c34 100644 --- a/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs +++ b/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs @@ -50,7 +50,7 @@ impl VnodeWatermarkCompactionPicker { return None; } Some(CompactionInput { - select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(), total_file_count: select_input_ssts.len() as u64, input_levels: vec![ InputLevel { diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 0c07ca6f09fbc..0853e28fcb572 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -139,7 +139,7 @@ impl HummockManager { for s in &mut sstables { add_prost_table_stats_map( &mut table_stats_change, - &to_prost_table_stats_map(std::mem::take(&mut s.table_stats)), + &to_prost_table_stats_map(s.table_stats.clone()), ); } @@ -158,7 +158,6 @@ impl HummockManager { ); let state_table_info = &version.latest_version().state_table_info; - let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); // Add new table @@ -221,8 +220,23 @@ impl HummockManager { NewTableFragmentInfo::None => (HashMap::new(), None, None), }; + let mut group_members_table_ids: HashMap> = HashMap::new(); + { + // expand group_members_table_ids + for (table_id, group_id) in &table_compaction_group_mapping { + group_members_table_ids + .entry(*group_id) + .or_default() + .insert(*table_id); + } + } + let commit_sstables = self - .correct_commit_ssts(sstables, &table_compaction_group_mapping) + .correct_commit_ssts( + sstables, + &table_compaction_group_mapping, + &group_members_table_ids, + ) .await?; let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); @@ -379,6 +393,7 @@ impl HummockManager { &self, sstables: Vec, table_compaction_group_mapping: &HashMap, + group_members_table_ids: &HashMap>, ) -> Result>> { let mut new_sst_id_number = 0; let mut sst_to_cg_vec = Vec::with_capacity(sstables.len()); @@ -413,8 +428,36 @@ impl HummockManager { let mut commit_sstables: BTreeMap> = BTreeMap::new(); for (mut sst, group_table_ids) in sst_to_cg_vec { - for (group_id, _match_ids) in group_table_ids { - let branch_sst = split_sst(&mut sst.sst_info, &mut new_sst_id); + for (group_id, match_ids) in group_table_ids { + let group_members_table_ids = group_members_table_ids.get(&group_id).unwrap(); + if match_ids + .iter() + .all(|id| group_members_table_ids.contains(&TableId::new(*id))) + { + commit_sstables + .entry(group_id) + .or_default() + .push(sst.sst_info.clone()); + continue; + } + + let origin_sst_size = sst.sst_info.sst_size; + let new_sst_size = match_ids + .iter() + .map(|id| { + let stat = sst.table_stats.get(id).unwrap(); + stat.total_compressed_size + }) + .sum(); + + let branch_sst = split_sst( + &mut sst.sst_info, + &mut new_sst_id, + origin_sst_size - new_sst_size, + new_sst_size, + match_ids, + ); + commit_sstables .entry(group_id) .or_default() diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 2dac4a843ead6..c68fc4222f283 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -30,8 +30,8 @@ use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, PbGroupConstruct, - PbGroupDestroy, PbStateTableInfoDelta, + CompactionConfig, CompactionGroupInfo, CompatibilityVersion, PbGroupConstruct, PbGroupDestroy, + PbStateTableInfoDelta, }; use tokio::sync::OnceCell; @@ -605,16 +605,6 @@ impl HummockManager { drop(compaction_guard); self.report_compact_tasks(canceled_tasks).await?; - // Don't trigger compactions if we enable deterministic compaction - if !self.env.opts.compaction_deterministic_test { - // commit_epoch may contains SSTs from any compaction group - self.try_send_compaction_request(parent_group_id, compact_task::TaskType::SpaceReclaim); - self.try_send_compaction_request( - target_compaction_group_id, - compact_task::TaskType::SpaceReclaim, - ); - } - self.metrics .move_state_table_count .with_label_values(&[&parent_group_id.to_string()]) diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 73fb98c03e0d9..4b9ddcb18cbc2 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -41,10 +41,7 @@ use risingwave_pb::hummock::{HummockPinnedSnapshot, HummockPinnedVersion, Hummoc use risingwave_pb::meta::add_worker_node_request::Property; use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; -use crate::hummock::compaction::selector::{ - default_compaction_selector, CompactionSelector, ManualCompactionOption, - SpaceReclaimCompactionSelector, -}; +use crate::hummock::compaction::selector::{default_compaction_selector, ManualCompactionOption}; use crate::hummock::error::Error; use crate::hummock::test_utils::*; use crate::hummock::{HummockManager, HummockManagerRef}; @@ -1200,6 +1197,8 @@ async fn test_version_stats() { total_key_size: 1000, total_value_size: 100, total_key_count: 10, + + total_compressed_size: 1024 * 1024, }; let ssts_with_table_ids = vec![vec![1, 2], vec![2, 3]]; let sst_ids = get_sst_ids(&hummock_manager, ssts_with_table_ids.len() as _).await; @@ -1271,6 +1270,7 @@ async fn test_version_stats() { total_key_size: -1000, total_value_size: -100, total_key_count: -10, + total_compressed_size: 0, // unused }, ), ( @@ -1279,6 +1279,7 @@ async fn test_version_stats() { total_key_size: -1000, total_value_size: -100, total_key_count: -10, + total_compressed_size: 0, // unused }, ), ]); @@ -1384,11 +1385,6 @@ async fn test_split_compaction_group_on_demand_basic() { .unwrap_err(); assert_eq!("compaction group error: invalid group 100", err.to_string()); - hummock_manager - .split_compaction_group(2, &[]) - .await - .unwrap(); - let err = hummock_manager .split_compaction_group(2, &[100]) .await @@ -1477,7 +1473,7 @@ async fn test_split_compaction_group_on_demand_basic() { assert!(new_group_id > StaticCompactionGroupId::End as u64); assert_eq!( get_compaction_group_object_ids(¤t_version, 2), - vec![10, 11] + Vec::::new() ); assert_eq!( get_compaction_group_object_ids(¤t_version, new_group_id), @@ -1667,15 +1663,6 @@ async fn test_split_compaction_group_trivial_expired() { .split_compaction_group(2, &[100]) .await .unwrap(); - let mut selector: Box = - Box::::default(); - let (mut normal_tasks, _unscheduled) = hummock_manager - .get_compact_tasks_impl(vec![2], 1, &mut selector) - .await - .unwrap(); - use crate::hummock::manager::CompactStatus; - let reclaim_task = normal_tasks.pop().unwrap(); - assert!(CompactStatus::is_trivial_reclaim(&reclaim_task)); let current_version = hummock_manager.get_current_version().await; let new_group_id = current_version.levels.keys().max().cloned().unwrap(); @@ -1854,7 +1841,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { current_version.get_compaction_group_levels(2).levels[base_level - 1] .table_infos .len(), - 2 + 1 ); assert_eq!( @@ -1865,7 +1852,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { assert_eq!( current_version.get_compaction_group_levels(2).levels[base_level - 1].table_infos[0] .table_ids, - vec![100, 101] + vec![101] ); assert_eq!( current_version @@ -1881,7 +1868,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { .levels[base_level - 1] .table_infos[0] .table_ids, - vec![100, 101] + vec![100] ); assert_eq!( current_version @@ -2047,38 +2034,15 @@ async fn test_move_tables_between_compaction_group() { current_version.get_compaction_group_levels(2).levels[base_level - 1] .table_infos .len(), - 3 + 2 ); let level = ¤t_version .get_compaction_group_levels(new_group_id) .levels[base_level - 1]; assert_eq!(level.table_infos[0].table_ids, vec![100]); - assert_eq!(level.table_infos[1].table_ids, vec![100, 101]); + assert_eq!(level.table_infos[1].table_ids, vec![100]); assert_eq!(level.table_infos.len(), 2); - - let mut selector: Box = - Box::::default(); - - let compaction_task = hummock_manager - .get_compact_task(2, &mut selector) - .await - .unwrap() - .unwrap(); - assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1); - assert_eq!(compaction_task.input_ssts[0].table_infos[0].object_id, 12); - assert_eq!(compaction_task.existing_table_ids, vec![101]); - - let ret = hummock_manager - .report_compact_task( - compaction_task.task_id, - TaskStatus::Success, - vec![gen_sstable_info(20, 2, vec![101])], - None, - ) - .await - .unwrap(); - assert!(ret); } #[tokio::test] @@ -2137,11 +2101,13 @@ async fn test_gc_stats() { hummock_manager.create_version_checkpoint(0).await.unwrap(), 0 ); + assert_eq_gc_stats(0, 0, 6, 3, 2, 4); hummock_manager .unpin_version_before(context_id, HummockVersionId::MAX) .await .unwrap(); + assert_eq_gc_stats(0, 0, 6, 3, 2, 4); assert_eq!( hummock_manager.create_version_checkpoint(0).await.unwrap(), @@ -2353,7 +2319,7 @@ async fn test_unregister_moved_table() { assert_eq!(current_version.levels.len(), 3); assert_eq!( get_compaction_group_object_ids(¤t_version, 2), - vec![10, 11] + vec![11] ); assert_eq!( get_compaction_group_object_ids(¤t_version, new_group_id), @@ -2387,7 +2353,7 @@ async fn test_unregister_moved_table() { assert!(!current_version.levels.contains_key(&new_group_id)); assert_eq!( get_compaction_group_object_ids(¤t_version, 2), - vec![10, 11] + vec![11] ); assert_eq!( current_version diff --git a/src/meta/src/manager/diagnose.rs b/src/meta/src/manager/diagnose.rs index e50242a9d44e6..afacce8dfdecb 100644 --- a/src/meta/src/manager/diagnose.rs +++ b/src/meta/src/manager/diagnose.rs @@ -475,7 +475,7 @@ impl DiagnoseCommand { let mut visit_level = |level: &Level| { sst_num += level.table_infos.len(); sst_total_file_size += - level.table_infos.iter().map(|t| t.file_size).sum::(); + level.table_infos.iter().map(|t| t.sst_size).sum::(); for sst in &level.table_infos { if sst.total_key_count == 0 { continue; diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index bfd6402e21a8c..ca6585f46fd51 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1013,7 +1013,13 @@ fn split_sst_info_for_level( .cloned() .collect_vec(); if !removed_table_ids.is_empty() { - let branch_sst = split_sst(sst_info, new_sst_id); + let branch_sst = split_sst( + sst_info, + new_sst_id, + sst_info.sst_size / 2, + sst_info.sst_size / 2, + member_table_ids.iter().cloned().collect_vec(), + ); insert_table_infos.push(branch_sst); } } @@ -1220,7 +1226,7 @@ pub fn object_size_map(version: &HummockVersion) -> HashMap Vec { res } -pub fn split_sst(sst_info: &mut SstableInfo, new_sst_id: &mut u64) -> SstableInfo { +pub fn split_sst( + sst_info: &mut SstableInfo, + new_sst_id: &mut u64, + old_sst_size: u64, + new_sst_size: u64, + new_sst_table_ids: Vec, +) -> SstableInfo { let mut branch_table_info = sst_info.clone(); branch_table_info.sst_id = *new_sst_id; + branch_table_info.sst_size = new_sst_size; + sst_info.sst_id = *new_sst_id + 1; + sst_info.sst_size = old_sst_size; + + { + // related github.com/risingwavelabs/risingwave/pull/17898/ + // This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898 + + let set1: HashSet<_> = sst_info.table_ids.iter().cloned().collect(); + let set2: HashSet<_> = new_sst_table_ids.iter().cloned().collect(); + let intersection: Vec<_> = set1.intersection(&set2).cloned().collect(); + + // Update table_ids + branch_table_info.table_ids = intersection; + sst_info + .table_ids + .retain(|table_id| !branch_table_info.table_ids.contains(table_id)); + } + *new_sst_id += 1; branch_table_info diff --git a/src/storage/hummock_sdk/src/table_stats.rs b/src/storage/hummock_sdk/src/table_stats.rs index 4fce0e0f048ae..0ab7a252b8804 100644 --- a/src/storage/hummock_sdk/src/table_stats.rs +++ b/src/storage/hummock_sdk/src/table_stats.rs @@ -29,6 +29,10 @@ pub struct TableStats { pub total_key_size: i64, pub total_value_size: i64, pub total_key_count: i64, + + // `total_compressed_size`` represents the size that the table takes up in the output sst + // and this field is only filled and used by CN flushes, not compactor compaction + pub total_compressed_size: u64, } impl From<&TableStats> for PbTableStats { @@ -37,6 +41,7 @@ impl From<&TableStats> for PbTableStats { total_key_size: value.total_key_size, total_value_size: value.total_value_size, total_key_count: value.total_key_count, + total_compressed_size: value.total_compressed_size, } } } @@ -53,6 +58,7 @@ impl From<&PbTableStats> for TableStats { total_key_size: value.total_key_size, total_value_size: value.total_value_size, total_key_count: value.total_key_count, + total_compressed_size: value.total_compressed_size, } } } @@ -62,6 +68,7 @@ impl TableStats { self.total_key_size += other.total_key_size; self.total_value_size += other.total_value_size; self.total_key_count += other.total_key_count; + self.total_compressed_size += other.total_compressed_size; } } @@ -69,6 +76,7 @@ pub fn add_prost_table_stats(this: &mut PbTableStats, other: &PbTableStats) { this.total_key_size += other.total_key_size; this.total_value_size += other.total_value_size; this.total_key_count += other.total_key_count; + this.total_compressed_size += other.total_compressed_size; } pub fn add_prost_table_stats_map(this: &mut PbTableStatsMap, other: &PbTableStatsMap) { diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 6eb49d627b340..093612623cce8 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -128,7 +128,6 @@ pub struct TaskConfig { /// doesn't belong to this divided SST. See `Compactor::compact_and_build_sst`. pub stats_target_table_ids: Option>, pub task_type: PbTaskType, - pub is_target_l0_or_lbase: bool, pub use_block_based_filter: bool, pub table_vnode_partition: BTreeMap, @@ -508,7 +507,7 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorCon .collect_vec(); let compaction_size = sstable_infos .iter() - .map(|table_info| table_info.file_size) + .map(|table_info| table_info.sst_size) .sum::(); let all_ssts_are_blocked_filter = sstable_infos @@ -575,7 +574,7 @@ pub async fn generate_splits_for_task( .collect_vec(); let compaction_size = sstable_infos .iter() - .map(|table_info| table_info.file_size) + .map(|table_info| table_info.sst_size) .sum::(); if !optimize_by_copy_block { @@ -612,7 +611,7 @@ pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorCo .collect_vec(); let select_size = select_table_infos .iter() - .map(|table| table.file_size) + .map(|table| table.sst_size) .sum::(); context .compactor_metrics @@ -625,7 +624,7 @@ pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorCo .with_label_values(&[&group_label, &cur_level_label]) .inc_by(select_table_infos.len() as u64); - let target_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::(); + let target_level_read_bytes = target_table_infos.iter().map(|t| t.sst_size).sum::(); let next_level_label = compact_task.target_level.to_string(); context .compactor_metrics @@ -660,7 +659,7 @@ pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &Compacto .collect_vec(); let compaction_size = sstable_infos .iter() - .map(|table_info| table_info.file_size) + .map(|table_info| table_info.sst_size) .sum::(); let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; calculate_task_parallelism_impl( diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 773b2d565550e..f812ff5cd6ff6 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -111,8 +111,6 @@ impl CompactorRunner { watermark: task.watermark, stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())), task_type: task.task_type, - is_target_l0_or_lbase: task.target_level == 0 - || task.target_level == task.base_level, use_block_based_filter, table_vnode_partition: task.table_vnode_partition.clone(), table_schemas: task diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 227a50cb5813d..9851afc47f148 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -294,7 +294,6 @@ impl CompactorRunner { watermark: task.watermark, stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())), task_type: task.task_type, - is_target_l0_or_lbase: task.target_level == 0 || task.target_level == task.base_level, table_vnode_partition: task.table_vnode_partition.clone(), use_block_based_filter: true, table_schemas: Default::default(), @@ -493,10 +492,10 @@ impl CompactorRunner { } let mut total_read_bytes = 0; for sst in &self.left.sstables { - total_read_bytes += sst.file_size; + total_read_bytes += sst.sst_size; } for sst in &self.right.sstables { - total_read_bytes += sst.file_size; + total_read_bytes += sst.sst_size; } self.metrics .compact_fast_runner_bytes diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 00680f5906ccf..fab4497433ddb 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -259,6 +259,7 @@ async fn compact_shared_buffer( .compactor_metrics .write_build_l0_bytes .inc_by(sst_info.file_size()); + sst_infos.push(sst_info.sst_info.clone()); } level0.extend(ssts); @@ -557,7 +558,6 @@ impl SharedBufferCompactRunner { watermark: GC_WATERMARK_FOR_FLUSH, stats_target_table_ids: None, task_type: compact_task::TaskType::SharedBuffer, - is_target_l0_or_lbase: true, table_vnode_partition, use_block_based_filter, table_schemas: Default::default(), diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 5a3dc3ef598fb..32960c7b8f97d 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -244,6 +244,7 @@ impl SstableBuilder { self.filter_builder.add_raw_data(filter_data); let block_meta = self.block_metas.last_mut().unwrap(); self.writer.write_block_bytes(buf, block_meta).await?; + Ok(true) } @@ -539,6 +540,21 @@ impl SstableBuilder { let bloom_filter_size = meta.bloom_filter.len(); let sstable_file_size = sst_info.file_size as usize; + if !meta.block_metas.is_empty() { + // fill total_compressed_size + let mut last_table_id = meta.block_metas[0].table_id().table_id(); + let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap(); + for block_meta in &meta.block_metas { + let block_table_id = block_meta.table_id(); + if last_table_id != block_table_id.table_id() { + last_table_id = block_table_id.table_id(); + last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap(); + } + + last_table_stats.total_compressed_size += block_meta.len as u64; + } + } + let writer_output = self.writer.finish(meta).await?; Ok(SstableBuilderOutput:: { sst_info: LocalSstableInfo::new(sst_info, self.table_stats), diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index cea2c42529ceb..ee26c922c648b 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -578,7 +578,7 @@ impl SstableStore { let store = self.store.clone(); let meta_path = self.get_sst_data_path(object_id); let stats_ptr = stats.remote_io_time.clone(); - let range = sst.meta_offset as usize..sst.file_size as usize; + let range = sst.meta_offset as usize..; async move { let now = Instant::now(); let buf = store.read(&meta_path, range).await?;