diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 8984aff7af042..b33365868e681 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -82,9 +82,10 @@ use crate::hummock::compaction::selector::{ use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig}; use crate::hummock::error::{Error, Result}; use crate::hummock::metrics_utils::{ - build_compact_task_level_type_metrics_label, trigger_delta_log_stats, trigger_lsm_stat, - trigger_mv_stat, trigger_pin_unpin_snapshot_state, trigger_pin_unpin_version_state, - trigger_split_stat, trigger_sst_stat, trigger_version_stat, trigger_write_stop_stats, + build_compact_task_level_type_metrics_label, trigger_delta_log_stats, trigger_local_table_stat, + trigger_lsm_stat, trigger_mv_stat, trigger_pin_unpin_snapshot_state, + trigger_pin_unpin_version_state, trigger_split_stat, trigger_sst_stat, trigger_version_stat, + trigger_write_stop_stats, }; use crate::hummock::sequence::next_compaction_task_id; use crate::hummock::{CompactorManagerRef, TASK_NORMAL}; @@ -1430,13 +1431,23 @@ impl HummockManager { VarTransactionWrapper, VarTransaction::new(&mut versioning.version_stats,) ); - if let Some(table_stats_change) = &table_stats_change { - add_prost_table_stats_map(&mut version_stats.table_stats, table_stats_change); - } // apply version delta before we persist this change. If it causes panic we can // recover to a correct state after restarting meta-node. current_version.apply_version_delta(&version_delta); + if purge_prost_table_stats(&mut version_stats.table_stats, ¤t_version) { + self.metrics.version_stats.reset(); + versioning.local_metrics.clear(); + } + if let Some(table_stats_change) = &table_stats_change { + add_prost_table_stats_map(&mut version_stats.table_stats, table_stats_change); + trigger_local_table_stat( + &self.metrics, + &mut versioning.local_metrics, + &version_stats, + table_stats_change, + ); + } commit_multi_var!( self.env.meta_store(), self.sql_meta_store(), @@ -1447,7 +1458,7 @@ impl HummockManager { )?; branched_ssts.commit_memory(); - trigger_version_stat(&self.metrics, ¤t_version, &versioning.version_stats); + trigger_version_stat(&self.metrics, ¤t_version); trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len()); self.notify_stats(&versioning.version_stats); versioning.current_version = current_version; @@ -1708,16 +1719,17 @@ impl HummockManager { VarTransaction::new(&mut versioning.version_stats) ); add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change); - purge_prost_table_stats(&mut version_stats.table_stats, &new_hummock_version); - for (table_id, stats) in &table_stats_change { - let table_id_str = table_id.to_string(); - let stats_value = - std::cmp::max(0, stats.total_key_size + stats.total_value_size) / 1024 / 1024; - self.metrics - .table_write_throughput - .with_label_values(&[table_id_str.as_str()]) - .inc_by(stats_value as u64); + if purge_prost_table_stats(&mut version_stats.table_stats, &new_hummock_version) { + self.metrics.version_stats.reset(); + versioning.local_metrics.clear(); } + + trigger_local_table_stat( + &self.metrics, + &mut versioning.local_metrics, + &version_stats, + &table_stats_change, + ); commit_multi_var!( self.env.meta_store(), self.sql_meta_store(), @@ -1735,11 +1747,7 @@ impl HummockManager { assert!(prev_snapshot.committed_epoch < epoch); assert!(prev_snapshot.current_epoch < epoch); - trigger_version_stat( - &self.metrics, - &versioning.current_version, - &versioning.version_stats, - ); + trigger_version_stat(&self.metrics, &versioning.current_version); for compaction_group_id in &modified_compaction_groups { trigger_sst_stat( &self.metrics, diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 877be8919262e..33eeeb54ba62a 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -1828,7 +1828,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { ..Default::default() }, ], - None + None, ) .await .unwrap()); @@ -2034,7 +2034,7 @@ async fn test_move_tables_between_compaction_group() { gen_sstable_info(12, 2, vec![100, 101]), gen_sstable_info(13, 3, vec![101, 102]), ], - None + None, ) .await .unwrap()); diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 5cf270076e6c3..18e6950585c72 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -39,7 +39,9 @@ use crate::hummock::error::Result; use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender}; use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, read_lock, write_lock}; -use crate::hummock::metrics_utils::{trigger_safepoint_stat, trigger_write_stop_stats}; +use crate::hummock::metrics_utils::{ + trigger_safepoint_stat, trigger_write_stop_stats, LocalTableMetrics, +}; use crate::hummock::model::CompactionGroup; use crate::hummock::HummockManager; use crate::model::{VarTransaction, VarTransactionWrapper}; @@ -94,6 +96,7 @@ pub struct Versioning { /// Stats for latest hummock version. pub version_stats: HummockVersionStats, pub checkpoint: HummockVersionCheckpoint, + pub local_metrics: HashMap, } impl Versioning { diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index cd0fd437c379c..0ede18e7c559a 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -18,9 +18,11 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use itertools::{enumerate, Itertools}; +use prometheus::IntGauge; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ object_size_map, BranchedSstInfo, }; +use risingwave_hummock_sdk::table_stats::PbTableStatsMap; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, @@ -37,11 +39,49 @@ use crate::hummock::checkpoint::HummockVersionCheckpoint; use crate::hummock::compaction::CompactStatus; use crate::rpc::metrics::MetaMetrics; -pub fn trigger_version_stat( +pub struct LocalTableMetrics { + total_key_count: IntGauge, + total_key_size: IntGauge, + total_value_size: IntGauge, +} + +pub fn trigger_local_table_stat( metrics: &MetaMetrics, - current_version: &HummockVersion, + local_metrics: &mut HashMap, version_stats: &HummockVersionStats, + table_stats_change: &PbTableStatsMap, ) { + for (table_id, stats) in table_stats_change { + if stats.total_key_size == 0 && stats.total_value_size == 0 && stats.total_key_count == 0 { + continue; + } + let table_metrics = local_metrics.entry(*table_id).or_insert_with(|| { + let table_label = format!("{}", table_id); + LocalTableMetrics { + total_key_count: metrics + .version_stats + .with_label_values(&[&table_label, "total_key_count"]), + total_key_size: metrics + .version_stats + .with_label_values(&[&table_label, "total_key_size"]), + total_value_size: metrics + .version_stats + .with_label_values(&[&table_label, "total_value_size"]), + } + }); + if let Some(table_stats) = version_stats.table_stats.get(table_id) { + table_metrics + .total_key_count + .set(table_stats.total_key_count); + table_metrics + .total_value_size + .set(table_stats.total_value_size); + table_metrics.total_key_size.set(table_stats.total_key_size); + } + } +} + +pub fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) { metrics .max_committed_epoch .set(current_version.max_committed_epoch as i64); @@ -50,22 +90,6 @@ pub fn trigger_version_stat( .set(current_version.estimated_encode_len() as i64); metrics.safe_epoch.set(current_version.safe_epoch as i64); metrics.current_version_id.set(current_version.id as i64); - metrics.version_stats.reset(); - for (table_id, stats) in &version_stats.table_stats { - let table_id = format!("{}", table_id); - metrics - .version_stats - .with_label_values(&[&table_id, "total_key_count"]) - .set(stats.total_key_count); - metrics - .version_stats - .with_label_values(&[&table_id, "total_key_size"]) - .set(stats.total_key_size); - metrics - .version_stats - .with_label_values(&[&table_id, "total_value_size"]) - .set(stats.total_value_size); - } } pub fn trigger_mv_stat( diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index bdc674830458b..65f115e08d332 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -308,7 +308,6 @@ impl HummockMetaClient for MockHummockMetaClient { let hummock_manager_compact = self.hummock_manager.clone(); let report_handle = tokio::spawn(async move { tracing::info!("report_handle start"); - loop { if let Some(item) = request_receiver.recv().await { if let Event::ReportTask(ReportTask { diff --git a/src/storage/hummock_sdk/src/table_stats.rs b/src/storage/hummock_sdk/src/table_stats.rs index 083fe9fe3e252..6f6bb9b9e3d40 100644 --- a/src/storage/hummock_sdk/src/table_stats.rs +++ b/src/storage/hummock_sdk/src/table_stats.rs @@ -105,14 +105,12 @@ pub fn from_prost_table_stats_map( pub fn purge_prost_table_stats( table_stats: &mut PbTableStatsMap, hummock_version: &HummockVersion, -) { +) -> bool { let mut all_tables_in_version: HashSet = HashSet::default(); - for group in hummock_version.levels.keys() { - hummock_version.level_iter(*group, |level| { - all_tables_in_version - .extend(level.table_infos.iter().flat_map(|s| s.table_ids.clone())); - true - }) + let prev_count = table_stats.len(); + for group in hummock_version.levels.values() { + all_tables_in_version.extend(group.member_table_ids.clone()); } table_stats.retain(|k, _| all_tables_in_version.contains(k)); + prev_count != table_stats.len() }