Skip to content

Commit

Permalink
collect metrics for fast compact
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Oct 18, 2023
1 parent 9c7a50b commit 77b0ae1
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 17 deletions.
9 changes: 2 additions & 7 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1454,13 +1454,8 @@ pub(crate) mod tests {
.await
.unwrap();
let ret = ret1.into_iter().map(|sst| sst.sst_info).collect_vec();
let fast_ret = fast_compact_runner
.run()
.await
.unwrap()
.into_iter()
.map(|sst| sst.sst_info)
.collect_vec();
let (ssts, _) = fast_compact_runner.run().await.unwrap();
let fast_ret = ssts.into_iter().map(|sst| sst.sst_info).collect_vec();
println!("ssts: {} vs {}", fast_ret.len(), ret.len());
let mut fast_tables = Vec::with_capacity(fast_ret.len());
let mut normal_tables = Vec::with_capacity(ret.len());
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ pub async fn compact(
task_progress_guard.progress.clone(),
);
match runner.run().await {
Ok(ssts) => {
output_ssts.push((0, ssts, CompactionStatistics::default()));
Ok((ssts, statistics)) => {
output_ssts.push((0, ssts, statistics));
}
Err(e) => {
task_status = TaskStatus::ExecuteFailed;
Expand Down
83 changes: 75 additions & 8 deletions src/storage/src/hummock/compactor/fast_compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ use bytes::Bytes;
use itertools::Itertools;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::table_stats::TableStats;
use risingwave_hummock_sdk::{can_concat, HummockEpoch, LocalSstableInfo};
use risingwave_pb::hummock::{CompactTask, SstableInfo};

use crate::filter_key_extractor::FilterKeyExtractorImpl;
use crate::hummock::compactor::task_progress::TaskProgress;
use crate::hummock::compactor::{Compactor, CompactorContext, RemoteBuilderFactory, TaskConfig};
use crate::hummock::compactor::{
CompactionStatistics, Compactor, CompactorContext, RemoteBuilderFactory, TaskConfig,
};
use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
use crate::hummock::sstable_store::{BlockStream, SstableStoreRef};
use crate::hummock::value::HummockValue;
Expand Down Expand Up @@ -280,7 +283,6 @@ pub struct CompactorRunner {
>,
compression_algorithm: CompressionAlgorithm,
metrics: Arc<CompactorMetrics>,
task_progress: Arc<TaskProgress>,
}

impl CompactorRunner {
Expand Down Expand Up @@ -343,17 +345,16 @@ impl CompactorRunner {
));

Self {
executor: CompactTaskExecutor::new(sst_builder, task_config),
executor: CompactTaskExecutor::new(sst_builder, task_config, task_progress),
left,
right,
task_id: task.task_id,
metrics: context.compactor_metrics.clone(),
compression_algorithm,
task_progress,
}
}

pub async fn run(mut self) -> HummockResult<Vec<LocalSstableInfo>> {
pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
self.left.rewind().await?;
self.right.rewind().await?;
let mut skip_raw_block_count = 0;
Expand Down Expand Up @@ -409,6 +410,7 @@ impl CompactorRunner {

let largest_key = first.current_sstable().current_block_largest();
let block_len = block.len() as u64;
let block_key_count = meta.total_key_count;

if self
.executor
Expand All @@ -419,6 +421,7 @@ impl CompactorRunner {
skip_raw_block_size += block_len;
skip_raw_block_count += 1;
}
self.executor.may_report_process_key(block_key_count);
self.executor.clear();
}
if !first.current_sstable().is_valid() {
Expand Down Expand Up @@ -462,6 +465,7 @@ impl CompactorRunner {
sstable_iter.download_next_block().await?.unwrap();
let largest_key = sstable_iter.current_block_largest();
let block_len = block.len() as u64;
let block_key_count = block_meta.total_key_count;
if self
.executor
.builder
Expand All @@ -471,6 +475,7 @@ impl CompactorRunner {
skip_raw_block_count += 1;
skip_raw_block_size += block_len;
}
self.executor.may_report_process_key(block_key_count);
}
rest_data.next_sstable().await?;
}
Expand All @@ -491,37 +496,61 @@ impl CompactorRunner {
skip_raw_block_size * 100 / total_read_bytes,
);

let statistic = self.executor.take_statistics();
let outputs = self.executor.builder.finish().await?;
let ssts = Compactor::report_progress(
self.metrics.clone(),
Some(self.task_progress.clone()),
Some(self.executor.task_progress.clone()),
outputs,
false,
)
.await?;
let sst_infos = ssts.iter().map(|sst| sst.sst_info.clone()).collect_vec();
assert!(can_concat(&sst_infos));
Ok(ssts)
Ok((ssts, statistic))
}
}

pub struct CompactTaskExecutor<F: TableBuilderFactory> {
last_key: FullKey<Vec<u8>>,
compaction_statistics: CompactionStatistics,
last_table_id: Option<u32>,
last_table_stats: TableStats,
watermark_can_see_last_key: bool,
builder: CapacitySplitTableBuilder<F>,
task_config: TaskConfig,
task_progress: Arc<TaskProgress>,
last_key_is_delete: bool,
progress_key_num: u32,
}

impl<F: TableBuilderFactory> CompactTaskExecutor<F> {
pub fn new(builder: CapacitySplitTableBuilder<F>, task_config: TaskConfig) -> Self {
pub fn new(
builder: CapacitySplitTableBuilder<F>,
task_config: TaskConfig,
task_progress: Arc<TaskProgress>,
) -> Self {
Self {
builder,
task_config,
last_key: FullKey::default(),
watermark_can_see_last_key: false,
last_key_is_delete: false,
compaction_statistics: CompactionStatistics::default(),
last_table_id: None,
last_table_stats: TableStats::default(),
progress_key_num: 0,
task_progress,
}
}

fn take_statistics(&mut self) -> CompactionStatistics {
if let Some(last_table_id) = self.last_table_id.take() {
self.compaction_statistics
.delta_drop_stat
.insert(last_table_id, std::mem::take(&mut self.last_table_stats));
}
std::mem::take(&mut self.compaction_statistics)
}

fn clear(&mut self) {
Expand All @@ -532,6 +561,17 @@ impl<F: TableBuilderFactory> CompactTaskExecutor<F> {
self.last_key_is_delete = false;
}

#[inline(always)]
fn may_report_process_key(&mut self, key_count: u32) {
const PROGRESS_KEY_INTERVAL: u32 = 100;
self.progress_key_num += key_count;
if self.progress_key_num > PROGRESS_KEY_INTERVAL {
self.task_progress
.inc_progress_key(self.progress_key_num as u64);
self.progress_key_num = 0;
}
}

pub async fn run(
&mut self,
iter: &mut BlockIterator,
Expand All @@ -540,6 +580,9 @@ impl<F: TableBuilderFactory> CompactTaskExecutor<F> {
while iter.is_valid() && iter.key().le(&target_key) {
let is_new_user_key =
!self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
self.compaction_statistics.iter_total_key_counts += 1;
self.may_report_process_key(1);

let mut drop = false;
let epoch = iter.key().epoch;
let value = HummockValue::from_slice(iter.value()).unwrap();
Expand All @@ -562,7 +605,31 @@ impl<F: TableBuilderFactory> CompactTaskExecutor<F> {
self.watermark_can_see_last_key = true;
}

if self.last_table_id.map_or(true, |last_table_id| {
last_table_id != self.last_key.user_key.table_id.table_id
}) {
if let Some(last_table_id) = self.last_table_id.take() {
self.compaction_statistics
.delta_drop_stat
.insert(last_table_id, std::mem::take(&mut self.last_table_stats));
}
self.last_table_id = Some(self.last_key.user_key.table_id.table_id);
}

if drop {
self.compaction_statistics.iter_drop_key_counts += 1;

let should_count = match self.task_config.stats_target_table_ids.as_ref() {
Some(target_table_ids) => {
target_table_ids.contains(&self.last_key.user_key.table_id.table_id)
}
None => true,
};
if should_count {
self.last_table_stats.total_key_count -= 1;
self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
self.last_table_stats.total_value_size -= value.encoded_len() as i64;
}
iter.next();
continue;
}
Expand Down

0 comments on commit 77b0ae1

Please sign in to comment.