Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compaction): Limit the size of the new overlapping level #19277

Merged
merged 12 commits into from
Nov 22, 2024
5 changes: 5 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ message RiseCtlUpdateCompactionConfigRequest {
uint64 sst_allowed_trivial_move_min_size = 19;
uint32 split_weight_by_vnode = 20;
bool disable_auto_group_scheduling = 21;
uint64 max_overlapping_level_size = 22;
}
}
repeated uint64 compaction_group_ids = 1;
Expand Down Expand Up @@ -858,6 +859,10 @@ message CompactionConfig {

// The limitation of auto group scheduling
optional bool disable_auto_group_scheduling = 23;

// The limitation of the max size of the overlapping-level for the compaction
// hummock will reorg the commit-sstables to the multi overlapping-level if the size of the commit-sstables is larger than `max_overlapping_level_size`
optional uint64 max_overlapping_level_size = 24;
}

message TableStats {
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2237,6 +2237,10 @@ pub mod default {
pub fn disable_auto_group_scheduling() -> bool {
false
}

pub fn max_overlapping_level_size() -> u64 {
256 * MB
}
}

pub mod object_store_config {
Expand Down
4 changes: 4 additions & 0 deletions src/ctl/src/cmd_impl/hummock/compaction_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub fn build_compaction_config_vec(
max_l0_compact_level: Option<u32>,
sst_allowed_trivial_move_min_size: Option<u64>,
disable_auto_group_scheduling: Option<bool>,
max_overlapping_level_size: Option<u64>,
) -> Vec<MutableConfig> {
let mut configs = vec![];
if let Some(c) = max_bytes_for_level_base {
Expand Down Expand Up @@ -127,6 +128,9 @@ pub fn build_compaction_config_vec(
if let Some(c) = disable_auto_group_scheduling {
configs.push(MutableConfig::DisableAutoGroupScheduling(c))
}
if let Some(c) = max_overlapping_level_size {
configs.push(MutableConfig::MaxOverlappingLevelSize(c))
}

configs
}
Expand Down
5 changes: 5 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ enum ComputeCommands {
ShowConfig { host: String },
}

#[allow(clippy::large_enum_variant)]
#[derive(Subcommand)]
enum HummockCommands {
/// list latest Hummock version on meta node
Expand Down Expand Up @@ -191,6 +192,8 @@ enum HummockCommands {
sst_allowed_trivial_move_min_size: Option<u64>,
#[clap(long)]
disable_auto_group_scheduling: Option<bool>,
#[clap(long)]
max_overlapping_level_size: Option<u64>,
},
/// Split given compaction group into two. Moves the given tables to the new group.
SplitCompactionGroup {
Expand Down Expand Up @@ -578,6 +581,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
max_l0_compact_level,
sst_allowed_trivial_move_min_size,
disable_auto_group_scheduling,
max_overlapping_level_size,
}) => {
cmd_impl::hummock::update_compaction_config(
context,
Expand Down Expand Up @@ -610,6 +614,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
max_l0_compact_level,
sst_allowed_trivial_move_min_size,
disable_auto_group_scheduling,
max_overlapping_level_size,
),
)
.await?
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/compaction/compaction_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl CompactionConfigBuilder {
disable_auto_group_scheduling: Some(
compaction_config::disable_auto_group_scheduling(),
),
max_overlapping_level_size: Some(compaction_config::max_overlapping_level_size()),
},
}
}
Expand Down
107 changes: 96 additions & 11 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::default::compaction_config;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
Expand Down Expand Up @@ -112,7 +114,7 @@ impl HummockManager {
let state_table_info = &version.latest_version().state_table_info;
let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
let mut new_table_ids = HashMap::new();
let mut new_compaction_groups = HashMap::new();
let mut new_compaction_groups = Vec::new();
let mut compaction_group_manager_txn = None;
let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;

Expand Down Expand Up @@ -143,14 +145,13 @@ impl HummockManager {
)
};
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
new_compaction_groups.insert(new_compaction_group_id, compaction_group_config.clone());
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config,
},
);
let new_compaction_group = CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config.clone(),
};

new_compaction_groups.push(new_compaction_group.clone());
compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);

on_handle_add_new_table(
state_table_info,
Expand All @@ -165,12 +166,35 @@ impl HummockManager {
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
.await?;

let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();
let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
// fill compaction_groups
let mut group_id_to_config = HashMap::new();
if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
for cg_id in &modified_compaction_groups {
let compaction_group = compaction_group_manager
.get(cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
group_id_to_config.insert(*cg_id, compaction_group);
}
} else {
let compaction_group_manager = self.compaction_group_manager.read().await;
for cg_id in &modified_compaction_groups {
let compaction_group = compaction_group_manager
.try_get_compaction_group_config(*cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
group_id_to_config.insert(*cg_id, compaction_group);
}
}

let group_id_to_sub_levels =
rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);

let time_travel_delta = version.pre_commit_epoch(
&tables_to_commit,
new_compaction_groups,
commit_sstables,
group_id_to_sub_levels,
&new_table_ids,
new_table_watermarks,
change_log_delta,
Expand Down Expand Up @@ -327,6 +351,7 @@ impl HummockManager {
) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
let mut new_sst_id_number = 0;
let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
for commit_sst in sstables {
let mut group_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
for table_id in &commit_sst.sst_info.table_ids {
Expand Down Expand Up @@ -395,6 +420,12 @@ impl HummockManager {
}
}

// order check
for ssts in commit_sstables.values() {
let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
}

Ok(commit_sstables)
}
}
Expand All @@ -419,3 +450,57 @@ fn on_handle_add_new_table(

Ok(())
}

/// Rewrite the commit sstables to sub-levels based on the compaction group config.
/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead.
fn rewrite_commit_sstables_to_sub_level(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct_commit_ssts which generatescommit_sstables, should ensure the relateive SST order is correct.
Rest LGTM.

commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
for (group_id, inserted_table_infos) in commit_sstables {
let config = group_id_to_config
.get(&group_id)
.expect("compaction group should exist");

let mut accumulated_size = 0;
let mut ssts = vec![];
let sub_level_size_limit = config
.max_overlapping_level_size
.unwrap_or(compaction_config::max_overlapping_level_size());

let level = overlapping_sstables.entry(group_id).or_default();

for sst in inserted_table_infos {
accumulated_size += sst.sst_size;
ssts.push(sst);
if accumulated_size > sub_level_size_limit {
level.push(ssts);

// reset the accumulated size and ssts
accumulated_size = 0;
ssts = vec![];
}
}

if !ssts.is_empty() {
level.push(ssts);
}

// The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top.
level.reverse();
}

overlapping_sstables
}

fn is_ordered_subset(vec_1: &Vec<u64>, vec_2: &Vec<u64>) -> bool {
let mut vec_2_iter = vec_2.iter().peekable();
for item in vec_1 {
if vec_2_iter.peek() == Some(&item) {
vec_2_iter.next();
}
}

vec_2_iter.peek().is_none()
}
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi
MutableConfig::DisableAutoGroupScheduling(c) => {
target.disable_auto_group_scheduling = Some(*c);
}
MutableConfig::MaxOverlappingLevelSize(c) => {
target.max_overlapping_level_size = Some(*c);
}
}
}
}
Expand Down
54 changes: 26 additions & 28 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::{BTreeMap, HashMap};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
Expand All @@ -24,11 +23,12 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId};
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas,
HummockVersionStats, StateTableInfoDelta,
CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
StateTableInfoDelta,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};

use crate::hummock::model::CompactionGroup;
use crate::manager::NotificationManager;
use crate::model::{
InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
Expand Down Expand Up @@ -111,8 +111,8 @@ impl<'a> HummockVersionTransaction<'a> {
pub(super) fn pre_commit_epoch(
&mut self,
tables_to_commit: &HashMap<TableId, u64>,
new_compaction_groups: HashMap<CompactionGroupId, Arc<CompactionConfig>>,
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
new_compaction_groups: Vec<CompactionGroup>,
group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
new_table_ids: &HashMap<TableId, CompactionGroupId>,
new_table_watermarks: HashMap<TableId, TableWatermarks>,
change_log_delta: HashMap<TableId, ChangeLogDelta>,
Expand All @@ -121,38 +121,36 @@ impl<'a> HummockVersionTransaction<'a> {
new_version_delta.new_table_watermarks = new_table_watermarks;
new_version_delta.change_log_delta = change_log_delta;

for (compaction_group_id, compaction_group_config) in new_compaction_groups {
{
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some((*compaction_group_config).clone()),
group_id: compaction_group_id,
parent_group_id: StaticCompactionGroupId::NewCompactionGroup
as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32,
split_key: None,
}));
}
for compaction_group in &new_compaction_groups {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group.group_id())
.or_default()
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some(compaction_group.compaction_config().as_ref().clone()),
group_id: compaction_group.group_id(),
parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32,
split_key: None,
}));
}

// Append SSTs to a new version.
for (compaction_group_id, inserted_table_infos) in commit_sstables {
for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;
let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos);

group_deltas.push(group_delta);
for sub_level in sub_levels {
group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
}
}

// update state table info
Expand Down
Loading
Loading