-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(compaction): Limit the size of the new overlapping level #19277
Merged
Merged
Changes from 8 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
0f7d928
feat(compaction): Limit the size of the new overlapping level
Li0k 579cf6d
change metrics bucket
Li0k 76494c1
refactor
Li0k c8a6fab
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k 868bef9
feat(config): add config for calculate max_overlapping_level_size
Li0k 93df881
refactor
Li0k 2c5eee4
refactor
Li0k d75e739
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k c6a4a0a
address comments
Li0k e4bc0a7
fix
Li0k e840af8
address comments
Li0k 206af73
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,9 @@ | |
use std::collections::{BTreeMap, HashMap, HashSet}; | ||
use std::sync::Arc; | ||
|
||
use itertools::Itertools; | ||
use risingwave_common::catalog::TableId; | ||
use risingwave_common::config::default::compaction_config; | ||
use risingwave_common::system_param::reader::SystemParamsRead; | ||
use risingwave_hummock_sdk::change_log::ChangeLogDelta; | ||
use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids; | ||
|
@@ -112,7 +114,7 @@ impl HummockManager { | |
let state_table_info = &version.latest_version().state_table_info; | ||
let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); | ||
let mut new_table_ids = HashMap::new(); | ||
let mut new_compaction_groups = HashMap::new(); | ||
let mut new_compaction_groups = Vec::new(); | ||
let mut compaction_group_manager_txn = None; | ||
let mut compaction_group_config: Option<Arc<CompactionConfig>> = None; | ||
|
||
|
@@ -143,14 +145,13 @@ impl HummockManager { | |
) | ||
}; | ||
let new_compaction_group_id = next_compaction_group_id(&self.env).await?; | ||
new_compaction_groups.insert(new_compaction_group_id, compaction_group_config.clone()); | ||
compaction_group_manager.insert( | ||
new_compaction_group_id, | ||
CompactionGroup { | ||
group_id: new_compaction_group_id, | ||
compaction_config: compaction_group_config, | ||
}, | ||
); | ||
let new_compaction_group = CompactionGroup { | ||
group_id: new_compaction_group_id, | ||
compaction_config: compaction_group_config.clone(), | ||
}; | ||
|
||
new_compaction_groups.push(new_compaction_group.clone()); | ||
compaction_group_manager.insert(new_compaction_group_id, new_compaction_group); | ||
|
||
on_handle_add_new_table( | ||
state_table_info, | ||
|
@@ -165,12 +166,35 @@ impl HummockManager { | |
.correct_commit_ssts(sstables, &table_compaction_group_mapping) | ||
.await?; | ||
|
||
let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); | ||
let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec(); | ||
// fill compaction_groups | ||
let mut group_id_to_config = HashMap::new(); | ||
if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() { | ||
for cg_id in &modified_compaction_groups { | ||
let compaction_group = compaction_group_manager | ||
.get(cg_id) | ||
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) | ||
.compaction_config(); | ||
group_id_to_config.insert(*cg_id, compaction_group); | ||
} | ||
} else { | ||
let compaction_group_manager = self.compaction_group_manager.read().await; | ||
for cg_id in &modified_compaction_groups { | ||
let compaction_group = compaction_group_manager | ||
.try_get_compaction_group_config(*cg_id) | ||
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) | ||
.compaction_config(); | ||
group_id_to_config.insert(*cg_id, compaction_group); | ||
} | ||
} | ||
|
||
let group_id_to_sub_levels = | ||
rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config); | ||
|
||
let time_travel_delta = version.pre_commit_epoch( | ||
&tables_to_commit, | ||
new_compaction_groups, | ||
commit_sstables, | ||
group_id_to_sub_levels, | ||
&new_table_ids, | ||
new_table_watermarks, | ||
change_log_delta, | ||
|
@@ -419,3 +443,46 @@ fn on_handle_add_new_table( | |
|
||
Ok(()) | ||
} | ||
|
||
/// Rewrite the commit sstables to sub-levels based on the compaction group config. | ||
/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead. | ||
fn rewrite_commit_sstables_to_sub_level( | ||
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>, | ||
group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>, | ||
) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> { | ||
let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new(); | ||
for (group_id, inserted_table_infos) in commit_sstables { | ||
let config = group_id_to_config | ||
.get(&group_id) | ||
.expect("compaction group should exist"); | ||
|
||
let mut accumulated_size = 0; | ||
let mut ssts = vec![]; | ||
let sub_level_size_limit = config | ||
.max_overlapping_level_size | ||
.unwrap_or(compaction_config::max_overlapping_level_size()); | ||
|
||
let level = overlapping_sstables.entry(group_id).or_default(); | ||
|
||
for sst in inserted_table_infos { | ||
accumulated_size += sst.sst_size; | ||
ssts.push(sst); | ||
if accumulated_size > sub_level_size_limit { | ||
level.push(ssts); | ||
|
||
// reset the accumulated size and ssts | ||
accumulated_size = 0; | ||
ssts = vec![]; | ||
} | ||
} | ||
|
||
if accumulated_size != 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is better to check |
||
level.push(ssts); | ||
} | ||
|
||
// The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top. | ||
level.reverse(); | ||
} | ||
|
||
overlapping_sstables | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct_commit_ssts
which generatescommit_sstables
, should ensure the relateive SST order is correct.Rest LGTM.