Skip to content

Commit

Permalink
feat(compaction): Limit the size of the new overlapping level (#19277)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Nov 22, 2024
1 parent 22fce59 commit 07f4f47
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 41 deletions.
5 changes: 5 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ message RiseCtlUpdateCompactionConfigRequest {
uint64 sst_allowed_trivial_move_min_size = 19;
uint32 split_weight_by_vnode = 20;
bool disable_auto_group_scheduling = 21;
uint64 max_overlapping_level_size = 22;
}
}
repeated uint64 compaction_group_ids = 1;
Expand Down Expand Up @@ -858,6 +859,10 @@ message CompactionConfig {

// The limitation of auto group scheduling
optional bool disable_auto_group_scheduling = 23;

// The limitation of the max size of the overlapping-level for the compaction
// hummock will reorg the commit-sstables to the multi overlapping-level if the size of the commit-sstables is larger than `max_overlapping_level_size`
optional uint64 max_overlapping_level_size = 24;
}

message TableStats {
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2237,6 +2237,10 @@ pub mod default {
pub fn disable_auto_group_scheduling() -> bool {
false
}

pub fn max_overlapping_level_size() -> u64 {
256 * MB
}
}

pub mod object_store_config {
Expand Down
4 changes: 4 additions & 0 deletions src/ctl/src/cmd_impl/hummock/compaction_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub fn build_compaction_config_vec(
max_l0_compact_level: Option<u32>,
sst_allowed_trivial_move_min_size: Option<u64>,
disable_auto_group_scheduling: Option<bool>,
max_overlapping_level_size: Option<u64>,
) -> Vec<MutableConfig> {
let mut configs = vec![];
if let Some(c) = max_bytes_for_level_base {
Expand Down Expand Up @@ -127,6 +128,9 @@ pub fn build_compaction_config_vec(
if let Some(c) = disable_auto_group_scheduling {
configs.push(MutableConfig::DisableAutoGroupScheduling(c))
}
if let Some(c) = max_overlapping_level_size {
configs.push(MutableConfig::MaxOverlappingLevelSize(c))
}

configs
}
Expand Down
5 changes: 5 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ enum ComputeCommands {
ShowConfig { host: String },
}

#[allow(clippy::large_enum_variant)]
#[derive(Subcommand)]
enum HummockCommands {
/// list latest Hummock version on meta node
Expand Down Expand Up @@ -191,6 +192,8 @@ enum HummockCommands {
sst_allowed_trivial_move_min_size: Option<u64>,
#[clap(long)]
disable_auto_group_scheduling: Option<bool>,
#[clap(long)]
max_overlapping_level_size: Option<u64>,
},
/// Split given compaction group into two. Moves the given tables to the new group.
SplitCompactionGroup {
Expand Down Expand Up @@ -578,6 +581,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
max_l0_compact_level,
sst_allowed_trivial_move_min_size,
disable_auto_group_scheduling,
max_overlapping_level_size,
}) => {
cmd_impl::hummock::update_compaction_config(
context,
Expand Down Expand Up @@ -610,6 +614,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
max_l0_compact_level,
sst_allowed_trivial_move_min_size,
disable_auto_group_scheduling,
max_overlapping_level_size,
),
)
.await?
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/compaction/compaction_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl CompactionConfigBuilder {
disable_auto_group_scheduling: Some(
compaction_config::disable_auto_group_scheduling(),
),
max_overlapping_level_size: Some(compaction_config::max_overlapping_level_size()),
},
}
}
Expand Down
107 changes: 96 additions & 11 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::default::compaction_config;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
Expand Down Expand Up @@ -112,7 +114,7 @@ impl HummockManager {
let state_table_info = &version.latest_version().state_table_info;
let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
let mut new_table_ids = HashMap::new();
let mut new_compaction_groups = HashMap::new();
let mut new_compaction_groups = Vec::new();
let mut compaction_group_manager_txn = None;
let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;

Expand Down Expand Up @@ -143,14 +145,13 @@ impl HummockManager {
)
};
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
new_compaction_groups.insert(new_compaction_group_id, compaction_group_config.clone());
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config,
},
);
let new_compaction_group = CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config.clone(),
};

new_compaction_groups.push(new_compaction_group.clone());
compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);

on_handle_add_new_table(
state_table_info,
Expand All @@ -165,12 +166,35 @@ impl HummockManager {
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
.await?;

let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();
let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
// fill compaction_groups
let mut group_id_to_config = HashMap::new();
if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
for cg_id in &modified_compaction_groups {
let compaction_group = compaction_group_manager
.get(cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
group_id_to_config.insert(*cg_id, compaction_group);
}
} else {
let compaction_group_manager = self.compaction_group_manager.read().await;
for cg_id in &modified_compaction_groups {
let compaction_group = compaction_group_manager
.try_get_compaction_group_config(*cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
group_id_to_config.insert(*cg_id, compaction_group);
}
}

let group_id_to_sub_levels =
rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);

let time_travel_delta = version.pre_commit_epoch(
&tables_to_commit,
new_compaction_groups,
commit_sstables,
group_id_to_sub_levels,
&new_table_ids,
new_table_watermarks,
change_log_delta,
Expand Down Expand Up @@ -327,6 +351,7 @@ impl HummockManager {
) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
let mut new_sst_id_number = 0;
let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
for commit_sst in sstables {
let mut group_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
for table_id in &commit_sst.sst_info.table_ids {
Expand Down Expand Up @@ -395,6 +420,12 @@ impl HummockManager {
}
}

// order check
for ssts in commit_sstables.values() {
let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
}

Ok(commit_sstables)
}
}
Expand All @@ -419,3 +450,57 @@ fn on_handle_add_new_table(

Ok(())
}

/// Rewrite the commit sstables to sub-levels based on the compaction group config.
/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead.
fn rewrite_commit_sstables_to_sub_level(
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
for (group_id, inserted_table_infos) in commit_sstables {
let config = group_id_to_config
.get(&group_id)
.expect("compaction group should exist");

let mut accumulated_size = 0;
let mut ssts = vec![];
let sub_level_size_limit = config
.max_overlapping_level_size
.unwrap_or(compaction_config::max_overlapping_level_size());

let level = overlapping_sstables.entry(group_id).or_default();

for sst in inserted_table_infos {
accumulated_size += sst.sst_size;
ssts.push(sst);
if accumulated_size > sub_level_size_limit {
level.push(ssts);

// reset the accumulated size and ssts
accumulated_size = 0;
ssts = vec![];
}
}

if !ssts.is_empty() {
level.push(ssts);
}

// The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top.
level.reverse();
}

overlapping_sstables
}

fn is_ordered_subset(vec_1: &Vec<u64>, vec_2: &Vec<u64>) -> bool {
let mut vec_2_iter = vec_2.iter().peekable();
for item in vec_1 {
if vec_2_iter.peek() == Some(&item) {
vec_2_iter.next();
}
}

vec_2_iter.peek().is_none()
}
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi
MutableConfig::DisableAutoGroupScheduling(c) => {
target.disable_auto_group_scheduling = Some(*c);
}
MutableConfig::MaxOverlappingLevelSize(c) => {
target.max_overlapping_level_size = Some(*c);
}
}
}
}
Expand Down
54 changes: 26 additions & 28 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::{BTreeMap, HashMap};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
Expand All @@ -24,11 +23,12 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId};
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas,
HummockVersionStats, StateTableInfoDelta,
CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
StateTableInfoDelta,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};

use crate::hummock::model::CompactionGroup;
use crate::manager::NotificationManager;
use crate::model::{
InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
Expand Down Expand Up @@ -111,8 +111,8 @@ impl<'a> HummockVersionTransaction<'a> {
pub(super) fn pre_commit_epoch(
&mut self,
tables_to_commit: &HashMap<TableId, u64>,
new_compaction_groups: HashMap<CompactionGroupId, Arc<CompactionConfig>>,
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
new_compaction_groups: Vec<CompactionGroup>,
group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
new_table_ids: &HashMap<TableId, CompactionGroupId>,
new_table_watermarks: HashMap<TableId, TableWatermarks>,
change_log_delta: HashMap<TableId, ChangeLogDelta>,
Expand All @@ -121,38 +121,36 @@ impl<'a> HummockVersionTransaction<'a> {
new_version_delta.new_table_watermarks = new_table_watermarks;
new_version_delta.change_log_delta = change_log_delta;

for (compaction_group_id, compaction_group_config) in new_compaction_groups {
{
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some((*compaction_group_config).clone()),
group_id: compaction_group_id,
parent_group_id: StaticCompactionGroupId::NewCompactionGroup
as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32,
split_key: None,
}));
}
for compaction_group in &new_compaction_groups {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group.group_id())
.or_default()
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some(compaction_group.compaction_config().as_ref().clone()),
group_id: compaction_group.group_id(),
parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32,
split_key: None,
}));
}

// Append SSTs to a new version.
for (compaction_group_id, inserted_table_infos) in commit_sstables {
for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;
let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos);

group_deltas.push(group_delta);
for sub_level in sub_levels {
group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
}
}

// update state table info
Expand Down
Loading

0 comments on commit 07f4f47

Please sign in to comment.