diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 50d739c5d1eb9..34b23a9b79774 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -1454,13 +1454,8 @@ pub(crate) mod tests { .await .unwrap(); let ret = ret1.into_iter().map(|sst| sst.sst_info).collect_vec(); - let fast_ret = fast_compact_runner - .run() - .await - .unwrap() - .into_iter() - .map(|sst| sst.sst_info) - .collect_vec(); + let (ssts, _) = fast_compact_runner.run().await.unwrap(); + let fast_ret = ssts.into_iter().map(|sst| sst.sst_info).collect_vec(); println!("ssts: {} vs {}", fast_ret.len(), ret.len()); let mut fast_tables = Vec::with_capacity(fast_ret.len()); let mut normal_tables = Vec::with_capacity(ret.len()); diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index a21016014d247..1925acbce7534 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -472,8 +472,8 @@ pub async fn compact( task_progress_guard.progress.clone(), ); match runner.run().await { - Ok(ssts) => { - output_ssts.push((0, ssts, CompactionStatistics::default())); + Ok((ssts, statistics)) => { + output_ssts.push((0, ssts, statistics)); } Err(e) => { task_status = TaskStatus::ExecuteFailed; diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 6dcfb0e2392cf..c3184fc3e5f76 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -24,12 +24,15 @@ use bytes::Bytes; use itertools::Itertools; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::key_range::KeyRange; +use risingwave_hummock_sdk::table_stats::TableStats; use risingwave_hummock_sdk::{can_concat, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::{CompactTask, SstableInfo}; use crate::filter_key_extractor::FilterKeyExtractorImpl; use crate::hummock::compactor::task_progress::TaskProgress; -use crate::hummock::compactor::{Compactor, CompactorContext, RemoteBuilderFactory, TaskConfig}; +use crate::hummock::compactor::{ + CompactionStatistics, Compactor, CompactorContext, RemoteBuilderFactory, TaskConfig, +}; use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory}; use crate::hummock::sstable_store::{BlockStream, SstableStoreRef}; use crate::hummock::value::HummockValue; @@ -280,7 +283,6 @@ pub struct CompactorRunner { >, compression_algorithm: CompressionAlgorithm, metrics: Arc, - task_progress: Arc, } impl CompactorRunner { @@ -343,17 +345,16 @@ impl CompactorRunner { )); Self { - executor: CompactTaskExecutor::new(sst_builder, task_config), + executor: CompactTaskExecutor::new(sst_builder, task_config, task_progress), left, right, task_id: task.task_id, metrics: context.compactor_metrics.clone(), compression_algorithm, - task_progress, } } - pub async fn run(mut self) -> HummockResult> { + pub async fn run(mut self) -> HummockResult<(Vec, CompactionStatistics)> { self.left.rewind().await?; self.right.rewind().await?; let mut skip_raw_block_count = 0; @@ -409,6 +410,7 @@ impl CompactorRunner { let largest_key = first.current_sstable().current_block_largest(); let block_len = block.len() as u64; + let block_key_count = meta.total_key_count; if self .executor @@ -419,6 +421,7 @@ impl CompactorRunner { skip_raw_block_size += block_len; skip_raw_block_count += 1; } + self.executor.may_report_process_key(block_key_count); self.executor.clear(); } if !first.current_sstable().is_valid() { @@ -462,6 +465,7 @@ impl CompactorRunner { sstable_iter.download_next_block().await?.unwrap(); let largest_key = sstable_iter.current_block_largest(); let block_len = block.len() as u64; + let block_key_count = block_meta.total_key_count; if self .executor .builder @@ -471,6 +475,7 @@ impl CompactorRunner { skip_raw_block_count += 1; skip_raw_block_size += block_len; } + self.executor.may_report_process_key(block_key_count); } rest_data.next_sstable().await?; } @@ -491,37 +496,61 @@ impl CompactorRunner { skip_raw_block_size * 100 / total_read_bytes, ); + let statistic = self.executor.take_statistics(); let outputs = self.executor.builder.finish().await?; let ssts = Compactor::report_progress( self.metrics.clone(), - Some(self.task_progress.clone()), + Some(self.executor.task_progress.clone()), outputs, false, ) .await?; let sst_infos = ssts.iter().map(|sst| sst.sst_info.clone()).collect_vec(); assert!(can_concat(&sst_infos)); - Ok(ssts) + Ok((ssts, statistic)) } } pub struct CompactTaskExecutor { last_key: FullKey>, + compaction_statistics: CompactionStatistics, + last_table_id: Option, + last_table_stats: TableStats, watermark_can_see_last_key: bool, builder: CapacitySplitTableBuilder, task_config: TaskConfig, + task_progress: Arc, last_key_is_delete: bool, + progress_key_num: u32, } impl CompactTaskExecutor { - pub fn new(builder: CapacitySplitTableBuilder, task_config: TaskConfig) -> Self { + pub fn new( + builder: CapacitySplitTableBuilder, + task_config: TaskConfig, + task_progress: Arc, + ) -> Self { Self { builder, task_config, last_key: FullKey::default(), watermark_can_see_last_key: false, last_key_is_delete: false, + compaction_statistics: CompactionStatistics::default(), + last_table_id: None, + last_table_stats: TableStats::default(), + progress_key_num: 0, + task_progress, + } + } + + fn take_statistics(&mut self) -> CompactionStatistics { + if let Some(last_table_id) = self.last_table_id.take() { + self.compaction_statistics + .delta_drop_stat + .insert(last_table_id, std::mem::take(&mut self.last_table_stats)); } + std::mem::take(&mut self.compaction_statistics) } fn clear(&mut self) { @@ -532,6 +561,17 @@ impl CompactTaskExecutor { self.last_key_is_delete = false; } + #[inline(always)] + fn may_report_process_key(&mut self, key_count: u32) { + const PROGRESS_KEY_INTERVAL: u32 = 100; + self.progress_key_num += key_count; + if self.progress_key_num > PROGRESS_KEY_INTERVAL { + self.task_progress + .inc_progress_key(self.progress_key_num as u64); + self.progress_key_num = 0; + } + } + pub async fn run( &mut self, iter: &mut BlockIterator, @@ -540,6 +580,9 @@ impl CompactTaskExecutor { while iter.is_valid() && iter.key().le(&target_key) { let is_new_user_key = !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref(); + self.compaction_statistics.iter_total_key_counts += 1; + self.may_report_process_key(1); + let mut drop = false; let epoch = iter.key().epoch; let value = HummockValue::from_slice(iter.value()).unwrap(); @@ -562,7 +605,31 @@ impl CompactTaskExecutor { self.watermark_can_see_last_key = true; } + if self.last_table_id.map_or(true, |last_table_id| { + last_table_id != self.last_key.user_key.table_id.table_id + }) { + if let Some(last_table_id) = self.last_table_id.take() { + self.compaction_statistics + .delta_drop_stat + .insert(last_table_id, std::mem::take(&mut self.last_table_stats)); + } + self.last_table_id = Some(self.last_key.user_key.table_id.table_id); + } + if drop { + self.compaction_statistics.iter_drop_key_counts += 1; + + let should_count = match self.task_config.stats_target_table_ids.as_ref() { + Some(target_table_ids) => { + target_table_ids.contains(&self.last_key.user_key.table_id.table_id) + } + None => true, + }; + if should_count { + self.last_table_stats.total_key_count -= 1; + self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64; + self.last_table_stats.total_value_size -= value.encoded_len() as i64; + } iter.next(); continue; } diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 7ccb3fbf04790..6404d80bb265f 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -604,9 +604,6 @@ mod tests { use crate::hummock::utils::MemoryLimiter; - // This is a clippy bug, see https://github.com/rust-lang/rust-clippy/issues/11380. - // TODO: remove `allow` here after the issued is closed. - #[expect(clippy::needless_pass_by_ref_mut)] async fn assert_pending(future: &mut (impl Future + Unpin)) { for _ in 0..10 { assert!(poll_fn(|cx| Poll::Ready(future.poll_unpin(cx)))