From c70b72c01d1ff8cd6779e2425fb1a6aee225a2a5 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 22 Aug 2024 16:33:01 +0800 Subject: [PATCH 01/11] feat(compaction): support merge compaction group --- proto/hummock.proto | 14 + .../src/cmd_impl/hummock/compaction_group.rs | 12 + src/ctl/src/lib.rs | 13 + src/meta/service/src/hummock_service.rs | 11 + src/meta/src/hummock/manager/checkpoint.rs | 4 +- .../compaction_group_manager.rs | 36 +- .../compaction/compaction_group_schedule.rs | 315 ++++++++++++++++++ .../{compaction.rs => compaction/mod.rs} | 80 +---- src/meta/src/hummock/manager/mod.rs | 3 +- src/meta/src/hummock/manager/timer_task.rs | 29 +- src/prost/build.rs | 1 + src/rpc_client/src/meta_client.rs | 14 + .../compaction_group/hummock_version_ext.rs | 88 +++-- .../hummock_sdk/src/compaction_group/mod.rs | 109 ++++++ src/storage/hummock_sdk/src/version.rs | 48 +-- 15 files changed, 598 insertions(+), 179 deletions(-) rename src/meta/src/hummock/manager/{ => compaction}/compaction_group_manager.rs (97%) create mode 100644 src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs rename src/meta/src/hummock/manager/{compaction.rs => compaction/mod.rs} (95%) diff --git a/proto/hummock.proto b/proto/hummock.proto index 5d66a2b7bb79b..6877f59de1380 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -99,6 +99,11 @@ message GroupTableChange { message GroupDestroy {} +message GroupMerge { + uint64 left_group_id = 1; + uint64 right_group_id = 2; +} + message GroupDelta { oneof delta_type { IntraLevelDelta intra_level = 1; @@ -106,6 +111,7 @@ message GroupDelta { GroupDestroy group_destroy = 3; GroupMetaChange group_meta_change = 4 [deprecated = true]; GroupTableChange group_table_change = 5 [deprecated = true]; + GroupMerge group_merge = 6; } } @@ -832,6 +838,13 @@ message GetVersionByEpochResponse { HummockVersion version = 1; } +message MergeCompactionGroupRequest { + uint64 left_group_id = 1; + uint64 right_group_id = 2; +} + +message MergeCompactionGroupResponse {} + service HummockManagerService { rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse); rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse); @@ -873,6 +886,7 @@ service HummockManagerService { rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse); rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse); rpc GetVersionByEpoch(GetVersionByEpochRequest) returns (GetVersionByEpochResponse); + rpc MergeCompactionGroup(MergeCompactionGroupRequest) returns (MergeCompactionGroupResponse); } message CompactionConfig { diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index d58aeb7bffe79..081d07cdb5b8b 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -280,3 +280,15 @@ pub async fn cancel_compact_task(context: &CtlContext, task_id: u64) -> anyhow:: Ok(()) } + +pub async fn merge_compaction_group( + context: &CtlContext, + left_group_id: CompactionGroupId, + right_group_id: CompactionGroupId, +) -> anyhow::Result<()> { + let meta_client = context.meta_client().await?; + meta_client + .merge_compaction_group(left_group_id, right_group_id) + .await?; + Ok(()) +} diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 5cc0765e16c81..f17ba8b82cb67 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -338,6 +338,12 @@ enum HummockCommands { #[clap(long)] record_hybrid_fetch_threshold_ms: Option, }, + MergeCompactionGroup { + #[clap(long)] + left_group_id: u64, + #[clap(long)] + right_group_id: u64, + }, } #[derive(Subcommand)] @@ -783,6 +789,13 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { ) .await? } + Commands::Hummock(HummockCommands::MergeCompactionGroup { + left_group_id, + right_group_id, + }) => { + cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id) + .await? + } Commands::Table(TableCommands::Scan { mv_name, data_dir, diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 21e203d8440bd..83c122bab5bd0 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -716,6 +716,17 @@ impl HummockManagerService for HummockServiceImpl { version: Some(version.to_protobuf()), })) } + + async fn merge_compaction_group( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + self.hummock_manager + .merge_compaction_group(req.left_group_id, req.right_group_id) + .await?; + Ok(Response::new(MergeCompactionGroupResponse {})) + } } #[cfg(test)] diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index bc3701a6b9d82..f678014d440c8 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -156,8 +156,8 @@ impl HummockManager { .hummock_version_deltas .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id))) { - for group_deltas in version_delta.group_deltas.values() { - let summary = summarize_group_deltas(group_deltas); + for (group_id, group_deltas) in &version_delta.group_deltas { + let summary = summarize_group_deltas(group_deltas, *group_id); object_sizes.extend( summary .insert_table_infos diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs similarity index 97% rename from src/meta/src/hummock/manager/compaction_group_manager.rs rename to src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index a9ab7ea24e63b..84960fd2a30d1 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -54,7 +54,7 @@ use crate::model::{ type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>; impl CompactionGroupManager { - pub(super) async fn new(env: &MetaSrvEnv) -> Result { + pub(crate) async fn new(env: &MetaSrvEnv) -> Result { let default_config = match env.opts.compaction_config.as_ref() { None => CompactionConfigBuilder::new().build(), Some(opt) => CompactionConfigBuilder::with_opt(opt).build(), @@ -62,7 +62,7 @@ impl CompactionGroupManager { Self::new_with_config(env, default_config).await } - pub(super) async fn new_with_config( + pub(crate) async fn new_with_config( env: &MetaSrvEnv, default_config: CompactionConfig, ) -> Result { @@ -428,24 +428,6 @@ impl HummockManager { results } - /// Splits a compaction group into two. The new one will contain `table_ids`. - /// Returns the newly created compaction group id. - pub async fn split_compaction_group( - &self, - parent_group_id: CompactionGroupId, - table_ids: &[StateTableId], - ) -> Result { - let result = self - .move_state_table_to_compaction_group( - parent_group_id, - table_ids, - self.env.opts.partition_vnode_count, - ) - .await?; - - Ok(result) - } - /// move some table to another compaction-group. Create a new compaction group if it does not /// exist. pub async fn move_state_table_to_compaction_group( @@ -661,7 +643,7 @@ impl HummockManager { infos } - pub(super) async fn initial_compaction_group_config_after_load( + pub(crate) async fn initial_compaction_group_config_after_load( &self, versioning_guard: &Versioning, compaction_group_manager: &mut CompactionGroupManager, @@ -685,7 +667,7 @@ impl HummockManager { /// 1. initialize default static compaction group. /// 2. register new table to new compaction group. /// 3. move existent table to new compaction group. -pub(super) struct CompactionGroupManager { +pub(crate) struct CompactionGroupManager { compaction_groups: BTreeMap, default_config: Arc, /// Tables that write limit is trigger for. @@ -719,7 +701,7 @@ impl CompactionGroupManager { } /// Tries to get compaction group config for `compaction_group_id`. - pub(super) fn try_get_compaction_group_config( + pub(crate) fn try_get_compaction_group_config( &self, compaction_group_id: CompactionGroupId, ) -> Option { @@ -727,7 +709,7 @@ impl CompactionGroupManager { } /// Tries to get compaction group config for `compaction_group_id`. - pub(super) fn default_compaction_config(&self) -> Arc { + pub(crate) fn default_compaction_config(&self) -> Arc { self.default_config.clone() } } @@ -821,7 +803,7 @@ impl<'a> CompactionGroupTransaction<'a> { } /// Tries to get compaction group config for `compaction_group_id`. - pub(super) fn try_get_compaction_group_config( + pub(crate) fn try_get_compaction_group_config( &self, compaction_group_id: CompactionGroupId, ) -> Option<&CompactionGroup> { @@ -829,7 +811,7 @@ impl<'a> CompactionGroupTransaction<'a> { } /// Removes stale group configs. - fn purge(&mut self, existing_groups: HashSet) { + pub fn purge(&mut self, existing_groups: HashSet) { let stale_group = self .tree_ref() .keys() @@ -844,7 +826,7 @@ impl<'a> CompactionGroupTransaction<'a> { } } - pub(super) fn update_compaction_config( + pub(crate) fn update_compaction_config( &mut self, compaction_group_ids: &[CompactionGroupId], config_to_update: &[MutableConfig], diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs new file mode 100644 index 0000000000000..45780da961c43 --- /dev/null +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -0,0 +1,315 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet, VecDeque}; +use std::ops::{Deref, DerefMut}; + +use itertools::Itertools; +use risingwave_common::catalog::TableId; +use risingwave_hummock_sdk::compact_task::ReportTask; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ + get_compaction_group_ids, TableGroupInfo, +}; +use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; +use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas}; +use risingwave_hummock_sdk::CompactionGroupId; +use risingwave_pb::hummock::compact_task::TaskStatus; +use risingwave_pb::hummock::{PbGroupMerge, PbStateTableInfoDelta}; +use thiserror_ext::AsReport; + +use crate::hummock::error::{Error, Result}; +use crate::hummock::manager::transaction::HummockVersionTransaction; +use crate::hummock::manager::{commit_multi_var, HummockManager}; +use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat; + +impl HummockManager { + /// Splits a compaction group into two. The new one will contain `table_ids`. + /// Returns the newly created compaction group id. + pub async fn split_compaction_group( + &self, + parent_group_id: CompactionGroupId, + table_ids: &[StateTableId], + ) -> Result { + let result = self + .move_state_table_to_compaction_group(parent_group_id, table_ids, 0) + .await?; + + Ok(result) + } + + pub async fn merge_compaction_group( + &self, + group_1: CompactionGroupId, + group_2: CompactionGroupId, + ) -> Result<()> { + let compaction_guard = self.compaction.write().await; + let mut versioning_guard = self.versioning.write().await; + let versioning = versioning_guard.deref_mut(); + // Validate parameters. + if !versioning.current_version.levels.contains_key(&group_1) { + return Err(Error::CompactionGroup(format!("invalid group {}", group_1))); + } + + if !versioning.current_version.levels.contains_key(&group_2) { + return Err(Error::CompactionGroup(format!("invalid group {}", group_2))); + } + + let state_table_info = versioning.current_version.state_table_info.clone(); + let member_table_ids_1 = state_table_info + .compaction_group_member_table_ids(group_1) + .iter() + .cloned() + .collect_vec(); + + let member_table_ids_2 = state_table_info + .compaction_group_member_table_ids(group_2) + .iter() + .cloned() + .collect_vec(); + + let mut combine_1 = member_table_ids_1.clone(); + combine_1.extend_from_slice(&member_table_ids_2); + + let mut combine_2 = member_table_ids_2; + combine_2.extend_from_slice(&member_table_ids_1); + + if !combine_1.is_sorted() && !combine_2.is_sorted() { + return Err(Error::CompactionGroup(format!( + "invalid merge group_1 {} group_2 {}", + group_1, group_2 + ))); + } + + let mut left_group_id = group_1; + let mut right_group_id = group_2; + let combine_member_table_ids = if combine_1.is_sorted() { + combine_1 + } else { + std::mem::swap(&mut left_group_id, &mut right_group_id); + combine_2 + }; + + let mut version = HummockVersionTransaction::new( + &mut versioning.current_version, + &mut versioning.hummock_version_deltas, + self.env.notification_manager(), + &self.metrics, + ); + let mut new_version_delta = version.new_delta(); + + let target_compaction_group_id = { + let mut config = self + .compaction_group_manager + .read() + .await + .try_get_compaction_group_config(group_1) + .unwrap() + .compaction_config() + .deref() + .clone(); + + { + // update config + config.split_weight_by_vnode = 0; + } + + // merge right_group_id to left_group_id and remove right_group_id + new_version_delta.group_deltas.insert( + left_group_id, + GroupDeltas { + group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge { + left_group_id, + right_group_id, + })], + }, + ); + left_group_id + }; + + // TODO: remove compaciton group_id from state_table_info + // rewrite compaction_group_id for all tables + new_version_delta.with_latest_version(|version, new_version_delta| { + for table_id in combine_member_table_ids { + let table_id = TableId::new(table_id.table_id()); + let info = version + .state_table_info + .info() + .get(&table_id) + .expect("have check exist previously"); + assert!(new_version_delta + .state_table_info_delta + .insert( + table_id, + PbStateTableInfoDelta { + committed_epoch: info.committed_epoch, + safe_epoch: info.safe_epoch, + compaction_group_id: target_compaction_group_id, + } + ) + .is_none()); + } + }); + + { + let mut compaction_group_manager = self.compaction_group_manager.write().await; + let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn(); + + // for metrics reclaim + { + let right_group_max_level = new_version_delta + .latest_version() + .get_compaction_group_levels(right_group_id) + .levels + .len(); + + remove_compaction_group_in_sst_stat( + &self.metrics, + right_group_id, + right_group_max_level, + ); + } + + new_version_delta.pre_apply(); + + // purge right_group_id + compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids( + version.latest_version(), + ))); + commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?; + } + + // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot. + versioning.mark_next_time_travel_version_snapshot(); + + // cancel tasks + let mut canceled_tasks = vec![]; + // after merge, all tasks in right_group_id should be canceled + // otherwise, pending size calculation by level handler will make some mistake + for task_assignment in compaction_guard.compact_task_assignment.values() { + if let Some(task) = task_assignment.compact_task.as_ref() { + let need_cancel = task.compaction_group_id == right_group_id; + if need_cancel { + canceled_tasks.push(ReportTask { + task_id: task.task_id, + task_status: TaskStatus::ManualCanceled, + table_stats_change: HashMap::default(), + sorted_output_ssts: vec![], + }); + } + } + } + + drop(versioning_guard); + drop(compaction_guard); + self.report_compact_tasks(canceled_tasks).await?; + + Ok(()) + } + + pub async fn try_split_compaction_group( + &self, + table_write_throughput: &HashMap>, + checkpoint_secs: u64, + group: &TableGroupInfo, + created_tables: &HashSet, + ) { + // split high throughput table to dedicated compaction group + for (table_id, table_size) in &group.table_statistic { + self.try_move_table_to_dedicated_cg( + table_write_throughput, + table_id, + table_size, + !created_tables.contains(table_id), + checkpoint_secs, + group.group_id, + group.group_size, + ) + .await; + } + } + + pub async fn try_move_table_to_dedicated_cg( + &self, + table_write_throughput: &HashMap>, + table_id: &u32, + table_size: &u64, + is_creating_table: bool, + checkpoint_secs: u64, + parent_group_id: u64, + group_size: u64, + ) { + let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into(); + let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into(); + let partition_vnode_count = self.env.opts.partition_vnode_count; + let window_size = + self.env.opts.table_info_statistic_history_times / (checkpoint_secs as usize); + + let mut is_high_write_throughput = false; + let mut is_low_write_throughput = true; + if let Some(history) = table_write_throughput.get(table_id) { + if history.len() >= window_size { + is_high_write_throughput = history.iter().all(|throughput| { + *throughput / checkpoint_secs > self.env.opts.table_write_throughput_threshold + }); + is_low_write_throughput = history.iter().any(|throughput| { + *throughput / checkpoint_secs < self.env.opts.min_table_split_write_throughput + }); + } + } + + let state_table_size = *table_size; + + // 1. Avoid splitting a creating table + // 2. Avoid splitting a is_low_write_throughput creating table + // 3. Avoid splitting a non-high throughput medium-sized table + if is_creating_table + || (is_low_write_throughput) + || (state_table_size < self.env.opts.min_table_split_size && !is_high_write_throughput) + { + return; + } + + // do not split a large table and a small table because it would increase IOPS + // of small table. + if parent_group_id != default_group_id && parent_group_id != mv_group_id { + let rest_group_size = group_size - state_table_size; + if rest_group_size < state_table_size + && rest_group_size < self.env.opts.min_table_split_size + { + return; + } + } + + let ret = self + .move_state_table_to_compaction_group( + parent_group_id, + &[*table_id], + partition_vnode_count, + ) + .await; + match ret { + Ok(new_group_id) => { + tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", table_id, parent_group_id, new_group_id, partition_vnode_count); + } + Err(e) => { + tracing::info!( + error = %e.as_report(), + "failed to move state table [{}] from group-{}", + table_id, + parent_group_id, + ) + } + } + } +} diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction/mod.rs similarity index 95% rename from src/meta/src/hummock/manager/compaction.rs rename to src/meta/src/hummock/manager/compaction/mod.rs index 8655df1367742..6db330e1d4d9c 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -27,7 +27,7 @@ // limitations under the License. use std::cmp::min; -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::{Arc, LazyLock}; use std::time::{Instant, SystemTime}; @@ -43,7 +43,6 @@ use rand::thread_rng; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask}; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt; -use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::level::{InputLevel, Level, Levels}; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -96,6 +95,9 @@ use crate::hummock::{commit_multi_var, start_measure_real_process_timer, Hummock use crate::manager::{MetadataManager, META_NODE_ID}; use crate::model::BTreeMapTransaction; +pub mod compaction_group_manager; +pub mod compaction_group_schedule; + const MAX_SKIP_TIMES: usize = 8; const MAX_REPORT_COUNT: usize = 16; @@ -1567,80 +1569,6 @@ impl HummockManager { .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); } } - - pub async fn try_move_table_to_dedicated_cg( - &self, - table_write_throughput: &HashMap>, - table_id: &u32, - table_size: &u64, - is_creating_table: bool, - checkpoint_secs: u64, - parent_group_id: u64, - group_size: u64, - ) { - let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into(); - let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into(); - let partition_vnode_count = self.env.opts.partition_vnode_count; - let window_size = - self.env.opts.table_info_statistic_history_times / (checkpoint_secs as usize); - - let mut is_high_write_throughput = false; - let mut is_low_write_throughput = true; - if let Some(history) = table_write_throughput.get(table_id) { - if history.len() >= window_size { - is_high_write_throughput = history.iter().all(|throughput| { - *throughput / checkpoint_secs > self.env.opts.table_write_throughput_threshold - }); - is_low_write_throughput = history.iter().any(|throughput| { - *throughput / checkpoint_secs < self.env.opts.min_table_split_write_throughput - }); - } - } - - let state_table_size = *table_size; - - // 1. Avoid splitting a creating table - // 2. Avoid splitting a is_low_write_throughput creating table - // 3. Avoid splitting a non-high throughput medium-sized table - if is_creating_table - || (is_low_write_throughput) - || (state_table_size < self.env.opts.min_table_split_size && !is_high_write_throughput) - { - return; - } - - // do not split a large table and a small table because it would increase IOPS - // of small table. - if parent_group_id != default_group_id && parent_group_id != mv_group_id { - let rest_group_size = group_size - state_table_size; - if rest_group_size < state_table_size - && rest_group_size < self.env.opts.min_table_split_size - { - return; - } - } - - let ret = self - .move_state_table_to_compaction_group( - parent_group_id, - &[*table_id], - partition_vnode_count, - ) - .await; - match ret { - Ok(new_group_id) => { - tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", table_id, parent_group_id, new_group_id, partition_vnode_count); - } - Err(e) => { - tracing::info!( - error = %e.as_report(), - "failed to move state table [{}] from group-{}", - table_id, - parent_group_id, - ) - } - } - } } #[cfg(any(test, feature = "test"))] diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 8c06acbd580f9..88bac3f115b7b 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -50,7 +50,6 @@ use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager}; use crate::model::{ClusterId, MetadataModel, MetadataModelError}; use crate::rpc::metrics::MetaMetrics; -mod compaction_group_manager; mod context; mod gc; mod tests; @@ -59,7 +58,7 @@ pub use context::HummockVersionSafePoint; use versioning::*; pub(crate) mod checkpoint; mod commit_epoch; -mod compaction; +pub mod compaction; pub mod sequence; pub mod time_travel; mod timer_task; diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index ec0f77ac88a8a..94537e9c33e1f 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -43,7 +43,7 @@ impl HummockManager { const COMPACTION_HEARTBEAT_PERIOD_SEC: u64 = 1; pub enum HummockTimerEvent { - GroupSplit, + GroupSchedule, CheckDeadTask, Report, CompactionHeartBeatExpiredCheck, @@ -158,7 +158,7 @@ impl HummockManager { .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); let split_group_trigger = IntervalStream::new(split_group_trigger_interval) - .map(|_| HummockTimerEvent::GroupSplit); + .map(|_| HummockTimerEvent::GroupSchedule); triggers.push(Box::pin(split_group_trigger)); } @@ -189,12 +189,12 @@ impl HummockManager { hummock_manager.check_dead_task().await; } - HummockTimerEvent::GroupSplit => { + HummockTimerEvent::GroupSchedule => { if hummock_manager.env.opts.compaction_deterministic_test { continue; } - hummock_manager.on_handle_check_split_multi_group().await; + hummock_manager.on_handle_schedule_group().await; } HummockTimerEvent::Report => { @@ -443,7 +443,7 @@ impl HummockManager { /// throughput keep larger than `table_write_throughput_threshold` for a long time. /// * For state-table whose throughput less than `min_table_split_write_throughput`, do not /// increase it size of base-level. - async fn on_handle_check_split_multi_group(&self) { + async fn on_handle_schedule_group(&self) { let params = self.env.system_params_reader().await; let barrier_interval_ms = params.barrier_interval_ms() as u64; let checkpoint_secs = std::cmp::max( @@ -469,18 +469,13 @@ impl HummockManager { continue; } - for (table_id, table_size) in &group.table_statistic { - self.try_move_table_to_dedicated_cg( - &table_write_throughput, - table_id, - table_size, - !created_tables.contains(table_id), - checkpoint_secs, - group.group_id, - group.group_size, - ) - .await; - } + self.try_split_compaction_group( + &table_write_throughput, + checkpoint_secs, + group, + &created_tables, + ) + .await; } } diff --git a/src/prost/build.rs b/src/prost/build.rs index 0682a63a02edb..635d686555174 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -178,6 +178,7 @@ fn main() -> Result<(), Box> { .type_attribute("hummock.GroupDestroy", "#[derive(Eq)]") .type_attribute("hummock.GroupMetaChange", "#[derive(Eq)]") .type_attribute("hummock.GroupTableChange", "#[derive(Eq)]") + .type_attribute("hummock.GroupMerge", "#[derive(Eq)]") .type_attribute("hummock.GroupDelta", "#[derive(Eq)]") .type_attribute("hummock.LevelHandler.RunningCompactTask", "#[derive(Eq)]") .type_attribute("hummock.LevelHandler", "#[derive(Eq)]") diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 5f69a3f779647..7a29cfab01435 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1436,6 +1436,19 @@ impl MetaClient { let resp = self.inner.get_version_by_epoch(req).await?; Ok(resp.version.unwrap()) } + + pub async fn merge_compaction_group( + &self, + left_group_id: CompactionGroupId, + right_group_id: CompactionGroupId, + ) -> Result<()> { + let req = MergeCompactionGroupRequest { + left_group_id, + right_group_id, + }; + self.inner.merge_compaction_group(req).await?; + Ok(()) + } } #[async_trait] @@ -2110,6 +2123,7 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse} ,{ hummock_client, list_change_log_epochs, ListChangeLogEpochsRequest, ListChangeLogEpochsResponse } ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse } + ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse } ,{ user_client, create_user, CreateUserRequest, CreateUserResponse } ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse } ,{ user_client, drop_user, DropUserRequest, DropUserResponse } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 13a0bcc08adf5..c8cb30a0e1e84 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -22,12 +22,13 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; use risingwave_pb::hummock::{ - CompactionConfig, CompatibilityVersion, GroupConstruct, GroupDestroy, GroupMetaChange, + CompactionConfig, CompatibilityVersion, GroupConstruct, GroupMerge, GroupMetaChange, GroupTableChange, PbLevelType, }; use tracing::warn; -use super::StateTableId; +use super::group_split::get_sub_level_insert_hint; +use super::{group_split, StateTableId}; use crate::change_log::TableChangeLog; use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; @@ -47,13 +48,17 @@ pub struct GroupDeltasSummary { pub insert_sub_level_id: u64, pub insert_table_infos: Vec, pub group_construct: Option, - pub group_destroy: Option, + pub group_destroy: Option, pub group_meta_changes: Vec, pub group_table_change: Option, pub new_vnode_partition_count: u32, + pub group_merge: Option, } -pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary { +pub fn summarize_group_deltas( + group_deltas: &GroupDeltas, + default_group_id: CompactionGroupId, +) -> GroupDeltasSummary { let mut delete_sst_levels = Vec::with_capacity(group_deltas.group_deltas.len()); let mut delete_sst_ids_set = HashSet::new(); let mut insert_sst_level_id = u32::MAX; @@ -64,6 +69,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary let mut group_meta_changes = vec![]; let mut group_table_change = None; let mut new_vnode_partition_count = 0; + let mut group_merge = None; for group_delta in &group_deltas.group_deltas { match group_delta { @@ -83,9 +89,9 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary assert!(group_construct.is_none()); group_construct = Some(construct_delta.clone()); } - GroupDelta::GroupDestroy(destroy_delta) => { + GroupDelta::GroupDestroy(_) => { assert!(group_destroy.is_none()); - group_destroy = Some(*destroy_delta); + group_destroy = Some(default_group_id); } GroupDelta::GroupMetaChange(meta_delta) => { group_meta_changes.push(meta_delta.clone()); @@ -93,6 +99,11 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary GroupDelta::GroupTableChange(meta_delta) => { group_table_change = Some(meta_delta.clone()); } + GroupDelta::GroupMerge(merge_delta) => { + assert!(group_merge.is_none()); + group_merge = Some(*merge_delta); + group_destroy = Some(merge_delta.right_group_id); + } } } @@ -110,6 +121,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary group_meta_changes, group_table_change, new_vnode_partition_count, + group_merge, } } @@ -386,23 +398,6 @@ impl HummockVersion { { for sub_level in &mut l0.sub_levels { let target_l0 = &mut cur_levels.l0; - // When `insert_hint` is `Ok(idx)`, it means that the sub level `idx` in `target_l0` - // will extend these SSTs. When `insert_hint` is `Err(idx)`, it - // means that we will add a new sub level `idx` into `target_l0`. - let mut insert_hint = Err(target_l0.sub_levels.len()); - for (idx, other) in target_l0.sub_levels.iter_mut().enumerate() { - match other.sub_level_id.cmp(&sub_level.sub_level_id) { - Ordering::Less => {} - Ordering::Equal => { - insert_hint = Ok(idx); - break; - } - Ordering::Greater => { - insert_hint = Err(idx); - break; - } - } - } // Remove SST from sub level may result in empty sub level. It will be purged // whenever another compaction task is finished. let insert_table_infos = @@ -419,7 +414,7 @@ impl HummockVersion { if insert_table_infos.is_empty() { continue; } - match insert_hint { + match get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) { Ok(idx) => { add_ssts_to_sub_level(target_l0, idx, insert_table_infos); } @@ -570,7 +565,7 @@ impl HummockVersion { // apply to `levels`, which is different compaction groups for (compaction_group_id, group_deltas) in &version_delta.group_deltas { - let summary = summarize_group_deltas(group_deltas); + let summary = summarize_group_deltas(group_deltas, *compaction_group_id); if let Some(group_construct) = &summary.group_construct { let mut new_levels = build_initial_compaction_group_levels( *compaction_group_id, @@ -635,14 +630,19 @@ impl HummockVersion { .expect("compaction group should exist") .member_table_ids .append(&mut moving_tables); + } else if let Some(group_merge) = &summary.group_merge { + tracing::info!( + "group_merge left {:?} right {:?}", + group_merge.left_group_id, + group_merge.right_group_id + ); + self.merge_compaction_group(group_merge.left_group_id, group_merge.right_group_id) } - let has_destroy = summary.group_destroy.is_some(); let visible_table_committed_epoch = self.visible_table_committed_epoch(); - let levels = self - .levels - .get_mut(compaction_group_id) - .expect("compaction group should exist"); - + let group_destroy = summary.group_destroy; + let levels = self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { + panic!("compaction group {} does not exist", compaction_group_id) + }); #[expect(deprecated)] // for backward-compatibility of previous hummock version delta for group_meta_delta in &summary.group_meta_changes { levels @@ -669,7 +669,8 @@ impl HummockVersion { } = summary; assert!( - delete_sst_levels.is_empty() && delete_sst_ids_set.is_empty() || has_destroy, + delete_sst_levels.is_empty() && delete_sst_ids_set.is_empty() + || group_destroy.is_some(), "no sst should be deleted when committing an epoch" ); for group_delta in &group_deltas.group_deltas { @@ -703,8 +704,8 @@ impl HummockVersion { .compaction_group_member_table_ids(*compaction_group_id), ); } - if has_destroy { - self.levels.remove(compaction_group_id); + if let Some(destroy_group_id) = &group_destroy { + self.levels.remove(destroy_group_id); } } self.id = version_delta.id; @@ -835,6 +836,25 @@ impl HummockVersion { } ret } + + pub fn merge_compaction_group( + &mut self, + left_group_id: CompactionGroupId, + right_group_id: CompactionGroupId, + ) { + let total_cg = self.levels.keys().cloned().collect::>(); + let [left_levels, right_levels] = self + .levels + .get_many_mut([&left_group_id, &right_group_id]) + .unwrap_or_else(|| { + panic!( + "compaction group should exist left {} right {} all {:?}", + left_group_id, right_group_id, total_cg + ) + }); + + group_split::merge_levels(left_levels, right_levels); + } } #[easy_ext::ext(HummockLevelsExt)] diff --git a/src/storage/hummock_sdk/src/compaction_group/mod.rs b/src/storage/hummock_sdk/src/compaction_group/mod.rs index 973cc3e3c6140..fae8c60c5c8fb 100644 --- a/src/storage/hummock_sdk/src/compaction_group/mod.rs +++ b/src/storage/hummock_sdk/src/compaction_group/mod.rs @@ -43,3 +43,112 @@ impl From for CompactionGroupId { cg as CompactionGroupId } } + +pub mod group_split { + use std::cmp::Ordering; + + use super::hummock_version_ext::{add_ssts_to_sub_level, insert_new_sub_level}; + use crate::can_concat; + use crate::level::{Level, Levels}; + + pub fn merge_levels(left_levels: &mut Levels, right_levels: &mut Levels) { + let right_l0 = &right_levels.l0; + + for right_sub_level in &right_l0.sub_levels { + let sub_level = right_sub_level.clone(); + let insert_hint = get_sub_level_insert_hint(&left_levels.l0.sub_levels, &sub_level); + + println!( + "sub_level: {:?} type {:?} insert_hint {:?}", + sub_level.sub_level_id, sub_level.level_type, insert_hint + ); + + match insert_hint { + Ok(insert_hint) => { + add_ssts_to_sub_level( + &mut left_levels.l0, + insert_hint, + sub_level.table_infos.clone(), + ); + } + Err(insert_hint) => { + insert_new_sub_level( + &mut left_levels.l0, + sub_level.sub_level_id, + sub_level.level_type, + sub_level.table_infos.clone(), + Some(insert_hint), + ); + } + } + } + + left_levels + .l0 + .sub_levels + .sort_by_key(|sub_level| sub_level.sub_level_id); + + // Reinitialise `vnode_partition_count`` to avoid misaligned hierarchies + // caused by the merge of different compaction groups.(picker might reject the different `vnode_partition_count` sub_level to compact) + left_levels + .l0 + .sub_levels + .iter_mut() + .for_each(|sub_level| sub_level.vnode_partition_count = 0); + + for (idx, level) in right_levels.levels.iter_mut().enumerate() { + if level.table_infos.is_empty() { + continue; + } + + let insert_table_infos = level.table_infos.clone(); + left_levels.levels[idx].total_file_size += insert_table_infos + .iter() + // .map(|sst| sst.estimated_sst_size) + .map(|sst| sst.file_size) + .sum::(); + left_levels.levels[idx].uncompressed_file_size += insert_table_infos + .iter() + .map(|sst| sst.uncompressed_file_size) + .sum::(); + + left_levels.levels[idx] + .table_infos + .extend(insert_table_infos); + left_levels.levels[idx] + .table_infos + .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range)); + assert!( + can_concat(&left_levels.levels[idx].table_infos), + "{}", + format!( + "left_levels.levels[{}].table_infos: {:?} level_idx {:?}", + idx, left_levels.levels[idx].table_infos, left_levels.levels[idx].level_idx + ) + ); + level.table_infos.clear(); + } + } + + // When `insert_hint` is `Ok(idx)`, it means that the sub level `idx` in `target_l0` + // will extend these SSTs. When `insert_hint` is `Err(idx)`, it + // means that we will add a new sub level `idx` into `target_l0`. + pub fn get_sub_level_insert_hint( + target_levels: &Vec, + sub_level: &Level, + ) -> Result { + for (idx, other) in target_levels.iter().enumerate() { + match other.sub_level_id.cmp(&sub_level.sub_level_id) { + Ordering::Less => {} + Ordering::Equal => { + return Ok(idx); + } + Ordering::Greater => { + return Err(idx); + } + } + } + + Err(target_levels.len()) + } +} diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index e418250f0b6bf..1c8cfd1e310b4 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -24,9 +24,9 @@ use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_pb::hummock::group_delta::PbDeltaType; use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas; use risingwave_pb::hummock::{ - CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMetaChange, - PbGroupTableChange, PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, - PbStateTableInfo, StateTableInfo, StateTableInfoDelta, + CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMerge, + PbGroupMetaChange, PbGroupTableChange, PbHummockVersion, PbHummockVersionDelta, + PbIntraLevelDelta, PbStateTableInfo, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; @@ -501,12 +501,10 @@ impl HummockVersionDelta { .flat_map(|group_deltas| { group_deltas.group_deltas.iter().flat_map(|group_delta| { static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = match group_delta { - GroupDelta::IntraLevel(level_delta) => &level_delta.inserted_table_infos, - GroupDelta::GroupConstruct(_) - | GroupDelta::GroupDestroy(_) - | GroupDelta::GroupMetaChange(_) - | GroupDelta::GroupTableChange(_) => &EMPTY_VEC, + let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { + &level_delta.inserted_table_infos + } else { + &EMPTY_VEC }; sst_slice.iter().map(|sst| sst.object_id) }) @@ -526,12 +524,10 @@ impl HummockVersionDelta { let ssts_from_group_deltas = self.group_deltas.values().flat_map(|group_deltas| { group_deltas.group_deltas.iter().flat_map(|group_delta| { static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = match group_delta { - GroupDelta::IntraLevel(level_delta) => &level_delta.inserted_table_infos, - GroupDelta::GroupConstruct(_) - | GroupDelta::GroupDestroy(_) - | GroupDelta::GroupMetaChange(_) - | GroupDelta::GroupTableChange(_) => &EMPTY_VEC, + let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { + &level_delta.inserted_table_infos + } else { + &EMPTY_VEC }; sst_slice.iter() }) @@ -564,12 +560,10 @@ impl HummockVersionDelta { .flat_map(|group_deltas| { group_deltas.group_deltas.iter().flat_map(|group_delta| { static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = match group_delta { - GroupDelta::IntraLevel(level_delta) => &level_delta.inserted_table_infos, - GroupDelta::GroupConstruct(_) - | GroupDelta::GroupDestroy(_) - | GroupDelta::GroupMetaChange(_) - | GroupDelta::GroupTableChange(_) => &EMPTY_VEC, + let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { + &level_delta.inserted_table_infos + } else { + &EMPTY_VEC }; sst_slice.iter() }) @@ -881,6 +875,8 @@ pub enum GroupDelta { #[allow(dead_code)] GroupTableChange(PbGroupTableChange), + + GroupMerge(PbGroupMerge), } impl From for GroupDelta { @@ -901,6 +897,7 @@ impl From for GroupDelta { Some(PbDeltaType::GroupTableChange(pb_group_table_change)) => { GroupDelta::GroupTableChange(pb_group_table_change) } + Some(PbDeltaType::GroupMerge(pb_group_merge)) => GroupDelta::GroupMerge(pb_group_merge), None => panic!("delta_type is not set"), } } @@ -924,6 +921,9 @@ impl From for PbGroupDelta { GroupDelta::GroupTableChange(pb_group_table_change) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupTableChange(pb_group_table_change)), }, + GroupDelta::GroupMerge(pb_group_merge) => PbGroupDelta { + delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)), + }, } } } @@ -946,6 +946,9 @@ impl From<&GroupDelta> for PbGroupDelta { GroupDelta::GroupTableChange(pb_group_table_change) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupTableChange(pb_group_table_change.clone())), }, + GroupDelta::GroupMerge(pb_group_merge) => PbGroupDelta { + delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)), + }, } } } @@ -968,6 +971,9 @@ impl From<&PbGroupDelta> for GroupDelta { Some(PbDeltaType::GroupTableChange(pb_group_table_change)) => { GroupDelta::GroupTableChange(pb_group_table_change.clone()) } + Some(PbDeltaType::GroupMerge(pb_group_merge)) => { + GroupDelta::GroupMerge(*pb_group_merge) + } None => panic!("delta_type is not set"), } } From e73189662b7829aa929cde23938647c9876cdb1c Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 22 Aug 2024 16:42:53 +0800 Subject: [PATCH 02/11] feat(compaction): add assert check when handle merge_compaction_group --- .../compaction_group/hummock_version_ext.rs | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index c8cb30a0e1e84..c937245417a1b 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -842,6 +842,26 @@ impl HummockVersion { left_group_id: CompactionGroupId, right_group_id: CompactionGroupId, ) { + // Double check + let left_group_id_table_ids = self + .state_table_info + .compaction_group_member_table_ids(left_group_id) + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(); + + assert!(left_group_id_table_ids.is_sorted()); + let right_group_id_table_ids = self + .state_table_info + .compaction_group_member_table_ids(right_group_id) + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(); + assert!(right_group_id_table_ids.is_sorted()); + assert!( + left_group_id_table_ids.last().unwrap() < right_group_id_table_ids.first().unwrap() + ); + let total_cg = self.levels.keys().cloned().collect::>(); let [left_levels, right_levels] = self .levels From 17e582aeae926fe022307ad00603e906cc781ec6 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 22 Aug 2024 16:57:06 +0800 Subject: [PATCH 03/11] chore(compaction): add ut --- .../compaction_group/hummock_version_ext.rs | 402 ++++++++++++++++++ .../hummock_sdk/src/compaction_group/mod.rs | 9 +- 2 files changed, 403 insertions(+), 8 deletions(-) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index c937245417a1b..eaadfaf9d4243 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1380,9 +1380,15 @@ pub fn split_sst(sst_info: &mut SstableInfo, new_sst_id: &mut u64) -> SstableInf mod tests { use std::collections::HashMap; + use bytes::Bytes; + use risingwave_common::catalog::TableId; + use risingwave_common::hash::VirtualNode; use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType}; + use crate::compaction_group::group_split; use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels; + use crate::key::{gen_key_from_str, FullKey}; + use crate::key_range::KeyRange; use crate::level::{Level, Levels, OverlappingLevel}; use crate::sstable_info::SstableInfo; use crate::version::{ @@ -1540,4 +1546,400 @@ mod tests { version }); } + + fn gen_sst_info(object_id: u64, table_ids: Vec, left: Bytes, right: Bytes) -> SstableInfo { + SstableInfo { + object_id, + sst_id: object_id, + key_range: KeyRange { + left, + right, + right_exclusive: false, + }, + table_ids, + file_size: 100, + // estimated_sst_size: 100, + uncompressed_file_size: 100, + ..Default::default() + } + } + + #[test] + fn test_merge_levels() { + let mut left_levels = build_initial_compaction_group_levels( + 1, + &CompactionConfig { + max_level: 6, + ..Default::default() + }, + ); + + let mut right_levels = build_initial_compaction_group_levels( + 2, + &CompactionConfig { + max_level: 6, + ..Default::default() + }, + ); + + left_levels.levels[0] = Level { + level_idx: 1, + level_type: LevelType::Nonoverlapping, + table_infos: vec![ + gen_sst_info( + 1, + vec![3], + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + ), + gen_sst_info( + 10, + vec![3, 4], + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(201), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(4), + gen_key_from_str(VirtualNode::from_index(10), "1"), + 0, + ) + .encode() + .into(), + ), + gen_sst_info( + 11, + vec![4], + FullKey::for_test( + TableId::new(4), + gen_key_from_str(VirtualNode::from_index(11), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(4), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + ), + ], + total_file_size: 300, + ..Default::default() + }; + + left_levels.l0.sub_levels.push(Level { + level_idx: 0, + table_infos: vec![gen_sst_info( + 3, + vec![3], + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + )], + sub_level_id: 101, + level_type: LevelType::Overlapping, + total_file_size: 100, + ..Default::default() + }); + + left_levels.l0.sub_levels.push(Level { + level_idx: 0, + table_infos: vec![gen_sst_info( + 3, + vec![3], + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + )], + sub_level_id: 103, + level_type: LevelType::Overlapping, + total_file_size: 100, + ..Default::default() + }); + + left_levels.l0.sub_levels.push(Level { + level_idx: 0, + table_infos: vec![gen_sst_info( + 3, + vec![3], + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + )], + sub_level_id: 105, + level_type: LevelType::Nonoverlapping, + total_file_size: 100, + ..Default::default() + }); + + right_levels.levels[0] = Level { + level_idx: 1, + level_type: LevelType::Nonoverlapping, + table_infos: vec![ + gen_sst_info( + 1, + vec![5], + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + ), + gen_sst_info( + 10, + vec![5, 6], + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(201), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(6), + gen_key_from_str(VirtualNode::from_index(10), "1"), + 0, + ) + .encode() + .into(), + ), + gen_sst_info( + 11, + vec![6], + FullKey::for_test( + TableId::new(6), + gen_key_from_str(VirtualNode::from_index(11), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(6), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + ), + ], + total_file_size: 300, + ..Default::default() + }; + + right_levels.l0.sub_levels.push(Level { + level_idx: 0, + table_infos: vec![gen_sst_info( + 3, + vec![5], + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + )], + sub_level_id: 101, + level_type: LevelType::Overlapping, + total_file_size: 100, + ..Default::default() + }); + + right_levels.l0.sub_levels.push(Level { + level_idx: 0, + table_infos: vec![gen_sst_info( + 5, + vec![5], + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + )], + sub_level_id: 102, + level_type: LevelType::Overlapping, + total_file_size: 100, + ..Default::default() + }); + + right_levels.l0.sub_levels.push(Level { + level_idx: 0, + table_infos: vec![gen_sst_info( + 3, + vec![5], + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + )], + sub_level_id: 103, + level_type: LevelType::Nonoverlapping, + total_file_size: 100, + ..Default::default() + }); + + { + // test empty + let mut left_levels = Levels::default(); + let mut right_levels = Levels::default(); + + group_split::merge_levels(&mut left_levels, &mut right_levels); + } + + { + // test empty left + let mut left_levels = build_initial_compaction_group_levels( + 1, + &CompactionConfig { + max_level: 6, + ..Default::default() + }, + ); + let mut right_levels = right_levels.clone(); + + group_split::merge_levels(&mut left_levels, &mut right_levels); + + assert!(left_levels.l0.sub_levels.len() == 3); + assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); + assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size); + assert!(left_levels.l0.sub_levels[1].sub_level_id == 102); + assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size); + assert!(left_levels.l0.sub_levels[2].sub_level_id == 103); + assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size); + + assert!(left_levels.levels[0].level_idx == 1); + assert_eq!(300, left_levels.levels[0].total_file_size); + } + + { + // test empty right + let mut left_levels = left_levels.clone(); + let mut right_levels = build_initial_compaction_group_levels( + 2, + &CompactionConfig { + max_level: 6, + ..Default::default() + }, + ); + + group_split::merge_levels(&mut left_levels, &mut right_levels); + + assert!(left_levels.l0.sub_levels.len() == 3); + assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); + assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size); + assert!(left_levels.l0.sub_levels[1].sub_level_id == 103); + assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size); + assert!(left_levels.l0.sub_levels[2].sub_level_id == 105); + assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size); + + assert!(left_levels.levels[0].level_idx == 1); + assert_eq!(300, left_levels.levels[0].total_file_size); + } + + { + let mut left_levels = left_levels.clone(); + let mut right_levels = right_levels.clone(); + + group_split::merge_levels(&mut left_levels, &mut right_levels); + + assert!(left_levels.l0.sub_levels.len() == 4); + assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); + assert_eq!(200, left_levels.l0.sub_levels[0].total_file_size); + assert!(left_levels.l0.sub_levels[1].sub_level_id == 102); + assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size); + assert!(left_levels.l0.sub_levels[2].sub_level_id == 103); + assert_eq!(200, left_levels.l0.sub_levels[2].total_file_size); + assert!(left_levels.l0.sub_levels[3].sub_level_id == 105); + assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size); + + assert!(left_levels.levels[0].level_idx == 1); + assert_eq!(600, left_levels.levels[0].total_file_size); + } + } } diff --git a/src/storage/hummock_sdk/src/compaction_group/mod.rs b/src/storage/hummock_sdk/src/compaction_group/mod.rs index fae8c60c5c8fb..3525573cad425 100644 --- a/src/storage/hummock_sdk/src/compaction_group/mod.rs +++ b/src/storage/hummock_sdk/src/compaction_group/mod.rs @@ -56,14 +56,7 @@ pub mod group_split { for right_sub_level in &right_l0.sub_levels { let sub_level = right_sub_level.clone(); - let insert_hint = get_sub_level_insert_hint(&left_levels.l0.sub_levels, &sub_level); - - println!( - "sub_level: {:?} type {:?} insert_hint {:?}", - sub_level.sub_level_id, sub_level.level_type, insert_hint - ); - - match insert_hint { + match get_sub_level_insert_hint(&left_levels.l0.sub_levels, &sub_level) { Ok(insert_hint) => { add_ssts_to_sub_level( &mut left_levels.l0, From beb8a61d9fa2aa4db5f5321621aa41b73af490fb Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 23 Aug 2024 12:30:40 +0800 Subject: [PATCH 04/11] fix(storage): fix test --- proto/hummock.proto | 1 + .../src/cmd_impl/hummock/compaction_group.rs | 3 +- src/ctl/src/lib.rs | 12 ++++++-- src/meta/service/src/hummock_service.rs | 2 +- .../compaction/compaction_group_schedule.rs | 3 +- src/meta/src/hummock/manager/tests.rs | 29 +++++++++---------- src/rpc_client/src/meta_client.rs | 2 ++ 7 files changed, 31 insertions(+), 21 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 7069d62fa6762..e45fa9856fcd5 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -744,6 +744,7 @@ message PinVersionResponse { message SplitCompactionGroupRequest { uint64 group_id = 1; repeated uint32 table_ids = 2; + uint32 partition_vnode_count = 3; } message SplitCompactionGroupResponse { diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index e0107a22cb858..c41b4c6e25b9e 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -131,10 +131,11 @@ pub async fn split_compaction_group( context: &CtlContext, group_id: CompactionGroupId, table_ids_to_new_group: &[StateTableId], + partition_vnode_count: u32, ) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; let new_group_id = meta_client - .split_compaction_group(group_id, table_ids_to_new_group) + .split_compaction_group(group_id, table_ids_to_new_group, partition_vnode_count) .await?; println!( "Succeed: split compaction group {}. tables {:#?} are moved to new group {}.", diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 574925ca5112e..27096b9cfd2e3 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -276,6 +276,8 @@ enum HummockCommands { compaction_group_id: u64, #[clap(long, value_delimiter = ',')] table_ids: Vec, + #[clap(long, default_value_t = 0)] + partition_vnode_count: u32, }, /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object. PauseVersionCheckpoint, @@ -714,9 +716,15 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Hummock(HummockCommands::SplitCompactionGroup { compaction_group_id, table_ids, + partition_vnode_count, }) => { - cmd_impl::hummock::split_compaction_group(context, compaction_group_id, &table_ids) - .await?; + cmd_impl::hummock::split_compaction_group( + context, + compaction_group_id, + &table_ids, + partition_vnode_count, + ) + .await?; } Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => { cmd_impl::hummock::pause_version_checkpoint(context).await?; diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 83c122bab5bd0..77c2df39c8749 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -457,7 +457,7 @@ impl HummockManagerService for HummockServiceImpl { let req = request.into_inner(); let new_group_id = self .hummock_manager - .split_compaction_group(req.group_id, &req.table_ids) + .split_compaction_group(req.group_id, &req.table_ids, req.partition_vnode_count) .await?; Ok(Response::new(SplitCompactionGroupResponse { new_group_id })) } diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index 45780da961c43..71f1777e56f4c 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -40,9 +40,10 @@ impl HummockManager { &self, parent_group_id: CompactionGroupId, table_ids: &[StateTableId], + partition_vnode_count: u32, ) -> Result { let result = self - .move_state_table_to_compaction_group(parent_group_id, table_ids, 0) + .move_state_table_to_compaction_group(parent_group_id, table_ids, partition_vnode_count) .await?; Ok(result) diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 2d25f196a60bf..fc60ef8966c50 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -17,7 +17,6 @@ use std::borrow::Borrow; use std::cmp::Ordering; use std::collections::HashMap; -use std::sync::Arc; use itertools::Itertools; use prometheus::Registry; @@ -1374,18 +1373,18 @@ async fn test_split_compaction_group_on_demand_basic() { assert_eq!(original_groups, vec![2, 3]); let err = hummock_manager - .split_compaction_group(100, &[0]) + .split_compaction_group(100, &[0], 0) .await .unwrap_err(); assert_eq!("compaction group error: invalid group 100", err.to_string()); hummock_manager - .split_compaction_group(2, &[]) + .split_compaction_group(2, &[], 0) .await .unwrap(); let err = hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap_err(); assert_eq!( @@ -1443,7 +1442,7 @@ async fn test_split_compaction_group_on_demand_basic() { .unwrap(); let err = hummock_manager - .split_compaction_group(2, &[100, 101]) + .split_compaction_group(2, &[100, 101], 0) .await .unwrap_err(); assert_eq!( @@ -1459,7 +1458,7 @@ async fn test_split_compaction_group_on_demand_basic() { .unwrap(); hummock_manager - .split_compaction_group(2, &[100, 101]) + .split_compaction_group(2, &[100, 101], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -1525,7 +1524,7 @@ async fn test_split_compaction_group_on_demand_non_trivial() { .unwrap(); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); @@ -1649,7 +1648,7 @@ async fn test_split_compaction_group_trivial_expired() { .unwrap(); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); let mut selector: Box = @@ -1822,7 +1821,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { ); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -1927,7 +1926,7 @@ async fn test_compaction_task_expiration_due_to_split_group() { let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 2); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); @@ -2011,7 +2010,7 @@ async fn test_move_tables_between_compaction_group() { ); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -2131,11 +2130,9 @@ async fn test_partition_level() { .level0_overlapping_sub_level_compact_level_count(3) .build(); let registry = Registry::new(); - let (_env, hummock_manager, _, worker_node) = + let (env, hummock_manager, _, worker_node) = setup_compute_env_with_metric(80, config.clone(), Some(MetaMetrics::for_test(®istry))) .await; - let config = Arc::new(config); - let context_id = worker_node.id; hummock_manager @@ -2170,7 +2167,7 @@ async fn test_partition_level() { .unwrap()); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], env.opts.partition_vnode_count) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -2304,7 +2301,7 @@ async fn test_unregister_moved_table() { .unwrap(); let new_group_id = hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); assert_ne!(new_group_id, 2); diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 7a29cfab01435..d48f1cd24a58d 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1245,10 +1245,12 @@ impl MetaClient { &self, group_id: CompactionGroupId, table_ids_to_new_group: &[StateTableId], + partition_vnode_count: u32, ) -> Result { let req = SplitCompactionGroupRequest { group_id, table_ids: table_ids_to_new_group.to_vec(), + partition_vnode_count, }; let resp = self.inner.split_compaction_group(req).await?; Ok(resp.new_group_id) From b3a4f7cfdf622ad67572c0d45d48ef93656c0808 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 27 Aug 2024 15:25:50 +0800 Subject: [PATCH 05/11] address comments --- .../compaction/compaction_group_schedule.rs | 71 ++++++++----------- src/meta/src/hummock/manager/mod.rs | 2 +- .../compaction_group/hummock_version_ext.rs | 52 +++++++------- .../hummock_sdk/src/compaction_group/mod.rs | 24 +++---- 4 files changed, 65 insertions(+), 84 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index 71f1777e56f4c..4ca0d15ed43d1 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -13,14 +13,12 @@ // limitations under the License. use std::collections::{HashMap, HashSet, VecDeque}; -use std::ops::{Deref, DerefMut}; +use std::ops::DerefMut; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compact_task::ReportTask; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - get_compaction_group_ids, TableGroupInfo, -}; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::TableGroupInfo; use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas}; use risingwave_hummock_sdk::CompactionGroupId; @@ -67,39 +65,45 @@ impl HummockManager { } let state_table_info = versioning.current_version.state_table_info.clone(); - let member_table_ids_1 = state_table_info + let mut member_table_ids_1 = state_table_info .compaction_group_member_table_ids(group_1) .iter() .cloned() .collect_vec(); - let member_table_ids_2 = state_table_info + let mut member_table_ids_2 = state_table_info .compaction_group_member_table_ids(group_2) .iter() .cloned() .collect_vec(); - let mut combine_1 = member_table_ids_1.clone(); - combine_1.extend_from_slice(&member_table_ids_2); - - let mut combine_2 = member_table_ids_2; - combine_2.extend_from_slice(&member_table_ids_1); - - if !combine_1.is_sorted() && !combine_2.is_sorted() { + debug_assert!(!member_table_ids_1.is_empty()); + debug_assert!(!member_table_ids_2.is_empty()); + assert!(member_table_ids_1.is_sorted()); + assert!(member_table_ids_2.is_sorted()); + + // Make sure `member_table_ids_1` is smaller than `member_table_ids_2` + let (left_group_id, right_group_id) = + if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() { + (group_1, group_2) + } else { + std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2); + (group_2, group_1) + }; + + // We can only merge two groups with non-overlapping member table ids + if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() { return Err(Error::CompactionGroup(format!( "invalid merge group_1 {} group_2 {}", - group_1, group_2 + left_group_id, right_group_id ))); } - let mut left_group_id = group_1; - let mut right_group_id = group_2; - let combine_member_table_ids = if combine_1.is_sorted() { - combine_1 - } else { - std::mem::swap(&mut left_group_id, &mut right_group_id); - combine_2 - }; + let combined_member_table_ids = member_table_ids_1 + .iter() + .chain(member_table_ids_2.iter()) + .collect_vec(); + assert!(combined_member_table_ids.is_sorted()); let mut version = HummockVersionTransaction::new( &mut versioning.current_version, @@ -110,21 +114,6 @@ impl HummockManager { let mut new_version_delta = version.new_delta(); let target_compaction_group_id = { - let mut config = self - .compaction_group_manager - .read() - .await - .try_get_compaction_group_config(group_1) - .unwrap() - .compaction_config() - .deref() - .clone(); - - { - // update config - config.split_weight_by_vnode = 0; - } - // merge right_group_id to left_group_id and remove right_group_id new_version_delta.group_deltas.insert( left_group_id, @@ -141,7 +130,7 @@ impl HummockManager { // TODO: remove compaciton group_id from state_table_info // rewrite compaction_group_id for all tables new_version_delta.with_latest_version(|version, new_version_delta| { - for table_id in combine_member_table_ids { + for table_id in combined_member_table_ids { let table_id = TableId::new(table_id.table_id()); let info = version .state_table_info @@ -183,10 +172,8 @@ impl HummockManager { new_version_delta.pre_apply(); - // purge right_group_id - compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids( - version.latest_version(), - ))); + // remove right_group_id + compaction_groups_txn.remove(right_group_id); commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?; } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 88bac3f115b7b..a45e9796987b6 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -58,7 +58,7 @@ pub use context::HummockVersionSafePoint; use versioning::*; pub(crate) mod checkpoint; mod commit_epoch; -pub mod compaction; +mod compaction; pub mod sequence; pub mod time_travel; mod timer_task; diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 3db9aae0bbae7..9e158526421b7 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -57,7 +57,7 @@ pub struct GroupDeltasSummary { pub fn summarize_group_deltas( group_deltas: &GroupDeltas, - default_group_id: CompactionGroupId, + compaction_group_id: CompactionGroupId, ) -> GroupDeltasSummary { let mut delete_sst_levels = Vec::with_capacity(group_deltas.group_deltas.len()); let mut delete_sst_ids_set = HashSet::new(); @@ -91,7 +91,7 @@ pub fn summarize_group_deltas( } GroupDelta::GroupDestroy(_) => { assert!(group_destroy.is_none()); - group_destroy = Some(default_group_id); + group_destroy = Some(compaction_group_id); } GroupDelta::GroupMetaChange(meta_delta) => { group_meta_changes.push(meta_delta.clone()); @@ -847,31 +847,31 @@ impl HummockVersion { .state_table_info .compaction_group_member_table_ids(left_group_id) .iter() - .map(|table_id| table_id.table_id) - .collect_vec(); - - assert!(left_group_id_table_ids.is_sorted()); + .map(|table_id| table_id.table_id); let right_group_id_table_ids = self .state_table_info .compaction_group_member_table_ids(right_group_id) .iter() - .map(|table_id| table_id.table_id) - .collect_vec(); - assert!(right_group_id_table_ids.is_sorted()); - assert!( - left_group_id_table_ids.last().unwrap() < right_group_id_table_ids.first().unwrap() - ); + .map(|table_id| table_id.table_id); + + assert!(left_group_id_table_ids + .chain(right_group_id_table_ids) + .is_sorted()); let total_cg = self.levels.keys().cloned().collect::>(); - let [left_levels, right_levels] = self - .levels - .get_many_mut([&left_group_id, &right_group_id]) - .unwrap_or_else(|| { - panic!( - "compaction group should exist left {} right {} all {:?}", - left_group_id, right_group_id, total_cg - ) - }); + let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| { + panic!( + "compaction group should exist right {} all {:?}", + right_group_id, total_cg + ) + }); + + let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| { + panic!( + "compaction group should exist left {} all {:?}", + left_group_id, total_cg + ) + }); group_split::merge_levels(left_levels, right_levels); } @@ -1558,7 +1558,7 @@ mod tests { }, table_ids, file_size: 100, - // estimated_sst_size: 100, + sst_size: 100, uncompressed_file_size: 100, ..Default::default() } @@ -1869,7 +1869,7 @@ mod tests { let mut left_levels = Levels::default(); let mut right_levels = Levels::default(); - group_split::merge_levels(&mut left_levels, &mut right_levels); + group_split::merge_levels(&mut left_levels, right_levels); } { @@ -1883,7 +1883,7 @@ mod tests { ); let mut right_levels = right_levels.clone(); - group_split::merge_levels(&mut left_levels, &mut right_levels); + group_split::merge_levels(&mut left_levels, right_levels); assert!(left_levels.l0.sub_levels.len() == 3); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); @@ -1908,7 +1908,7 @@ mod tests { }, ); - group_split::merge_levels(&mut left_levels, &mut right_levels); + group_split::merge_levels(&mut left_levels, right_levels); assert!(left_levels.l0.sub_levels.len() == 3); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); @@ -1926,7 +1926,7 @@ mod tests { let mut left_levels = left_levels.clone(); let mut right_levels = right_levels.clone(); - group_split::merge_levels(&mut left_levels, &mut right_levels); + group_split::merge_levels(&mut left_levels, right_levels); assert!(left_levels.l0.sub_levels.len() == 4); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); diff --git a/src/storage/hummock_sdk/src/compaction_group/mod.rs b/src/storage/hummock_sdk/src/compaction_group/mod.rs index 3525573cad425..d4bcb5211741a 100644 --- a/src/storage/hummock_sdk/src/compaction_group/mod.rs +++ b/src/storage/hummock_sdk/src/compaction_group/mod.rs @@ -51,25 +51,21 @@ pub mod group_split { use crate::can_concat; use crate::level::{Level, Levels}; - pub fn merge_levels(left_levels: &mut Levels, right_levels: &mut Levels) { - let right_l0 = &right_levels.l0; + pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) { + let right_l0 = right_levels.l0; - for right_sub_level in &right_l0.sub_levels { + for right_sub_level in right_l0.sub_levels { let sub_level = right_sub_level.clone(); match get_sub_level_insert_hint(&left_levels.l0.sub_levels, &sub_level) { Ok(insert_hint) => { - add_ssts_to_sub_level( - &mut left_levels.l0, - insert_hint, - sub_level.table_infos.clone(), - ); + add_ssts_to_sub_level(&mut left_levels.l0, insert_hint, sub_level.table_infos); } Err(insert_hint) => { insert_new_sub_level( &mut left_levels.l0, sub_level.sub_level_id, sub_level.level_type, - sub_level.table_infos.clone(), + sub_level.table_infos, Some(insert_hint), ); } @@ -81,7 +77,7 @@ pub mod group_split { .sub_levels .sort_by_key(|sub_level| sub_level.sub_level_id); - // Reinitialise `vnode_partition_count`` to avoid misaligned hierarchies + // Reinitialise `vnode_partition_count` to avoid misaligned hierarchies // caused by the merge of different compaction groups.(picker might reject the different `vnode_partition_count` sub_level to compact) left_levels .l0 @@ -89,16 +85,15 @@ pub mod group_split { .iter_mut() .for_each(|sub_level| sub_level.vnode_partition_count = 0); - for (idx, level) in right_levels.levels.iter_mut().enumerate() { + for (idx, level) in right_levels.levels.into_iter().enumerate() { if level.table_infos.is_empty() { continue; } - let insert_table_infos = level.table_infos.clone(); + let insert_table_infos = level.table_infos; left_levels.levels[idx].total_file_size += insert_table_infos .iter() - // .map(|sst| sst.estimated_sst_size) - .map(|sst| sst.file_size) + .map(|sst| sst.sst_size) .sum::(); left_levels.levels[idx].uncompressed_file_size += insert_table_infos .iter() @@ -119,7 +114,6 @@ pub mod group_split { idx, left_levels.levels[idx].table_infos, left_levels.levels[idx].level_idx ) ); - level.table_infos.clear(); } } From 72e1d74e6c8b50c66db5141218e415cc910cf83d Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 28 Aug 2024 18:01:08 +0800 Subject: [PATCH 06/11] fix(storage): fix merge levels with conflict sub level type --- .../compaction_group/hummock_version_ext.rs | 16 +++++--- .../hummock_sdk/src/compaction_group/mod.rs | 40 +++++++++++-------- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 41f63d773da8b..340cf67da77cc 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1959,15 +1959,19 @@ mod tests { group_split::merge_levels(&mut left_levels, right_levels); - assert!(left_levels.l0.sub_levels.len() == 4); + assert!(left_levels.l0.sub_levels.len() == 6); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); - assert_eq!(200, left_levels.l0.sub_levels[0].total_file_size); - assert!(left_levels.l0.sub_levels[1].sub_level_id == 102); + assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size); + assert!(left_levels.l0.sub_levels[1].sub_level_id == 103); assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size); - assert!(left_levels.l0.sub_levels[2].sub_level_id == 103); - assert_eq!(200, left_levels.l0.sub_levels[2].total_file_size); - assert!(left_levels.l0.sub_levels[3].sub_level_id == 105); + assert!(left_levels.l0.sub_levels[2].sub_level_id == 105); + assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size); + assert!(left_levels.l0.sub_levels[3].sub_level_id == 106); assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size); + assert!(left_levels.l0.sub_levels[4].sub_level_id == 107); + assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size); + assert!(left_levels.l0.sub_levels[5].sub_level_id == 108); + assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size); assert!(left_levels.levels[0].level_idx == 1); assert_eq!(600, left_levels.levels[0].total_file_size); diff --git a/src/storage/hummock_sdk/src/compaction_group/mod.rs b/src/storage/hummock_sdk/src/compaction_group/mod.rs index a2848e174b655..ba6582c2b72c7 100644 --- a/src/storage/hummock_sdk/src/compaction_group/mod.rs +++ b/src/storage/hummock_sdk/src/compaction_group/mod.rs @@ -47,29 +47,37 @@ impl From for CompactionGroupId { pub mod group_split { use std::cmp::Ordering; - use super::hummock_version_ext::{add_ssts_to_sub_level, insert_new_sub_level}; + use super::hummock_version_ext::insert_new_sub_level; use crate::can_concat; use crate::level::{Level, Levels}; pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) { let right_l0 = right_levels.l0; - for right_sub_level in right_l0.sub_levels { - let sub_level = right_sub_level.clone(); - match get_sub_level_insert_hint(&left_levels.l0.sub_levels, &sub_level) { - Ok(insert_hint) => { - add_ssts_to_sub_level(&mut left_levels.l0, insert_hint, sub_level.table_infos); - } - Err(insert_hint) => { - insert_new_sub_level( - &mut left_levels.l0, - sub_level.sub_level_id, - sub_level.level_type, - sub_level.table_infos, - Some(insert_hint), - ); - } + let mut max_left_sub_level_id = left_levels + .l0 + .sub_levels + .iter() + .map(|sub_level| sub_level.sub_level_id + 1) + .max() + .unwrap_or(0); // If there are no sub levels, the max sub level id is 0. + let need_rewrite_right_sub_level_id = max_left_sub_level_id != 0; + + for mut right_sub_level in right_l0.sub_levels { + // Rewrtie the sub level id of right sub level to avoid conflict with left sub levels. (conflict level type) + // e.g. left sub levels: [0, 1, 2], right sub levels: [0, 1, 2], after rewrite, right sub levels: [3, 4, 5] + if need_rewrite_right_sub_level_id { + right_sub_level.sub_level_id = max_left_sub_level_id; + max_left_sub_level_id += 1; } + + insert_new_sub_level( + &mut left_levels.l0, + right_sub_level.sub_level_id, + right_sub_level.level_type, + right_sub_level.table_infos, + None, + ); } assert!( From ffd9728d90fded0038bd556a993097c0c9809fbb Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 9 Sep 2024 17:17:12 +0800 Subject: [PATCH 07/11] chore(storage): add ut --- .../compaction/compaction_group_schedule.rs | 38 ++ .../hummock_sdk/src/compaction_group/mod.rs | 8 +- .../hummock_test/src/compactor_tests.rs | 458 +++++++++++++++++- .../hummock_test/src/sync_point_tests.rs | 2 +- 4 files changed, 492 insertions(+), 14 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index 4ca0d15ed43d1..b73fceae90551 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -105,6 +105,44 @@ impl HummockManager { .collect_vec(); assert!(combined_member_table_ids.is_sorted()); + // check duplicated sst_id + let mut sst_id_set = HashSet::new(); + for sst in versioning.current_version.get_sst_ids() { + if !sst_id_set.insert(sst) { + return Err(Error::CompactionGroup(format!( + "invalid merge group_1 {} group_2 {} duplicated sst_id {}", + left_group_id, right_group_id, sst + ))); + } + } + + // TODO(li0k): remove this check (Since the current split_sst does not change key_range, this check can not be removed, otherwise concate will fail.) + // check branched sst on non-overlap level + { + for level in versioning + .current_version + .get_compaction_group_levels(group_1) + .levels + .iter() + .chain( + versioning + .current_version + .get_compaction_group_levels(group_2) + .levels + .iter(), + ) + { + for sst in &level.table_infos { + if sst.sst_id != sst.object_id { + return Err(Error::CompactionGroup(format!( + "invalid merge group_1 {} group_2 {} branched sst_id {}", + left_group_id, right_group_id, sst.sst_id + ))); + } + } + } + } + let mut version = HummockVersionTransaction::new( &mut versioning.current_version, &mut versioning.hummock_version_deltas, diff --git a/src/storage/hummock_sdk/src/compaction_group/mod.rs b/src/storage/hummock_sdk/src/compaction_group/mod.rs index ba6582c2b72c7..94ef89b8046e2 100644 --- a/src/storage/hummock_sdk/src/compaction_group/mod.rs +++ b/src/storage/hummock_sdk/src/compaction_group/mod.rs @@ -122,8 +122,12 @@ pub mod group_split { can_concat(&left_levels.levels[idx].table_infos), "{}", format!( - "left_levels.levels[{}].table_infos: {:?} level_idx {:?}", - idx, left_levels.levels[idx].table_infos, left_levels.levels[idx].level_idx + "left-group {} right-group {} left_levels.levels[{}].table_infos: {:?} level_idx {:?}", + left_levels.group_id, + right_levels.group_id, + idx, + left_levels.levels[idx].table_infos, + left_levels.levels[idx].level_idx ) ); } diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 79b00d0f9b8f2..ebdb5adb8cd14 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -75,7 +75,7 @@ pub(crate) mod tests { use risingwave_storage::hummock::value::HummockValue; use risingwave_storage::hummock::{ BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, FilterBuilder, - HummockStorage as GlobalHummockStorage, HummockStorage, MemoryLimiter, + HummockStorage as GlobalHummockStorage, HummockStorage, LocalHummockStorage, MemoryLimiter, SharedComapctorObjectIdManager, Sstable, SstableBuilder, SstableBuilderOptions, SstableIteratorReadOptions, SstableObjectIdManager, SstableWriterOptions, }; @@ -92,7 +92,7 @@ pub(crate) mod tests { hummock_meta_client: Arc, notification_client: impl NotificationClient, hummock_manager_ref: &HummockManagerRef, - table_id: TableId, + table_id: &[u32], ) -> HummockStorage { let remote_dir = "hummock_001_test".to_string(); let options = Arc::new(StorageOpts { @@ -117,7 +117,7 @@ pub(crate) mod tests { register_tables_with_id_for_test( hummock.filter_key_extractor_manager(), hummock_manager_ref, - &[table_id.table_id()], + table_id, ) .await; @@ -189,7 +189,6 @@ pub(crate) mod tests { local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); } let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); - hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); } } @@ -604,7 +603,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; @@ -885,7 +884,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; @@ -1090,7 +1089,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; @@ -1290,7 +1289,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; let (compact_ctx, filter_key_extractor_manager) = @@ -1505,7 +1504,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; hummock_manager_ref.get_new_sst_ids(10).await.unwrap(); @@ -1680,7 +1679,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; hummock_manager_ref.get_new_sst_ids(10).await.unwrap(); @@ -1798,7 +1797,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; hummock_manager_ref.get_new_sst_ids(10).await.unwrap(); @@ -1980,4 +1979,441 @@ pub(crate) mod tests { count += 1; } } + + #[tokio::test] + async fn test_split_and_merge() { + let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = + setup_compute_env(8080).await; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager_ref.clone(), + worker_node.id, + )); + + let table_id_1 = TableId::from(1); + let table_id_2 = TableId::from(2); + + let storage = get_hummock_storage( + hummock_meta_client.clone(), + get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), + &hummock_manager_ref, + &[table_id_1.table_id(), table_id_2.table_id()], + ) + .await; + + // basic cg2 -> [1, 2] + let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone() + { + FilterKeyExtractorManager::RpcFilterKeyExtractorManager( + rpc_filter_key_extractor_manager, + ) => rpc_filter_key_extractor_manager, + FilterKeyExtractorManager::StaticFilterKeyExtractorManager(_) => unreachable!(), + }; + + let mut key = BytesMut::default(); + key.put_u16(1); + key.put_slice(b"key_prefix"); + let key_prefix = key.freeze(); + + rpc_filter_key_extractor_manager.update( + table_id_1.table_id(), + Arc::new(FilterKeyExtractorImpl::FixedLength( + FixedLengthFilterKeyExtractor::new(TABLE_PREFIX_LEN + key_prefix.len()), + )), + ); + rpc_filter_key_extractor_manager.update( + table_id_2.table_id(), + Arc::new(FilterKeyExtractorImpl::FixedLength( + FixedLengthFilterKeyExtractor::new(TABLE_PREFIX_LEN + key_prefix.len()), + )), + ); + + let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( + rpc_filter_key_extractor_manager, + ); + let compact_ctx = get_compactor_context(&storage); + let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( + hummock_meta_client.clone(), + storage + .storage_opts() + .clone() + .sstable_id_remote_fetch_number, + )); + + let base_epoch = Epoch::now(); + let mut epoch: u64 = base_epoch.0; + let millisec_interval_epoch: u64 = (1 << 16) * 100; + // let mut epoch_set = BTreeSet::new(); + + let mut local_1 = storage + .new_local(NewLocalOptions::for_test(table_id_1.clone())) + .await; + let mut local_2 = storage + .new_local(NewLocalOptions::for_test(table_id_2.clone())) + .await; + + let val = Bytes::from(b"0"[..].to_vec()); + + async fn write_data( + storage: &HummockStorage, + local_1: (&mut LocalHummockStorage, bool), + local_2: (&mut LocalHummockStorage, bool), + epoch: &mut u64, + val: Bytes, + kv_count: u64, + millisec_interval_epoch: u64, + key_prefix: Bytes, + hummock_meta_client: Arc, + is_init: &mut bool, + ) { + let table_id_set = + HashSet::from_iter(vec![local_1.0.table_id(), local_2.0.table_id()].into_iter()); + + storage.start_epoch(*epoch, table_id_set.clone()); + for i in 0..kv_count { + if i == 0 && *is_init { + local_1.0.init_for_test(*epoch).await.unwrap(); + local_2.0.init_for_test(*epoch).await.unwrap(); + + *is_init = false; + } + let next_epoch = *epoch + millisec_interval_epoch; + storage.start_epoch(next_epoch, table_id_set.clone()); + + let ramdom_key = + [key_prefix.as_ref(), &rand::thread_rng().gen::<[u8; 32]>()].concat(); + + if local_1.1 { + local_1 + .0 + .insert(TableKey(Bytes::from(ramdom_key.clone())), val.clone(), None) + .unwrap(); + } + local_1.0.flush().await.unwrap(); + local_1 + .0 + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); + + if local_2.1 { + local_2 + .0 + .insert(TableKey(Bytes::from(ramdom_key.clone())), val.clone(), None) + .unwrap(); + } + local_2.0.flush().await.unwrap(); + local_2 + .0 + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); + + let res = storage.seal_and_sync_epoch(*epoch).await.unwrap(); + hummock_meta_client.commit_epoch(*epoch, res).await.unwrap(); + *epoch += millisec_interval_epoch; + } + } + + let mut is_init = true; + write_data( + &storage, + (&mut local_1, true), + (&mut local_2, true), + &mut epoch, + val.clone(), + 16, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + + epoch += millisec_interval_epoch; + + let parent_group_id = 2; + let split_table_ids = vec![table_id_2.table_id()]; + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + + assert_ne!(parent_group_id, new_cg_id); + + hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .unwrap(); + + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + + { + // compact left group + let manual_compcation_option = ManualCompactionOption { + level: 0, + ..Default::default() + }; + // 2. get compact task + let mut compact_task = hummock_manager_ref + .manual_get_compact_task(parent_group_id, manual_compcation_option) + .await + .unwrap() + .unwrap(); + + let compaction_filter_flag = + CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; + compact_task.compaction_filter_mask = compaction_filter_flag.bits(); + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch(); + + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let ((result_task, task_stats), _) = compact( + compact_ctx.clone(), + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await; + + hummock_manager_ref + .report_compact_task( + result_task.task_id, + result_task.task_status, + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) + .await + .unwrap(); + } + + { + // compact right group + let manual_compcation_option = ManualCompactionOption { + level: 0, + ..Default::default() + }; + // 2. get compact task + let mut compact_task = hummock_manager_ref + .manual_get_compact_task(new_cg_id, manual_compcation_option) + .await + .unwrap() + .unwrap(); + let compaction_filter_flag = + CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; + compact_task.compaction_filter_mask = compaction_filter_flag.bits(); + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch(); + + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let ((result_task, task_stats), _) = compact( + compact_ctx.clone(), + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await; + + hummock_manager_ref + .report_compact_task( + result_task.task_id, + result_task.task_status, + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) + .await + .unwrap(); + } + + // write left + write_data( + &storage, + (&mut local_1, true), + (&mut local_2, false), + &mut epoch, + val.clone(), + 16, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + + epoch += millisec_interval_epoch; + + // try merge + hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .unwrap(); + + // compact + { + // compact left group + let manual_compcation_option = ManualCompactionOption { + level: 0, + ..Default::default() + }; + // 2. get compact task + let mut compact_task = hummock_manager_ref + .manual_get_compact_task(parent_group_id, manual_compcation_option) + .await + .unwrap() + .unwrap(); + let compaction_filter_flag = + CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; + compact_task.compaction_filter_mask = compaction_filter_flag.bits(); + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch(); + + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let ((result_task, task_stats), _) = compact( + compact_ctx.clone(), + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await; + + hummock_manager_ref + .report_compact_task( + result_task.task_id, + result_task.task_status, + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) + .await + .unwrap(); + } + + // try split + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + + // write right + write_data( + &storage, + (&mut local_1, false), + (&mut local_2, true), + &mut epoch, + val.clone(), + 16, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + + let ret_err = hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await; + assert!(ret_err.is_err()); + + // try compact + { + // compact left + loop { + let compact_task = hummock_manager_ref + .get_compact_task(parent_group_id, &mut default_compaction_selector()) + .await + .unwrap(); + + if compact_task.is_none() { + break; + } + + let mut compact_task = compact_task.unwrap(); + let compaction_filter_flag = + CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; + compact_task.compaction_filter_mask = compaction_filter_flag.bits(); + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch(); + + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let ((result_task, task_stats), _) = compact( + compact_ctx.clone(), + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await; + + hummock_manager_ref + .report_compact_task( + result_task.task_id, + result_task.task_status, + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) + .await + .unwrap(); + } + + // compact right + loop { + let compact_task = hummock_manager_ref + .get_compact_task(new_cg_id, &mut default_compaction_selector()) + .await + .unwrap(); + + if compact_task.is_none() { + break; + } + + let mut compact_task = compact_task.unwrap(); + let compaction_filter_flag = + CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; + compact_task.compaction_filter_mask = compaction_filter_flag.bits(); + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch(); + + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let ((result_task, task_stats), _) = compact( + compact_ctx.clone(), + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await; + + hummock_manager_ref + .report_compact_task( + result_task.task_id, + result_task.task_status, + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) + .await + .unwrap(); + } + } + + // try merge + let ret_err = hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await; + assert!(ret_err.is_err()); + } } diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index f5ee41783813d..008c667ccedf5 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -242,7 +242,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; let (compact_ctx, filter_key_extractor_manager) = From c7c6fa49764e10145d4306ca1fc657c04c26e144 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 10 Sep 2024 02:29:07 +0800 Subject: [PATCH 08/11] refactor(compaction): refactor merge check and ut --- .../compaction/compaction_group_schedule.rs | 64 ++-- .../compaction_group/hummock_version_ext.rs | 18 + .../hummock_test/src/compactor_tests.rs | 339 +++++++++++------- 3 files changed, 276 insertions(+), 145 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index b73fceae90551..2ad2811273132 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -21,7 +21,7 @@ use risingwave_hummock_sdk::compact_task::ReportTask; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::TableGroupInfo; use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas}; -use risingwave_hummock_sdk::CompactionGroupId; +use risingwave_hummock_sdk::{can_concat, CompactionGroupId}; use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::{PbGroupMerge, PbStateTableInfoDelta}; use thiserror_ext::AsReport; @@ -107,11 +107,19 @@ impl HummockManager { // check duplicated sst_id let mut sst_id_set = HashSet::new(); - for sst in versioning.current_version.get_sst_ids() { - if !sst_id_set.insert(sst) { + for sst_id in versioning + .current_version + .get_sst_ids_by_group_id(left_group_id) + .chain( + versioning + .current_version + .get_sst_ids_by_group_id(right_group_id), + ) + { + if !sst_id_set.insert(sst_id) { return Err(Error::CompactionGroup(format!( "invalid merge group_1 {} group_2 {} duplicated sst_id {}", - left_group_id, right_group_id, sst + left_group_id, right_group_id, sst_id ))); } } @@ -119,26 +127,36 @@ impl HummockManager { // TODO(li0k): remove this check (Since the current split_sst does not change key_range, this check can not be removed, otherwise concate will fail.) // check branched sst on non-overlap level { - for level in versioning + let left_levels = versioning .current_version - .get_compaction_group_levels(group_1) - .levels - .iter() - .chain( - versioning - .current_version - .get_compaction_group_levels(group_2) - .levels - .iter(), - ) - { - for sst in &level.table_infos { - if sst.sst_id != sst.object_id { - return Err(Error::CompactionGroup(format!( - "invalid merge group_1 {} group_2 {} branched sst_id {}", - left_group_id, right_group_id, sst.sst_id - ))); - } + .get_compaction_group_levels(group_1); + + let right_levels = versioning + .current_version + .get_compaction_group_levels(group_2); + + // we can not check the l0 sub level, because the sub level id will be rewritten when merge + // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct. + let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len()); + for level_idx in 0..max_level { + let left_level = left_levels.levels.get(level_idx).unwrap(); + let right_level = right_levels.levels.get(level_idx).unwrap(); + if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() { + continue; + } + + let left_last_sst = left_level.table_infos.last().unwrap().clone(); + let right_first_sst = right_level.table_infos.first().unwrap().clone(); + let left_sst_id = left_last_sst.sst_id; + let right_sst_id = right_first_sst.sst_id; + let left_obj_id = left_last_sst.object_id; + let right_obj_id = right_first_sst.object_id; + + if !can_concat(&[left_last_sst, right_first_sst]) { + return Err(Error::CompactionGroup(format!( + "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}", + left_group_id, right_group_id, level_idx, left_sst_id, right_sst_id, left_obj_id, right_obj_id + ))); } } } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index e52886b7aceec..826841b108ca6 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -185,6 +185,24 @@ impl HummockVersion { })) } + pub fn get_sst_ids_by_group_id( + &self, + compaction_group_id: CompactionGroupId, + ) -> impl Iterator + '_ { + self.levels + .iter() + .filter_map(move |(cg_id, level)| { + if *cg_id == compaction_group_id { + Some(level) + } else { + None + } + }) + .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter())) + .flat_map(|level| level.table_infos.iter()) + .map(|s| s.sst_id) + } + /// `get_sst_infos_from_groups` doesn't guarantee that all returned sst info belongs to `select_group`. /// i.e. `select_group` is just a hint. /// We separate `get_sst_infos_from_groups` and `get_sst_infos` because `get_sst_infos_from_groups` may be further customized in the future. diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index ebdb5adb8cd14..7c8048c61caee 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -29,7 +29,6 @@ pub(crate) mod tests { use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, Epoch, EpochExt}; use risingwave_common_service::NotificationClient; - use risingwave_hummock_sdk::can_concat; use risingwave_hummock_sdk::compact_task::CompactTask; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{ @@ -44,6 +43,7 @@ pub(crate) mod tests { ReadTableWatermark, TableWatermarks, VnodeWatermark, WatermarkDirection, }; use risingwave_hummock_sdk::version::HummockVersion; + use risingwave_hummock_sdk::{can_concat, CompactionGroupId}; use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; use risingwave_meta::hummock::compaction::selector::{ default_compaction_selector, ManualCompactionOption, @@ -92,7 +92,7 @@ pub(crate) mod tests { hummock_meta_client: Arc, notification_client: impl NotificationClient, hummock_manager_ref: &HummockManagerRef, - table_id: &[u32], + table_ids: &[u32], ) -> HummockStorage { let remote_dir = "hummock_001_test".to_string(); let options = Arc::new(StorageOpts { @@ -117,7 +117,7 @@ pub(crate) mod tests { register_tables_with_id_for_test( hummock.filter_key_extractor_manager(), hummock_manager_ref, - table_id, + table_ids, ) .await; @@ -235,7 +235,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - Default::default(), + &[0], ) .await; let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone() @@ -405,7 +405,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - Default::default(), + &[0], ) .await; @@ -2042,13 +2042,12 @@ pub(crate) mod tests { let base_epoch = Epoch::now(); let mut epoch: u64 = base_epoch.0; let millisec_interval_epoch: u64 = (1 << 16) * 100; - // let mut epoch_set = BTreeSet::new(); let mut local_1 = storage - .new_local(NewLocalOptions::for_test(table_id_1.clone())) + .new_local(NewLocalOptions::for_test(table_id_1)) .await; let mut local_2 = storage - .new_local(NewLocalOptions::for_test(table_id_2.clone())) + .new_local(NewLocalOptions::for_test(table_id_2)) .await; let val = Bytes::from(b"0"[..].to_vec()); @@ -2117,37 +2116,20 @@ pub(crate) mod tests { (&mut local_2, true), &mut epoch, val.clone(), - 16, + 1, millisec_interval_epoch, key_prefix.clone(), hummock_meta_client.clone(), &mut is_init, ) .await; - epoch += millisec_interval_epoch; let parent_group_id = 2; let split_table_ids = vec![table_id_2.table_id()]; - let new_cg_id = hummock_manager_ref - .split_compaction_group(parent_group_id, &split_table_ids, 0) - .await - .unwrap(); - - assert_ne!(parent_group_id, new_cg_id); - - hummock_manager_ref - .merge_compaction_group(parent_group_id, new_cg_id) - .await - .unwrap(); - - let new_cg_id = hummock_manager_ref - .split_compaction_group(parent_group_id, &split_table_ids, 0) - .await - .unwrap(); + // compact { - // compact left group let manual_compcation_option = ManualCompactionOption { level: 0, ..Default::default() @@ -2189,18 +2171,57 @@ pub(crate) mod tests { .unwrap(); } - { - // compact right group + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + + assert_ne!(parent_group_id, new_cg_id); + assert!(hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .is_err()); + + write_data( + &storage, + (&mut local_1, true), + (&mut local_2, true), + &mut epoch, + val.clone(), + 100, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + epoch += millisec_interval_epoch; + + async fn compact_once( + group_id: CompactionGroupId, + level: usize, + hummock_manager_ref: HummockManagerRef, + compact_ctx: CompactorContext, + filter_key_extractor_manager: FilterKeyExtractorManager, + sstable_object_id_manager: Arc, + ) { + // compact left group let manual_compcation_option = ManualCompactionOption { - level: 0, + level, ..Default::default() }; // 2. get compact task - let mut compact_task = hummock_manager_ref - .manual_get_compact_task(new_cg_id, manual_compcation_option) + let compact_task = hummock_manager_ref + .manual_get_compact_task(group_id, manual_compcation_option) .await - .unwrap() .unwrap(); + + if compact_task.is_none() { + return; + } + + let mut compact_task = compact_task.unwrap(); + let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; compact_task.compaction_filter_mask = compaction_filter_flag.bits(); @@ -2212,7 +2233,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); let ((result_task, task_stats), _) = compact( - compact_ctx.clone(), + compact_ctx, compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), @@ -2231,6 +2252,57 @@ pub(crate) mod tests { .unwrap(); } + compact_once( + parent_group_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + compact_once( + new_cg_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + // try merge + hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .unwrap(); + + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + + compact_once( + parent_group_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + compact_once( + new_cg_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + // write left write_data( &storage, @@ -2255,47 +2327,15 @@ pub(crate) mod tests { .unwrap(); // compact - { - // compact left group - let manual_compcation_option = ManualCompactionOption { - level: 0, - ..Default::default() - }; - // 2. get compact task - let mut compact_task = hummock_manager_ref - .manual_get_compact_task(parent_group_id, manual_compcation_option) - .await - .unwrap() - .unwrap(); - let compaction_filter_flag = - CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; - compact_task.compaction_filter_mask = compaction_filter_flag.bits(); - compact_task.current_epoch_time = hummock_manager_ref - .get_current_version() - .await - .max_committed_epoch(); - - // 3. compact - let (_tx, rx) = tokio::sync::oneshot::channel(); - let ((result_task, task_stats), _) = compact( - compact_ctx.clone(), - compact_task.clone(), - rx, - Box::new(sstable_object_id_manager.clone()), - filter_key_extractor_manager.clone(), - ) - .await; - - hummock_manager_ref - .report_compact_task( - result_task.task_id, - result_task.task_status, - result_task.sorted_output_ssts, - Some(to_prost_table_stats_map(task_stats)), - ) - .await - .unwrap(); - } + compact_once( + parent_group_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; // try split let new_cg_id = hummock_manager_ref @@ -2318,59 +2358,66 @@ pub(crate) mod tests { ) .await; - let ret_err = hummock_manager_ref + epoch += millisec_interval_epoch; + + hummock_manager_ref .merge_compaction_group(parent_group_id, new_cg_id) - .await; - assert!(ret_err.is_err()); + .await + .unwrap(); - // try compact - { - // compact left - loop { - let compact_task = hummock_manager_ref - .get_compact_task(parent_group_id, &mut default_compaction_selector()) - .await - .unwrap(); + // write left and right - if compact_task.is_none() { - break; - } + write_data( + &storage, + (&mut local_1, true), + (&mut local_2, true), + &mut epoch, + val.clone(), + 1, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; - let mut compact_task = compact_task.unwrap(); - let compaction_filter_flag = - CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; - compact_task.compaction_filter_mask = compaction_filter_flag.bits(); - compact_task.current_epoch_time = hummock_manager_ref - .get_current_version() - .await - .max_committed_epoch(); + epoch += millisec_interval_epoch; - // 3. compact - let (_tx, rx) = tokio::sync::oneshot::channel(); - let ((result_task, task_stats), _) = compact( - compact_ctx.clone(), - compact_task.clone(), - rx, - Box::new(sstable_object_id_manager.clone()), - filter_key_extractor_manager.clone(), - ) - .await; + compact_once( + parent_group_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; - hummock_manager_ref - .report_compact_task( - result_task.task_id, - result_task.task_status, - result_task.sorted_output_ssts, - Some(to_prost_table_stats_map(task_stats)), - ) - .await - .unwrap(); - } + compact_once( + new_cg_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; - // compact right + async fn compact_all( + group_id: CompactionGroupId, + level: usize, + hummock_manager_ref: HummockManagerRef, + compact_ctx: CompactorContext, + filter_key_extractor_manager: FilterKeyExtractorManager, + sstable_object_id_manager: Arc, + ) { loop { + let manual_compcation_option = ManualCompactionOption { + level, + ..Default::default() + }; let compact_task = hummock_manager_ref - .get_compact_task(new_cg_id, &mut default_compaction_selector()) + .manual_get_compact_task(group_id, manual_compcation_option) .await .unwrap(); @@ -2410,10 +2457,58 @@ pub(crate) mod tests { } } + // try split + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + // try merge - let ret_err = hummock_manager_ref + assert!(hummock_manager_ref .merge_compaction_group(parent_group_id, new_cg_id) - .await; - assert!(ret_err.is_err()); + .await + .is_err()); + + // write left and write + + write_data( + &storage, + (&mut local_1, true), + (&mut local_2, true), + &mut epoch, + val.clone(), + 200, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + + compact_all( + parent_group_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + compact_all( + new_cg_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + // try merge + hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .unwrap(); } } From e3f02c7fb9f558067a760fd90437a7cc1091f9ef Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 10 Sep 2024 02:38:38 +0800 Subject: [PATCH 09/11] typo --- .../compaction/compaction_group_schedule.rs | 1 + .../hummock_test/src/compactor_tests.rs | 92 ++++++------------- 2 files changed, 31 insertions(+), 62 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index 2ad2811273132..92af3f69bbfcc 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -152,6 +152,7 @@ impl HummockManager { let left_obj_id = left_last_sst.object_id; let right_obj_id = right_first_sst.object_id; + // Since the sst key_range within a group is legal, we only need to check the ssts adjacent to the two groups. if !can_concat(&[left_last_sst, right_first_sst]) { return Err(Error::CompactionGroup(format!( "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}", diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 7c8048c61caee..92856fb5022c6 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -2128,19 +2128,31 @@ pub(crate) mod tests { let parent_group_id = 2; let split_table_ids = vec![table_id_2.table_id()]; - // compact - { + async fn compact_once( + group_id: CompactionGroupId, + level: usize, + hummock_manager_ref: HummockManagerRef, + compact_ctx: CompactorContext, + filter_key_extractor_manager: FilterKeyExtractorManager, + sstable_object_id_manager: Arc, + ) { + // compact left group let manual_compcation_option = ManualCompactionOption { - level: 0, + level, ..Default::default() }; // 2. get compact task - let mut compact_task = hummock_manager_ref - .manual_get_compact_task(parent_group_id, manual_compcation_option) + let compact_task = hummock_manager_ref + .manual_get_compact_task(group_id, manual_compcation_option) .await - .unwrap() .unwrap(); + if compact_task.is_none() { + return; + } + + let mut compact_task = compact_task.unwrap(); + let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; compact_task.compaction_filter_mask = compaction_filter_flag.bits(); @@ -2152,7 +2164,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); let ((result_task, task_stats), _) = compact( - compact_ctx.clone(), + compact_ctx, compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), @@ -2171,6 +2183,17 @@ pub(crate) mod tests { .unwrap(); } + // compact + compact_once( + parent_group_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + let new_cg_id = hummock_manager_ref .split_compaction_group(parent_group_id, &split_table_ids, 0) .await @@ -2197,61 +2220,6 @@ pub(crate) mod tests { .await; epoch += millisec_interval_epoch; - async fn compact_once( - group_id: CompactionGroupId, - level: usize, - hummock_manager_ref: HummockManagerRef, - compact_ctx: CompactorContext, - filter_key_extractor_manager: FilterKeyExtractorManager, - sstable_object_id_manager: Arc, - ) { - // compact left group - let manual_compcation_option = ManualCompactionOption { - level, - ..Default::default() - }; - // 2. get compact task - let compact_task = hummock_manager_ref - .manual_get_compact_task(group_id, manual_compcation_option) - .await - .unwrap(); - - if compact_task.is_none() { - return; - } - - let mut compact_task = compact_task.unwrap(); - - let compaction_filter_flag = - CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; - compact_task.compaction_filter_mask = compaction_filter_flag.bits(); - compact_task.current_epoch_time = hummock_manager_ref - .get_current_version() - .await - .max_committed_epoch(); - - // 3. compact - let (_tx, rx) = tokio::sync::oneshot::channel(); - let ((result_task, task_stats), _) = compact( - compact_ctx, - compact_task.clone(), - rx, - Box::new(sstable_object_id_manager.clone()), - filter_key_extractor_manager.clone(), - ) - .await; - - hummock_manager_ref - .report_compact_task( - result_task.task_id, - result_task.task_status, - result_task.sorted_output_ssts, - Some(to_prost_table_stats_map(task_stats)), - ) - .await - .unwrap(); - } - compact_once( parent_group_id, 0, From 8df085a0e63fc39a7f825d207f7721f4c63c5242 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 10 Sep 2024 11:44:26 +0800 Subject: [PATCH 10/11] add doc --- .../hummock_sdk/src/compaction_group/hummock_version_ext.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 826841b108ca6..03e5db1e0f04f 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -185,6 +185,7 @@ impl HummockVersion { })) } + // only scan the sst infos from levels in the specified compaction group (without table change log) pub fn get_sst_ids_by_group_id( &self, compaction_group_id: CompactionGroupId, From e064064ae4ea0d970b042843466d905f02de1166 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 10 Sep 2024 15:58:03 +0800 Subject: [PATCH 11/11] address comments --- .../manager/compaction/compaction_group_schedule.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index 92af3f69bbfcc..93103ca87abf5 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -124,7 +124,6 @@ impl HummockManager { } } - // TODO(li0k): remove this check (Since the current split_sst does not change key_range, this check can not be removed, otherwise concate will fail.) // check branched sst on non-overlap level { let left_levels = versioning @@ -138,9 +137,9 @@ impl HummockManager { // we can not check the l0 sub level, because the sub level id will be rewritten when merge // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct. let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len()); - for level_idx in 0..max_level { - let left_level = left_levels.levels.get(level_idx).unwrap(); - let right_level = right_levels.levels.get(level_idx).unwrap(); + for level_idx in 1..=max_level { + let left_level = left_levels.get_level(level_idx); + let right_level = right_levels.get_level(level_idx); if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() { continue; }