Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compaction): support merge compaction group #18188

Merged
merged 21 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c70b72c
feat(compaction): support merge compaction group
Li0k Aug 22, 2024
e731896
feat(compaction): add assert check when handle merge_compaction_group
Li0k Aug 22, 2024
a0e8578
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 22, 2024
17e582a
chore(compaction): add ut
Li0k Aug 22, 2024
beb8a61
fix(storage): fix test
Li0k Aug 23, 2024
0adcb8d
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 23, 2024
793db2e
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 27, 2024
b3a4f7c
address comments
Li0k Aug 27, 2024
596ddbb
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 27, 2024
029a3cc
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 27, 2024
72e1d74
fix(storage): fix merge levels with conflict sub level type
Li0k Aug 28, 2024
0c1cdbd
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 28, 2024
8a22e91
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 29, 2024
e755671
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Sep 9, 2024
ffd9728
chore(storage): add ut
Li0k Sep 9, 2024
c7c6fa4
refactor(compaction): refactor merge check and ut
Li0k Sep 9, 2024
7be2d3d
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Sep 9, 2024
e3f02c7
typo
Li0k Sep 9, 2024
8df085a
add doc
Li0k Sep 10, 2024
e064064
address comments
Li0k Sep 10, 2024
d8e0152
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Sep 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,19 @@ message GroupTableChange {

message GroupDestroy {}

message GroupMerge {
uint64 left_group_id = 1;
uint64 right_group_id = 2;
}

message GroupDelta {
oneof delta_type {
IntraLevelDelta intra_level = 1;
GroupConstruct group_construct = 2;
GroupDestroy group_destroy = 3;
GroupMetaChange group_meta_change = 4 [deprecated = true];
GroupTableChange group_table_change = 5 [deprecated = true];
GroupMerge group_merge = 6;
}
}

Expand Down Expand Up @@ -744,6 +750,7 @@ message PinVersionResponse {
message SplitCompactionGroupRequest {
uint64 group_id = 1;
repeated uint32 table_ids = 2;
uint32 partition_vnode_count = 3;
}

message SplitCompactionGroupResponse {
Expand Down Expand Up @@ -839,6 +846,13 @@ message GetVersionByEpochResponse {
HummockVersion version = 1;
}

message MergeCompactionGroupRequest {
uint64 left_group_id = 1;
uint64 right_group_id = 2;
}

message MergeCompactionGroupResponse {}

service HummockManagerService {
rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse);
rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse);
Expand Down Expand Up @@ -880,6 +894,7 @@ service HummockManagerService {
rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse);
rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse);
rpc GetVersionByEpoch(GetVersionByEpochRequest) returns (GetVersionByEpochResponse);
rpc MergeCompactionGroup(MergeCompactionGroupRequest) returns (MergeCompactionGroupResponse);
}

message CompactionConfig {
Expand Down
15 changes: 14 additions & 1 deletion src/ctl/src/cmd_impl/hummock/compaction_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ pub async fn split_compaction_group(
context: &CtlContext,
group_id: CompactionGroupId,
table_ids_to_new_group: &[StateTableId],
partition_vnode_count: u32,
) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let new_group_id = meta_client
.split_compaction_group(group_id, table_ids_to_new_group)
.split_compaction_group(group_id, table_ids_to_new_group, partition_vnode_count)
.await?;
println!(
"Succeed: split compaction group {}. tables {:#?} are moved to new group {}.",
Expand Down Expand Up @@ -284,3 +285,15 @@ pub async fn cancel_compact_task(context: &CtlContext, task_id: u64) -> anyhow::

Ok(())
}

pub async fn merge_compaction_group(
context: &CtlContext,
left_group_id: CompactionGroupId,
right_group_id: CompactionGroupId,
) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
meta_client
.merge_compaction_group(left_group_id, right_group_id)
.await?;
Ok(())
}
25 changes: 23 additions & 2 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ enum HummockCommands {
compaction_group_id: u64,
#[clap(long, value_delimiter = ',')]
table_ids: Vec<u32>,
#[clap(long, default_value_t = 0)]
partition_vnode_count: u32,
},
/// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
PauseVersionCheckpoint,
Expand Down Expand Up @@ -340,6 +342,12 @@ enum HummockCommands {
#[clap(long)]
record_hybrid_fetch_threshold_ms: Option<u32>,
},
MergeCompactionGroup {
#[clap(long)]
left_group_id: u64,
#[clap(long)]
right_group_id: u64,
},
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -711,9 +719,15 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Hummock(HummockCommands::SplitCompactionGroup {
compaction_group_id,
table_ids,
partition_vnode_count,
}) => {
cmd_impl::hummock::split_compaction_group(context, compaction_group_id, &table_ids)
.await?;
cmd_impl::hummock::split_compaction_group(
context,
compaction_group_id,
&table_ids,
partition_vnode_count,
)
.await?;
}
Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
cmd_impl::hummock::pause_version_checkpoint(context).await?;
Expand Down Expand Up @@ -790,6 +804,13 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
)
.await?
}
Commands::Hummock(HummockCommands::MergeCompactionGroup {
left_group_id,
right_group_id,
}) => {
cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
.await?
}
Commands::Table(TableCommands::Scan {
mv_name,
data_dir,
Expand Down
13 changes: 12 additions & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let new_group_id = self
.hummock_manager
.split_compaction_group(req.group_id, &req.table_ids)
.split_compaction_group(req.group_id, &req.table_ids, req.partition_vnode_count)
.await?;
Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
}
Expand Down Expand Up @@ -716,6 +716,17 @@ impl HummockManagerService for HummockServiceImpl {
version: Some(version.to_protobuf()),
}))
}

async fn merge_compaction_group(
&self,
request: Request<MergeCompactionGroupRequest>,
) -> Result<Response<MergeCompactionGroupResponse>, Status> {
let req = request.into_inner();
self.hummock_manager
.merge_compaction_group(req.left_group_id, req.right_group_id)
.await?;
Ok(Response::new(MergeCompactionGroupResponse {}))
}
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ impl HummockManager {
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
{
for group_deltas in version_delta.group_deltas.values() {
let summary = summarize_group_deltas(group_deltas);
for (group_id, group_deltas) in &version_delta.group_deltas {
let summary = summarize_group_deltas(group_deltas, *group_id);
object_sizes.extend(
summary
.insert_table_infos
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ use crate::model::{
type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;

impl CompactionGroupManager {
pub(super) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
let default_config = match env.opts.compaction_config.as_ref() {
None => CompactionConfigBuilder::new().build(),
Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
};
Self::new_with_config(env, default_config).await
}

pub(super) async fn new_with_config(
pub(crate) async fn new_with_config(
env: &MetaSrvEnv,
default_config: CompactionConfig,
) -> Result<CompactionGroupManager> {
Expand Down Expand Up @@ -428,24 +428,6 @@ impl HummockManager {
results
}

/// Splits a compaction group into two. The new one will contain `table_ids`.
/// Returns the newly created compaction group id.
pub async fn split_compaction_group(
&self,
parent_group_id: CompactionGroupId,
table_ids: &[StateTableId],
) -> Result<CompactionGroupId> {
let result = self
.move_state_table_to_compaction_group(
parent_group_id,
table_ids,
self.env.opts.partition_vnode_count,
)
.await?;

Ok(result)
}

/// move some table to another compaction-group. Create a new compaction group if it does not
/// exist.
pub async fn move_state_table_to_compaction_group(
Expand Down Expand Up @@ -651,7 +633,7 @@ impl HummockManager {
infos
}

pub(super) async fn initial_compaction_group_config_after_load(
pub(crate) async fn initial_compaction_group_config_after_load(
&self,
versioning_guard: &Versioning,
compaction_group_manager: &mut CompactionGroupManager,
Expand All @@ -675,7 +657,7 @@ impl HummockManager {
/// 1. initialize default static compaction group.
/// 2. register new table to new compaction group.
/// 3. move existent table to new compaction group.
pub(super) struct CompactionGroupManager {
pub(crate) struct CompactionGroupManager {
compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
default_config: Arc<CompactionConfig>,
/// Tables that write limit is trigger for.
Expand Down Expand Up @@ -709,15 +691,15 @@ impl CompactionGroupManager {
}

/// Tries to get compaction group config for `compaction_group_id`.
pub(super) fn try_get_compaction_group_config(
pub(crate) fn try_get_compaction_group_config(
&self,
compaction_group_id: CompactionGroupId,
) -> Option<CompactionGroup> {
self.compaction_groups.get(&compaction_group_id).cloned()
}

/// Tries to get compaction group config for `compaction_group_id`.
pub(super) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
self.default_config.clone()
}
}
Expand Down Expand Up @@ -814,15 +796,15 @@ impl<'a> CompactionGroupTransaction<'a> {
}

/// Tries to get compaction group config for `compaction_group_id`.
pub(super) fn try_get_compaction_group_config(
pub(crate) fn try_get_compaction_group_config(
&self,
compaction_group_id: CompactionGroupId,
) -> Option<&CompactionGroup> {
self.get(&compaction_group_id)
}

/// Removes stale group configs.
fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
let stale_group = self
.tree_ref()
.keys()
Expand All @@ -837,7 +819,7 @@ impl<'a> CompactionGroupTransaction<'a> {
}
}

pub(super) fn update_compaction_config(
pub(crate) fn update_compaction_config(
&mut self,
compaction_group_ids: &[CompactionGroupId],
config_to_update: &[MutableConfig],
Expand Down
Loading
Loading