Skip to content

Commit

Permalink
limit loop count
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Mar 27, 2024
1 parent 645f758 commit e185075
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
8 changes: 7 additions & 1 deletion src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,15 @@ impl HummockManager {
.await;

match compact_ret {
Ok(compact_tasks) => {
Ok((compact_tasks, trivial_tasks)) => {
if compact_tasks.is_empty() {
break;
}
generated_task_count += compact_tasks.len();
for task in trivial_tasks {
existed_groups.push(task.compaction_group_id);
wait_compact_groups.insert(task.compaction_group_id);
}
for task in compact_tasks {
let task_id = task.task_id;
existed_groups.push(task.compaction_group_id);
Expand All @@ -164,6 +168,8 @@ impl HummockManager {
break;
}
}
existed_groups.sort();
existed_groups.dedup();
}
Err(err) => {
tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
Expand Down
14 changes: 10 additions & 4 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ impl HummockManager {
&self,
compaction_groups: Vec<CompactionGroupId>,
selector: &mut Box<dyn CompactionSelector>,
) -> crate::hummock::error::Result<Vec<CompactTask>> {
) -> crate::hummock::error::Result<(Vec<CompactTask>, Vec<CompactTask>)> {
// TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the
// lock in compaction_guard, take out all table_options in advance there may be a
// waste of resources here, need to add a more efficient filter in catalog_manager
Expand Down Expand Up @@ -913,6 +913,7 @@ impl HummockManager {
let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
&self.env.opts,
));
const MAX_TRIVIAL_MOVE_TASK_COUNT: usize = 256;
for compaction_group_id in compaction_groups {
if current_version.levels.get(&compaction_group_id).is_none() {
continue;
Expand Down Expand Up @@ -960,6 +961,7 @@ impl HummockManager {
}
}

let mut trivial_move_count = 0;
while let Some(compact_task) = compact_status.get_compact_task(
current_version.get_compaction_group_levels(compaction_group_id),
task_id as HummockCompactionTaskId,
Expand Down Expand Up @@ -1056,6 +1058,10 @@ impl HummockManager {
);
current_version.apply_version_delta(&version_delta);
trivial_tasks.push(compact_task);
trivial_move_count += 1;
if trivial_move_count >= MAX_TRIVIAL_MOVE_TASK_COUNT {
break;
}
} else {
let mut table_to_vnode_partition = match self
.group_to_table_vnode_partition
Expand Down Expand Up @@ -1217,7 +1223,7 @@ impl HummockManager {
drop(compaction_guard);
self.check_state_consistency().await;
}
Ok(pick_tasks)
Ok((pick_tasks, trivial_tasks))
}

/// Cancels a compaction task no matter it's assigned or unassigned.
Expand Down Expand Up @@ -1253,7 +1259,7 @@ impl HummockManager {
&self,
compaction_groups: Vec<CompactionGroupId>,
selector: &mut Box<dyn CompactionSelector>,
) -> Result<Vec<CompactTask>> {
) -> Result<(Vec<CompactTask>, Vec<CompactTask>)> {
fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
anyhow::anyhow!("failpoint metastore error")
)));
Expand All @@ -1270,7 +1276,7 @@ impl HummockManager {
anyhow::anyhow!("failpoint metastore error")
)));

let mut normal_tasks = self
let (mut normal_tasks, _) = self
.get_compact_tasks_impl(vec![compaction_group_id], selector)
.await?;
Ok(normal_tasks.pop())
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::hummock::compaction::selector::{
use crate::hummock::error::Error;
use crate::hummock::test_utils::*;
use crate::hummock::{CommitEpochInfo, HummockManager, HummockManagerRef};
use crate::hummock::compaction::CompactStatus;
use crate::manager::{MetaSrvEnv, WorkerId};
use crate::model::MetadataModel;
use crate::rpc::metrics::MetaMetrics;
Expand Down Expand Up @@ -1694,13 +1695,13 @@ async fn test_split_compaction_group_trivial_expired() {
.unwrap();
let mut selector: Box<dyn CompactionSelector> =
Box::<SpaceReclaimCompactionSelector>::default();
let normal_tasks = hummock_manager
let (normal_tasks, mut trivial_task) = hummock_manager
.get_compact_tasks_impl(vec![2], &mut selector)
.await
.unwrap();
assert!(normal_tasks.is_empty());
// let reclaim_task = normal_tasks.pop().unwrap();
// assert!(CompactStatus::is_trivial_reclaim(&reclaim_task));
let reclaim_task = trivial_task.pop().unwrap();
assert!(CompactStatus::is_trivial_reclaim(&reclaim_task));

let current_version = hummock_manager.get_current_version().await;
let new_group_id = current_version.levels.keys().max().cloned().unwrap();
Expand Down

0 comments on commit e185075

Please sign in to comment.