Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): optimize table stats report performance #15401

Merged
merged 5 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ impl CompactorManager {

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::time::Duration;

use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
Expand Down Expand Up @@ -499,6 +500,7 @@ mod tests {
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_compaction_selector(),
&mut HashMap::default(),
)
.await
.unwrap()
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ impl HummockManager {
vec![],
&mut compaction_guard,
None,
&mut HashMap::default(),
)
.await
.unwrap_or(false)
Expand Down
64 changes: 39 additions & 25 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ use crate::hummock::compaction::selector::{
use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig};
use crate::hummock::error::{Error, Result};
use crate::hummock::metrics_utils::{
build_compact_task_level_type_metrics_label, trigger_delta_log_stats, trigger_lsm_stat,
trigger_mv_stat, trigger_pin_unpin_snapshot_state, trigger_pin_unpin_version_state,
trigger_split_stat, trigger_sst_stat, trigger_version_stat, trigger_write_stop_stats,
build_compact_task_level_type_metrics_label, get_local_table_stats, trigger_delta_log_stats,
trigger_local_table_stat, trigger_lsm_stat, trigger_mv_stat, trigger_pin_unpin_snapshot_state,
trigger_pin_unpin_version_state, trigger_split_stat, trigger_sst_stat, trigger_table_stat,
trigger_version_stat, trigger_write_stop_stats, LocalTableMetrics,
};
use crate::hummock::sequence::next_compaction_task_id;
use crate::hummock::{CompactorManagerRef, TASK_NORMAL};
Expand Down Expand Up @@ -856,6 +857,7 @@ impl HummockManager {
&self,
compaction_group_id: CompactionGroupId,
selector: &mut Box<dyn CompactionSelector>,
local_metrics: &mut HashMap<u32, LocalTableMetrics>,
) -> Result<Option<CompactTask>> {
// TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the
// lock in compaction_guard, take out all table_options in advance there may be a
Expand Down Expand Up @@ -1017,6 +1019,7 @@ impl HummockManager {
vec![],
&mut compaction_guard,
None,
local_metrics,
)
.await?;
tracing::debug!(
Expand All @@ -1040,6 +1043,7 @@ impl HummockManager {
compact_task.input_ssts[0].table_infos.clone(),
&mut compaction_guard,
None,
local_metrics,
)
.await?;

Expand Down Expand Up @@ -1181,6 +1185,7 @@ impl HummockManager {
vec![],
&mut compaction_guard,
None,
&mut HashMap::default(),
)
.await?;
#[cfg(test)]
Expand Down Expand Up @@ -1222,13 +1227,14 @@ impl HummockManager {
&self,
compaction_group_id: CompactionGroupId,
selector: &mut Box<dyn CompactionSelector>,
local_metrics: &mut HashMap<u32, LocalTableMetrics>,
) -> Result<Option<CompactTask>> {
fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
anyhow::anyhow!("failpoint metastore error")
)));

while let Some(task) = self
.get_compact_task_impl(compaction_group_id, selector)
.get_compact_task_impl(compaction_group_id, selector, local_metrics)
.await?
{
if let TaskStatus::Pending = task.task_status() {
Expand All @@ -1250,7 +1256,7 @@ impl HummockManager {
) -> Result<Option<CompactTask>> {
let mut selector: Box<dyn CompactionSelector> =
Box::new(ManualCompactionSelector::new(manual_compaction_option));
self.get_compact_task(compaction_group_id, &mut selector)
self.get_compact_task(compaction_group_id, &mut selector, &mut HashMap::default())
.await
}

Expand Down Expand Up @@ -1280,6 +1286,7 @@ impl HummockManager {
task_status: TaskStatus,
sorted_output_ssts: Vec<SstableInfo>,
table_stats_change: Option<PbTableStatsMap>,
local_metrics: &mut HashMap<u32, LocalTableMetrics>,
) -> Result<bool> {
let mut guard = write_lock!(self, compaction).await;
self.report_compact_task_impl(
Expand All @@ -1289,6 +1296,7 @@ impl HummockManager {
sorted_output_ssts,
&mut guard,
table_stats_change,
local_metrics,
)
.await
}
Expand All @@ -1309,6 +1317,7 @@ impl HummockManager {
sorted_output_ssts: Vec<SstableInfo>,
compaction_guard: &mut RwLockWriteGuard<'_, Compaction>,
table_stats_change: Option<PbTableStatsMap>,
local_metrics: &mut HashMap<u32, LocalTableMetrics>,
) -> Result<bool> {
let deterministic_mode = self.env.opts.compaction_deterministic_test;
let compaction = compaction_guard.deref_mut();
Expand Down Expand Up @@ -1430,13 +1439,23 @@ impl HummockManager {
VarTransactionWrapper,
VarTransaction::new(&mut versioning.version_stats,)
);
if let Some(table_stats_change) = &table_stats_change {
add_prost_table_stats_map(&mut version_stats.table_stats, table_stats_change);
}

// apply version delta before we persist this change. If it causes panic we can
// recover to a correct state after restarting meta-node.
current_version.apply_version_delta(&version_delta);
if purge_prost_table_stats(&mut version_stats.table_stats, &current_version) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, This metric is just an approximation, we don't need to update it every time we report the compact task, we can reduce the overhead by reporting it less frequently, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR only report the table which has update

self.metrics.version_stats.reset();
local_metrics.clear();
}
if let Some(table_stats_change) = &table_stats_change {
add_prost_table_stats_map(&mut version_stats.table_stats, table_stats_change);
trigger_local_table_stat(
&self.metrics,
local_metrics,
&version_stats,
table_stats_change,
);
}
commit_multi_var!(
self.env.meta_store(),
self.sql_meta_store(),
Expand All @@ -1447,7 +1466,7 @@ impl HummockManager {
)?;
branched_ssts.commit_memory();

trigger_version_stat(&self.metrics, &current_version, &versioning.version_stats);
trigger_version_stat(&self.metrics, &current_version);
trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len());
self.notify_stats(&versioning.version_stats);
versioning.current_version = current_version;
Expand Down Expand Up @@ -1708,16 +1727,7 @@ impl HummockManager {
VarTransaction::new(&mut versioning.version_stats)
);
add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
purge_prost_table_stats(&mut version_stats.table_stats, &new_hummock_version);
for (table_id, stats) in &table_stats_change {
let table_id_str = table_id.to_string();
let stats_value =
std::cmp::max(0, stats.total_key_size + stats.total_value_size) / 1024 / 1024;
self.metrics
.table_write_throughput
.with_label_values(&[table_id_str.as_str()])
.inc_by(stats_value as u64);
}
trigger_table_stat(&self.metrics, &version_stats, &table_stats_change);
commit_multi_var!(
self.env.meta_store(),
self.sql_meta_store(),
Expand All @@ -1735,11 +1745,7 @@ impl HummockManager {
assert!(prev_snapshot.committed_epoch < epoch);
assert!(prev_snapshot.current_epoch < epoch);

trigger_version_stat(
&self.metrics,
&versioning.current_version,
&versioning.version_stats,
);
trigger_version_stat(&self.metrics, &versioning.current_version);
for compaction_group_id in &modified_compaction_groups {
trigger_sst_stat(
&self.metrics,
Expand Down Expand Up @@ -2143,6 +2149,7 @@ impl HummockManager {
sorted_output_ssts,
&mut guard,
table_stats_change,
&mut HashMap::default(),
)
.await
}
Expand Down Expand Up @@ -3090,6 +3097,11 @@ impl HummockManager {
Ok(())
}

async fn local_metrics(&self) -> HashMap<u32, LocalTableMetrics> {
let versioning = self.versioning.read(&["local_metrics"]).await;
get_local_table_stats(self.metrics.as_ref(), &versioning.version_stats)
}

/// dedicated event runtime for CPU/IO bound event
#[named]
async fn compact_task_dedicated_event_handler(
Expand All @@ -3098,6 +3110,7 @@ impl HummockManager {
shutdown_rx_shared: Shared<OneShotReceiver<()>>,
) {
let mut compaction_selectors = init_selectors();
let mut local_metrics = hummock_manager.local_metrics().await;

tokio::select! {
_ = shutdown_rx_shared => {}
Expand Down Expand Up @@ -3146,7 +3159,7 @@ impl HummockManager {
};
for _ in 0..pull_task_count {
let compact_task =
hummock_manager.get_compact_task(group, selector).await;
hummock_manager.get_compact_task(group, selector, &mut local_metrics).await;

match compact_task {
Ok(Some(compact_task)) => {
Expand Down Expand Up @@ -3206,6 +3219,7 @@ impl HummockManager {
TaskStatus::try_from(task_status).unwrap(),
sorted_output_ssts,
Some(table_stats_change),
&mut local_metrics,
)
.await
{
Expand Down
Loading
Loading