Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): risectl support update CG vnode_partition_count #15688

Closed
wants to merge 8 commits into from
2 changes: 2 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ message RiseCtlUpdateCompactionConfigRequest {
uint64 level0_max_compact_file_number = 14;
bool enable_emergency_picker = 15;
uint32 tombstone_reclaim_ratio = 16;
uint32 partition_vnode_count = 17;
}
}
repeated uint64 compaction_group_ids = 1;
Expand All @@ -687,6 +688,7 @@ message PinVersionResponse {
message SplitCompactionGroupRequest {
uint64 group_id = 1;
repeated uint32 table_ids = 2;
uint32 partition_vnode_count = 3;
}

message SplitCompactionGroupResponse {
Expand Down
11 changes: 8 additions & 3 deletions src/ctl/src/cmd_impl/hummock/compaction_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub fn build_compaction_config_vec(
level0_overlapping_sub_level_compact_level_count: Option<u32>,
enable_emergency_picker: Option<bool>,
tombstone_reclaim_ratio: Option<u32>,
partition_vnode_count: Option<u32>,
) -> Vec<MutableConfig> {
let mut configs = vec![];
if let Some(c) = max_bytes_for_level_base {
Expand Down Expand Up @@ -110,6 +111,9 @@ pub fn build_compaction_config_vec(
if let Some(c) = tombstone_reclaim_ratio {
configs.push(MutableConfig::TombstoneReclaimRatio(c))
}
if let Some(c) = partition_vnode_count {
configs.push(MutableConfig::PartitionVnodeCount(c))
}

configs
}
Expand All @@ -118,14 +122,15 @@ pub async fn split_compaction_group(
context: &CtlContext,
group_id: CompactionGroupId,
table_ids_to_new_group: &[StateTableId],
partition_vnode_count: u32,
) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let new_group_id = meta_client
.split_compaction_group(group_id, table_ids_to_new_group)
.split_compaction_group(group_id, table_ids_to_new_group, partition_vnode_count)
.await?;
println!(
"Succeed: split compaction group {}. tables {:#?} are moved to new group {}.",
group_id, table_ids_to_new_group, new_group_id
"Succeed: split compaction group {}. tables {:#?} are moved to new group {} partition_vnode_count {}",
group_id, table_ids_to_new_group, new_group_id, partition_vnode_count
);
Ok(())
}
Expand Down
16 changes: 14 additions & 2 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,17 @@ enum HummockCommands {
enable_emergency_picker: Option<bool>,
#[clap(long)]
tombstone_reclaim_ratio: Option<u32>,
#[clap(long)]
partition_vnode_count: Option<u32>,
},
/// Split given compaction group into two. Moves the given tables to the new group.
SplitCompactionGroup {
#[clap(long)]
compaction_group_id: u64,
#[clap(long)]
table_ids: Vec<u32>,
#[clap(long, short)]
partition_vnode_count: u32,
},
/// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
PauseVersionCheckpoint,
Expand Down Expand Up @@ -646,6 +650,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
level0_overlapping_sub_level_compact_level_count,
enable_emergency_picker,
tombstone_reclaim_ratio,
partition_vnode_count,
}) => {
cmd_impl::hummock::update_compaction_config(
context,
Expand All @@ -666,16 +671,23 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
level0_overlapping_sub_level_compact_level_count,
enable_emergency_picker,
tombstone_reclaim_ratio,
partition_vnode_count,
),
)
.await?
}
Commands::Hummock(HummockCommands::SplitCompactionGroup {
compaction_group_id,
table_ids,
partition_vnode_count,
}) => {
cmd_impl::hummock::split_compaction_group(context, compaction_group_id, &table_ids)
.await?;
cmd_impl::hummock::split_compaction_group(
context,
compaction_group_id,
&table_ids,
partition_vnode_count,
)
.await?;
}
Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
cmd_impl::hummock::pause_version_checkpoint(context).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let new_group_id = self
.hummock_manager
.split_compaction_group(req.group_id, &req.table_ids)
.split_compaction_group(req.group_id, &req.table_ids, req.partition_vnode_count)
.await?;
Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
}
Expand Down
11 changes: 10 additions & 1 deletion src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,15 @@ impl HummockManager {
&self,
parent_group_id: CompactionGroupId,
table_ids: &[StateTableId],
partition_vnode_count: u32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

) -> Result<CompactionGroupId> {
let result = self
.move_state_table_to_compaction_group(parent_group_id, table_ids, None, 0)
.move_state_table_to_compaction_group(
parent_group_id,
table_ids,
None,
partition_vnode_count,
)
.await?;
self.group_to_table_vnode_partition
.write()
Expand Down Expand Up @@ -985,6 +991,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi
MutableConfig::TombstoneReclaimRatio(c) => {
target.tombstone_reclaim_ratio = *c;
}
MutableConfig::PartitionVnodeCount(c) => {
target.split_weight_by_vnode = *c;
}
}
}
}
Expand Down
34 changes: 21 additions & 13 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,7 @@ impl HummockManager {
} else {
table_to_vnode_partition
.retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
// partition_count is based on compaction config. When compaction group is set to partition, the dynamically calculated value in memory will be invalid.
if group_config.compaction_config.split_weight_by_vnode > 0 {
table_to_vnode_partition.clear();
for table_id in &compact_task.existing_table_ids {
Expand Down Expand Up @@ -3060,20 +3061,27 @@ impl HummockManager {
rewrite_cg_ids.push(*cg_id);
}

if let Some(levels) = current_version.levels.get(cg_id) {
if levels.member_table_ids.len() == 1 {
restore_cg_to_partition_vnode.insert(
*cg_id,
vec![(
levels.member_table_ids[0],
compaction_group_config
.compaction_config
.split_weight_by_vnode,
)]
.into_iter()
if let Some(levels) = current_version.levels.get(cg_id)
&& compaction_group_config
.compaction_config
.split_weight_by_vnode
> 0
{
restore_cg_to_partition_vnode.insert(
*cg_id,
levels
.member_table_ids
.iter()
.map(|table_id| {
(
*table_id,
compaction_group_config
.compaction_config
.split_weight_by_vnode,
)
})
.collect(),
);
}
);
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1387,18 +1387,18 @@ async fn test_split_compaction_group_on_demand_basic() {
assert_eq!(original_groups, vec![2, 3]);

let err = hummock_manager
.split_compaction_group(100, &[0])
.split_compaction_group(100, &[0], 0)
.await
.unwrap_err();
assert_eq!("compaction group error: invalid group 100", err.to_string());

hummock_manager
.split_compaction_group(2, &[])
.split_compaction_group(2, &[], 0)
.await
.unwrap();

let err = hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap_err();
assert_eq!(
Expand Down Expand Up @@ -1460,7 +1460,7 @@ async fn test_split_compaction_group_on_demand_basic() {
.unwrap();

let err = hummock_manager
.split_compaction_group(2, &[100, 101])
.split_compaction_group(2, &[100, 101], 0)
.await
.unwrap_err();
assert_eq!(
Expand All @@ -1476,7 +1476,7 @@ async fn test_split_compaction_group_on_demand_basic() {
.unwrap();

hummock_manager
.split_compaction_group(2, &[100, 101])
.split_compaction_group(2, &[100, 101], 0)
.await
.unwrap();
let current_version = hummock_manager.get_current_version().await;
Expand Down Expand Up @@ -1554,7 +1554,7 @@ async fn test_split_compaction_group_on_demand_non_trivial() {
.unwrap();

hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap();

Expand Down Expand Up @@ -1690,7 +1690,7 @@ async fn test_split_compaction_group_trivial_expired() {
.unwrap();

hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap();
let mut selector: Box<dyn CompactionSelector> =
Expand Down Expand Up @@ -1861,7 +1861,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() {
);

hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap();
let current_version = hummock_manager.get_current_version().await;
Expand Down Expand Up @@ -1978,7 +1978,7 @@ async fn test_compaction_task_expiration_due_to_split_group() {
let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await;
assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 2);
hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap();

Expand Down Expand Up @@ -2068,7 +2068,7 @@ async fn test_move_tables_between_compaction_group() {
);

hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap();
let current_version = hummock_manager.get_current_version().await;
Expand Down
2 changes: 2 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,10 +1120,12 @@ impl MetaClient {
&self,
group_id: CompactionGroupId,
table_ids_to_new_group: &[StateTableId],
partition_vnode_count: u32,
) -> Result<CompactionGroupId> {
let req = SplitCompactionGroupRequest {
group_id,
table_ids: table_ids_to_new_group.to_vec(),
partition_vnode_count,
};
let resp = self.inner.split_compaction_group(req).await?;
Ok(resp.new_group_id)
Expand Down
Loading