From 1d7e169312a8d2f98f36a4b936082c9e9a20b938 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 15 Aug 2024 17:56:14 +0800 Subject: [PATCH 01/14] feat(storage): support split sst with size from table_stats --- src/meta/src/hummock/manager/commit_epoch.rs | 29 +++++++++++++++++-- .../compaction_group/hummock_version_ext.rs | 18 ++++++++++-- .../src/hummock/compactor/compaction_utils.rs | 1 - .../src/hummock/compactor/compactor_runner.rs | 2 -- .../compactor/fast_compactor_runner.rs | 1 - .../compactor/shared_buffer_compact.rs | 1 - src/storage/src/hummock/sstable_store.rs | 2 +- 7 files changed, 43 insertions(+), 11 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index c7c2057fcc96e..83d89a75dc6a2 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -165,7 +165,7 @@ impl HummockManager { for s in &mut sstables { add_prost_table_stats_map( &mut table_stats_change, - &to_prost_table_stats_map(std::mem::take(&mut s.table_stats)), + &to_prost_table_stats_map(s.table_stats.clone()), ); } @@ -464,8 +464,31 @@ impl HummockManager { let mut commit_sstables: BTreeMap> = BTreeMap::new(); for (mut sst, group_table_ids) in sst_to_cg_vec { - for (group_id, _match_ids) in group_table_ids { - let branch_sst = split_sst(&mut sst.sst_info, &mut new_sst_id); + let sst_size_from_stats = sst + .table_stats + .values() + .map(|stat| stat.total_key_size as u64 + stat.total_value_size as u64) + .sum::(); + for (group_id, match_ids) in group_table_ids { + let match_table_ids_size: u64 = match_ids + .iter() + .map(|id| { + let stat = sst.table_stats.get(id).unwrap(); + stat.total_key_size as u64 + stat.total_value_size as u64 + }) + .sum(); + + let origin_sst_size = sst.sst_info.file_size; + // Since the block encode may trigger a compress, the sst size may not necessarily match the stats, so a proportional calculation is used to determine the estimated size of the new sst. + let new_sst_size = (match_table_ids_size as f64 / sst_size_from_stats as f64 + * origin_sst_size as f64) as u64; + + let branch_sst = split_sst( + &mut sst.sst_info, + &mut new_sst_id, + origin_sst_size - new_sst_size, + new_sst_size, + ); commit_sstables .entry(group_id) .or_default() diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 1ee4fe0443783..2026db68eaa53 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1006,7 +1006,12 @@ fn split_sst_info_for_level( .cloned() .collect_vec(); if !removed_table_ids.is_empty() { - let branch_sst = split_sst(sst_info, new_sst_id); + let branch_sst = split_sst( + sst_info, + new_sst_id, + sst_info.file_size / 2, + sst_info.file_size / 2, + ); insert_table_infos.push(branch_sst); } } @@ -1320,10 +1325,19 @@ pub fn validate_version(version: &HummockVersion) -> Vec { res } -pub fn split_sst(sst_info: &mut SstableInfo, new_sst_id: &mut u64) -> SstableInfo { +pub fn split_sst( + sst_info: &mut SstableInfo, + new_sst_id: &mut u64, + old_sst_size: u64, + new_sst_size: u64, +) -> SstableInfo { let mut branch_table_info = sst_info.clone(); branch_table_info.sst_id = *new_sst_id; + branch_table_info.file_size = new_sst_size; + sst_info.sst_id = *new_sst_id + 1; + sst_info.file_size = old_sst_size; + *new_sst_id += 1; branch_table_info diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 6eb49d627b340..e579534427baa 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -128,7 +128,6 @@ pub struct TaskConfig { /// doesn't belong to this divided SST. See `Compactor::compact_and_build_sst`. pub stats_target_table_ids: Option>, pub task_type: PbTaskType, - pub is_target_l0_or_lbase: bool, pub use_block_based_filter: bool, pub table_vnode_partition: BTreeMap, diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 773b2d565550e..f812ff5cd6ff6 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -111,8 +111,6 @@ impl CompactorRunner { watermark: task.watermark, stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())), task_type: task.task_type, - is_target_l0_or_lbase: task.target_level == 0 - || task.target_level == task.base_level, use_block_based_filter, table_vnode_partition: task.table_vnode_partition.clone(), table_schemas: task diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 227a50cb5813d..8cf65e66c1de7 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -294,7 +294,6 @@ impl CompactorRunner { watermark: task.watermark, stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())), task_type: task.task_type, - is_target_l0_or_lbase: task.target_level == 0 || task.target_level == task.base_level, table_vnode_partition: task.table_vnode_partition.clone(), use_block_based_filter: true, table_schemas: Default::default(), diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 00680f5906ccf..35b5e7b21d27d 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -557,7 +557,6 @@ impl SharedBufferCompactRunner { watermark: GC_WATERMARK_FOR_FLUSH, stats_target_table_ids: None, task_type: compact_task::TaskType::SharedBuffer, - is_target_l0_or_lbase: true, table_vnode_partition, use_block_based_filter, table_schemas: Default::default(), diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index cea2c42529ceb..ee26c922c648b 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -578,7 +578,7 @@ impl SstableStore { let store = self.store.clone(); let meta_path = self.get_sst_data_path(object_id); let stats_ptr = stats.remote_io_time.clone(); - let range = sst.meta_offset as usize..sst.file_size as usize; + let range = sst.meta_offset as usize..; async move { let now = Instant::now(); let buf = store.read(&meta_path, range).await?; From 2c189f0704a80d1f222970b936db6000f2f29365 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 16 Aug 2024 14:11:22 +0800 Subject: [PATCH 02/14] fix(storage): fix trivial --- src/meta/src/hummock/manager/commit_epoch.rs | 43 +++++++++++++++----- src/meta/src/hummock/manager/tests.rs | 2 + src/storage/hummock_sdk/src/version.rs | 14 +++++-- 3 files changed, 45 insertions(+), 14 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index f6a521bb8eaf3..31ae3af252fc7 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -160,6 +160,7 @@ impl HummockManager { let state_table_info = &version.latest_version().state_table_info; let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); + let group_members_table_ids = state_table_info.build_compaction_group_member_tables(); // Add new table let (new_table_ids, new_compaction_group, compaction_group_manager_txn) = @@ -222,7 +223,11 @@ impl HummockManager { }; let commit_sstables = self - .correct_commit_ssts(sstables, &table_compaction_group_mapping) + .correct_commit_ssts( + sstables, + &table_compaction_group_mapping, + &group_members_table_ids, + ) .await?; let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); @@ -379,6 +384,7 @@ impl HummockManager { &self, sstables: Vec, table_compaction_group_mapping: &HashMap, + group_members_table_ids: &HashMap>, ) -> Result>> { let mut new_sst_id_number = 0; let mut sst_to_cg_vec = Vec::with_capacity(sstables.len()); @@ -419,18 +425,33 @@ impl HummockManager { .map(|stat| stat.total_key_size as u64 + stat.total_value_size as u64) .sum::(); for (group_id, match_ids) in group_table_ids { - let match_table_ids_size: u64 = match_ids + let group_members_table_ids = group_members_table_ids.get(&group_id).unwrap(); + if match_ids .iter() - .map(|id| { - let stat = sst.table_stats.get(id).unwrap(); - stat.total_key_size as u64 + stat.total_value_size as u64 - }) - .sum(); + .all(|id| group_members_table_ids.contains(&TableId::new(*id))) + { + commit_sstables + .entry(group_id) + .or_default() + .push(sst.sst_info.clone()); + continue; + } let origin_sst_size = sst.sst_info.file_size; - // Since the block encode may trigger a compress, the sst size may not necessarily match the stats, so a proportional calculation is used to determine the estimated size of the new sst. - let new_sst_size = (match_table_ids_size as f64 / sst_size_from_stats as f64 - * origin_sst_size as f64) as u64; + let new_sst_size = if !sst.table_stats.is_empty() { + let match_table_ids_size: u64 = match_ids + .iter() + .map(|id| { + let stat = sst.table_stats.get(id).unwrap(); + stat.total_key_size as u64 + stat.total_value_size as u64 + }) + .sum(); + // Since the block encode may trigger a compress, the sst size may not necessarily match the stats, so a proportional calculation is used to determine the estimated size of the new sst. + (match_table_ids_size as f64 / sst_size_from_stats as f64 + * origin_sst_size as f64) as u64 + } else { + origin_sst_size / 2 + }; let branch_sst = split_sst( &mut sst.sst_info, diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 5fbec419cc7fe..d5b5cb7649ddb 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -2108,11 +2108,13 @@ async fn test_gc_stats() { hummock_manager.create_version_checkpoint(0).await.unwrap(), 0 ); + assert_eq_gc_stats(0, 0, 6, 3, 2, 4); hummock_manager .unpin_version_before(context_id, HummockVersionId::MAX) .await .unwrap(); + assert_eq_gc_stats(0, 0, 6, 3, 2, 4); assert_eq!( hummock_manager.create_version_checkpoint(0).await.unwrap(), diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index e418250f0b6bf..108ad04ab6905 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -54,7 +54,15 @@ impl HummockVersionStateTableInfo { } } - fn build_compaction_group_member_tables( + pub fn build_compaction_group_member_tables( + &self, + ) -> HashMap> { + HummockVersionStateTableInfo::build_compaction_group_member_tables_impl( + &self.state_table_info, + ) + } + + fn build_compaction_group_member_tables_impl( state_table_info: &HashMap, ) -> HashMap> { let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new(); @@ -80,7 +88,7 @@ impl HummockVersionStateTableInfo { .map(|(table_id, info)| (TableId::new(*table_id), *info)) .collect(); let compaction_group_member_tables = - Self::build_compaction_group_member_tables(&state_table_info); + Self::build_compaction_group_member_tables_impl(&state_table_info); Self { state_table_info, compaction_group_member_tables, @@ -184,7 +192,7 @@ impl HummockVersionStateTableInfo { } debug_assert_eq!( self.compaction_group_member_tables, - Self::build_compaction_group_member_tables(&self.state_table_info) + Self::build_compaction_group_member_tables_impl(&self.state_table_info) ); (changed_table, has_bumped_committed_epoch) } From b836ca7c2155e599c1589ddb40c6dd7c2c9190b1 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 16 Aug 2024 17:09:32 +0800 Subject: [PATCH 03/14] fix(storage): fix test --- src/meta/src/hummock/manager/commit_epoch.rs | 13 +++++++++++-- src/storage/hummock_sdk/src/version.rs | 14 +++----------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 31ae3af252fc7..1b37c39974475 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -158,9 +158,7 @@ impl HummockManager { ); let state_table_info = &version.latest_version().state_table_info; - let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); - let group_members_table_ids = state_table_info.build_compaction_group_member_tables(); // Add new table let (new_table_ids, new_compaction_group, compaction_group_manager_txn) = @@ -222,6 +220,17 @@ impl HummockManager { NewTableFragmentInfo::None => (HashMap::new(), None, None), }; + let mut group_members_table_ids: HashMap> = HashMap::new(); + { + // expand group_members_table_ids + for (table_id, group_id) in &table_compaction_group_mapping { + group_members_table_ids + .entry(*group_id) + .or_default() + .insert(*table_id); + } + } + let commit_sstables = self .correct_commit_ssts( sstables, diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 108ad04ab6905..e418250f0b6bf 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -54,15 +54,7 @@ impl HummockVersionStateTableInfo { } } - pub fn build_compaction_group_member_tables( - &self, - ) -> HashMap> { - HummockVersionStateTableInfo::build_compaction_group_member_tables_impl( - &self.state_table_info, - ) - } - - fn build_compaction_group_member_tables_impl( + fn build_compaction_group_member_tables( state_table_info: &HashMap, ) -> HashMap> { let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new(); @@ -88,7 +80,7 @@ impl HummockVersionStateTableInfo { .map(|(table_id, info)| (TableId::new(*table_id), *info)) .collect(); let compaction_group_member_tables = - Self::build_compaction_group_member_tables_impl(&state_table_info); + Self::build_compaction_group_member_tables(&state_table_info); Self { state_table_info, compaction_group_member_tables, @@ -192,7 +184,7 @@ impl HummockVersionStateTableInfo { } debug_assert_eq!( self.compaction_group_member_tables, - Self::build_compaction_group_member_tables_impl(&self.state_table_info) + Self::build_compaction_group_member_tables(&self.state_table_info) ); (changed_table, has_bumped_committed_epoch) } From 57574dd9b1230dccd1b5943826013489c130d182 Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 21 Aug 2024 17:26:46 +0800 Subject: [PATCH 04/14] refactor(storage): address comment --- proto/hummock.proto | 2 + src/meta/src/hummock/manager/commit_epoch.rs | 34 +++++--------- src/meta/src/hummock/manager/tests.rs | 4 ++ src/storage/hummock_sdk/src/table_stats.rs | 7 +++ .../compactor/shared_buffer_compact.rs | 6 +++ src/storage/src/hummock/sstable/builder.rs | 47 +++++++++++++++---- 6 files changed, 69 insertions(+), 31 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 5d66a2b7bb79b..06a6804b048df 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -913,6 +913,8 @@ message TableStats { int64 total_key_size = 1; int64 total_value_size = 2; int64 total_key_count = 3; + + uint64 compressed_size_in_sstable = 4; } message HummockVersionStats { diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 1b37c39974475..a1ef3e2689513 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -428,11 +428,6 @@ impl HummockManager { let mut commit_sstables: BTreeMap> = BTreeMap::new(); for (mut sst, group_table_ids) in sst_to_cg_vec { - let sst_size_from_stats = sst - .table_stats - .values() - .map(|stat| stat.total_key_size as u64 + stat.total_value_size as u64) - .sum::(); for (group_id, match_ids) in group_table_ids { let group_members_table_ids = group_members_table_ids.get(&group_id).unwrap(); if match_ids @@ -447,27 +442,24 @@ impl HummockManager { } let origin_sst_size = sst.sst_info.file_size; - let new_sst_size = if !sst.table_stats.is_empty() { - let match_table_ids_size: u64 = match_ids - .iter() - .map(|id| { - let stat = sst.table_stats.get(id).unwrap(); - stat.total_key_size as u64 + stat.total_value_size as u64 - }) - .sum(); - // Since the block encode may trigger a compress, the sst size may not necessarily match the stats, so a proportional calculation is used to determine the estimated size of the new sst. - (match_table_ids_size as f64 / sst_size_from_stats as f64 - * origin_sst_size as f64) as u64 - } else { - origin_sst_size / 2 - }; - - let branch_sst = split_sst( + let new_sst_size = match_ids + .iter() + .map(|id| { + let stat = sst.table_stats.get(id).unwrap(); + stat.compressed_size_in_sstable + }) + .sum(); + + let mut branch_sst = split_sst( &mut sst.sst_info, &mut new_sst_id, origin_sst_size - new_sst_size, new_sst_size, ); + + // FIXME(li0k): We would change table_ids inside the `split_sst` function + branch_sst.table_ids = match_ids; + commit_sstables .entry(group_id) .or_default() diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index d5b5cb7649ddb..9b52c6794dbac 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -1196,6 +1196,8 @@ async fn test_version_stats() { total_key_size: 1000, total_value_size: 100, total_key_count: 10, + + compressed_size_in_sstable: 1024 * 1024, }; let ssts_with_table_ids = vec![vec![1, 2], vec![2, 3]]; let sst_ids = get_sst_ids(&hummock_manager, ssts_with_table_ids.len() as _).await; @@ -1266,6 +1268,7 @@ async fn test_version_stats() { total_key_size: -1000, total_value_size: -100, total_key_count: -10, + compressed_size_in_sstable: 0, // unused }, ), ( @@ -1274,6 +1277,7 @@ async fn test_version_stats() { total_key_size: -1000, total_value_size: -100, total_key_count: -10, + compressed_size_in_sstable: 0, // unused }, ), ]); diff --git a/src/storage/hummock_sdk/src/table_stats.rs b/src/storage/hummock_sdk/src/table_stats.rs index 4fce0e0f048ae..3ee38e11a174b 100644 --- a/src/storage/hummock_sdk/src/table_stats.rs +++ b/src/storage/hummock_sdk/src/table_stats.rs @@ -29,6 +29,9 @@ pub struct TableStats { pub total_key_size: i64, pub total_value_size: i64, pub total_key_count: i64, + + // `compressed_size_in_sstable`` represents the size that the table takes up in the output sst, and is only meaningful when `TableStats` is the sstable builder output. + pub compressed_size_in_sstable: u64, } impl From<&TableStats> for PbTableStats { @@ -37,6 +40,7 @@ impl From<&TableStats> for PbTableStats { total_key_size: value.total_key_size, total_value_size: value.total_value_size, total_key_count: value.total_key_count, + compressed_size_in_sstable: value.compressed_size_in_sstable, } } } @@ -53,6 +57,7 @@ impl From<&PbTableStats> for TableStats { total_key_size: value.total_key_size, total_value_size: value.total_value_size, total_key_count: value.total_key_count, + compressed_size_in_sstable: value.compressed_size_in_sstable, } } } @@ -62,6 +67,7 @@ impl TableStats { self.total_key_size += other.total_key_size; self.total_value_size += other.total_value_size; self.total_key_count += other.total_key_count; + self.compressed_size_in_sstable += other.compressed_size_in_sstable; } } @@ -69,6 +75,7 @@ pub fn add_prost_table_stats(this: &mut PbTableStats, other: &PbTableStats) { this.total_key_size += other.total_key_size; this.total_value_size += other.total_value_size; this.total_key_count += other.total_key_count; + this.compressed_size_in_sstable += other.compressed_size_in_sstable; } pub fn add_prost_table_stats_map(this: &mut PbTableStatsMap, other: &PbTableStatsMap) { diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 35b5e7b21d27d..dc142a264cd52 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -259,6 +259,12 @@ async fn compact_shared_buffer( .compactor_metrics .write_build_l0_bytes .inc_by(sst_info.file_size()); + + println!( + "sst_info: {:?} size {:?} stats {:?}", + sst_info.sst_info.sst_id, sst_info.sst_info.file_size, sst_info.table_stats + ); + sst_infos.push(sst_info.sst_info.clone()); } level0.extend(ssts); diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 77399eb3e3d08..f5c6dd1479dca 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -196,11 +196,6 @@ impl SstableBuilder { mut meta: BlockMeta, ) -> HummockResult { let table_id = smallest_key.user_key.table_id.table_id; - if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id { - self.table_ids.insert(table_id); - self.finalize_last_table_stats(); - self.last_table_id = Some(table_id); - } if !self.block_builder.is_empty() { let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4); if self.block_builder.approximate_len() < min_block_size { @@ -235,6 +230,20 @@ impl SstableBuilder { self.filter_builder.add_raw_data(filter_data); let block_meta = self.block_metas.last_mut().unwrap(); self.writer.write_block_bytes(buf, block_meta).await?; + + self.last_table_stats.compressed_size_in_sstable = self + .block_metas + .iter() + .filter(|block_meta| block_meta.table_id().table_id() == self.last_table_id.unwrap()) + .map(|block_meta| block_meta.len as u64) + .sum::(); + + if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id { + self.table_ids.insert(table_id); + self.finalize_last_table_stats(); + self.last_table_id = Some(table_id); + } + Ok(true) } @@ -297,12 +306,22 @@ impl SstableBuilder { "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}", is_new_user_key, self.sstable_id, self.block_metas.len(), table_id, self.last_table_id, full_key ); - self.table_ids.insert(table_id); - self.finalize_last_table_stats(); - self.last_table_id = Some(table_id); if !self.block_builder.is_empty() { self.build_block().await?; } + + self.last_table_stats.compressed_size_in_sstable = self + .block_metas + .iter() + .filter(|block_meta| { + block_meta.table_id().table_id() == self.last_table_id.unwrap() + }) + .map(|block_meta| block_meta.len as u64) + .sum::(); + + self.table_ids.insert(table_id); + self.finalize_last_table_stats(); + self.last_table_id = Some(table_id); } else if is_block_full && could_switch_block { self.build_block().await?; } @@ -368,9 +387,16 @@ impl SstableBuilder { self.block_metas[0].smallest_key.clone() }; let largest_key = self.last_full_key.clone(); - self.finalize_last_table_stats(); - self.build_block().await?; + + self.last_table_stats.compressed_size_in_sstable = self + .block_metas + .iter() + .filter(|block_meta| block_meta.table_id().table_id() == self.last_table_id.unwrap()) + .map(|block_meta| block_meta.len as u64) + .sum::(); + + self.finalize_last_table_stats(); let right_exclusive = false; let meta_offset = self.writer.data_len() as u64; @@ -610,6 +636,7 @@ impl SstableBuilder { if self.table_ids.is_empty() || self.last_table_id.is_none() { return; } + self.table_stats.insert( self.last_table_id.unwrap(), std::mem::take(&mut self.last_table_stats), From 4324572128dd2610e4d372c4d699b683c9887e6b Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 21 Aug 2024 18:10:52 +0800 Subject: [PATCH 05/14] typo --- src/storage/src/hummock/compactor/shared_buffer_compact.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index dc142a264cd52..fab4497433ddb 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -260,11 +260,6 @@ async fn compact_shared_buffer( .write_build_l0_bytes .inc_by(sst_info.file_size()); - println!( - "sst_info: {:?} size {:?} stats {:?}", - sst_info.sst_info.sst_id, sst_info.sst_info.file_size, sst_info.table_stats - ); - sst_infos.push(sst_info.sst_info.clone()); } level0.extend(ssts); From 2ccdb93809f65596b1aade9d42b48a9f900d0a16 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 22 Aug 2024 20:01:28 +0800 Subject: [PATCH 06/14] refactor(compaction): address comment --- proto/hummock.proto | 4 +- src/meta/src/hummock/manager/commit_epoch.rs | 8 +-- src/meta/src/hummock/manager/tests.rs | 64 ++++--------------- .../compaction_group/hummock_version_ext.rs | 17 +++++ src/storage/hummock_sdk/src/table_stats.rs | 13 ++-- src/storage/src/hummock/sstable/builder.rs | 39 ++++------- 6 files changed, 54 insertions(+), 91 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 06a6804b048df..5c50a3c5a83e0 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -914,7 +914,9 @@ message TableStats { int64 total_value_size = 2; int64 total_key_count = 3; - uint64 compressed_size_in_sstable = 4; + // `total_compressed_size`` represents the size that the table takes up in the output sst + // and this field is only filled and used by CN flushes, not compactor compaction + uint64 total_compressed_size = 4; } message HummockVersionStats { diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index a1ef3e2689513..6c56890c07a1d 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -446,20 +446,18 @@ impl HummockManager { .iter() .map(|id| { let stat = sst.table_stats.get(id).unwrap(); - stat.compressed_size_in_sstable + stat.total_compressed_size }) .sum(); - let mut branch_sst = split_sst( + let branch_sst = split_sst( &mut sst.sst_info, &mut new_sst_id, origin_sst_size - new_sst_size, new_sst_size, + match_ids, ); - // FIXME(li0k): We would change table_ids inside the `split_sst` function - branch_sst.table_ids = match_ids; - commit_sstables .entry(group_id) .or_default() diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 533f9f91da47e..70c7d3feb7254 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -41,10 +41,7 @@ use risingwave_pb::hummock::{HummockPinnedSnapshot, HummockPinnedVersion, Hummoc use risingwave_pb::meta::add_worker_node_request::Property; use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; -use crate::hummock::compaction::selector::{ - default_compaction_selector, CompactionSelector, ManualCompactionOption, - SpaceReclaimCompactionSelector, -}; +use crate::hummock::compaction::selector::{default_compaction_selector, ManualCompactionOption}; use crate::hummock::error::Error; use crate::hummock::test_utils::*; use crate::hummock::{HummockManager, HummockManagerRef}; @@ -1199,7 +1196,7 @@ async fn test_version_stats() { total_value_size: 100, total_key_count: 10, - compressed_size_in_sstable: 1024 * 1024, + total_compressed_size: 1024 * 1024, }; let ssts_with_table_ids = vec![vec![1, 2], vec![2, 3]]; let sst_ids = get_sst_ids(&hummock_manager, ssts_with_table_ids.len() as _).await; @@ -1270,7 +1267,7 @@ async fn test_version_stats() { total_key_size: -1000, total_value_size: -100, total_key_count: -10, - compressed_size_in_sstable: 0, // unused + total_compressed_size: 0, // unused }, ), ( @@ -1279,7 +1276,7 @@ async fn test_version_stats() { total_key_size: -1000, total_value_size: -100, total_key_count: -10, - compressed_size_in_sstable: 0, // unused + total_compressed_size: 0, // unused }, ), ]); @@ -1383,11 +1380,6 @@ async fn test_split_compaction_group_on_demand_basic() { .unwrap_err(); assert_eq!("compaction group error: invalid group 100", err.to_string()); - hummock_manager - .split_compaction_group(2, &[]) - .await - .unwrap(); - let err = hummock_manager .split_compaction_group(2, &[100]) .await @@ -1472,7 +1464,7 @@ async fn test_split_compaction_group_on_demand_basic() { assert!(new_group_id > StaticCompactionGroupId::End as u64); assert_eq!( get_compaction_group_object_ids(¤t_version, 2), - vec![10, 11] + Vec::::new() ); assert_eq!( get_compaction_group_object_ids(¤t_version, new_group_id), @@ -1656,15 +1648,6 @@ async fn test_split_compaction_group_trivial_expired() { .split_compaction_group(2, &[100]) .await .unwrap(); - let mut selector: Box = - Box::::default(); - let (mut normal_tasks, _unscheduled) = hummock_manager - .get_compact_tasks_impl(vec![2], 1, &mut selector) - .await - .unwrap(); - use crate::hummock::manager::CompactStatus; - let reclaim_task = normal_tasks.pop().unwrap(); - assert!(CompactStatus::is_trivial_reclaim(&reclaim_task)); let current_version = hummock_manager.get_current_version().await; let new_group_id = current_version.levels.keys().max().cloned().unwrap(); @@ -1835,7 +1818,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { current_version.get_compaction_group_levels(2).levels[base_level - 1] .table_infos .len(), - 2 + 1 ); assert_eq!( @@ -1846,7 +1829,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { assert_eq!( current_version.get_compaction_group_levels(2).levels[base_level - 1].table_infos[0] .table_ids, - vec![100, 101] + vec![101] ); assert_eq!( current_version @@ -1862,7 +1845,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { .levels[base_level - 1] .table_infos[0] .table_ids, - vec![100, 101] + vec![100] ); assert_eq!( current_version @@ -2024,38 +2007,15 @@ async fn test_move_tables_between_compaction_group() { current_version.get_compaction_group_levels(2).levels[base_level - 1] .table_infos .len(), - 3 + 2 ); let level = ¤t_version .get_compaction_group_levels(new_group_id) .levels[base_level - 1]; assert_eq!(level.table_infos[0].table_ids, vec![100]); - assert_eq!(level.table_infos[1].table_ids, vec![100, 101]); + assert_eq!(level.table_infos[1].table_ids, vec![100]); assert_eq!(level.table_infos.len(), 2); - - let mut selector: Box = - Box::::default(); - - let compaction_task = hummock_manager - .get_compact_task(2, &mut selector) - .await - .unwrap() - .unwrap(); - assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1); - assert_eq!(compaction_task.input_ssts[0].table_infos[0].object_id, 12); - assert_eq!(compaction_task.existing_table_ids, vec![101]); - - let ret = hummock_manager - .report_compact_task( - compaction_task.task_id, - TaskStatus::Success, - vec![gen_sstable_info(20, 2, vec![101])], - None, - ) - .await - .unwrap(); - assert!(ret); } #[tokio::test] @@ -2324,7 +2284,7 @@ async fn test_unregister_moved_table() { assert_eq!(current_version.levels.len(), 3); assert_eq!( get_compaction_group_object_ids(¤t_version, 2), - vec![10, 11] + vec![11] ); assert_eq!( get_compaction_group_object_ids(¤t_version, new_group_id), @@ -2358,7 +2318,7 @@ async fn test_unregister_moved_table() { assert!(!current_version.levels.contains_key(&new_group_id)); assert_eq!( get_compaction_group_object_ids(¤t_version, 2), - vec![10, 11] + vec![11] ); assert_eq!( current_version diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index a69a8de234afc..cd84f07e291fe 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1018,6 +1018,7 @@ fn split_sst_info_for_level( new_sst_id, sst_info.file_size / 2, sst_info.file_size / 2, + member_table_ids.iter().cloned().collect_vec(), ); insert_table_infos.push(branch_sst); } @@ -1337,6 +1338,7 @@ pub fn split_sst( new_sst_id: &mut u64, old_sst_size: u64, new_sst_size: u64, + new_sst_table_ids: Vec, ) -> SstableInfo { let mut branch_table_info = sst_info.clone(); branch_table_info.sst_id = *new_sst_id; @@ -1345,6 +1347,21 @@ pub fn split_sst( sst_info.sst_id = *new_sst_id + 1; sst_info.file_size = old_sst_size; + { + // related github.com/risingwavelabs/risingwave/pull/17898/ + // This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898 + + let set1: HashSet<_> = sst_info.table_ids.iter().cloned().collect(); + let set2: HashSet<_> = new_sst_table_ids.iter().cloned().collect(); + let intersection: Vec<_> = set1.intersection(&set2).cloned().collect(); + + // Update table_ids + branch_table_info.table_ids = intersection; + sst_info + .table_ids + .retain(|table_id| !branch_table_info.table_ids.contains(table_id)); + } + *new_sst_id += 1; branch_table_info diff --git a/src/storage/hummock_sdk/src/table_stats.rs b/src/storage/hummock_sdk/src/table_stats.rs index 3ee38e11a174b..0ab7a252b8804 100644 --- a/src/storage/hummock_sdk/src/table_stats.rs +++ b/src/storage/hummock_sdk/src/table_stats.rs @@ -30,8 +30,9 @@ pub struct TableStats { pub total_value_size: i64, pub total_key_count: i64, - // `compressed_size_in_sstable`` represents the size that the table takes up in the output sst, and is only meaningful when `TableStats` is the sstable builder output. - pub compressed_size_in_sstable: u64, + // `total_compressed_size`` represents the size that the table takes up in the output sst + // and this field is only filled and used by CN flushes, not compactor compaction + pub total_compressed_size: u64, } impl From<&TableStats> for PbTableStats { @@ -40,7 +41,7 @@ impl From<&TableStats> for PbTableStats { total_key_size: value.total_key_size, total_value_size: value.total_value_size, total_key_count: value.total_key_count, - compressed_size_in_sstable: value.compressed_size_in_sstable, + total_compressed_size: value.total_compressed_size, } } } @@ -57,7 +58,7 @@ impl From<&PbTableStats> for TableStats { total_key_size: value.total_key_size, total_value_size: value.total_value_size, total_key_count: value.total_key_count, - compressed_size_in_sstable: value.compressed_size_in_sstable, + total_compressed_size: value.total_compressed_size, } } } @@ -67,7 +68,7 @@ impl TableStats { self.total_key_size += other.total_key_size; self.total_value_size += other.total_value_size; self.total_key_count += other.total_key_count; - self.compressed_size_in_sstable += other.compressed_size_in_sstable; + self.total_compressed_size += other.total_compressed_size; } } @@ -75,7 +76,7 @@ pub fn add_prost_table_stats(this: &mut PbTableStats, other: &PbTableStats) { this.total_key_size += other.total_key_size; this.total_value_size += other.total_value_size; this.total_key_count += other.total_key_count; - this.compressed_size_in_sstable += other.compressed_size_in_sstable; + this.total_compressed_size += other.total_compressed_size; } pub fn add_prost_table_stats_map(this: &mut PbTableStatsMap, other: &PbTableStatsMap) { diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index f5c6dd1479dca..593ad382a7980 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -205,8 +205,8 @@ impl SstableBuilder { while iter.is_valid() { let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| { panic!( - "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}", - self.sstable_id, self.block_metas.len(), self.last_table_id + "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?} table_id {}", + self.sstable_id, self.block_metas.len(), self.last_table_id, table_id ) }); self.add_impl(iter.key(), value, false).await?; @@ -220,10 +220,11 @@ impl SstableBuilder { assert_eq!( meta.len as usize, buf.len(), - "meta {} buf {} last_table_id {:?}", + "meta {} buf {} last_table_id {:?} table_id {}", meta.len, buf.len(), - self.last_table_id + self.last_table_id, + table_id ); meta.offset = self.writer.data_len() as u32; self.block_metas.push(meta); @@ -231,13 +232,6 @@ impl SstableBuilder { let block_meta = self.block_metas.last_mut().unwrap(); self.writer.write_block_bytes(buf, block_meta).await?; - self.last_table_stats.compressed_size_in_sstable = self - .block_metas - .iter() - .filter(|block_meta| block_meta.table_id().table_id() == self.last_table_id.unwrap()) - .map(|block_meta| block_meta.len as u64) - .sum::(); - if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id { self.table_ids.insert(table_id); self.finalize_last_table_stats(); @@ -310,15 +304,6 @@ impl SstableBuilder { self.build_block().await?; } - self.last_table_stats.compressed_size_in_sstable = self - .block_metas - .iter() - .filter(|block_meta| { - block_meta.table_id().table_id() == self.last_table_id.unwrap() - }) - .map(|block_meta| block_meta.len as u64) - .sum::(); - self.table_ids.insert(table_id); self.finalize_last_table_stats(); self.last_table_id = Some(table_id); @@ -389,13 +374,6 @@ impl SstableBuilder { let largest_key = self.last_full_key.clone(); self.build_block().await?; - self.last_table_stats.compressed_size_in_sstable = self - .block_metas - .iter() - .filter(|block_meta| block_meta.table_id().table_id() == self.last_table_id.unwrap()) - .map(|block_meta| block_meta.len as u64) - .sum::(); - self.finalize_last_table_stats(); let right_exclusive = false; let meta_offset = self.writer.data_len() as u64; @@ -637,6 +615,13 @@ impl SstableBuilder { return; } + self.last_table_stats.total_compressed_size = self + .block_metas + .iter() + .filter(|block_meta| block_meta.table_id().table_id() == self.last_table_id.unwrap()) + .map(|block_meta| block_meta.len as u64) + .sum::(); + self.table_stats.insert( self.last_table_id.unwrap(), std::mem::take(&mut self.last_table_stats), From e3956282db5055150354b16f959873a8aec6fcb2 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 22 Aug 2024 21:38:34 +0800 Subject: [PATCH 07/14] fix(compaction): Do not trigger SpaceReclaim compaction after split --- .../src/hummock/manager/compaction_group_manager.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 2dac4a843ead6..6e63cf90fa9c8 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -605,16 +605,6 @@ impl HummockManager { drop(compaction_guard); self.report_compact_tasks(canceled_tasks).await?; - // Don't trigger compactions if we enable deterministic compaction - if !self.env.opts.compaction_deterministic_test { - // commit_epoch may contains SSTs from any compaction group - self.try_send_compaction_request(parent_group_id, compact_task::TaskType::SpaceReclaim); - self.try_send_compaction_request( - target_compaction_group_id, - compact_task::TaskType::SpaceReclaim, - ); - } - self.metrics .move_state_table_count .with_label_values(&[&parent_group_id.to_string()]) From 196e2182b01086142517cdee3ac278969ff0723b Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 23 Aug 2024 13:21:37 +0800 Subject: [PATCH 08/14] fix: check --- src/meta/src/hummock/manager/compaction_group_manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 6e63cf90fa9c8..c68fc4222f283 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -30,8 +30,8 @@ use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, PbGroupConstruct, - PbGroupDestroy, PbStateTableInfoDelta, + CompactionConfig, CompactionGroupInfo, CompatibilityVersion, PbGroupConstruct, PbGroupDestroy, + PbStateTableInfoDelta, }; use tokio::sync::OnceCell; From 4bd6904ddcb7f0f0902b585418cc1781d15e276f Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 23 Aug 2024 15:35:37 +0800 Subject: [PATCH 09/14] address comments --- src/storage/src/hummock/sstable/builder.rs | 57 ++++++++++++++-------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 593ad382a7980..3eea5aced3c6f 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -205,8 +205,8 @@ impl SstableBuilder { while iter.is_valid() { let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| { panic!( - "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?} table_id {}", - self.sstable_id, self.block_metas.len(), self.last_table_id, table_id + "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}", + self.sstable_id, self.block_metas.len(), self.last_table_id ) }); self.add_impl(iter.key(), value, false).await?; @@ -220,11 +220,10 @@ impl SstableBuilder { assert_eq!( meta.len as usize, buf.len(), - "meta {} buf {} last_table_id {:?} table_id {}", + "meta {} buf {} last_table_id {:?}", meta.len, buf.len(), - self.last_table_id, - table_id + self.last_table_id ); meta.offset = self.writer.data_len() as u32; self.block_metas.push(meta); @@ -300,13 +299,12 @@ impl SstableBuilder { "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}", is_new_user_key, self.sstable_id, self.block_metas.len(), table_id, self.last_table_id, full_key ); - if !self.block_builder.is_empty() { - self.build_block().await?; - } - self.table_ids.insert(table_id); self.finalize_last_table_stats(); self.last_table_id = Some(table_id); + if !self.block_builder.is_empty() { + self.build_block().await?; + } } else if is_block_full && could_switch_block { self.build_block().await?; } @@ -372,9 +370,9 @@ impl SstableBuilder { self.block_metas[0].smallest_key.clone() }; let largest_key = self.last_full_key.clone(); - self.build_block().await?; - self.finalize_last_table_stats(); + + self.build_block().await?; let right_exclusive = false; let meta_offset = self.writer.data_len() as u64; @@ -533,6 +531,35 @@ impl SstableBuilder { let bloom_filter_size = meta.bloom_filter.len(); let sstable_file_size = sst_info.file_size as usize; + { + // fill total_compressed_size + + let mut last_total_compressed_size = 0; + let mut last_table_id = None; + for block_meta in &meta.block_metas { + let block_table_id = block_meta.table_id(); + if last_table_id.is_none() || last_table_id.unwrap() != block_table_id.table_id() { + if last_table_id.is_some() { + self.table_stats + .get_mut(&last_table_id.unwrap()) + .unwrap() + .total_compressed_size = last_total_compressed_size; + } + + last_table_id = Some(block_table_id.table_id()); + last_total_compressed_size = 0; + } + last_total_compressed_size += block_meta.len as u64; + } + + if last_total_compressed_size != 0 { + self.table_stats + .get_mut(&last_table_id.unwrap()) + .unwrap() + .total_compressed_size = last_total_compressed_size; + } + } + let writer_output = self.writer.finish(meta).await?; Ok(SstableBuilderOutput:: { sst_info: LocalSstableInfo::new(sst_info, self.table_stats), @@ -614,14 +641,6 @@ impl SstableBuilder { if self.table_ids.is_empty() || self.last_table_id.is_none() { return; } - - self.last_table_stats.total_compressed_size = self - .block_metas - .iter() - .filter(|block_meta| block_meta.table_id().table_id() == self.last_table_id.unwrap()) - .map(|block_meta| block_meta.len as u64) - .sum::(); - self.table_stats.insert( self.last_table_id.unwrap(), std::mem::take(&mut self.last_table_stats), From 90fca85aeda25b6fd6ce68c2615c291a59bd9673 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 23 Aug 2024 15:48:41 +0800 Subject: [PATCH 10/14] revert --- src/storage/src/hummock/sstable/builder.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 3eea5aced3c6f..6e647864f02a0 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -196,6 +196,11 @@ impl SstableBuilder { mut meta: BlockMeta, ) -> HummockResult { let table_id = smallest_key.user_key.table_id.table_id; + if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id { + self.table_ids.insert(table_id); + self.finalize_last_table_stats(); + self.last_table_id = Some(table_id); + } if !self.block_builder.is_empty() { let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4); if self.block_builder.approximate_len() < min_block_size { @@ -231,12 +236,6 @@ impl SstableBuilder { let block_meta = self.block_metas.last_mut().unwrap(); self.writer.write_block_bytes(buf, block_meta).await?; - if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id { - self.table_ids.insert(table_id); - self.finalize_last_table_stats(); - self.last_table_id = Some(table_id); - } - Ok(true) } From e930afcf48d02bae1a1b3b8569f5f3d0c9f26149 Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 26 Aug 2024 15:16:07 +0800 Subject: [PATCH 11/14] address comments --- src/storage/src/hummock/sstable/builder.rs | 26 +++++----------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 6e647864f02a0..a797237bab4a1 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -532,30 +532,16 @@ impl SstableBuilder { { // fill total_compressed_size - - let mut last_total_compressed_size = 0; - let mut last_table_id = None; + let mut last_table_id = meta.block_metas[0].table_id().table_id(); + let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap(); for block_meta in &meta.block_metas { let block_table_id = block_meta.table_id(); - if last_table_id.is_none() || last_table_id.unwrap() != block_table_id.table_id() { - if last_table_id.is_some() { - self.table_stats - .get_mut(&last_table_id.unwrap()) - .unwrap() - .total_compressed_size = last_total_compressed_size; - } - - last_table_id = Some(block_table_id.table_id()); - last_total_compressed_size = 0; + if last_table_id != block_table_id.table_id() { + last_table_id = block_table_id.table_id(); + last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap(); } - last_total_compressed_size += block_meta.len as u64; - } - if last_total_compressed_size != 0 { - self.table_stats - .get_mut(&last_table_id.unwrap()) - .unwrap() - .total_compressed_size = last_total_compressed_size; + last_table_stats.total_compressed_size += block_meta.len as u64; } } From e3bcebaa17028b57a7009a2ccd1f458652e05cec Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 26 Aug 2024 21:05:12 +0800 Subject: [PATCH 12/14] fix(storage): fix ut --- src/storage/src/hummock/sstable/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index a797237bab4a1..d5e1ef1be921d 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -530,7 +530,7 @@ impl SstableBuilder { let bloom_filter_size = meta.bloom_filter.len(); let sstable_file_size = sst_info.file_size as usize; - { + if !meta.block_metas.is_empty() { // fill total_compressed_size let mut last_table_id = meta.block_metas[0].table_id().table_id(); let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap(); From 096b398c7aa410e1ce17635f95cad03da80d7b9c Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 26 Aug 2024 21:28:11 +0800 Subject: [PATCH 13/14] typo --- .../picker/trivial_move_compaction_picker.rs | 3 ++- .../compaction/picker/vnode_watermark_picker.rs | 2 +- src/meta/src/hummock/manager/commit_epoch.rs | 2 +- src/meta/src/manager/diagnose.rs | 2 +- .../src/compaction_group/hummock_version_ext.rs | 8 ++++---- src/storage/src/hummock/compactor/compaction_utils.rs | 10 +++++----- .../src/hummock/compactor/fast_compactor_runner.rs | 4 ++-- 7 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs index 6e172f15d3804..458919cd8b717 100644 --- a/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs @@ -55,7 +55,7 @@ impl TrivialMovePicker { ) -> Option { let mut skip_by_pending = false; for sst in select_tables { - if sst.file_size < self.sst_allowed_trivial_move_min_size { + if sst.sst_size < self.sst_allowed_trivial_move_min_size { continue; } @@ -128,6 +128,7 @@ pub mod tests { let sst = SstableInfo { sst_id: 1, file_size: 100, + sst_size: 100, ..Default::default() }; diff --git a/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs b/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs index 50a33c7d42e4f..5171c48ad9c34 100644 --- a/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs +++ b/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs @@ -50,7 +50,7 @@ impl VnodeWatermarkCompactionPicker { return None; } Some(CompactionInput { - select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(), total_file_count: select_input_ssts.len() as u64, input_levels: vec![ InputLevel { diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 6c56890c07a1d..0853e28fcb572 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -441,7 +441,7 @@ impl HummockManager { continue; } - let origin_sst_size = sst.sst_info.file_size; + let origin_sst_size = sst.sst_info.sst_size; let new_sst_size = match_ids .iter() .map(|id| { diff --git a/src/meta/src/manager/diagnose.rs b/src/meta/src/manager/diagnose.rs index e50242a9d44e6..afacce8dfdecb 100644 --- a/src/meta/src/manager/diagnose.rs +++ b/src/meta/src/manager/diagnose.rs @@ -475,7 +475,7 @@ impl DiagnoseCommand { let mut visit_level = |level: &Level| { sst_num += level.table_infos.len(); sst_total_file_size += - level.table_infos.iter().map(|t| t.file_size).sum::(); + level.table_infos.iter().map(|t| t.sst_size).sum::(); for sst in &level.table_infos { if sst.total_key_count == 0 { continue; diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 90278b661738f..d1db6ad4ff87d 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1016,8 +1016,8 @@ fn split_sst_info_for_level( let branch_sst = split_sst( sst_info, new_sst_id, - sst_info.file_size / 2, - sst_info.file_size / 2, + sst_info.sst_size / 2, + sst_info.sst_size / 2, member_table_ids.iter().cloned().collect_vec(), ); insert_table_infos.push(branch_sst); @@ -1342,10 +1342,10 @@ pub fn split_sst( ) -> SstableInfo { let mut branch_table_info = sst_info.clone(); branch_table_info.sst_id = *new_sst_id; - branch_table_info.file_size = new_sst_size; + branch_table_info.sst_size = new_sst_size; sst_info.sst_id = *new_sst_id + 1; - sst_info.file_size = old_sst_size; + sst_info.sst_size = old_sst_size; { // related github.com/risingwavelabs/risingwave/pull/17898/ diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index e579534427baa..093612623cce8 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -507,7 +507,7 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorCon .collect_vec(); let compaction_size = sstable_infos .iter() - .map(|table_info| table_info.file_size) + .map(|table_info| table_info.sst_size) .sum::(); let all_ssts_are_blocked_filter = sstable_infos @@ -574,7 +574,7 @@ pub async fn generate_splits_for_task( .collect_vec(); let compaction_size = sstable_infos .iter() - .map(|table_info| table_info.file_size) + .map(|table_info| table_info.sst_size) .sum::(); if !optimize_by_copy_block { @@ -611,7 +611,7 @@ pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorCo .collect_vec(); let select_size = select_table_infos .iter() - .map(|table| table.file_size) + .map(|table| table.sst_size) .sum::(); context .compactor_metrics @@ -624,7 +624,7 @@ pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorCo .with_label_values(&[&group_label, &cur_level_label]) .inc_by(select_table_infos.len() as u64); - let target_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::(); + let target_level_read_bytes = target_table_infos.iter().map(|t| t.sst_size).sum::(); let next_level_label = compact_task.target_level.to_string(); context .compactor_metrics @@ -659,7 +659,7 @@ pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &Compacto .collect_vec(); let compaction_size = sstable_infos .iter() - .map(|table_info| table_info.file_size) + .map(|table_info| table_info.sst_size) .sum::(); let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; calculate_task_parallelism_impl( diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 8cf65e66c1de7..9851afc47f148 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -492,10 +492,10 @@ impl CompactorRunner { } let mut total_read_bytes = 0; for sst in &self.left.sstables { - total_read_bytes += sst.file_size; + total_read_bytes += sst.sst_size; } for sst in &self.right.sstables { - total_read_bytes += sst.file_size; + total_read_bytes += sst.sst_size; } self.metrics .compact_fast_runner_bytes From 3d26a6c82a1444ed657dc505fd181857b4ac4fe4 Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 26 Aug 2024 21:35:21 +0800 Subject: [PATCH 14/14] fix(storage): fix object_size_map --- .../hummock_sdk/src/compaction_group/hummock_version_ext.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index d1db6ad4ff87d..ca6585f46fd51 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1226,7 +1226,7 @@ pub fn object_size_map(version: &HummockVersion) -> HashMap