diff --git a/proto/hummock.proto b/proto/hummock.proto index 19b7e036c968..b1c5761860ed 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -104,6 +104,11 @@ message GroupTableChange { message GroupDestroy {} +message GroupMerge { + uint64 left_group_id = 1; + uint64 right_group_id = 2; +} + message GroupDelta { oneof delta_type { IntraLevelDelta intra_level = 1; @@ -111,6 +116,7 @@ message GroupDelta { GroupDestroy group_destroy = 3; GroupMetaChange group_meta_change = 4 [deprecated = true]; GroupTableChange group_table_change = 5 [deprecated = true]; + GroupMerge group_merge = 6; } } @@ -744,6 +750,7 @@ message PinVersionResponse { message SplitCompactionGroupRequest { uint64 group_id = 1; repeated uint32 table_ids = 2; + uint32 partition_vnode_count = 3; } message SplitCompactionGroupResponse { @@ -839,6 +846,13 @@ message GetVersionByEpochResponse { HummockVersion version = 1; } +message MergeCompactionGroupRequest { + uint64 left_group_id = 1; + uint64 right_group_id = 2; +} + +message MergeCompactionGroupResponse {} + service HummockManagerService { rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse); rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse); @@ -880,6 +894,7 @@ service HummockManagerService { rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse); rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse); rpc GetVersionByEpoch(GetVersionByEpochRequest) returns (GetVersionByEpochResponse); + rpc MergeCompactionGroup(MergeCompactionGroupRequest) returns (MergeCompactionGroupResponse); } message CompactionConfig { diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index a0395d236d50..c41b4c6e25b9 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -131,10 +131,11 @@ pub async fn split_compaction_group( context: &CtlContext, group_id: CompactionGroupId, table_ids_to_new_group: &[StateTableId], + partition_vnode_count: u32, ) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; let new_group_id = meta_client - .split_compaction_group(group_id, table_ids_to_new_group) + .split_compaction_group(group_id, table_ids_to_new_group, partition_vnode_count) .await?; println!( "Succeed: split compaction group {}. tables {:#?} are moved to new group {}.", @@ -284,3 +285,15 @@ pub async fn cancel_compact_task(context: &CtlContext, task_id: u64) -> anyhow:: Ok(()) } + +pub async fn merge_compaction_group( + context: &CtlContext, + left_group_id: CompactionGroupId, + right_group_id: CompactionGroupId, +) -> anyhow::Result<()> { + let meta_client = context.meta_client().await?; + meta_client + .merge_compaction_group(left_group_id, right_group_id) + .await?; + Ok(()) +} diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 34c5be6ace21..b35b8d1e42cb 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -276,6 +276,8 @@ enum HummockCommands { compaction_group_id: u64, #[clap(long, value_delimiter = ',')] table_ids: Vec, + #[clap(long, default_value_t = 0)] + partition_vnode_count: u32, }, /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object. PauseVersionCheckpoint, @@ -340,6 +342,12 @@ enum HummockCommands { #[clap(long)] record_hybrid_fetch_threshold_ms: Option, }, + MergeCompactionGroup { + #[clap(long)] + left_group_id: u64, + #[clap(long)] + right_group_id: u64, + }, } #[derive(Subcommand)] @@ -711,9 +719,15 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Hummock(HummockCommands::SplitCompactionGroup { compaction_group_id, table_ids, + partition_vnode_count, }) => { - cmd_impl::hummock::split_compaction_group(context, compaction_group_id, &table_ids) - .await?; + cmd_impl::hummock::split_compaction_group( + context, + compaction_group_id, + &table_ids, + partition_vnode_count, + ) + .await?; } Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => { cmd_impl::hummock::pause_version_checkpoint(context).await?; @@ -790,6 +804,13 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { ) .await? } + Commands::Hummock(HummockCommands::MergeCompactionGroup { + left_group_id, + right_group_id, + }) => { + cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id) + .await? + } Commands::Table(TableCommands::Scan { mv_name, data_dir, diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 21e203d8440b..77c2df39c874 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -457,7 +457,7 @@ impl HummockManagerService for HummockServiceImpl { let req = request.into_inner(); let new_group_id = self .hummock_manager - .split_compaction_group(req.group_id, &req.table_ids) + .split_compaction_group(req.group_id, &req.table_ids, req.partition_vnode_count) .await?; Ok(Response::new(SplitCompactionGroupResponse { new_group_id })) } @@ -716,6 +716,17 @@ impl HummockManagerService for HummockServiceImpl { version: Some(version.to_protobuf()), })) } + + async fn merge_compaction_group( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + self.hummock_manager + .merge_compaction_group(req.left_group_id, req.right_group_id) + .await?; + Ok(Response::new(MergeCompactionGroupResponse {})) + } } #[cfg(test)] diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index bc3701a6b9d8..f678014d440c 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -156,8 +156,8 @@ impl HummockManager { .hummock_version_deltas .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id))) { - for group_deltas in version_delta.group_deltas.values() { - let summary = summarize_group_deltas(group_deltas); + for (group_id, group_deltas) in &version_delta.group_deltas { + let summary = summarize_group_deltas(group_deltas, *group_id); object_sizes.extend( summary .insert_table_infos diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs similarity index 97% rename from src/meta/src/hummock/manager/compaction_group_manager.rs rename to src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index c68fc4222f28..2e20d1fcf65e 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -54,7 +54,7 @@ use crate::model::{ type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>; impl CompactionGroupManager { - pub(super) async fn new(env: &MetaSrvEnv) -> Result { + pub(crate) async fn new(env: &MetaSrvEnv) -> Result { let default_config = match env.opts.compaction_config.as_ref() { None => CompactionConfigBuilder::new().build(), Some(opt) => CompactionConfigBuilder::with_opt(opt).build(), @@ -62,7 +62,7 @@ impl CompactionGroupManager { Self::new_with_config(env, default_config).await } - pub(super) async fn new_with_config( + pub(crate) async fn new_with_config( env: &MetaSrvEnv, default_config: CompactionConfig, ) -> Result { @@ -428,24 +428,6 @@ impl HummockManager { results } - /// Splits a compaction group into two. The new one will contain `table_ids`. - /// Returns the newly created compaction group id. - pub async fn split_compaction_group( - &self, - parent_group_id: CompactionGroupId, - table_ids: &[StateTableId], - ) -> Result { - let result = self - .move_state_table_to_compaction_group( - parent_group_id, - table_ids, - self.env.opts.partition_vnode_count, - ) - .await?; - - Ok(result) - } - /// move some table to another compaction-group. Create a new compaction group if it does not /// exist. pub async fn move_state_table_to_compaction_group( @@ -651,7 +633,7 @@ impl HummockManager { infos } - pub(super) async fn initial_compaction_group_config_after_load( + pub(crate) async fn initial_compaction_group_config_after_load( &self, versioning_guard: &Versioning, compaction_group_manager: &mut CompactionGroupManager, @@ -675,7 +657,7 @@ impl HummockManager { /// 1. initialize default static compaction group. /// 2. register new table to new compaction group. /// 3. move existent table to new compaction group. -pub(super) struct CompactionGroupManager { +pub(crate) struct CompactionGroupManager { compaction_groups: BTreeMap, default_config: Arc, /// Tables that write limit is trigger for. @@ -709,7 +691,7 @@ impl CompactionGroupManager { } /// Tries to get compaction group config for `compaction_group_id`. - pub(super) fn try_get_compaction_group_config( + pub(crate) fn try_get_compaction_group_config( &self, compaction_group_id: CompactionGroupId, ) -> Option { @@ -717,7 +699,7 @@ impl CompactionGroupManager { } /// Tries to get compaction group config for `compaction_group_id`. - pub(super) fn default_compaction_config(&self) -> Arc { + pub(crate) fn default_compaction_config(&self) -> Arc { self.default_config.clone() } } @@ -814,7 +796,7 @@ impl<'a> CompactionGroupTransaction<'a> { } /// Tries to get compaction group config for `compaction_group_id`. - pub(super) fn try_get_compaction_group_config( + pub(crate) fn try_get_compaction_group_config( &self, compaction_group_id: CompactionGroupId, ) -> Option<&CompactionGroup> { @@ -822,7 +804,7 @@ impl<'a> CompactionGroupTransaction<'a> { } /// Removes stale group configs. - fn purge(&mut self, existing_groups: HashSet) { + pub fn purge(&mut self, existing_groups: HashSet) { let stale_group = self .tree_ref() .keys() @@ -837,7 +819,7 @@ impl<'a> CompactionGroupTransaction<'a> { } } - pub(super) fn update_compaction_config( + pub(crate) fn update_compaction_config( &mut self, compaction_group_ids: &[CompactionGroupId], config_to_update: &[MutableConfig], diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs new file mode 100644 index 000000000000..93103ca87abf --- /dev/null +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -0,0 +1,359 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet, VecDeque}; +use std::ops::DerefMut; + +use itertools::Itertools; +use risingwave_common::catalog::TableId; +use risingwave_hummock_sdk::compact_task::ReportTask; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::TableGroupInfo; +use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; +use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas}; +use risingwave_hummock_sdk::{can_concat, CompactionGroupId}; +use risingwave_pb::hummock::compact_task::TaskStatus; +use risingwave_pb::hummock::{PbGroupMerge, PbStateTableInfoDelta}; +use thiserror_ext::AsReport; + +use crate::hummock::error::{Error, Result}; +use crate::hummock::manager::transaction::HummockVersionTransaction; +use crate::hummock::manager::{commit_multi_var, HummockManager}; +use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat; + +impl HummockManager { + /// Splits a compaction group into two. The new one will contain `table_ids`. + /// Returns the newly created compaction group id. + pub async fn split_compaction_group( + &self, + parent_group_id: CompactionGroupId, + table_ids: &[StateTableId], + partition_vnode_count: u32, + ) -> Result { + let result = self + .move_state_table_to_compaction_group(parent_group_id, table_ids, partition_vnode_count) + .await?; + + Ok(result) + } + + pub async fn merge_compaction_group( + &self, + group_1: CompactionGroupId, + group_2: CompactionGroupId, + ) -> Result<()> { + let compaction_guard = self.compaction.write().await; + let mut versioning_guard = self.versioning.write().await; + let versioning = versioning_guard.deref_mut(); + // Validate parameters. + if !versioning.current_version.levels.contains_key(&group_1) { + return Err(Error::CompactionGroup(format!("invalid group {}", group_1))); + } + + if !versioning.current_version.levels.contains_key(&group_2) { + return Err(Error::CompactionGroup(format!("invalid group {}", group_2))); + } + + let state_table_info = versioning.current_version.state_table_info.clone(); + let mut member_table_ids_1 = state_table_info + .compaction_group_member_table_ids(group_1) + .iter() + .cloned() + .collect_vec(); + + let mut member_table_ids_2 = state_table_info + .compaction_group_member_table_ids(group_2) + .iter() + .cloned() + .collect_vec(); + + debug_assert!(!member_table_ids_1.is_empty()); + debug_assert!(!member_table_ids_2.is_empty()); + assert!(member_table_ids_1.is_sorted()); + assert!(member_table_ids_2.is_sorted()); + + // Make sure `member_table_ids_1` is smaller than `member_table_ids_2` + let (left_group_id, right_group_id) = + if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() { + (group_1, group_2) + } else { + std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2); + (group_2, group_1) + }; + + // We can only merge two groups with non-overlapping member table ids + if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() { + return Err(Error::CompactionGroup(format!( + "invalid merge group_1 {} group_2 {}", + left_group_id, right_group_id + ))); + } + + let combined_member_table_ids = member_table_ids_1 + .iter() + .chain(member_table_ids_2.iter()) + .collect_vec(); + assert!(combined_member_table_ids.is_sorted()); + + // check duplicated sst_id + let mut sst_id_set = HashSet::new(); + for sst_id in versioning + .current_version + .get_sst_ids_by_group_id(left_group_id) + .chain( + versioning + .current_version + .get_sst_ids_by_group_id(right_group_id), + ) + { + if !sst_id_set.insert(sst_id) { + return Err(Error::CompactionGroup(format!( + "invalid merge group_1 {} group_2 {} duplicated sst_id {}", + left_group_id, right_group_id, sst_id + ))); + } + } + + // check branched sst on non-overlap level + { + let left_levels = versioning + .current_version + .get_compaction_group_levels(group_1); + + let right_levels = versioning + .current_version + .get_compaction_group_levels(group_2); + + // we can not check the l0 sub level, because the sub level id will be rewritten when merge + // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct. + let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len()); + for level_idx in 1..=max_level { + let left_level = left_levels.get_level(level_idx); + let right_level = right_levels.get_level(level_idx); + if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() { + continue; + } + + let left_last_sst = left_level.table_infos.last().unwrap().clone(); + let right_first_sst = right_level.table_infos.first().unwrap().clone(); + let left_sst_id = left_last_sst.sst_id; + let right_sst_id = right_first_sst.sst_id; + let left_obj_id = left_last_sst.object_id; + let right_obj_id = right_first_sst.object_id; + + // Since the sst key_range within a group is legal, we only need to check the ssts adjacent to the two groups. + if !can_concat(&[left_last_sst, right_first_sst]) { + return Err(Error::CompactionGroup(format!( + "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}", + left_group_id, right_group_id, level_idx, left_sst_id, right_sst_id, left_obj_id, right_obj_id + ))); + } + } + } + + let mut version = HummockVersionTransaction::new( + &mut versioning.current_version, + &mut versioning.hummock_version_deltas, + self.env.notification_manager(), + &self.metrics, + ); + let mut new_version_delta = version.new_delta(); + + let target_compaction_group_id = { + // merge right_group_id to left_group_id and remove right_group_id + new_version_delta.group_deltas.insert( + left_group_id, + GroupDeltas { + group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge { + left_group_id, + right_group_id, + })], + }, + ); + left_group_id + }; + + // TODO: remove compaciton group_id from state_table_info + // rewrite compaction_group_id for all tables + new_version_delta.with_latest_version(|version, new_version_delta| { + for table_id in combined_member_table_ids { + let table_id = TableId::new(table_id.table_id()); + let info = version + .state_table_info + .info() + .get(&table_id) + .expect("have check exist previously"); + assert!(new_version_delta + .state_table_info_delta + .insert( + table_id, + PbStateTableInfoDelta { + committed_epoch: info.committed_epoch, + safe_epoch: info.safe_epoch, + compaction_group_id: target_compaction_group_id, + } + ) + .is_none()); + } + }); + + { + let mut compaction_group_manager = self.compaction_group_manager.write().await; + let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn(); + + // for metrics reclaim + { + let right_group_max_level = new_version_delta + .latest_version() + .get_compaction_group_levels(right_group_id) + .levels + .len(); + + remove_compaction_group_in_sst_stat( + &self.metrics, + right_group_id, + right_group_max_level, + ); + } + + new_version_delta.pre_apply(); + + // remove right_group_id + compaction_groups_txn.remove(right_group_id); + commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?; + } + + // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot. + versioning.mark_next_time_travel_version_snapshot(); + + // cancel tasks + let mut canceled_tasks = vec![]; + // after merge, all tasks in right_group_id should be canceled + // otherwise, pending size calculation by level handler will make some mistake + for task_assignment in compaction_guard.compact_task_assignment.values() { + if let Some(task) = task_assignment.compact_task.as_ref() { + let need_cancel = task.compaction_group_id == right_group_id; + if need_cancel { + canceled_tasks.push(ReportTask { + task_id: task.task_id, + task_status: TaskStatus::ManualCanceled, + table_stats_change: HashMap::default(), + sorted_output_ssts: vec![], + }); + } + } + } + + drop(versioning_guard); + drop(compaction_guard); + self.report_compact_tasks(canceled_tasks).await?; + + Ok(()) + } + + pub async fn try_split_compaction_group( + &self, + table_write_throughput: &HashMap>, + checkpoint_secs: u64, + group: &TableGroupInfo, + created_tables: &HashSet, + ) { + // split high throughput table to dedicated compaction group + for (table_id, table_size) in &group.table_statistic { + self.try_move_table_to_dedicated_cg( + table_write_throughput, + table_id, + table_size, + !created_tables.contains(table_id), + checkpoint_secs, + group.group_id, + group.group_size, + ) + .await; + } + } + + pub async fn try_move_table_to_dedicated_cg( + &self, + table_write_throughput: &HashMap>, + table_id: &u32, + table_size: &u64, + is_creating_table: bool, + checkpoint_secs: u64, + parent_group_id: u64, + group_size: u64, + ) { + let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into(); + let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into(); + let partition_vnode_count = self.env.opts.partition_vnode_count; + let window_size = + self.env.opts.table_info_statistic_history_times / (checkpoint_secs as usize); + + let mut is_high_write_throughput = false; + let mut is_low_write_throughput = true; + if let Some(history) = table_write_throughput.get(table_id) { + if history.len() >= window_size { + is_high_write_throughput = history.iter().all(|throughput| { + *throughput / checkpoint_secs > self.env.opts.table_write_throughput_threshold + }); + is_low_write_throughput = history.iter().any(|throughput| { + *throughput / checkpoint_secs < self.env.opts.min_table_split_write_throughput + }); + } + } + + let state_table_size = *table_size; + + // 1. Avoid splitting a creating table + // 2. Avoid splitting a is_low_write_throughput creating table + // 3. Avoid splitting a non-high throughput medium-sized table + if is_creating_table + || (is_low_write_throughput) + || (state_table_size < self.env.opts.min_table_split_size && !is_high_write_throughput) + { + return; + } + + // do not split a large table and a small table because it would increase IOPS + // of small table. + if parent_group_id != default_group_id && parent_group_id != mv_group_id { + let rest_group_size = group_size - state_table_size; + if rest_group_size < state_table_size + && rest_group_size < self.env.opts.min_table_split_size + { + return; + } + } + + let ret = self + .move_state_table_to_compaction_group( + parent_group_id, + &[*table_id], + partition_vnode_count, + ) + .await; + match ret { + Ok(new_group_id) => { + tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", table_id, parent_group_id, new_group_id, partition_vnode_count); + } + Err(e) => { + tracing::info!( + error = %e.as_report(), + "failed to move state table [{}] from group-{}", + table_id, + parent_group_id, + ) + } + } + } +} diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction/mod.rs similarity index 95% rename from src/meta/src/hummock/manager/compaction.rs rename to src/meta/src/hummock/manager/compaction/mod.rs index 4696c0745201..8f2ecc33c60b 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -27,7 +27,7 @@ // limitations under the License. use std::cmp::min; -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::{Arc, LazyLock}; use std::time::{Instant, SystemTime}; @@ -43,7 +43,6 @@ use rand::thread_rng; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask}; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt; -use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::level::{InputLevel, Level, Levels}; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -96,6 +95,9 @@ use crate::hummock::{commit_multi_var, start_measure_real_process_timer, Hummock use crate::manager::{MetadataManager, META_NODE_ID}; use crate::model::BTreeMapTransaction; +pub mod compaction_group_manager; +pub mod compaction_group_schedule; + const MAX_SKIP_TIMES: usize = 8; const MAX_REPORT_COUNT: usize = 16; @@ -1567,80 +1569,6 @@ impl HummockManager { .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); } } - - pub async fn try_move_table_to_dedicated_cg( - &self, - table_write_throughput: &HashMap>, - table_id: &u32, - table_size: &u64, - is_creating_table: bool, - checkpoint_secs: u64, - parent_group_id: u64, - group_size: u64, - ) { - let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into(); - let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into(); - let partition_vnode_count = self.env.opts.partition_vnode_count; - let window_size = - self.env.opts.table_info_statistic_history_times / (checkpoint_secs as usize); - - let mut is_high_write_throughput = false; - let mut is_low_write_throughput = true; - if let Some(history) = table_write_throughput.get(table_id) { - if history.len() >= window_size { - is_high_write_throughput = history.iter().all(|throughput| { - *throughput / checkpoint_secs > self.env.opts.table_write_throughput_threshold - }); - is_low_write_throughput = history.iter().any(|throughput| { - *throughput / checkpoint_secs < self.env.opts.min_table_split_write_throughput - }); - } - } - - let state_table_size = *table_size; - - // 1. Avoid splitting a creating table - // 2. Avoid splitting a is_low_write_throughput creating table - // 3. Avoid splitting a non-high throughput medium-sized table - if is_creating_table - || (is_low_write_throughput) - || (state_table_size < self.env.opts.min_table_split_size && !is_high_write_throughput) - { - return; - } - - // do not split a large table and a small table because it would increase IOPS - // of small table. - if parent_group_id != default_group_id && parent_group_id != mv_group_id { - let rest_group_size = group_size - state_table_size; - if rest_group_size < state_table_size - && rest_group_size < self.env.opts.min_table_split_size - { - return; - } - } - - let ret = self - .move_state_table_to_compaction_group( - parent_group_id, - &[*table_id], - partition_vnode_count, - ) - .await; - match ret { - Ok(new_group_id) => { - tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", table_id, parent_group_id, new_group_id, partition_vnode_count); - } - Err(e) => { - tracing::info!( - error = %e.as_report(), - "failed to move state table [{}] from group-{}", - table_id, - parent_group_id, - ) - } - } - } } #[cfg(any(test, feature = "test"))] diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index ded8d507dbff..d43b1cc6f542 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -50,7 +50,6 @@ use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager}; use crate::model::{ClusterId, MetadataModel, MetadataModelError}; use crate::rpc::metrics::MetaMetrics; -mod compaction_group_manager; mod context; mod gc; mod tests; diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index dca7311f4778..d0183d84d23c 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -17,7 +17,6 @@ use std::borrow::Borrow; use std::cmp::Ordering; use std::collections::HashMap; -use std::sync::Arc; use itertools::Itertools; use prometheus::Registry; @@ -1393,13 +1392,13 @@ async fn test_split_compaction_group_on_demand_basic() { assert_eq!(original_groups, vec![2, 3]); let err = hummock_manager - .split_compaction_group(100, &[0]) + .split_compaction_group(100, &[0], 0) .await .unwrap_err(); assert_eq!("compaction group error: invalid group 100", err.to_string()); let err = hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap_err(); assert_eq!( @@ -1461,7 +1460,7 @@ async fn test_split_compaction_group_on_demand_basic() { .unwrap(); let err = hummock_manager - .split_compaction_group(2, &[100, 101]) + .split_compaction_group(2, &[100, 101], 0) .await .unwrap_err(); assert_eq!( @@ -1477,7 +1476,7 @@ async fn test_split_compaction_group_on_demand_basic() { .unwrap(); hummock_manager - .split_compaction_group(2, &[100, 101]) + .split_compaction_group(2, &[100, 101], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -1545,7 +1544,7 @@ async fn test_split_compaction_group_on_demand_non_trivial() { .unwrap(); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); @@ -1673,7 +1672,7 @@ async fn test_split_compaction_group_trivial_expired() { .unwrap(); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); @@ -1845,7 +1844,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { ); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -1954,7 +1953,7 @@ async fn test_compaction_task_expiration_due_to_split_group() { let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 2); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); @@ -2038,7 +2037,7 @@ async fn test_move_tables_between_compaction_group() { ); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -2137,11 +2136,9 @@ async fn test_partition_level() { .level0_overlapping_sub_level_compact_level_count(3) .build(); let registry = Registry::new(); - let (_env, hummock_manager, _, worker_node) = + let (env, hummock_manager, _, worker_node) = setup_compute_env_with_metric(80, config.clone(), Some(MetaMetrics::for_test(®istry))) .await; - let config = Arc::new(config); - let context_id = worker_node.id; hummock_manager @@ -2176,7 +2173,7 @@ async fn test_partition_level() { .unwrap()); hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], env.opts.partition_vnode_count) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -2318,7 +2315,7 @@ async fn test_unregister_moved_table() { .unwrap(); let new_group_id = hummock_manager - .split_compaction_group(2, &[100]) + .split_compaction_group(2, &[100], 0) .await .unwrap(); assert_ne!(new_group_id, 2); diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index ec0f77ac88a8..94537e9c33e1 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -43,7 +43,7 @@ impl HummockManager { const COMPACTION_HEARTBEAT_PERIOD_SEC: u64 = 1; pub enum HummockTimerEvent { - GroupSplit, + GroupSchedule, CheckDeadTask, Report, CompactionHeartBeatExpiredCheck, @@ -158,7 +158,7 @@ impl HummockManager { .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); let split_group_trigger = IntervalStream::new(split_group_trigger_interval) - .map(|_| HummockTimerEvent::GroupSplit); + .map(|_| HummockTimerEvent::GroupSchedule); triggers.push(Box::pin(split_group_trigger)); } @@ -189,12 +189,12 @@ impl HummockManager { hummock_manager.check_dead_task().await; } - HummockTimerEvent::GroupSplit => { + HummockTimerEvent::GroupSchedule => { if hummock_manager.env.opts.compaction_deterministic_test { continue; } - hummock_manager.on_handle_check_split_multi_group().await; + hummock_manager.on_handle_schedule_group().await; } HummockTimerEvent::Report => { @@ -443,7 +443,7 @@ impl HummockManager { /// throughput keep larger than `table_write_throughput_threshold` for a long time. /// * For state-table whose throughput less than `min_table_split_write_throughput`, do not /// increase it size of base-level. - async fn on_handle_check_split_multi_group(&self) { + async fn on_handle_schedule_group(&self) { let params = self.env.system_params_reader().await; let barrier_interval_ms = params.barrier_interval_ms() as u64; let checkpoint_secs = std::cmp::max( @@ -469,18 +469,13 @@ impl HummockManager { continue; } - for (table_id, table_size) in &group.table_statistic { - self.try_move_table_to_dedicated_cg( - &table_write_throughput, - table_id, - table_size, - !created_tables.contains(table_id), - checkpoint_secs, - group.group_id, - group.group_size, - ) - .await; - } + self.try_split_compaction_group( + &table_write_throughput, + checkpoint_secs, + group, + &created_tables, + ) + .await; } } diff --git a/src/prost/build.rs b/src/prost/build.rs index 18bc2d4ae949..da595949b442 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -179,6 +179,7 @@ fn main() -> Result<(), Box> { .type_attribute("hummock.GroupDestroy", "#[derive(Eq)]") .type_attribute("hummock.GroupMetaChange", "#[derive(Eq)]") .type_attribute("hummock.GroupTableChange", "#[derive(Eq)]") + .type_attribute("hummock.GroupMerge", "#[derive(Eq)]") .type_attribute("hummock.GroupDelta", "#[derive(Eq)]") .type_attribute("hummock.LevelHandler.RunningCompactTask", "#[derive(Eq)]") .type_attribute("hummock.LevelHandler", "#[derive(Eq)]") diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index db66e60c91ee..8e3c06b4caba 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1246,10 +1246,12 @@ impl MetaClient { &self, group_id: CompactionGroupId, table_ids_to_new_group: &[StateTableId], + partition_vnode_count: u32, ) -> Result { let req = SplitCompactionGroupRequest { group_id, table_ids: table_ids_to_new_group.to_vec(), + partition_vnode_count, }; let resp = self.inner.split_compaction_group(req).await?; Ok(resp.new_group_id) @@ -1445,6 +1447,19 @@ impl MetaClient { let resp = self.inner.get_cluster_limits(req).await?; Ok(resp.active_limits.into_iter().map(|l| l.into()).collect()) } + + pub async fn merge_compaction_group( + &self, + left_group_id: CompactionGroupId, + right_group_id: CompactionGroupId, + ) -> Result<()> { + let req = MergeCompactionGroupRequest { + left_group_id, + right_group_id, + }; + self.inner.merge_compaction_group(req).await?; + Ok(()) + } } #[async_trait] @@ -2117,6 +2132,7 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse} ,{ hummock_client, list_change_log_epochs, ListChangeLogEpochsRequest, ListChangeLogEpochsResponse } ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse } + ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse } ,{ user_client, create_user, CreateUserRequest, CreateUserResponse } ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse } ,{ user_client, drop_user, DropUserRequest, DropUserResponse } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index c54dd05b25d2..03e5db1e0f04 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -22,12 +22,13 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; use risingwave_pb::hummock::{ - CompactionConfig, CompatibilityVersion, GroupConstruct, GroupDestroy, GroupMetaChange, + CompactionConfig, CompatibilityVersion, GroupConstruct, GroupMerge, GroupMetaChange, GroupTableChange, PbLevelType, }; use tracing::warn; -use super::StateTableId; +use super::group_split::get_sub_level_insert_hint; +use super::{group_split, StateTableId}; use crate::change_log::TableChangeLog; use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; @@ -47,13 +48,17 @@ pub struct GroupDeltasSummary { pub insert_sub_level_id: u64, pub insert_table_infos: Vec, pub group_construct: Option, - pub group_destroy: Option, + pub group_destroy: Option, pub group_meta_changes: Vec, pub group_table_change: Option, pub new_vnode_partition_count: u32, + pub group_merge: Option, } -pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary { +pub fn summarize_group_deltas( + group_deltas: &GroupDeltas, + compaction_group_id: CompactionGroupId, +) -> GroupDeltasSummary { let mut delete_sst_levels = Vec::with_capacity(group_deltas.group_deltas.len()); let mut delete_sst_ids_set = HashSet::new(); let mut insert_sst_level_id = u32::MAX; @@ -64,6 +69,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary let mut group_meta_changes = vec![]; let mut group_table_change = None; let mut new_vnode_partition_count = 0; + let mut group_merge = None; for group_delta in &group_deltas.group_deltas { match group_delta { @@ -83,9 +89,9 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary assert!(group_construct.is_none()); group_construct = Some(construct_delta.clone()); } - GroupDelta::GroupDestroy(destroy_delta) => { + GroupDelta::GroupDestroy(_) => { assert!(group_destroy.is_none()); - group_destroy = Some(*destroy_delta); + group_destroy = Some(compaction_group_id); } GroupDelta::GroupMetaChange(meta_delta) => { group_meta_changes.push(meta_delta.clone()); @@ -93,6 +99,11 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary GroupDelta::GroupTableChange(meta_delta) => { group_table_change = Some(meta_delta.clone()); } + GroupDelta::GroupMerge(merge_delta) => { + assert!(group_merge.is_none()); + group_merge = Some(*merge_delta); + group_destroy = Some(merge_delta.right_group_id); + } } } @@ -110,6 +121,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary group_meta_changes, group_table_change, new_vnode_partition_count, + group_merge, } } @@ -173,6 +185,25 @@ impl HummockVersion { })) } + // only scan the sst infos from levels in the specified compaction group (without table change log) + pub fn get_sst_ids_by_group_id( + &self, + compaction_group_id: CompactionGroupId, + ) -> impl Iterator + '_ { + self.levels + .iter() + .filter_map(move |(cg_id, level)| { + if *cg_id == compaction_group_id { + Some(level) + } else { + None + } + }) + .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter())) + .flat_map(|level| level.table_infos.iter()) + .map(|s| s.sst_id) + } + /// `get_sst_infos_from_groups` doesn't guarantee that all returned sst info belongs to `select_group`. /// i.e. `select_group` is just a hint. /// We separate `get_sst_infos_from_groups` and `get_sst_infos` because `get_sst_infos_from_groups` may be further customized in the future. @@ -386,23 +417,6 @@ impl HummockVersion { { for sub_level in &mut l0.sub_levels { let target_l0 = &mut cur_levels.l0; - // When `insert_hint` is `Ok(idx)`, it means that the sub level `idx` in `target_l0` - // will extend these SSTs. When `insert_hint` is `Err(idx)`, it - // means that we will add a new sub level `idx` into `target_l0`. - let mut insert_hint = Err(target_l0.sub_levels.len()); - for (idx, other) in target_l0.sub_levels.iter_mut().enumerate() { - match other.sub_level_id.cmp(&sub_level.sub_level_id) { - Ordering::Less => {} - Ordering::Equal => { - insert_hint = Ok(idx); - break; - } - Ordering::Greater => { - insert_hint = Err(idx); - break; - } - } - } // Remove SST from sub level may result in empty sub level. It will be purged // whenever another compaction task is finished. let insert_table_infos = @@ -419,7 +433,7 @@ impl HummockVersion { if insert_table_infos.is_empty() { continue; } - match insert_hint { + match get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) { Ok(idx) => { add_ssts_to_sub_level(target_l0, idx, insert_table_infos); } @@ -570,7 +584,7 @@ impl HummockVersion { // apply to `levels`, which is different compaction groups for (compaction_group_id, group_deltas) in &version_delta.group_deltas { - let summary = summarize_group_deltas(group_deltas); + let summary = summarize_group_deltas(group_deltas, *compaction_group_id); if let Some(group_construct) = &summary.group_construct { let mut new_levels = build_initial_compaction_group_levels( *compaction_group_id, @@ -635,14 +649,19 @@ impl HummockVersion { .expect("compaction group should exist") .member_table_ids .append(&mut moving_tables); + } else if let Some(group_merge) = &summary.group_merge { + tracing::info!( + "group_merge left {:?} right {:?}", + group_merge.left_group_id, + group_merge.right_group_id + ); + self.merge_compaction_group(group_merge.left_group_id, group_merge.right_group_id) } - let has_destroy = summary.group_destroy.is_some(); let visible_table_committed_epoch = self.visible_table_committed_epoch(); - let levels = self - .levels - .get_mut(compaction_group_id) - .expect("compaction group should exist"); - + let group_destroy = summary.group_destroy; + let levels = self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { + panic!("compaction group {} does not exist", compaction_group_id) + }); #[expect(deprecated)] // for backward-compatibility of previous hummock version delta for group_meta_delta in &summary.group_meta_changes { levels @@ -669,7 +688,8 @@ impl HummockVersion { } = summary; assert!( - delete_sst_levels.is_empty() && delete_sst_ids_set.is_empty() || has_destroy, + delete_sst_levels.is_empty() && delete_sst_ids_set.is_empty() + || group_destroy.is_some(), "no sst should be deleted when committing an epoch" ); for group_delta in &group_deltas.group_deltas { @@ -703,8 +723,8 @@ impl HummockVersion { .compaction_group_member_table_ids(*compaction_group_id), ); } - if has_destroy { - self.levels.remove(compaction_group_id); + if let Some(destroy_group_id) = &group_destroy { + self.levels.remove(destroy_group_id); } } self.id = version_delta.id; @@ -835,6 +855,45 @@ impl HummockVersion { } ret } + + pub fn merge_compaction_group( + &mut self, + left_group_id: CompactionGroupId, + right_group_id: CompactionGroupId, + ) { + // Double check + let left_group_id_table_ids = self + .state_table_info + .compaction_group_member_table_ids(left_group_id) + .iter() + .map(|table_id| table_id.table_id); + let right_group_id_table_ids = self + .state_table_info + .compaction_group_member_table_ids(right_group_id) + .iter() + .map(|table_id| table_id.table_id); + + assert!(left_group_id_table_ids + .chain(right_group_id_table_ids) + .is_sorted()); + + let total_cg = self.levels.keys().cloned().collect::>(); + let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| { + panic!( + "compaction group should exist right {} all {:?}", + right_group_id, total_cg + ) + }); + + let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| { + panic!( + "compaction group should exist left {} all {:?}", + left_group_id, total_cg + ) + }); + + group_split::merge_levels(left_levels, right_levels); + } } #[easy_ext::ext(HummockLevelsExt)] @@ -1373,9 +1432,15 @@ pub fn split_sst( mod tests { use std::collections::HashMap; + use bytes::Bytes; + use risingwave_common::catalog::TableId; + use risingwave_common::hash::VirtualNode; use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType}; + use crate::compaction_group::group_split; use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels; + use crate::key::{gen_key_from_str, FullKey}; + use crate::key_range::KeyRange; use crate::level::{Level, Levels, OverlappingLevel}; use crate::sstable_info::SstableInfo; use crate::version::{ @@ -1533,4 +1598,404 @@ mod tests { version }); } + + fn gen_sst_info(object_id: u64, table_ids: Vec, left: Bytes, right: Bytes) -> SstableInfo { + SstableInfo { + object_id, + sst_id: object_id, + key_range: KeyRange { + left, + right, + right_exclusive: false, + }, + table_ids, + file_size: 100, + sst_size: 100, + uncompressed_file_size: 100, + ..Default::default() + } + } + + #[test] + fn test_merge_levels() { + let mut left_levels = build_initial_compaction_group_levels( + 1, + &CompactionConfig { + max_level: 6, + ..Default::default() + }, + ); + + let mut right_levels = build_initial_compaction_group_levels( + 2, + &CompactionConfig { + max_level: 6, + ..Default::default() + }, + ); + + left_levels.levels[0] = Level { + level_idx: 1, + level_type: LevelType::Nonoverlapping, + table_infos: vec![ + gen_sst_info( + 1, + vec![3], + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + ), + gen_sst_info( + 10, + vec![3, 4], + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(201), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(4), + gen_key_from_str(VirtualNode::from_index(10), "1"), + 0, + ) + .encode() + .into(), + ), + gen_sst_info( + 11, + vec![4], + FullKey::for_test( + TableId::new(4), + gen_key_from_str(VirtualNode::from_index(11), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(4), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + ), + ], + total_file_size: 300, + ..Default::default() + }; + + left_levels.l0.sub_levels.push(Level { + level_idx: 0, + table_infos: vec![gen_sst_info( + 3, + vec![3], + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + )], + sub_level_id: 101, + level_type: LevelType::Overlapping, + total_file_size: 100, + ..Default::default() + }); + + left_levels.l0.sub_levels.push(Level { + level_idx: 0, + table_infos: vec![gen_sst_info( + 3, + vec![3], + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + )], + sub_level_id: 103, + level_type: LevelType::Overlapping, + total_file_size: 100, + ..Default::default() + }); + + left_levels.l0.sub_levels.push(Level { + level_idx: 0, + table_infos: vec![gen_sst_info( + 3, + vec![3], + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(3), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + )], + sub_level_id: 105, + level_type: LevelType::Nonoverlapping, + total_file_size: 100, + ..Default::default() + }); + + right_levels.levels[0] = Level { + level_idx: 1, + level_type: LevelType::Nonoverlapping, + table_infos: vec![ + gen_sst_info( + 1, + vec![5], + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + ), + gen_sst_info( + 10, + vec![5, 6], + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(201), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(6), + gen_key_from_str(VirtualNode::from_index(10), "1"), + 0, + ) + .encode() + .into(), + ), + gen_sst_info( + 11, + vec![6], + FullKey::for_test( + TableId::new(6), + gen_key_from_str(VirtualNode::from_index(11), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(6), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + ), + ], + total_file_size: 300, + ..Default::default() + }; + + right_levels.l0.sub_levels.push(Level { + level_idx: 0, + table_infos: vec![gen_sst_info( + 3, + vec![5], + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + )], + sub_level_id: 101, + level_type: LevelType::Overlapping, + total_file_size: 100, + ..Default::default() + }); + + right_levels.l0.sub_levels.push(Level { + level_idx: 0, + table_infos: vec![gen_sst_info( + 5, + vec![5], + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + )], + sub_level_id: 102, + level_type: LevelType::Overlapping, + total_file_size: 100, + ..Default::default() + }); + + right_levels.l0.sub_levels.push(Level { + level_idx: 0, + table_infos: vec![gen_sst_info( + 3, + vec![5], + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(1), "1"), + 0, + ) + .encode() + .into(), + FullKey::for_test( + TableId::new(5), + gen_key_from_str(VirtualNode::from_index(200), "1"), + 0, + ) + .encode() + .into(), + )], + sub_level_id: 103, + level_type: LevelType::Nonoverlapping, + total_file_size: 100, + ..Default::default() + }); + + { + // test empty + let mut left_levels = Levels::default(); + let right_levels = Levels::default(); + + group_split::merge_levels(&mut left_levels, right_levels); + } + + { + // test empty left + let mut left_levels = build_initial_compaction_group_levels( + 1, + &CompactionConfig { + max_level: 6, + ..Default::default() + }, + ); + let right_levels = right_levels.clone(); + + group_split::merge_levels(&mut left_levels, right_levels); + + assert!(left_levels.l0.sub_levels.len() == 3); + assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); + assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size); + assert!(left_levels.l0.sub_levels[1].sub_level_id == 102); + assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size); + assert!(left_levels.l0.sub_levels[2].sub_level_id == 103); + assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size); + + assert!(left_levels.levels[0].level_idx == 1); + assert_eq!(300, left_levels.levels[0].total_file_size); + } + + { + // test empty right + let mut left_levels = left_levels.clone(); + let right_levels = build_initial_compaction_group_levels( + 2, + &CompactionConfig { + max_level: 6, + ..Default::default() + }, + ); + + group_split::merge_levels(&mut left_levels, right_levels); + + assert!(left_levels.l0.sub_levels.len() == 3); + assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); + assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size); + assert!(left_levels.l0.sub_levels[1].sub_level_id == 103); + assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size); + assert!(left_levels.l0.sub_levels[2].sub_level_id == 105); + assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size); + + assert!(left_levels.levels[0].level_idx == 1); + assert_eq!(300, left_levels.levels[0].total_file_size); + } + + { + let mut left_levels = left_levels.clone(); + let right_levels = right_levels.clone(); + + group_split::merge_levels(&mut left_levels, right_levels); + + assert!(left_levels.l0.sub_levels.len() == 6); + assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); + assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size); + assert!(left_levels.l0.sub_levels[1].sub_level_id == 103); + assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size); + assert!(left_levels.l0.sub_levels[2].sub_level_id == 105); + assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size); + assert!(left_levels.l0.sub_levels[3].sub_level_id == 106); + assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size); + assert!(left_levels.l0.sub_levels[4].sub_level_id == 107); + assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size); + assert!(left_levels.l0.sub_levels[5].sub_level_id == 108); + assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size); + + assert!(left_levels.levels[0].level_idx == 1); + assert_eq!(600, left_levels.levels[0].total_file_size); + } + } } diff --git a/src/storage/hummock_sdk/src/compaction_group/mod.rs b/src/storage/hummock_sdk/src/compaction_group/mod.rs index 973cc3e3c614..94ef89b8046e 100644 --- a/src/storage/hummock_sdk/src/compaction_group/mod.rs +++ b/src/storage/hummock_sdk/src/compaction_group/mod.rs @@ -43,3 +43,115 @@ impl From for CompactionGroupId { cg as CompactionGroupId } } + +pub mod group_split { + use std::cmp::Ordering; + + use super::hummock_version_ext::insert_new_sub_level; + use crate::can_concat; + use crate::level::{Level, Levels}; + + pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) { + let right_l0 = right_levels.l0; + + let mut max_left_sub_level_id = left_levels + .l0 + .sub_levels + .iter() + .map(|sub_level| sub_level.sub_level_id + 1) + .max() + .unwrap_or(0); // If there are no sub levels, the max sub level id is 0. + let need_rewrite_right_sub_level_id = max_left_sub_level_id != 0; + + for mut right_sub_level in right_l0.sub_levels { + // Rewrtie the sub level id of right sub level to avoid conflict with left sub levels. (conflict level type) + // e.g. left sub levels: [0, 1, 2], right sub levels: [0, 1, 2], after rewrite, right sub levels: [3, 4, 5] + if need_rewrite_right_sub_level_id { + right_sub_level.sub_level_id = max_left_sub_level_id; + max_left_sub_level_id += 1; + } + + insert_new_sub_level( + &mut left_levels.l0, + right_sub_level.sub_level_id, + right_sub_level.level_type, + right_sub_level.table_infos, + None, + ); + } + + assert!( + left_levels + .l0 + .sub_levels + .is_sorted_by_key(|sub_level| sub_level.sub_level_id), + "{}", + format!("left_levels.l0.sub_levels: {:?}", left_levels.l0.sub_levels) + ); + + // Reinitialise `vnode_partition_count` to avoid misaligned hierarchies + // caused by the merge of different compaction groups.(picker might reject the different `vnode_partition_count` sub_level to compact) + left_levels + .l0 + .sub_levels + .iter_mut() + .for_each(|sub_level| sub_level.vnode_partition_count = 0); + + for (idx, level) in right_levels.levels.into_iter().enumerate() { + if level.table_infos.is_empty() { + continue; + } + + let insert_table_infos = level.table_infos; + left_levels.levels[idx].total_file_size += insert_table_infos + .iter() + .map(|sst| sst.sst_size) + .sum::(); + left_levels.levels[idx].uncompressed_file_size += insert_table_infos + .iter() + .map(|sst| sst.uncompressed_file_size) + .sum::(); + + left_levels.levels[idx] + .table_infos + .extend(insert_table_infos); + left_levels.levels[idx] + .table_infos + .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range)); + assert!( + can_concat(&left_levels.levels[idx].table_infos), + "{}", + format!( + "left-group {} right-group {} left_levels.levels[{}].table_infos: {:?} level_idx {:?}", + left_levels.group_id, + right_levels.group_id, + idx, + left_levels.levels[idx].table_infos, + left_levels.levels[idx].level_idx + ) + ); + } + } + + // When `insert_hint` is `Ok(idx)`, it means that the sub level `idx` in `target_l0` + // will extend these SSTs. When `insert_hint` is `Err(idx)`, it + // means that we will add a new sub level `idx` into `target_l0`. + pub fn get_sub_level_insert_hint( + target_levels: &Vec, + sub_level: &Level, + ) -> Result { + for (idx, other) in target_levels.iter().enumerate() { + match other.sub_level_id.cmp(&sub_level.sub_level_id) { + Ordering::Less => {} + Ordering::Equal => { + return Ok(idx); + } + Ordering::Greater => { + return Err(idx); + } + } + } + + Err(target_levels.len()) + } +} diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index e418250f0b6b..1c8cfd1e310b 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -24,9 +24,9 @@ use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_pb::hummock::group_delta::PbDeltaType; use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas; use risingwave_pb::hummock::{ - CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMetaChange, - PbGroupTableChange, PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, - PbStateTableInfo, StateTableInfo, StateTableInfoDelta, + CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMerge, + PbGroupMetaChange, PbGroupTableChange, PbHummockVersion, PbHummockVersionDelta, + PbIntraLevelDelta, PbStateTableInfo, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; @@ -501,12 +501,10 @@ impl HummockVersionDelta { .flat_map(|group_deltas| { group_deltas.group_deltas.iter().flat_map(|group_delta| { static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = match group_delta { - GroupDelta::IntraLevel(level_delta) => &level_delta.inserted_table_infos, - GroupDelta::GroupConstruct(_) - | GroupDelta::GroupDestroy(_) - | GroupDelta::GroupMetaChange(_) - | GroupDelta::GroupTableChange(_) => &EMPTY_VEC, + let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { + &level_delta.inserted_table_infos + } else { + &EMPTY_VEC }; sst_slice.iter().map(|sst| sst.object_id) }) @@ -526,12 +524,10 @@ impl HummockVersionDelta { let ssts_from_group_deltas = self.group_deltas.values().flat_map(|group_deltas| { group_deltas.group_deltas.iter().flat_map(|group_delta| { static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = match group_delta { - GroupDelta::IntraLevel(level_delta) => &level_delta.inserted_table_infos, - GroupDelta::GroupConstruct(_) - | GroupDelta::GroupDestroy(_) - | GroupDelta::GroupMetaChange(_) - | GroupDelta::GroupTableChange(_) => &EMPTY_VEC, + let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { + &level_delta.inserted_table_infos + } else { + &EMPTY_VEC }; sst_slice.iter() }) @@ -564,12 +560,10 @@ impl HummockVersionDelta { .flat_map(|group_deltas| { group_deltas.group_deltas.iter().flat_map(|group_delta| { static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = match group_delta { - GroupDelta::IntraLevel(level_delta) => &level_delta.inserted_table_infos, - GroupDelta::GroupConstruct(_) - | GroupDelta::GroupDestroy(_) - | GroupDelta::GroupMetaChange(_) - | GroupDelta::GroupTableChange(_) => &EMPTY_VEC, + let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { + &level_delta.inserted_table_infos + } else { + &EMPTY_VEC }; sst_slice.iter() }) @@ -881,6 +875,8 @@ pub enum GroupDelta { #[allow(dead_code)] GroupTableChange(PbGroupTableChange), + + GroupMerge(PbGroupMerge), } impl From for GroupDelta { @@ -901,6 +897,7 @@ impl From for GroupDelta { Some(PbDeltaType::GroupTableChange(pb_group_table_change)) => { GroupDelta::GroupTableChange(pb_group_table_change) } + Some(PbDeltaType::GroupMerge(pb_group_merge)) => GroupDelta::GroupMerge(pb_group_merge), None => panic!("delta_type is not set"), } } @@ -924,6 +921,9 @@ impl From for PbGroupDelta { GroupDelta::GroupTableChange(pb_group_table_change) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupTableChange(pb_group_table_change)), }, + GroupDelta::GroupMerge(pb_group_merge) => PbGroupDelta { + delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)), + }, } } } @@ -946,6 +946,9 @@ impl From<&GroupDelta> for PbGroupDelta { GroupDelta::GroupTableChange(pb_group_table_change) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupTableChange(pb_group_table_change.clone())), }, + GroupDelta::GroupMerge(pb_group_merge) => PbGroupDelta { + delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)), + }, } } } @@ -968,6 +971,9 @@ impl From<&PbGroupDelta> for GroupDelta { Some(PbDeltaType::GroupTableChange(pb_group_table_change)) => { GroupDelta::GroupTableChange(pb_group_table_change.clone()) } + Some(PbDeltaType::GroupMerge(pb_group_merge)) => { + GroupDelta::GroupMerge(*pb_group_merge) + } None => panic!("delta_type is not set"), } } diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 79b00d0f9b8f..92856fb5022c 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -29,7 +29,6 @@ pub(crate) mod tests { use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, Epoch, EpochExt}; use risingwave_common_service::NotificationClient; - use risingwave_hummock_sdk::can_concat; use risingwave_hummock_sdk::compact_task::CompactTask; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{ @@ -44,6 +43,7 @@ pub(crate) mod tests { ReadTableWatermark, TableWatermarks, VnodeWatermark, WatermarkDirection, }; use risingwave_hummock_sdk::version::HummockVersion; + use risingwave_hummock_sdk::{can_concat, CompactionGroupId}; use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; use risingwave_meta::hummock::compaction::selector::{ default_compaction_selector, ManualCompactionOption, @@ -75,7 +75,7 @@ pub(crate) mod tests { use risingwave_storage::hummock::value::HummockValue; use risingwave_storage::hummock::{ BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, FilterBuilder, - HummockStorage as GlobalHummockStorage, HummockStorage, MemoryLimiter, + HummockStorage as GlobalHummockStorage, HummockStorage, LocalHummockStorage, MemoryLimiter, SharedComapctorObjectIdManager, Sstable, SstableBuilder, SstableBuilderOptions, SstableIteratorReadOptions, SstableObjectIdManager, SstableWriterOptions, }; @@ -92,7 +92,7 @@ pub(crate) mod tests { hummock_meta_client: Arc, notification_client: impl NotificationClient, hummock_manager_ref: &HummockManagerRef, - table_id: TableId, + table_ids: &[u32], ) -> HummockStorage { let remote_dir = "hummock_001_test".to_string(); let options = Arc::new(StorageOpts { @@ -117,7 +117,7 @@ pub(crate) mod tests { register_tables_with_id_for_test( hummock.filter_key_extractor_manager(), hummock_manager_ref, - &[table_id.table_id()], + table_ids, ) .await; @@ -189,7 +189,6 @@ pub(crate) mod tests { local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); } let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); - hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); } } @@ -236,7 +235,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - Default::default(), + &[0], ) .await; let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone() @@ -406,7 +405,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - Default::default(), + &[0], ) .await; @@ -604,7 +603,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; @@ -885,7 +884,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; @@ -1090,7 +1089,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; @@ -1290,7 +1289,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; let (compact_ctx, filter_key_extractor_manager) = @@ -1505,7 +1504,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; hummock_manager_ref.get_new_sst_ids(10).await.unwrap(); @@ -1680,7 +1679,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; hummock_manager_ref.get_new_sst_ids(10).await.unwrap(); @@ -1798,7 +1797,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; hummock_manager_ref.get_new_sst_ids(10).await.unwrap(); @@ -1980,4 +1979,504 @@ pub(crate) mod tests { count += 1; } } + + #[tokio::test] + async fn test_split_and_merge() { + let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = + setup_compute_env(8080).await; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager_ref.clone(), + worker_node.id, + )); + + let table_id_1 = TableId::from(1); + let table_id_2 = TableId::from(2); + + let storage = get_hummock_storage( + hummock_meta_client.clone(), + get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), + &hummock_manager_ref, + &[table_id_1.table_id(), table_id_2.table_id()], + ) + .await; + + // basic cg2 -> [1, 2] + let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone() + { + FilterKeyExtractorManager::RpcFilterKeyExtractorManager( + rpc_filter_key_extractor_manager, + ) => rpc_filter_key_extractor_manager, + FilterKeyExtractorManager::StaticFilterKeyExtractorManager(_) => unreachable!(), + }; + + let mut key = BytesMut::default(); + key.put_u16(1); + key.put_slice(b"key_prefix"); + let key_prefix = key.freeze(); + + rpc_filter_key_extractor_manager.update( + table_id_1.table_id(), + Arc::new(FilterKeyExtractorImpl::FixedLength( + FixedLengthFilterKeyExtractor::new(TABLE_PREFIX_LEN + key_prefix.len()), + )), + ); + rpc_filter_key_extractor_manager.update( + table_id_2.table_id(), + Arc::new(FilterKeyExtractorImpl::FixedLength( + FixedLengthFilterKeyExtractor::new(TABLE_PREFIX_LEN + key_prefix.len()), + )), + ); + + let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( + rpc_filter_key_extractor_manager, + ); + let compact_ctx = get_compactor_context(&storage); + let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( + hummock_meta_client.clone(), + storage + .storage_opts() + .clone() + .sstable_id_remote_fetch_number, + )); + + let base_epoch = Epoch::now(); + let mut epoch: u64 = base_epoch.0; + let millisec_interval_epoch: u64 = (1 << 16) * 100; + + let mut local_1 = storage + .new_local(NewLocalOptions::for_test(table_id_1)) + .await; + let mut local_2 = storage + .new_local(NewLocalOptions::for_test(table_id_2)) + .await; + + let val = Bytes::from(b"0"[..].to_vec()); + + async fn write_data( + storage: &HummockStorage, + local_1: (&mut LocalHummockStorage, bool), + local_2: (&mut LocalHummockStorage, bool), + epoch: &mut u64, + val: Bytes, + kv_count: u64, + millisec_interval_epoch: u64, + key_prefix: Bytes, + hummock_meta_client: Arc, + is_init: &mut bool, + ) { + let table_id_set = + HashSet::from_iter(vec![local_1.0.table_id(), local_2.0.table_id()].into_iter()); + + storage.start_epoch(*epoch, table_id_set.clone()); + for i in 0..kv_count { + if i == 0 && *is_init { + local_1.0.init_for_test(*epoch).await.unwrap(); + local_2.0.init_for_test(*epoch).await.unwrap(); + + *is_init = false; + } + let next_epoch = *epoch + millisec_interval_epoch; + storage.start_epoch(next_epoch, table_id_set.clone()); + + let ramdom_key = + [key_prefix.as_ref(), &rand::thread_rng().gen::<[u8; 32]>()].concat(); + + if local_1.1 { + local_1 + .0 + .insert(TableKey(Bytes::from(ramdom_key.clone())), val.clone(), None) + .unwrap(); + } + local_1.0.flush().await.unwrap(); + local_1 + .0 + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); + + if local_2.1 { + local_2 + .0 + .insert(TableKey(Bytes::from(ramdom_key.clone())), val.clone(), None) + .unwrap(); + } + local_2.0.flush().await.unwrap(); + local_2 + .0 + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); + + let res = storage.seal_and_sync_epoch(*epoch).await.unwrap(); + hummock_meta_client.commit_epoch(*epoch, res).await.unwrap(); + *epoch += millisec_interval_epoch; + } + } + + let mut is_init = true; + write_data( + &storage, + (&mut local_1, true), + (&mut local_2, true), + &mut epoch, + val.clone(), + 1, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + epoch += millisec_interval_epoch; + + let parent_group_id = 2; + let split_table_ids = vec![table_id_2.table_id()]; + + async fn compact_once( + group_id: CompactionGroupId, + level: usize, + hummock_manager_ref: HummockManagerRef, + compact_ctx: CompactorContext, + filter_key_extractor_manager: FilterKeyExtractorManager, + sstable_object_id_manager: Arc, + ) { + // compact left group + let manual_compcation_option = ManualCompactionOption { + level, + ..Default::default() + }; + // 2. get compact task + let compact_task = hummock_manager_ref + .manual_get_compact_task(group_id, manual_compcation_option) + .await + .unwrap(); + + if compact_task.is_none() { + return; + } + + let mut compact_task = compact_task.unwrap(); + + let compaction_filter_flag = + CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; + compact_task.compaction_filter_mask = compaction_filter_flag.bits(); + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch(); + + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let ((result_task, task_stats), _) = compact( + compact_ctx, + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await; + + hummock_manager_ref + .report_compact_task( + result_task.task_id, + result_task.task_status, + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) + .await + .unwrap(); + } + + // compact + compact_once( + parent_group_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + + assert_ne!(parent_group_id, new_cg_id); + assert!(hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .is_err()); + + write_data( + &storage, + (&mut local_1, true), + (&mut local_2, true), + &mut epoch, + val.clone(), + 100, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + epoch += millisec_interval_epoch; + + compact_once( + parent_group_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + compact_once( + new_cg_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + // try merge + hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .unwrap(); + + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + + compact_once( + parent_group_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + compact_once( + new_cg_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + // write left + write_data( + &storage, + (&mut local_1, true), + (&mut local_2, false), + &mut epoch, + val.clone(), + 16, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + + epoch += millisec_interval_epoch; + + // try merge + hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .unwrap(); + + // compact + compact_once( + parent_group_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + // try split + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + + // write right + write_data( + &storage, + (&mut local_1, false), + (&mut local_2, true), + &mut epoch, + val.clone(), + 16, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + + epoch += millisec_interval_epoch; + + hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .unwrap(); + + // write left and right + + write_data( + &storage, + (&mut local_1, true), + (&mut local_2, true), + &mut epoch, + val.clone(), + 1, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + + epoch += millisec_interval_epoch; + + compact_once( + parent_group_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + compact_once( + new_cg_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + async fn compact_all( + group_id: CompactionGroupId, + level: usize, + hummock_manager_ref: HummockManagerRef, + compact_ctx: CompactorContext, + filter_key_extractor_manager: FilterKeyExtractorManager, + sstable_object_id_manager: Arc, + ) { + loop { + let manual_compcation_option = ManualCompactionOption { + level, + ..Default::default() + }; + let compact_task = hummock_manager_ref + .manual_get_compact_task(group_id, manual_compcation_option) + .await + .unwrap(); + + if compact_task.is_none() { + break; + } + + let mut compact_task = compact_task.unwrap(); + let compaction_filter_flag = + CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; + compact_task.compaction_filter_mask = compaction_filter_flag.bits(); + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch(); + + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let ((result_task, task_stats), _) = compact( + compact_ctx.clone(), + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await; + + hummock_manager_ref + .report_compact_task( + result_task.task_id, + result_task.task_status, + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) + .await + .unwrap(); + } + } + + // try split + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + + // try merge + assert!(hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .is_err()); + + // write left and write + + write_data( + &storage, + (&mut local_1, true), + (&mut local_2, true), + &mut epoch, + val.clone(), + 200, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + + compact_all( + parent_group_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + compact_all( + new_cg_id, + 0, + hummock_manager_ref.clone(), + compact_ctx.clone(), + filter_key_extractor_manager.clone(), + sstable_object_id_manager.clone(), + ) + .await; + + // try merge + hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .unwrap(); + } } diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index f5ee41783813..008c667ccedf 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -242,7 +242,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; let (compact_ctx, filter_key_extractor_manager) =