diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 0dc5cbb19bf2d..778e0a30721e2 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -80,10 +80,10 @@ use crate::hummock::compaction::selector::{ use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig}; use crate::hummock::error::{Error, Result}; use crate::hummock::metrics_utils::{ - build_compact_task_level_type_metrics_label, trigger_delta_log_stats, trigger_local_table_stat, - trigger_lsm_stat, trigger_mv_stat, trigger_pin_unpin_snapshot_state, - trigger_pin_unpin_version_state, trigger_split_stat, trigger_sst_stat, trigger_version_stat, - trigger_write_stop_stats, + build_compact_task_level_type_metrics_label, get_or_create_local_table_stat, + trigger_delta_log_stats, trigger_local_table_stat, trigger_lsm_stat, trigger_mv_stat, + trigger_pin_unpin_snapshot_state, trigger_pin_unpin_version_state, trigger_split_stat, + trigger_sst_stat, trigger_version_stat, trigger_write_stop_stats, }; use crate::hummock::sequence::next_compaction_task_id; use crate::hummock::{CompactorManagerRef, TASK_NORMAL}; @@ -1007,7 +1007,7 @@ impl HummockManager { compaction_filter_mask: group_config.compaction_config.compaction_filter_mask, target_sub_level_id: compact_task.input.target_sub_level_id, task_type: compact_task.compaction_task_type as i32, - split_weight_by_vnode, + split_weight_by_vnode: vnode_partition_count, ..Default::default() }; @@ -1751,6 +1751,21 @@ impl HummockManager { &version_stats, &table_stats_change, ); + for (table_id, stats) in &table_stats_change { + if stats.total_key_size == 0 + && stats.total_value_size == 0 + && stats.total_key_count == 0 + { + continue; + } + let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size); + let table_metrics = get_or_create_local_table_stat( + &self.metrics, + *table_id, + &mut versioning.local_metrics, + ); + table_metrics.inc_write_throughput(stats_value as u64); + } commit_multi_var!( self.env.meta_store(), self.sql_meta_store(), diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index 0ede18e7c559a..be25c5cf452b2 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use itertools::{enumerate, Itertools}; +use prometheus::core::{AtomicU64, GenericCounter}; use prometheus::IntGauge; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ object_size_map, BranchedSstInfo, @@ -43,6 +44,50 @@ pub struct LocalTableMetrics { total_key_count: IntGauge, total_key_size: IntGauge, total_value_size: IntGauge, + write_throughput: GenericCounter, + cal_count: usize, + write_size: u64, +} + +const MIN_FLUSH_COUNT: usize = 16; +const MIN_FLUSH_DATA_SIZE: u64 = 128 * 1024 * 1024; + +impl LocalTableMetrics { + pub fn inc_write_throughput(&mut self, val: u64) { + self.write_size += val; + self.cal_count += 1; + if self.write_size > MIN_FLUSH_DATA_SIZE || self.cal_count > MIN_FLUSH_COUNT { + self.write_throughput.inc_by(self.write_size / 1024 / 1024); + self.write_size = 0; + self.cal_count = 0; + } + } +} + +pub fn get_or_create_local_table_stat<'a>( + metrics: &MetaMetrics, + table_id: u32, + local_metrics: &'a mut HashMap, +) -> &'a mut LocalTableMetrics { + local_metrics.entry(table_id).or_insert_with(|| { + let table_label = format!("{}", table_id); + LocalTableMetrics { + total_key_count: metrics + .version_stats + .with_label_values(&[&table_label, "total_key_count"]), + total_key_size: metrics + .version_stats + .with_label_values(&[&table_label, "total_key_size"]), + total_value_size: metrics + .version_stats + .with_label_values(&[&table_label, "total_value_size"]), + write_throughput: metrics + .table_write_throughput + .with_label_values(&[&table_label]), + cal_count: 0, + write_size: 0, + } + }) } pub fn trigger_local_table_stat( @@ -55,20 +100,7 @@ pub fn trigger_local_table_stat( if stats.total_key_size == 0 && stats.total_value_size == 0 && stats.total_key_count == 0 { continue; } - let table_metrics = local_metrics.entry(*table_id).or_insert_with(|| { - let table_label = format!("{}", table_id); - LocalTableMetrics { - total_key_count: metrics - .version_stats - .with_label_values(&[&table_label, "total_key_count"]), - total_key_size: metrics - .version_stats - .with_label_values(&[&table_label, "total_key_size"]), - total_value_size: metrics - .version_stats - .with_label_values(&[&table_label, "total_value_size"]), - } - }); + let table_metrics = get_or_create_local_table_stat(metrics, *table_id, local_metrics); if let Some(table_stats) = version_stats.table_stats.get(table_id) { table_metrics .total_key_count