diff --git a/proto/hummock.proto b/proto/hummock.proto index e19faee10c43e..31c52dd8b85ff 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -715,6 +715,7 @@ message RiseCtlUpdateCompactionConfigRequest { uint32 tombstone_reclaim_ratio = 16; CompressionAlgorithm compression_algorithm = 17; uint32 max_l0_compact_level_count = 18; + uint32 partition_vnode_count = 19; } } repeated uint64 compaction_group_ids = 1; @@ -736,6 +737,7 @@ message PinVersionResponse { message SplitCompactionGroupRequest { uint64 group_id = 1; repeated uint32 table_ids = 2; + uint32 partition_vnode_count = 3; } message SplitCompactionGroupResponse { diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index d58aeb7bffe79..b5cb232ffc2e5 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -66,6 +66,7 @@ pub fn build_compaction_config_vec( tombstone_reclaim_ratio: Option, compress_algorithm: Option, max_l0_compact_level: Option, + partition_vnode_count: Option, ) -> Vec { let mut configs = vec![]; if let Some(c) = max_bytes_for_level_base { @@ -119,22 +120,24 @@ pub fn build_compaction_config_vec( if let Some(c) = max_l0_compact_level { configs.push(MutableConfig::MaxL0CompactLevelCount(c)) } - - configs + if let Some(c) = partition_vnode_count { + configs.push(MutableConfig::PartitionVnodeCount(c)) + } } pub async fn split_compaction_group( context: &CtlContext, group_id: CompactionGroupId, table_ids_to_new_group: &[StateTableId], + partition_vnode_count: u32, ) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; let new_group_id = meta_client - .split_compaction_group(group_id, table_ids_to_new_group) + .split_compaction_group(group_id, table_ids_to_new_group, partition_vnode_count) .await?; println!( - "Succeed: split compaction group {}. tables {:#?} are moved to new group {}.", - group_id, table_ids_to_new_group, new_group_id + "Succeed: split compaction group {}. tables {:#?} are moved to new group {} partition_vnode_count {}", + group_id, table_ids_to_new_group, new_group_id, partition_vnode_count ); Ok(()) } diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index efbd681f8cae5..6bb915c41651d 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -267,6 +267,8 @@ enum HummockCommands { compression_algorithm: Option, #[clap(long)] max_l0_compact_level: Option, + #[clap(long)] + partition_vnode_count: Option, }, /// Split given compaction group into two. Moves the given tables to the new group. SplitCompactionGroup { @@ -274,6 +276,8 @@ enum HummockCommands { compaction_group_id: u64, #[clap(long, value_delimiter = ',')] table_ids: Vec, + #[clap(long, short)] + partition_vnode_count: u32, }, /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object. PauseVersionCheckpoint, @@ -662,6 +666,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { compression_level, compression_algorithm, max_l0_compact_level, + partition_vnode_count, }) => { cmd_impl::hummock::update_compaction_config( context, @@ -692,6 +697,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { None }, max_l0_compact_level, + partition_vnode_count, ), ) .await? @@ -699,9 +705,15 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Hummock(HummockCommands::SplitCompactionGroup { compaction_group_id, table_ids, + partition_vnode_count, }) => { - cmd_impl::hummock::split_compaction_group(context, compaction_group_id, &table_ids) - .await?; + cmd_impl::hummock::split_compaction_group( + context, + compaction_group_id, + &table_ids, + partition_vnode_count, + ) + .await?; } Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => { cmd_impl::hummock::pause_version_checkpoint(context).await?; diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 3e5a26d2a7771..fea6acbc8d9ca 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -437,7 +437,7 @@ impl HummockManagerService for HummockServiceImpl { let req = request.into_inner(); let new_group_id = self .hummock_manager - .split_compaction_group(req.group_id, &req.table_ids) + .split_compaction_group(req.group_id, &req.table_ids, req.partition_vnode_count) .await?; Ok(Response::new(SplitCompactionGroupResponse { new_group_id })) } diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 6fc637148e17b..bb1c6143a166b 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -430,6 +430,7 @@ impl HummockManager { &self, parent_group_id: CompactionGroupId, table_ids: &[StateTableId], + partition_vnode_count: u32, ) -> Result { let result = self .move_state_table_to_compaction_group( @@ -763,6 +764,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi MutableConfig::MaxL0CompactLevelCount(c) => { target.max_l0_compact_level_count = *c; } + MutableConfig::PartitionVnodeCount(c) => { + target.split_weight_by_vnode = *c; + } } } } diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 5e677248903de..03e6756c20dd1 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -1399,18 +1399,18 @@ async fn test_split_compaction_group_on_demand_basic() { assert_eq!(original_groups, vec![2, 3]); let err = hummock_manager - .split_compaction_group(100, &[0]) + .split_compaction_group(100, &[0], 0) .await .unwrap_err(); assert_eq!("compaction group error: invalid group 100", err.to_string()); hummock_manager - .split_compaction_group(2, &[]) + .split_compaction_group(2, &[], 0) .await .unwrap(); let err = hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap_err(); assert_eq!( @@ -1468,7 +1468,7 @@ async fn test_split_compaction_group_on_demand_basic() { .unwrap(); let err = hummock_manager - .split_compaction_group(2, &[100, 101]) + .split_compaction_group(2, &[100, 101], 0) .await .unwrap_err(); assert_eq!( @@ -1484,7 +1484,7 @@ async fn test_split_compaction_group_on_demand_basic() { .unwrap(); hummock_manager - .split_compaction_group(2, &[100, 101]) + .split_compaction_group(2, &[100, 101], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -1550,7 +1550,7 @@ async fn test_split_compaction_group_on_demand_non_trivial() { .unwrap(); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); @@ -1674,7 +1674,7 @@ async fn test_split_compaction_group_trivial_expired() { .unwrap(); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); let mut selector: Box = @@ -1849,7 +1849,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { ); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -1954,7 +1954,7 @@ async fn test_compaction_task_expiration_due_to_split_group() { let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 2); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); @@ -2038,7 +2038,7 @@ async fn test_move_tables_between_compaction_group() { ); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -2197,7 +2197,7 @@ async fn test_partition_level() { .unwrap()); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -2331,7 +2331,7 @@ async fn test_unregister_moved_table() { .unwrap(); let new_group_id = hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); assert_ne!(new_group_id, 2); diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index db24711ba2377..8fd928a497667 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1200,10 +1200,12 @@ impl MetaClient { &self, group_id: CompactionGroupId, table_ids_to_new_group: &[StateTableId], + partition_vnode_count: u32, ) -> Result { let req = SplitCompactionGroupRequest { group_id, table_ids: table_ids_to_new_group.to_vec(), + partition_vnode_count, }; let resp = self.inner.split_compaction_group(req).await?; Ok(resp.new_group_id)