diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index a81281f567ce8..989a25aab028b 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -141,11 +141,15 @@ impl HummockManager { .await; match compact_ret { - Ok(compact_tasks) => { + Ok((compact_tasks, trivial_tasks)) => { if compact_tasks.is_empty() { break; } generated_task_count += compact_tasks.len(); + for task in trivial_tasks { + existed_groups.push(task.compaction_group_id); + wait_compact_groups.insert(task.compaction_group_id); + } for task in compact_tasks { let task_id = task.task_id; existed_groups.push(task.compaction_group_id); @@ -164,6 +168,8 @@ impl HummockManager { break; } } + existed_groups.sort(); + existed_groups.dedup(); } Err(err) => { tracing::warn!(error = %err.as_report(), "Failed to get compaction task"); diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 778e0a30721e2..dd50b800253e1 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -859,7 +859,7 @@ impl HummockManager { &self, compaction_groups: Vec, selector: &mut Box, - ) -> crate::hummock::error::Result> { + ) -> crate::hummock::error::Result<(Vec, Vec)> { // TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the // lock in compaction_guard, take out all table_options in advance there may be a // waste of resources here, need to add a more efficient filter in catalog_manager @@ -913,6 +913,7 @@ impl HummockManager { let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts( &self.env.opts, )); + const MAX_TRIVIAL_MOVE_TASK_COUNT: usize = 256; for compaction_group_id in compaction_groups { if current_version.levels.get(&compaction_group_id).is_none() { continue; @@ -960,6 +961,7 @@ impl HummockManager { } } + let mut trivial_move_count = 0; while let Some(compact_task) = compact_status.get_compact_task( current_version.get_compaction_group_levels(compaction_group_id), task_id as HummockCompactionTaskId, @@ -1056,6 +1058,10 @@ impl HummockManager { ); current_version.apply_version_delta(&version_delta); trivial_tasks.push(compact_task); + trivial_move_count += 1; + if trivial_move_count >= MAX_TRIVIAL_MOVE_TASK_COUNT { + break; + } } else { let mut table_to_vnode_partition = match self .group_to_table_vnode_partition @@ -1217,7 +1223,7 @@ impl HummockManager { drop(compaction_guard); self.check_state_consistency().await; } - Ok(pick_tasks) + Ok((pick_tasks, trivial_tasks)) } /// Cancels a compaction task no matter it's assigned or unassigned. @@ -1253,7 +1259,7 @@ impl HummockManager { &self, compaction_groups: Vec, selector: &mut Box, - ) -> Result> { + ) -> Result<(Vec, Vec)> { fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore( anyhow::anyhow!("failpoint metastore error") ))); @@ -1270,7 +1276,7 @@ impl HummockManager { anyhow::anyhow!("failpoint metastore error") ))); - let mut normal_tasks = self + let (mut normal_tasks, _) = self .get_compact_tasks_impl(vec![compaction_group_id], selector) .await?; Ok(normal_tasks.pop()) diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index ed4605d9842ae..7395cb41130a2 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -46,6 +46,7 @@ use crate::hummock::compaction::selector::{ use crate::hummock::error::Error; use crate::hummock::test_utils::*; use crate::hummock::{CommitEpochInfo, HummockManager, HummockManagerRef}; +use crate::hummock::compaction::CompactStatus; use crate::manager::{MetaSrvEnv, WorkerId}; use crate::model::MetadataModel; use crate::rpc::metrics::MetaMetrics; @@ -1694,13 +1695,13 @@ async fn test_split_compaction_group_trivial_expired() { .unwrap(); let mut selector: Box = Box::::default(); - let normal_tasks = hummock_manager + let (normal_tasks, mut trivial_task) = hummock_manager .get_compact_tasks_impl(vec![2], &mut selector) .await .unwrap(); assert!(normal_tasks.is_empty()); - // let reclaim_task = normal_tasks.pop().unwrap(); - // assert!(CompactStatus::is_trivial_reclaim(&reclaim_task)); + let reclaim_task = trivial_task.pop().unwrap(); + assert!(CompactStatus::is_trivial_reclaim(&reclaim_task)); let current_version = hummock_manager.get_current_version().await; let new_group_id = current_version.levels.keys().max().cloned().unwrap();