Skip to content

Commit

Permalink
feat(meta): optimize table stats report performance (#15401)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored and Li0k committed Mar 7, 2024
1 parent b9888f5 commit 9d50acb
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 55 deletions.
50 changes: 28 additions & 22 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ use crate::hummock::compaction::selector::{
use crate::hummock::compaction::CompactStatus;
use crate::hummock::error::{Error, Result};
use crate::hummock::metrics_utils::{
build_compact_task_level_type_metrics_label, trigger_delta_log_stats, trigger_lsm_stat,
trigger_mv_stat, trigger_pin_unpin_snapshot_state, trigger_pin_unpin_version_state,
trigger_split_stat, trigger_sst_stat, trigger_version_stat, trigger_write_stop_stats,
build_compact_task_level_type_metrics_label, trigger_delta_log_stats, trigger_local_table_stat,
trigger_lsm_stat, trigger_mv_stat, trigger_pin_unpin_snapshot_state,
trigger_pin_unpin_version_state, trigger_split_stat, trigger_sst_stat, trigger_version_stat,
trigger_write_stop_stats,
};
use crate::hummock::{CompactorManagerRef, TASK_NORMAL};
#[cfg(any(test, feature = "test"))]
Expand Down Expand Up @@ -1315,10 +1316,6 @@ impl HummockManager {
deterministic_mode,
);
let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
if let Some(table_stats_change) = &table_stats_change {
add_prost_table_stats_map(&mut version_stats.table_stats, table_stats_change);
}

// apply version delta before we persist this change. If it causes panic we can
// recover to a correct state after restarting meta-node.

Expand All @@ -1329,6 +1326,19 @@ impl HummockManager {
.hummock_manager_latency
.with_label_values(&["commit_transaction"])
.start_timer();
if purge_prost_table_stats(&mut version_stats.table_stats, &current_version) {
self.metrics.version_stats.reset();
versioning.local_metrics.clear();
}
if let Some(table_stats_change) = &table_stats_change {
add_prost_table_stats_map(&mut version_stats.table_stats, table_stats_change);
trigger_local_table_stat(
&self.metrics,
&mut versioning.local_metrics,
&version_stats,
table_stats_change,
);
}
commit_multi_var!(
self,
None,
Expand All @@ -1346,7 +1356,7 @@ impl HummockManager {
.start_timer();
branched_ssts.commit_memory();

trigger_version_stat(&self.metrics, &current_version, &versioning.version_stats);
trigger_version_stat(&self.metrics, &current_version);
trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len());
self.notify_stats(&versioning.version_stats);
versioning.current_version = current_version;
Expand Down Expand Up @@ -1613,22 +1623,22 @@ impl HummockManager {
// Apply stats changes.
let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
purge_prost_table_stats(&mut version_stats.table_stats, &new_hummock_version);
for (table_id, stats) in &table_stats_change {
let table_id_str = table_id.to_string();
let stats_value =
std::cmp::max(0, stats.total_key_size + stats.total_value_size) / 1024 / 1024;
self.metrics
.table_write_throughput
.with_label_values(&[table_id_str.as_str()])
.inc_by(stats_value as u64);
if purge_prost_table_stats(&mut version_stats.table_stats, &new_hummock_version) {
self.metrics.version_stats.reset();
versioning.local_metrics.clear();
}

let commit_etcd_timer = self
.metrics
.hummock_manager_latency
.with_label_values(&["commit_etcd"])
.start_timer();
trigger_local_table_stat(
&self.metrics,
&mut versioning.local_metrics,
&version_stats,
&table_stats_change,
);
commit_multi_var!(
self,
None,
Expand All @@ -1649,11 +1659,7 @@ impl HummockManager {
assert!(prev_snapshot.committed_epoch < epoch);
assert!(prev_snapshot.current_epoch < epoch);

trigger_version_stat(
&self.metrics,
&versioning.current_version,
&versioning.version_stats,
);
trigger_version_stat(&self.metrics, &versioning.current_version);
for compaction_group_id in &modified_compaction_groups {
trigger_sst_stat(
&self.metrics,
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() {
..Default::default()
},
],
None
None,
)
.await
.unwrap());
Expand Down Expand Up @@ -2002,7 +2002,7 @@ async fn test_move_tables_between_compaction_group() {
gen_sstable_info(12, 2, vec![100, 101]),
gen_sstable_info(13, 3, vec![101, 102]),
],
None
None,
)
.await
.unwrap());
Expand Down
5 changes: 4 additions & 1 deletion src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use crate::hummock::error::Result;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender};
use crate::hummock::manager::{commit_multi_var, read_lock, write_lock};
use crate::hummock::metrics_utils::{trigger_safepoint_stat, trigger_write_stop_stats};
use crate::hummock::metrics_utils::{
trigger_safepoint_stat, trigger_write_stop_stats, LocalTableMetrics,
};
use crate::hummock::model::CompactionGroup;
use crate::hummock::HummockManager;
use crate::model::{ValTransaction, VarTransaction};
Expand Down Expand Up @@ -97,6 +99,7 @@ pub struct Versioning {
/// Stats for latest hummock version.
pub version_stats: HummockVersionStats,
pub checkpoint: HummockVersionCheckpoint,
pub local_metrics: HashMap<u32, LocalTableMetrics>,
}

impl Versioning {
Expand Down
64 changes: 42 additions & 22 deletions src/meta/src/hummock/metrics_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use itertools::{enumerate, Itertools};
use prometheus::IntGauge;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
object_size_map, BranchedSstInfo,
};
use risingwave_hummock_sdk::table_stats::PbTableStatsMap;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{
CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId,
Expand All @@ -37,11 +39,49 @@ use crate::hummock::checkpoint::HummockVersionCheckpoint;
use crate::hummock::compaction::CompactStatus;
use crate::rpc::metrics::MetaMetrics;

pub fn trigger_version_stat(
pub struct LocalTableMetrics {
total_key_count: IntGauge,
total_key_size: IntGauge,
total_value_size: IntGauge,
}

pub fn trigger_local_table_stat(
metrics: &MetaMetrics,
current_version: &HummockVersion,
local_metrics: &mut HashMap<u32, LocalTableMetrics>,
version_stats: &HummockVersionStats,
table_stats_change: &PbTableStatsMap,
) {
for (table_id, stats) in table_stats_change {
if stats.total_key_size == 0 && stats.total_value_size == 0 && stats.total_key_count == 0 {
continue;
}
let table_metrics = local_metrics.entry(*table_id).or_insert_with(|| {
let table_label = format!("{}", table_id);
LocalTableMetrics {
total_key_count: metrics
.version_stats
.with_label_values(&[&table_label, "total_key_count"]),
total_key_size: metrics
.version_stats
.with_label_values(&[&table_label, "total_key_size"]),
total_value_size: metrics
.version_stats
.with_label_values(&[&table_label, "total_value_size"]),
}
});
if let Some(table_stats) = version_stats.table_stats.get(table_id) {
table_metrics
.total_key_count
.set(table_stats.total_key_count);
table_metrics
.total_value_size
.set(table_stats.total_value_size);
table_metrics.total_key_size.set(table_stats.total_key_size);
}
}
}

pub fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) {
metrics
.max_committed_epoch
.set(current_version.max_committed_epoch as i64);
Expand All @@ -50,26 +90,6 @@ pub fn trigger_version_stat(
.set(current_version.estimated_encode_len() as i64);
metrics.safe_epoch.set(current_version.safe_epoch as i64);
metrics.current_version_id.set(current_version.id as i64);
metrics.version_stats.reset();
metrics
.version_stats
.with_label_values(&["total", ""])
.set(version_stats.table_stats.len() as i64);
// for (table_id, stats) in &version_stats.table_stats {
// let table_id = format!("{}", table_id);
// metrics
// .version_stats
// .with_label_values(&[&table_id, "total_key_count"])
// .set(stats.total_key_count);
// metrics
// .version_stats
// .with_label_values(&[&table_id, "total_key_size"])
// .set(stats.total_key_size);
// metrics
// .version_stats
// .with_label_values(&[&table_id, "total_value_size"])
// .set(stats.total_value_size);
// }
}

pub fn trigger_mv_stat(
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ impl HummockMetaClient for MockHummockMetaClient {
let hummock_manager_compact = self.hummock_manager.clone();
let report_handle = tokio::spawn(async move {
tracing::info!("report_handle start");

loop {
if let Some(item) = request_receiver.recv().await {
if let Event::ReportTask(ReportTask {
Expand Down
12 changes: 5 additions & 7 deletions src/storage/hummock_sdk/src/table_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,12 @@ pub fn from_prost_table_stats_map(
pub fn purge_prost_table_stats(
table_stats: &mut PbTableStatsMap,
hummock_version: &HummockVersion,
) {
) -> bool {
let mut all_tables_in_version: HashSet<u32> = HashSet::default();
for group in hummock_version.levels.keys() {
hummock_version.level_iter(*group, |level| {
all_tables_in_version
.extend(level.table_infos.iter().flat_map(|s| s.table_ids.clone()));
true
})
let prev_count = table_stats.len();
for group in hummock_version.levels.values() {
all_tables_in_version.extend(group.member_table_ids.clone());
}
table_stats.retain(|k, _| all_tables_in_version.contains(k));
prev_count != table_stats.len()
}

0 comments on commit 9d50acb

Please sign in to comment.