Skip to content

Commit

Permalink
feat(storage): Support for repairing the size of a split sst based on…
Browse files Browse the repository at this point in the history
… table stats (#18053)
  • Loading branch information
Li0k authored Aug 27, 2024
1 parent e3a9d37 commit 79d9810
Show file tree
Hide file tree
Showing 15 changed files with 141 additions and 86 deletions.
4 changes: 4 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,10 @@ message TableStats {
int64 total_key_size = 1;
int64 total_value_size = 2;
int64 total_key_count = 3;

// `total_compressed_size`` represents the size that the table takes up in the output sst
// and this field is only filled and used by CN flushes, not compactor compaction
uint64 total_compressed_size = 4;
}

message HummockVersionStats {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl TrivialMovePicker {
) -> Option<SstableInfo> {
let mut skip_by_pending = false;
for sst in select_tables {
if sst.file_size < self.sst_allowed_trivial_move_min_size {
if sst.sst_size < self.sst_allowed_trivial_move_min_size {
continue;
}

Expand Down Expand Up @@ -128,6 +128,7 @@ pub mod tests {
let sst = SstableInfo {
sst_id: 1,
file_size: 100,
sst_size: 100,
..Default::default()
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl VnodeWatermarkCompactionPicker {
return None;
}
Some(CompactionInput {
select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(),
select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
total_file_count: select_input_ssts.len() as u64,
input_levels: vec![
InputLevel {
Expand Down
55 changes: 49 additions & 6 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
Expand Down Expand Up @@ -139,7 +139,7 @@ impl HummockManager {
for s in &mut sstables {
add_prost_table_stats_map(
&mut table_stats_change,
&to_prost_table_stats_map(std::mem::take(&mut s.table_stats)),
&to_prost_table_stats_map(s.table_stats.clone()),
);
}

Expand All @@ -158,7 +158,6 @@ impl HummockManager {
);

let state_table_info = &version.latest_version().state_table_info;

let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();

// Add new table
Expand Down Expand Up @@ -221,8 +220,23 @@ impl HummockManager {
NewTableFragmentInfo::None => (HashMap::new(), None, None),
};

let mut group_members_table_ids: HashMap<u64, BTreeSet<TableId>> = HashMap::new();
{
// expand group_members_table_ids
for (table_id, group_id) in &table_compaction_group_mapping {
group_members_table_ids
.entry(*group_id)
.or_default()
.insert(*table_id);
}
}

let commit_sstables = self
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
.correct_commit_ssts(
sstables,
&table_compaction_group_mapping,
&group_members_table_ids,
)
.await?;

let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();
Expand Down Expand Up @@ -379,6 +393,7 @@ impl HummockManager {
&self,
sstables: Vec<LocalSstableInfo>,
table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
group_members_table_ids: &HashMap<CompactionGroupId, BTreeSet<TableId>>,
) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
let mut new_sst_id_number = 0;
let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
Expand Down Expand Up @@ -413,8 +428,36 @@ impl HummockManager {
let mut commit_sstables: BTreeMap<u64, Vec<SstableInfo>> = BTreeMap::new();

for (mut sst, group_table_ids) in sst_to_cg_vec {
for (group_id, _match_ids) in group_table_ids {
let branch_sst = split_sst(&mut sst.sst_info, &mut new_sst_id);
for (group_id, match_ids) in group_table_ids {
let group_members_table_ids = group_members_table_ids.get(&group_id).unwrap();
if match_ids
.iter()
.all(|id| group_members_table_ids.contains(&TableId::new(*id)))
{
commit_sstables
.entry(group_id)
.or_default()
.push(sst.sst_info.clone());
continue;
}

let origin_sst_size = sst.sst_info.sst_size;
let new_sst_size = match_ids
.iter()
.map(|id| {
let stat = sst.table_stats.get(id).unwrap();
stat.total_compressed_size
})
.sum();

let branch_sst = split_sst(
&mut sst.sst_info,
&mut new_sst_id,
origin_sst_size - new_sst_size,
new_sst_size,
match_ids,
);

commit_sstables
.entry(group_id)
.or_default()
Expand Down
14 changes: 2 additions & 12 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use risingwave_pb::hummock::compact_task::TaskStatus;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, PbGroupConstruct,
PbGroupDestroy, PbStateTableInfoDelta,
CompactionConfig, CompactionGroupInfo, CompatibilityVersion, PbGroupConstruct, PbGroupDestroy,
PbStateTableInfoDelta,
};
use tokio::sync::OnceCell;

Expand Down Expand Up @@ -605,16 +605,6 @@ impl HummockManager {
drop(compaction_guard);
self.report_compact_tasks(canceled_tasks).await?;

// Don't trigger compactions if we enable deterministic compaction
if !self.env.opts.compaction_deterministic_test {
// commit_epoch may contains SSTs from any compaction group
self.try_send_compaction_request(parent_group_id, compact_task::TaskType::SpaceReclaim);
self.try_send_compaction_request(
target_compaction_group_id,
compact_task::TaskType::SpaceReclaim,
);
}

self.metrics
.move_state_table_count
.with_label_values(&[&parent_group_id.to_string()])
Expand Down
64 changes: 15 additions & 49 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ use risingwave_pb::hummock::{HummockPinnedSnapshot, HummockPinnedVersion, Hummoc
use risingwave_pb::meta::add_worker_node_request::Property;

use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
use crate::hummock::compaction::selector::{
default_compaction_selector, CompactionSelector, ManualCompactionOption,
SpaceReclaimCompactionSelector,
};
use crate::hummock::compaction::selector::{default_compaction_selector, ManualCompactionOption};
use crate::hummock::error::Error;
use crate::hummock::test_utils::*;
use crate::hummock::{HummockManager, HummockManagerRef};
Expand Down Expand Up @@ -1200,6 +1197,8 @@ async fn test_version_stats() {
total_key_size: 1000,
total_value_size: 100,
total_key_count: 10,

total_compressed_size: 1024 * 1024,
};
let ssts_with_table_ids = vec![vec![1, 2], vec![2, 3]];
let sst_ids = get_sst_ids(&hummock_manager, ssts_with_table_ids.len() as _).await;
Expand Down Expand Up @@ -1271,6 +1270,7 @@ async fn test_version_stats() {
total_key_size: -1000,
total_value_size: -100,
total_key_count: -10,
total_compressed_size: 0, // unused
},
),
(
Expand All @@ -1279,6 +1279,7 @@ async fn test_version_stats() {
total_key_size: -1000,
total_value_size: -100,
total_key_count: -10,
total_compressed_size: 0, // unused
},
),
]);
Expand Down Expand Up @@ -1384,11 +1385,6 @@ async fn test_split_compaction_group_on_demand_basic() {
.unwrap_err();
assert_eq!("compaction group error: invalid group 100", err.to_string());

hummock_manager
.split_compaction_group(2, &[])
.await
.unwrap();

let err = hummock_manager
.split_compaction_group(2, &[100])
.await
Expand Down Expand Up @@ -1477,7 +1473,7 @@ async fn test_split_compaction_group_on_demand_basic() {
assert!(new_group_id > StaticCompactionGroupId::End as u64);
assert_eq!(
get_compaction_group_object_ids(&current_version, 2),
vec![10, 11]
Vec::<u64>::new()
);
assert_eq!(
get_compaction_group_object_ids(&current_version, new_group_id),
Expand Down Expand Up @@ -1667,15 +1663,6 @@ async fn test_split_compaction_group_trivial_expired() {
.split_compaction_group(2, &[100])
.await
.unwrap();
let mut selector: Box<dyn CompactionSelector> =
Box::<SpaceReclaimCompactionSelector>::default();
let (mut normal_tasks, _unscheduled) = hummock_manager
.get_compact_tasks_impl(vec![2], 1, &mut selector)
.await
.unwrap();
use crate::hummock::manager::CompactStatus;
let reclaim_task = normal_tasks.pop().unwrap();
assert!(CompactStatus::is_trivial_reclaim(&reclaim_task));

let current_version = hummock_manager.get_current_version().await;
let new_group_id = current_version.levels.keys().max().cloned().unwrap();
Expand Down Expand Up @@ -1854,7 +1841,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() {
current_version.get_compaction_group_levels(2).levels[base_level - 1]
.table_infos
.len(),
2
1
);

assert_eq!(
Expand All @@ -1865,7 +1852,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() {
assert_eq!(
current_version.get_compaction_group_levels(2).levels[base_level - 1].table_infos[0]
.table_ids,
vec![100, 101]
vec![101]
);
assert_eq!(
current_version
Expand All @@ -1881,7 +1868,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() {
.levels[base_level - 1]
.table_infos[0]
.table_ids,
vec![100, 101]
vec![100]
);
assert_eq!(
current_version
Expand Down Expand Up @@ -2047,38 +2034,15 @@ async fn test_move_tables_between_compaction_group() {
current_version.get_compaction_group_levels(2).levels[base_level - 1]
.table_infos
.len(),
3
2
);

let level = &current_version
.get_compaction_group_levels(new_group_id)
.levels[base_level - 1];
assert_eq!(level.table_infos[0].table_ids, vec![100]);
assert_eq!(level.table_infos[1].table_ids, vec![100, 101]);
assert_eq!(level.table_infos[1].table_ids, vec![100]);
assert_eq!(level.table_infos.len(), 2);

let mut selector: Box<dyn CompactionSelector> =
Box::<SpaceReclaimCompactionSelector>::default();

let compaction_task = hummock_manager
.get_compact_task(2, &mut selector)
.await
.unwrap()
.unwrap();
assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1);
assert_eq!(compaction_task.input_ssts[0].table_infos[0].object_id, 12);
assert_eq!(compaction_task.existing_table_ids, vec![101]);

let ret = hummock_manager
.report_compact_task(
compaction_task.task_id,
TaskStatus::Success,
vec![gen_sstable_info(20, 2, vec![101])],
None,
)
.await
.unwrap();
assert!(ret);
}

#[tokio::test]
Expand Down Expand Up @@ -2137,11 +2101,13 @@ async fn test_gc_stats() {
hummock_manager.create_version_checkpoint(0).await.unwrap(),
0
);

assert_eq_gc_stats(0, 0, 6, 3, 2, 4);
hummock_manager
.unpin_version_before(context_id, HummockVersionId::MAX)
.await
.unwrap();

assert_eq_gc_stats(0, 0, 6, 3, 2, 4);
assert_eq!(
hummock_manager.create_version_checkpoint(0).await.unwrap(),
Expand Down Expand Up @@ -2353,7 +2319,7 @@ async fn test_unregister_moved_table() {
assert_eq!(current_version.levels.len(), 3);
assert_eq!(
get_compaction_group_object_ids(&current_version, 2),
vec![10, 11]
vec![11]
);
assert_eq!(
get_compaction_group_object_ids(&current_version, new_group_id),
Expand Down Expand Up @@ -2387,7 +2353,7 @@ async fn test_unregister_moved_table() {
assert!(!current_version.levels.contains_key(&new_group_id));
assert_eq!(
get_compaction_group_object_ids(&current_version, 2),
vec![10, 11]
vec![11]
);
assert_eq!(
current_version
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/diagnose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ impl DiagnoseCommand {
let mut visit_level = |level: &Level| {
sst_num += level.table_infos.len();
sst_total_file_size +=
level.table_infos.iter().map(|t| t.file_size).sum::<u64>();
level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
for sst in &level.table_infos {
if sst.total_key_count == 0 {
continue;
Expand Down
Loading

0 comments on commit 79d9810

Please sign in to comment.