Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(compactor): collect metrics for fast compact runer #12939

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1454,13 +1454,8 @@ pub(crate) mod tests {
.await
.unwrap();
let ret = ret1.into_iter().map(|sst| sst.sst_info).collect_vec();
let fast_ret = fast_compact_runner
.run()
.await
.unwrap()
.into_iter()
.map(|sst| sst.sst_info)
.collect_vec();
let (ssts, _) = fast_compact_runner.run().await.unwrap();
let fast_ret = ssts.into_iter().map(|sst| sst.sst_info).collect_vec();
println!("ssts: {} vs {}", fast_ret.len(), ret.len());
let mut fast_tables = Vec::with_capacity(fast_ret.len());
let mut normal_tables = Vec::with_capacity(ret.len());
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,8 @@ pub async fn compact(
task_progress_guard.progress.clone(),
);
match runner.run().await {
Ok(ssts) => {
output_ssts.push((0, ssts, CompactionStatistics::default()));
Ok((ssts, statistics)) => {
output_ssts.push((0, ssts, statistics));
}
Err(e) => {
task_status = TaskStatus::ExecuteFailed;
Expand Down
83 changes: 75 additions & 8 deletions src/storage/src/hummock/compactor/fast_compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ use bytes::Bytes;
use itertools::Itertools;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::table_stats::TableStats;
use risingwave_hummock_sdk::{can_concat, HummockEpoch, LocalSstableInfo};
use risingwave_pb::hummock::{CompactTask, SstableInfo};

use crate::filter_key_extractor::FilterKeyExtractorImpl;
use crate::hummock::compactor::task_progress::TaskProgress;
use crate::hummock::compactor::{Compactor, CompactorContext, RemoteBuilderFactory, TaskConfig};
use crate::hummock::compactor::{
CompactionStatistics, Compactor, CompactorContext, RemoteBuilderFactory, TaskConfig,
};
use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
use crate::hummock::sstable_store::{BlockStream, SstableStoreRef};
use crate::hummock::value::HummockValue;
Expand Down Expand Up @@ -280,7 +283,6 @@ pub struct CompactorRunner {
>,
compression_algorithm: CompressionAlgorithm,
metrics: Arc<CompactorMetrics>,
task_progress: Arc<TaskProgress>,
}

impl CompactorRunner {
Expand Down Expand Up @@ -343,17 +345,16 @@ impl CompactorRunner {
));

Self {
executor: CompactTaskExecutor::new(sst_builder, task_config),
executor: CompactTaskExecutor::new(sst_builder, task_config, task_progress),
left,
right,
task_id: task.task_id,
metrics: context.compactor_metrics.clone(),
compression_algorithm,
task_progress,
}
}

pub async fn run(mut self) -> HummockResult<Vec<LocalSstableInfo>> {
pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
self.left.rewind().await?;
self.right.rewind().await?;
let mut skip_raw_block_count = 0;
Expand Down Expand Up @@ -409,6 +410,7 @@ impl CompactorRunner {

let largest_key = first.current_sstable().current_block_largest();
let block_len = block.len() as u64;
let block_key_count = meta.total_key_count;

if self
.executor
Expand All @@ -419,6 +421,7 @@ impl CompactorRunner {
skip_raw_block_size += block_len;
skip_raw_block_count += 1;
}
self.executor.may_report_process_key(block_key_count);
self.executor.clear();
}
if !first.current_sstable().is_valid() {
Expand Down Expand Up @@ -462,6 +465,7 @@ impl CompactorRunner {
sstable_iter.download_next_block().await?.unwrap();
let largest_key = sstable_iter.current_block_largest();
let block_len = block.len() as u64;
let block_key_count = block_meta.total_key_count;
if self
.executor
.builder
Expand All @@ -471,6 +475,7 @@ impl CompactorRunner {
skip_raw_block_count += 1;
skip_raw_block_size += block_len;
}
self.executor.may_report_process_key(block_key_count);
}
rest_data.next_sstable().await?;
}
Expand All @@ -491,37 +496,61 @@ impl CompactorRunner {
skip_raw_block_size * 100 / total_read_bytes,
);

let statistic = self.executor.take_statistics();
let outputs = self.executor.builder.finish().await?;
let ssts = Compactor::report_progress(
self.metrics.clone(),
Some(self.task_progress.clone()),
Some(self.executor.task_progress.clone()),
outputs,
false,
)
.await?;
let sst_infos = ssts.iter().map(|sst| sst.sst_info.clone()).collect_vec();
assert!(can_concat(&sst_infos));
Ok(ssts)
Ok((ssts, statistic))
}
}

pub struct CompactTaskExecutor<F: TableBuilderFactory> {
last_key: FullKey<Vec<u8>>,
compaction_statistics: CompactionStatistics,
last_table_id: Option<u32>,
last_table_stats: TableStats,
watermark_can_see_last_key: bool,
builder: CapacitySplitTableBuilder<F>,
task_config: TaskConfig,
task_progress: Arc<TaskProgress>,
last_key_is_delete: bool,
progress_key_num: u32,
}

impl<F: TableBuilderFactory> CompactTaskExecutor<F> {
pub fn new(builder: CapacitySplitTableBuilder<F>, task_config: TaskConfig) -> Self {
pub fn new(
builder: CapacitySplitTableBuilder<F>,
task_config: TaskConfig,
task_progress: Arc<TaskProgress>,
) -> Self {
Self {
builder,
task_config,
last_key: FullKey::default(),
watermark_can_see_last_key: false,
last_key_is_delete: false,
compaction_statistics: CompactionStatistics::default(),
last_table_id: None,
last_table_stats: TableStats::default(),
progress_key_num: 0,
task_progress,
}
}

fn take_statistics(&mut self) -> CompactionStatistics {
if let Some(last_table_id) = self.last_table_id.take() {
self.compaction_statistics
.delta_drop_stat
.insert(last_table_id, std::mem::take(&mut self.last_table_stats));
}
std::mem::take(&mut self.compaction_statistics)
}

fn clear(&mut self) {
Expand All @@ -532,6 +561,17 @@ impl<F: TableBuilderFactory> CompactTaskExecutor<F> {
self.last_key_is_delete = false;
}

#[inline(always)]
fn may_report_process_key(&mut self, key_count: u32) {
const PROGRESS_KEY_INTERVAL: u32 = 100;
self.progress_key_num += key_count;
if self.progress_key_num > PROGRESS_KEY_INTERVAL {
self.task_progress
.inc_progress_key(self.progress_key_num as u64);
self.progress_key_num = 0;
}
}

pub async fn run(
&mut self,
iter: &mut BlockIterator,
Expand All @@ -540,6 +580,9 @@ impl<F: TableBuilderFactory> CompactTaskExecutor<F> {
while iter.is_valid() && iter.key().le(&target_key) {
let is_new_user_key =
!self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
self.compaction_statistics.iter_total_key_counts += 1;
self.may_report_process_key(1);

let mut drop = false;
let epoch = iter.key().epoch;
let value = HummockValue::from_slice(iter.value()).unwrap();
Expand All @@ -562,7 +605,31 @@ impl<F: TableBuilderFactory> CompactTaskExecutor<F> {
self.watermark_can_see_last_key = true;
}

if self.last_table_id.map_or(true, |last_table_id| {
last_table_id != self.last_key.user_key.table_id.table_id
}) {
if let Some(last_table_id) = self.last_table_id.take() {
self.compaction_statistics
.delta_drop_stat
.insert(last_table_id, std::mem::take(&mut self.last_table_stats));
}
self.last_table_id = Some(self.last_key.user_key.table_id.table_id);
}

if drop {
self.compaction_statistics.iter_drop_key_counts += 1;

let should_count = match self.task_config.stats_target_table_ids.as_ref() {
Some(target_table_ids) => {
target_table_ids.contains(&self.last_key.user_key.table_id.table_id)
}
None => true,
};
if should_count {
self.last_table_stats.total_key_count -= 1;
self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
self.last_table_stats.total_value_size -= value.encoded_len() as i64;
}
iter.next();
continue;
}
Expand Down
3 changes: 0 additions & 3 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,6 @@ mod tests {

use crate::hummock::utils::MemoryLimiter;

// This is a clippy bug, see https://github.com/rust-lang/rust-clippy/issues/11380.
// TODO: remove `allow` here after the issued is closed.
#[expect(clippy::needless_pass_by_ref_mut)]
async fn assert_pending(future: &mut (impl Future + Unpin)) {
for _ in 0..10 {
assert!(poll_fn(|cx| Poll::Ready(future.poll_unpin(cx)))
Expand Down
Loading