Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): risectl support update CG vnode_partition_count #15688

Closed
wants to merge 8 commits into from
2 changes: 2 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ message RiseCtlUpdateCompactionConfigRequest {
uint32 tombstone_reclaim_ratio = 16;
CompressionAlgorithm compression_algorithm = 17;
uint32 max_l0_compact_level_count = 18;
uint32 partition_vnode_count = 19;
}
}
repeated uint64 compaction_group_ids = 1;
Expand All @@ -736,6 +737,7 @@ message PinVersionResponse {
message SplitCompactionGroupRequest {
uint64 group_id = 1;
repeated uint32 table_ids = 2;
uint32 partition_vnode_count = 3;
}

message SplitCompactionGroupResponse {
Expand Down
13 changes: 8 additions & 5 deletions src/ctl/src/cmd_impl/hummock/compaction_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub fn build_compaction_config_vec(
tombstone_reclaim_ratio: Option<u32>,
compress_algorithm: Option<CompressionAlgorithm>,
max_l0_compact_level: Option<u32>,
partition_vnode_count: Option<u32>,
) -> Vec<MutableConfig> {
let mut configs = vec![];
if let Some(c) = max_bytes_for_level_base {
Expand Down Expand Up @@ -119,22 +120,24 @@ pub fn build_compaction_config_vec(
if let Some(c) = max_l0_compact_level {
configs.push(MutableConfig::MaxL0CompactLevelCount(c))
}

configs
if let Some(c) = partition_vnode_count {
configs.push(MutableConfig::PartitionVnodeCount(c))
}
}

pub async fn split_compaction_group(
context: &CtlContext,
group_id: CompactionGroupId,
table_ids_to_new_group: &[StateTableId],
partition_vnode_count: u32,
) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let new_group_id = meta_client
.split_compaction_group(group_id, table_ids_to_new_group)
.split_compaction_group(group_id, table_ids_to_new_group, partition_vnode_count)
.await?;
println!(
"Succeed: split compaction group {}. tables {:#?} are moved to new group {}.",
group_id, table_ids_to_new_group, new_group_id
"Succeed: split compaction group {}. tables {:#?} are moved to new group {} partition_vnode_count {}",
group_id, table_ids_to_new_group, new_group_id, partition_vnode_count
);
Ok(())
}
Expand Down
16 changes: 14 additions & 2 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,17 @@ enum HummockCommands {
compression_algorithm: Option<String>,
#[clap(long)]
max_l0_compact_level: Option<u32>,
#[clap(long)]
partition_vnode_count: Option<u32>,
},
/// Split given compaction group into two. Moves the given tables to the new group.
SplitCompactionGroup {
#[clap(long)]
compaction_group_id: u64,
#[clap(long, value_delimiter = ',')]
table_ids: Vec<u32>,
#[clap(long, short)]
partition_vnode_count: u32,
},
/// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
PauseVersionCheckpoint,
Expand Down Expand Up @@ -662,6 +666,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
compression_level,
compression_algorithm,
max_l0_compact_level,
partition_vnode_count,
}) => {
cmd_impl::hummock::update_compaction_config(
context,
Expand Down Expand Up @@ -692,16 +697,23 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
None
},
max_l0_compact_level,
partition_vnode_count,
),
)
.await?
}
Commands::Hummock(HummockCommands::SplitCompactionGroup {
compaction_group_id,
table_ids,
partition_vnode_count,
}) => {
cmd_impl::hummock::split_compaction_group(context, compaction_group_id, &table_ids)
.await?;
cmd_impl::hummock::split_compaction_group(
context,
compaction_group_id,
&table_ids,
partition_vnode_count,
)
.await?;
}
Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
cmd_impl::hummock::pause_version_checkpoint(context).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let new_group_id = self
.hummock_manager
.split_compaction_group(req.group_id, &req.table_ids)
.split_compaction_group(req.group_id, &req.table_ids, req.partition_vnode_count)
.await?;
Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
}
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ impl HummockManager {
&self,
parent_group_id: CompactionGroupId,
table_ids: &[StateTableId],
partition_vnode_count: u32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

) -> Result<CompactionGroupId> {
let result = self
.move_state_table_to_compaction_group(
Expand Down Expand Up @@ -763,6 +764,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi
MutableConfig::MaxL0CompactLevelCount(c) => {
target.max_l0_compact_level_count = *c;
}
MutableConfig::PartitionVnodeCount(c) => {
target.split_weight_by_vnode = *c;
}
}
}
}
Expand Down
24 changes: 12 additions & 12 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1399,18 +1399,18 @@ async fn test_split_compaction_group_on_demand_basic() {
assert_eq!(original_groups, vec![2, 3]);

let err = hummock_manager
.split_compaction_group(100, &[0])
.split_compaction_group(100, &[0], 0)
.await
.unwrap_err();
assert_eq!("compaction group error: invalid group 100", err.to_string());

hummock_manager
.split_compaction_group(2, &[])
.split_compaction_group(2, &[], 0)
.await
.unwrap();

let err = hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap_err();
assert_eq!(
Expand Down Expand Up @@ -1468,7 +1468,7 @@ async fn test_split_compaction_group_on_demand_basic() {
.unwrap();

let err = hummock_manager
.split_compaction_group(2, &[100, 101])
.split_compaction_group(2, &[100, 101], 0)
.await
.unwrap_err();
assert_eq!(
Expand All @@ -1484,7 +1484,7 @@ async fn test_split_compaction_group_on_demand_basic() {
.unwrap();

hummock_manager
.split_compaction_group(2, &[100, 101])
.split_compaction_group(2, &[100, 101], 0)
.await
.unwrap();
let current_version = hummock_manager.get_current_version().await;
Expand Down Expand Up @@ -1550,7 +1550,7 @@ async fn test_split_compaction_group_on_demand_non_trivial() {
.unwrap();

hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap();

Expand Down Expand Up @@ -1674,7 +1674,7 @@ async fn test_split_compaction_group_trivial_expired() {
.unwrap();

hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap();
let mut selector: Box<dyn CompactionSelector> =
Expand Down Expand Up @@ -1849,7 +1849,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() {
);

hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap();
let current_version = hummock_manager.get_current_version().await;
Expand Down Expand Up @@ -1954,7 +1954,7 @@ async fn test_compaction_task_expiration_due_to_split_group() {
let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await;
assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 2);
hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap();

Expand Down Expand Up @@ -2038,7 +2038,7 @@ async fn test_move_tables_between_compaction_group() {
);

hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap();
let current_version = hummock_manager.get_current_version().await;
Expand Down Expand Up @@ -2197,7 +2197,7 @@ async fn test_partition_level() {
.unwrap());

hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap();
let current_version = hummock_manager.get_current_version().await;
Expand Down Expand Up @@ -2331,7 +2331,7 @@ async fn test_unregister_moved_table() {
.unwrap();

let new_group_id = hummock_manager
.split_compaction_group(2, &[100])
.split_compaction_group(2, &[100], 0)
.await
.unwrap();
assert_ne!(new_group_id, 2);
Expand Down
2 changes: 2 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1200,10 +1200,12 @@ impl MetaClient {
&self,
group_id: CompactionGroupId,
table_ids_to_new_group: &[StateTableId],
partition_vnode_count: u32,
) -> Result<CompactionGroupId> {
let req = SplitCompactionGroupRequest {
group_id,
table_ids: table_ids_to_new_group.to_vec(),
partition_vnode_count,
};
let resp = self.inner.split_compaction_group(req).await?;
Ok(resp.new_group_id)
Expand Down
Loading