Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): fix compact_fast_runner_bytes unused #12655

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 63 additions & 25 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ pub async fn compact(
Err(e) => {
tracing::error!("Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error {:?}", compact_task.existing_table_ids, e);
let task_status = TaskStatus::ExecuteFailed;
return compact_done(compact_task, context.clone(), vec![], task_status);
return compact_done(compact_task, context.clone(), vec![], task_status, false);
}
Ok(extractor) => extractor,
};
Expand All @@ -338,7 +338,7 @@ pub async fn compact(
if !removed_tables.is_empty() {
tracing::error!("Failed to fetch filter key extractor tables [{:?}. [{:?}] may be removed by meta-service. ", compact_table_ids, removed_tables);
let task_status = TaskStatus::ExecuteFailed;
return compact_done(compact_task, context.clone(), vec![], task_status);
return compact_done(compact_task, context.clone(), vec![], task_status, false);
}
}

Expand Down Expand Up @@ -391,7 +391,13 @@ pub async fn compact(
Err(e) => {
tracing::warn!("Failed to generate_splits {:#?}", e);
task_status = TaskStatus::ExecuteFailed;
return compact_done(compact_task, context.clone(), vec![], task_status);
return compact_done(
compact_task,
context.clone(),
vec![],
task_status,
optimize_by_copy_block,
);
}
}
}
Expand All @@ -415,7 +421,13 @@ pub async fn compact(
Err(err) => {
tracing::warn!("Failed to build delete range aggregator {:#?}", err);
task_status = TaskStatus::ExecuteFailed;
return compact_done(compact_task, context.clone(), vec![], task_status);
return compact_done(
compact_task,
context.clone(),
vec![],
task_status,
optimize_by_copy_block,
);
}
};

Expand All @@ -433,17 +445,17 @@ pub async fn compact(
) * compact_task.splits.len() as u64;

tracing::info!(
"Ready to handle compaction group {} task: {} compact_task_statistics {:?} target_level {} compression_algorithm {:?} table_ids {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}",
compact_task.compaction_group_id,
compact_task.task_id,
compact_task_statistics,
compact_task.target_level,
compact_task.compression_algorithm,
compact_task.existing_table_ids,
parallelism,
task_memory_capacity_with_parallelism,
optimize_by_copy_block
);
"Ready to handle compaction group {} task: {} compact_task_statistics {:?} target_level {} compression_algorithm {:?} table_ids {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}",
compact_task.compaction_group_id,
compact_task.task_id,
compact_task_statistics,
compact_task.target_level,
compact_task.compression_algorithm,
compact_task.existing_table_ids,
parallelism,
task_memory_capacity_with_parallelism,
optimize_by_copy_block
);

// If the task does not have enough memory, it should cancel the task and let the meta
// reschedule it, so that it does not occupy the compactor's resources.
Expand All @@ -459,7 +471,13 @@ pub async fn compact(
context.memory_limiter.quota()
);
task_status = TaskStatus::NoAvailResourceCanceled;
return compact_done(compact_task, context.clone(), output_ssts, task_status);
return compact_done(
compact_task,
context.clone(),
output_ssts,
task_status,
optimize_by_copy_block,
);
}

context.compactor_metrics.compact_task_pending_num.inc();
Expand Down Expand Up @@ -487,8 +505,13 @@ pub async fn compact(

context.compactor_metrics.compact_task_pending_num.dec();
// After a compaction is done, mutate the compaction task.
let (compact_task, table_stats) =
compact_done(compact_task, context.clone(), output_ssts, task_status);
let (compact_task, table_stats) = compact_done(
compact_task,
context.clone(),
output_ssts,
task_status,
optimize_by_copy_block,
);
let cost_time = timer.stop_and_record() * 1000.0;
tracing::info!(
"Finished compaction task in {:?}ms: {}",
Expand Down Expand Up @@ -583,8 +606,13 @@ pub async fn compact(
}

// After a compaction is done, mutate the compaction task.
let (compact_task, table_stats) =
compact_done(compact_task, context.clone(), output_ssts, task_status);
let (compact_task, table_stats) = compact_done(
compact_task,
context.clone(),
output_ssts,
task_status,
optimize_by_copy_block,
);
let cost_time = timer.stop_and_record() * 1000.0;
tracing::info!(
"Finished compaction task in {:?}ms: {}",
Expand All @@ -606,6 +634,7 @@ fn compact_done(
context: CompactorContext,
output_ssts: Vec<CompactOutput>,
task_status: TaskStatus,
optimize_by_copy_block: bool,
) -> (CompactTask, HashMap<u32, TableStats>) {
let mut table_stats_map = TableStatsMap::default();
compact_task.set_task_status(task_status);
Expand All @@ -630,11 +659,20 @@ fn compact_done(

let group_label = compact_task.compaction_group_id.to_string();
let level_label = compact_task.target_level.to_string();
context
.compactor_metrics
.compact_write_bytes
.with_label_values(&[&group_label, level_label.as_str()])
.inc_by(compaction_write_bytes);

if optimize_by_copy_block {
context
.compactor_metrics
.compact_fast_runner_bytes
.with_label_values(&[&group_label, level_label.as_str()])
.inc_by(compaction_write_bytes);
} else {
context
.compactor_metrics
.compact_write_bytes
.with_label_values(&[&group_label, level_label.as_str()])
.inc_by(compaction_write_bytes);
}
context
.compactor_metrics
.compact_write_sstn
Expand Down
5 changes: 3 additions & 2 deletions src/storage/src/monitor/compactor_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
#[derive(Debug, Clone)]
pub struct CompactorMetrics {
pub compaction_upload_sst_counts: GenericCounter<AtomicU64>,
pub compact_fast_runner_bytes: GenericCounter<AtomicU64>,
pub compact_fast_runner_bytes: GenericCounterVec<AtomicU64>,
pub compact_write_bytes: GenericCounterVec<AtomicU64>,
pub compact_read_current_level: GenericCounterVec<AtomicU64>,
pub compact_read_next_level: GenericCounterVec<AtomicU64>,
Expand Down Expand Up @@ -212,9 +212,10 @@ impl CompactorMetrics {
"Total size of compaction files size that have been written to object store from shared buffer",
registry
).unwrap();
let compact_fast_runner_bytes = register_int_counter_with_registry!(
let compact_fast_runner_bytes = register_int_counter_vec_with_registry!(
"compactor_fast_compact_bytes",
"Total size of compaction files size of fast compactor runner",
&["group", "level_index"],
registry
)
.unwrap();
Expand Down