Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Nov 19, 2024
1 parent 579cf6d commit 76494c1
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 86 deletions.
96 changes: 67 additions & 29 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
Expand Down Expand Up @@ -112,7 +113,7 @@ impl HummockManager {
let state_table_info = &version.latest_version().state_table_info;
let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
let mut new_table_ids = HashMap::new();
let mut compaction_groups = HashMap::new();
let mut new_compaction_groups = Vec::new();
let mut compaction_group_manager_txn = None;
let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;

Expand Down Expand Up @@ -143,17 +144,13 @@ impl HummockManager {
)
};
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
compaction_groups.insert(
new_compaction_group_id,
(true, compaction_group_config.clone()),
);
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config,
},
);
let new_compaction_group = CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config.clone(),
};

new_compaction_groups.push(new_compaction_group.clone());
compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);

on_handle_add_new_table(
state_table_info,
Expand All @@ -168,35 +165,35 @@ impl HummockManager {
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
.await?;

let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();
let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
// fill compaction_groups
let mut group_id_to_config = HashMap::new();
if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
for cg_id in &modified_compaction_groups {
if !compaction_groups.contains_key(cg_id) {
let compaction_group = compaction_group_manager
.get(cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
compaction_groups.insert(*cg_id, (false, compaction_group));
}
let compaction_group = compaction_group_manager
.get(cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
group_id_to_config.insert(*cg_id, compaction_group);
}
} else {
let compaction_group_manager = self.compaction_group_manager.read().await;
for cg_id in &modified_compaction_groups {
if !compaction_groups.contains_key(cg_id) {
let compaction_group = compaction_group_manager
.try_get_compaction_group_config(*cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
compaction_groups.insert(*cg_id, (false, compaction_group));
}
let compaction_group = compaction_group_manager
.try_get_compaction_group_config(*cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
group_id_to_config.insert(*cg_id, compaction_group);
}
}

let group_id_to_sub_levels =
rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);

let time_travel_delta = version.pre_commit_epoch(
&tables_to_commit,
compaction_groups,
commit_sstables,
new_compaction_groups,
group_id_to_sub_levels,
&new_table_ids,
new_table_watermarks,
change_log_delta,
Expand Down Expand Up @@ -445,3 +442,44 @@ fn on_handle_add_new_table(

Ok(())
}

/// Rewrite the commit sstables to sub-levels based on the compaction group config.
/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead.
fn rewrite_commit_sstables_to_sub_level(
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
for (group_id, inserted_table_infos) in commit_sstables {
let config = group_id_to_config
.get(&group_id)
.expect("compaction group should exist");

let mut accumulated_size = 0;
let mut ssts = vec![];
let sub_level_size_limit = config.sub_level_max_compaction_bytes * 2; // TODO: use config instead of magic number

let level = overlapping_sstables.entry(group_id).or_default();

for sst in inserted_table_infos {
accumulated_size += sst.sst_size;
ssts.push(sst);
if accumulated_size > sub_level_size_limit {
level.push(ssts);

// reset the accumulated size and ssts
accumulated_size = 0;
ssts = vec![];
}
}

if accumulated_size != 0 {
level.push(ssts);
}

// The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top.
level.reverse();
}

overlapping_sstables
}
81 changes: 24 additions & 57 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::{BTreeMap, HashMap};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
Expand All @@ -24,11 +23,12 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId};
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas,
HummockVersionStats, StateTableInfoDelta,
CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
StateTableInfoDelta,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};

use crate::hummock::model::CompactionGroup;
use crate::manager::NotificationManager;
use crate::model::{
InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
Expand Down Expand Up @@ -111,8 +111,8 @@ impl<'a> HummockVersionTransaction<'a> {
pub(super) fn pre_commit_epoch(
&mut self,
tables_to_commit: &HashMap<TableId, u64>,
compaction_groups: HashMap<CompactionGroupId, (bool, Arc<CompactionConfig>)>,
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
new_compaction_groups: Vec<CompactionGroup>,
group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
new_table_ids: &HashMap<TableId, CompactionGroupId>,
new_table_watermarks: HashMap<TableId, TableWatermarks>,
change_log_delta: HashMap<TableId, ChangeLogDelta>,
Expand All @@ -121,68 +121,35 @@ impl<'a> HummockVersionTransaction<'a> {
new_version_delta.new_table_watermarks = new_table_watermarks;
new_version_delta.change_log_delta = change_log_delta;

for (compaction_group_id, (is_new, compaction_group_config)) in &compaction_groups {
{
if !is_new {
continue;
}
for compaction_group in &new_compaction_groups {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group.group_id())
.or_default()
.group_deltas;

let group_deltas = &mut new_version_delta
.group_deltas
.entry(*compaction_group_id)
.or_default()
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some((**compaction_group_config).clone()),
group_id: *compaction_group_id,
parent_group_id: StaticCompactionGroupId::NewCompactionGroup
as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32,
split_key: None,
}));
}
#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some(compaction_group.compaction_config().as_ref().clone()),
group_id: compaction_group.group_id(),
parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32,
split_key: None,
}));
}

// Append SSTs to a new version.
for (compaction_group_id, inserted_table_infos) in commit_sstables {
let mut accumulated_size = 0;
let mut ssts = vec![];
let (_, config) = compaction_groups
.get(&compaction_group_id)
.unwrap_or_else(|| {
panic!(
"compaction group {:?} not found in compaction_groups",
compaction_group_id
)
});
let sub_level_size_limit = config.sub_level_max_compaction_bytes * 2;

for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;

for sst in inserted_table_infos {
accumulated_size += sst.sst_size;
ssts.push(sst);
if accumulated_size > sub_level_size_limit {
let group_delta = GroupDelta::NewL0SubLevel(ssts);
group_deltas.push(group_delta);

// reset the accumulated size and ssts
accumulated_size = 0;
ssts = vec![];
}
}

if accumulated_size != 0 {
let group_delta = GroupDelta::NewL0SubLevel(ssts);
group_deltas.push(group_delta);
for sub_level in sub_levels {
group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
}
}

Expand Down

0 comments on commit 76494c1

Please sign in to comment.