Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): Support for repairing the size of a split sst based on table stats #18053

Merged
merged 23 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1d7e169
feat(storage): support split sst with size from table_stats
Li0k Aug 15, 2024
d74d98a
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 15, 2024
2c189f0
fix(storage): fix trivial
Li0k Aug 16, 2024
a74d8f0
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 16, 2024
b836ca7
fix(storage): fix test
Li0k Aug 16, 2024
07ce918
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 16, 2024
57574dd
refactor(storage): address comment
Li0k Aug 21, 2024
dd9059e
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 21, 2024
4324572
typo
Li0k Aug 21, 2024
2ccdb93
refactor(compaction): address comment
Li0k Aug 22, 2024
68ebae0
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 22, 2024
e395628
fix(compaction): Do not trigger SpaceReclaim compaction after split
Li0k Aug 22, 2024
f2af33a
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 22, 2024
196e218
fix: check
Li0k Aug 23, 2024
ec000b3
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 23, 2024
4bd6904
address comments
Li0k Aug 23, 2024
90fca85
revert
Li0k Aug 23, 2024
e930afc
address comments
Li0k Aug 26, 2024
ce16afe
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 26, 2024
e3bceba
fix(storage): fix ut
Li0k Aug 26, 2024
b874d67
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 26, 2024
096b398
typo
Li0k Aug 26, 2024
3d26a6c
fix(storage): fix object_size_map
Li0k Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,10 @@ message TableStats {
int64 total_key_size = 1;
int64 total_value_size = 2;
int64 total_key_count = 3;

// `total_compressed_size`` represents the size that the table takes up in the output sst
// and this field is only filled and used by CN flushes, not compactor compaction
uint64 total_compressed_size = 4;
}

message HummockVersionStats {
Expand Down
55 changes: 49 additions & 6 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
Expand Down Expand Up @@ -139,7 +139,7 @@ impl HummockManager {
for s in &mut sstables {
add_prost_table_stats_map(
&mut table_stats_change,
&to_prost_table_stats_map(std::mem::take(&mut s.table_stats)),
&to_prost_table_stats_map(s.table_stats.clone()),
);
}

Expand All @@ -158,7 +158,6 @@ impl HummockManager {
);

let state_table_info = &version.latest_version().state_table_info;

let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();

// Add new table
Expand Down Expand Up @@ -221,8 +220,23 @@ impl HummockManager {
NewTableFragmentInfo::None => (HashMap::new(), None, None),
};

let mut group_members_table_ids: HashMap<u64, BTreeSet<TableId>> = HashMap::new();
{
// expand group_members_table_ids
for (table_id, group_id) in &table_compaction_group_mapping {
group_members_table_ids
.entry(*group_id)
.or_default()
.insert(*table_id);
}
}

let commit_sstables = self
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
.correct_commit_ssts(
sstables,
&table_compaction_group_mapping,
&group_members_table_ids,
)
.await?;

let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();
Expand Down Expand Up @@ -379,6 +393,7 @@ impl HummockManager {
&self,
sstables: Vec<LocalSstableInfo>,
table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
group_members_table_ids: &HashMap<CompactionGroupId, BTreeSet<TableId>>,
) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
let mut new_sst_id_number = 0;
let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
Expand Down Expand Up @@ -413,8 +428,36 @@ impl HummockManager {
let mut commit_sstables: BTreeMap<u64, Vec<SstableInfo>> = BTreeMap::new();

for (mut sst, group_table_ids) in sst_to_cg_vec {
for (group_id, _match_ids) in group_table_ids {
let branch_sst = split_sst(&mut sst.sst_info, &mut new_sst_id);
for (group_id, match_ids) in group_table_ids {
let group_members_table_ids = group_members_table_ids.get(&group_id).unwrap();
if match_ids
.iter()
.all(|id| group_members_table_ids.contains(&TableId::new(*id)))
{
commit_sstables
.entry(group_id)
.or_default()
.push(sst.sst_info.clone());
continue;
}

let origin_sst_size = sst.sst_info.file_size;
let new_sst_size = match_ids
.iter()
.map(|id| {
let stat = sst.table_stats.get(id).unwrap();
stat.total_compressed_size
})
.sum();

let branch_sst = split_sst(
&mut sst.sst_info,
&mut new_sst_id,
origin_sst_size - new_sst_size,
new_sst_size,
match_ids,
);

commit_sstables
.entry(group_id)
.or_default()
Expand Down
10 changes: 0 additions & 10 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,16 +605,6 @@ impl HummockManager {
drop(compaction_guard);
self.report_compact_tasks(canceled_tasks).await?;

// Don't trigger compactions if we enable deterministic compaction
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after 2ccdb93

SstableInfo's table_ids have already been modified in split_sst, so there is no need to trigger SpaceRecliam.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if all the table ids of a SST is split out? I think in our current implementation, we will have two SST: one with empty table id and the other one with identical table id as before the split.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SstableInfo with empty table_ids will be cleaned up immediately in the apply_version_delta phase.

if !self.env.opts.compaction_deterministic_test {
// commit_epoch may contains SSTs from any compaction group
self.try_send_compaction_request(parent_group_id, compact_task::TaskType::SpaceReclaim);
self.try_send_compaction_request(
target_compaction_group_id,
compact_task::TaskType::SpaceReclaim,
);
}

self.metrics
.move_state_table_count
.with_label_values(&[&parent_group_id.to_string()])
Expand Down
64 changes: 15 additions & 49 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ use risingwave_pb::hummock::{HummockPinnedSnapshot, HummockPinnedVersion, Hummoc
use risingwave_pb::meta::add_worker_node_request::Property;

use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
use crate::hummock::compaction::selector::{
default_compaction_selector, CompactionSelector, ManualCompactionOption,
SpaceReclaimCompactionSelector,
};
use crate::hummock::compaction::selector::{default_compaction_selector, ManualCompactionOption};
use crate::hummock::error::Error;
use crate::hummock::test_utils::*;
use crate::hummock::{HummockManager, HummockManagerRef};
Expand Down Expand Up @@ -1198,6 +1195,8 @@ async fn test_version_stats() {
total_key_size: 1000,
total_value_size: 100,
total_key_count: 10,

total_compressed_size: 1024 * 1024,
};
let ssts_with_table_ids = vec![vec![1, 2], vec![2, 3]];
let sst_ids = get_sst_ids(&hummock_manager, ssts_with_table_ids.len() as _).await;
Expand Down Expand Up @@ -1268,6 +1267,7 @@ async fn test_version_stats() {
total_key_size: -1000,
total_value_size: -100,
total_key_count: -10,
total_compressed_size: 0, // unused
},
),
(
Expand All @@ -1276,6 +1276,7 @@ async fn test_version_stats() {
total_key_size: -1000,
total_value_size: -100,
total_key_count: -10,
total_compressed_size: 0, // unused
},
),
]);
Expand Down Expand Up @@ -1379,11 +1380,6 @@ async fn test_split_compaction_group_on_demand_basic() {
.unwrap_err();
assert_eq!("compaction group error: invalid group 100", err.to_string());

hummock_manager
.split_compaction_group(2, &[])
.await
.unwrap();

let err = hummock_manager
.split_compaction_group(2, &[100])
.await
Expand Down Expand Up @@ -1468,7 +1464,7 @@ async fn test_split_compaction_group_on_demand_basic() {
assert!(new_group_id > StaticCompactionGroupId::End as u64);
assert_eq!(
get_compaction_group_object_ids(&current_version, 2),
vec![10, 11]
Vec::<u64>::new()
);
assert_eq!(
get_compaction_group_object_ids(&current_version, new_group_id),
Expand Down Expand Up @@ -1652,15 +1648,6 @@ async fn test_split_compaction_group_trivial_expired() {
.split_compaction_group(2, &[100])
.await
.unwrap();
let mut selector: Box<dyn CompactionSelector> =
Box::<SpaceReclaimCompactionSelector>::default();
let (mut normal_tasks, _unscheduled) = hummock_manager
.get_compact_tasks_impl(vec![2], 1, &mut selector)
.await
.unwrap();
use crate::hummock::manager::CompactStatus;
let reclaim_task = normal_tasks.pop().unwrap();
assert!(CompactStatus::is_trivial_reclaim(&reclaim_task));

let current_version = hummock_manager.get_current_version().await;
let new_group_id = current_version.levels.keys().max().cloned().unwrap();
Expand Down Expand Up @@ -1831,7 +1818,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() {
current_version.get_compaction_group_levels(2).levels[base_level - 1]
.table_infos
.len(),
2
1
);

assert_eq!(
Expand All @@ -1842,7 +1829,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() {
assert_eq!(
current_version.get_compaction_group_levels(2).levels[base_level - 1].table_infos[0]
.table_ids,
vec![100, 101]
vec![101]
);
assert_eq!(
current_version
Expand All @@ -1858,7 +1845,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() {
.levels[base_level - 1]
.table_infos[0]
.table_ids,
vec![100, 101]
vec![100]
);
assert_eq!(
current_version
Expand Down Expand Up @@ -2020,38 +2007,15 @@ async fn test_move_tables_between_compaction_group() {
current_version.get_compaction_group_levels(2).levels[base_level - 1]
.table_infos
.len(),
3
2
);

let level = &current_version
.get_compaction_group_levels(new_group_id)
.levels[base_level - 1];
assert_eq!(level.table_infos[0].table_ids, vec![100]);
assert_eq!(level.table_infos[1].table_ids, vec![100, 101]);
assert_eq!(level.table_infos[1].table_ids, vec![100]);
assert_eq!(level.table_infos.len(), 2);

let mut selector: Box<dyn CompactionSelector> =
Box::<SpaceReclaimCompactionSelector>::default();

let compaction_task = hummock_manager
.get_compact_task(2, &mut selector)
.await
.unwrap()
.unwrap();
assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1);
assert_eq!(compaction_task.input_ssts[0].table_infos[0].object_id, 12);
assert_eq!(compaction_task.existing_table_ids, vec![101]);

let ret = hummock_manager
.report_compact_task(
compaction_task.task_id,
TaskStatus::Success,
vec![gen_sstable_info(20, 2, vec![101])],
None,
)
.await
.unwrap();
assert!(ret);
}

#[tokio::test]
Expand Down Expand Up @@ -2110,11 +2074,13 @@ async fn test_gc_stats() {
hummock_manager.create_version_checkpoint(0).await.unwrap(),
0
);

assert_eq_gc_stats(0, 0, 6, 3, 2, 4);
hummock_manager
.unpin_version_before(context_id, HummockVersionId::MAX)
.await
.unwrap();

assert_eq_gc_stats(0, 0, 6, 3, 2, 4);
assert_eq!(
hummock_manager.create_version_checkpoint(0).await.unwrap(),
Expand Down Expand Up @@ -2318,7 +2284,7 @@ async fn test_unregister_moved_table() {
assert_eq!(current_version.levels.len(), 3);
assert_eq!(
get_compaction_group_object_ids(&current_version, 2),
vec![10, 11]
vec![11]
);
assert_eq!(
get_compaction_group_object_ids(&current_version, new_group_id),
Expand Down Expand Up @@ -2352,7 +2318,7 @@ async fn test_unregister_moved_table() {
assert!(!current_version.levels.contains_key(&new_group_id));
assert_eq!(
get_compaction_group_object_ids(&current_version, 2),
vec![10, 11]
vec![11]
);
assert_eq!(
current_version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,13 @@ fn split_sst_info_for_level(
.cloned()
.collect_vec();
if !removed_table_ids.is_empty() {
let branch_sst = split_sst(sst_info, new_sst_id);
let branch_sst = split_sst(
sst_info,
new_sst_id,
sst_info.file_size / 2,
sst_info.file_size / 2,
member_table_ids.iter().cloned().collect_vec(),
);
insert_table_infos.push(branch_sst);
}
}
Expand Down Expand Up @@ -1327,10 +1333,35 @@ pub fn validate_version(version: &HummockVersion) -> Vec<String> {
res
}

pub fn split_sst(sst_info: &mut SstableInfo, new_sst_id: &mut u64) -> SstableInfo {
pub fn split_sst(
sst_info: &mut SstableInfo,
new_sst_id: &mut u64,
old_sst_size: u64,
new_sst_size: u64,
new_sst_table_ids: Vec<u32>,
) -> SstableInfo {
let mut branch_table_info = sst_info.clone();
branch_table_info.sst_id = *new_sst_id;
branch_table_info.file_size = new_sst_size;

sst_info.sst_id = *new_sst_id + 1;
sst_info.file_size = old_sst_size;

{
// related github.com/risingwavelabs/risingwave/pull/17898/
// This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898

let set1: HashSet<_> = sst_info.table_ids.iter().cloned().collect();
let set2: HashSet<_> = new_sst_table_ids.iter().cloned().collect();
let intersection: Vec<_> = set1.intersection(&set2).cloned().collect();

// Update table_ids
branch_table_info.table_ids = intersection;
sst_info
.table_ids
.retain(|table_id| !branch_table_info.table_ids.contains(table_id));
}

*new_sst_id += 1;

branch_table_info
Expand Down
8 changes: 8 additions & 0 deletions src/storage/hummock_sdk/src/table_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub struct TableStats {
pub total_key_size: i64,
pub total_value_size: i64,
pub total_key_count: i64,

// `total_compressed_size`` represents the size that the table takes up in the output sst
// and this field is only filled and used by CN flushes, not compactor compaction
pub total_compressed_size: u64,
}

impl From<&TableStats> for PbTableStats {
Expand All @@ -37,6 +41,7 @@ impl From<&TableStats> for PbTableStats {
total_key_size: value.total_key_size,
total_value_size: value.total_value_size,
total_key_count: value.total_key_count,
total_compressed_size: value.total_compressed_size,
}
}
}
Expand All @@ -53,6 +58,7 @@ impl From<&PbTableStats> for TableStats {
total_key_size: value.total_key_size,
total_value_size: value.total_value_size,
total_key_count: value.total_key_count,
total_compressed_size: value.total_compressed_size,
}
}
}
Expand All @@ -62,13 +68,15 @@ impl TableStats {
self.total_key_size += other.total_key_size;
self.total_value_size += other.total_value_size;
self.total_key_count += other.total_key_count;
self.total_compressed_size += other.total_compressed_size;
}
}

pub fn add_prost_table_stats(this: &mut PbTableStats, other: &PbTableStats) {
this.total_key_size += other.total_key_size;
this.total_value_size += other.total_value_size;
this.total_key_count += other.total_key_count;
this.total_compressed_size += other.total_compressed_size;
}

pub fn add_prost_table_stats_map(this: &mut PbTableStatsMap, other: &PbTableStatsMap) {
Expand Down
Loading
Loading