Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): optimize table stats report performance #15401

Merged
merged 5 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ impl HummockManager {
vec![],
&mut compaction_guard,
None,
&mut HashMap::default(),
)
.await
.unwrap_or(false)
Expand Down
50 changes: 34 additions & 16 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,7 @@ use crate::hummock::compaction::selector::{
};
use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig};
use crate::hummock::error::{Error, Result};
use crate::hummock::metrics_utils::{
build_compact_task_level_type_metrics_label, trigger_delta_log_stats, trigger_lsm_stat,
trigger_mv_stat, trigger_pin_unpin_snapshot_state, trigger_pin_unpin_version_state,
trigger_split_stat, trigger_sst_stat, trigger_version_stat, trigger_write_stop_stats,
};
use crate::hummock::metrics_utils::{build_compact_task_level_type_metrics_label, get_local_table_stats, LocalTableMetrics, trigger_delta_log_stats, trigger_lsm_stat, trigger_mv_stat, trigger_pin_unpin_snapshot_state, trigger_pin_unpin_version_state, trigger_split_stat, trigger_sst_stat, trigger_table_stat, trigger_version_stat, trigger_write_stop_stats};
use crate::hummock::sequence::next_compaction_task_id;
use crate::hummock::{CompactorManagerRef, TASK_NORMAL};
#[cfg(any(test, feature = "test"))]
Expand Down Expand Up @@ -189,9 +185,7 @@ macro_rules! read_lock {
}
pub(crate) use read_lock;
use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId};
use risingwave_hummock_sdk::table_stats::{
add_prost_table_stats_map, purge_prost_table_stats, PbTableStatsMap,
};
use risingwave_hummock_sdk::table_stats::{add_prost_table_stats_map, purge_prost_table_stats, PbTableStatsMap};
use risingwave_object_store::object::{build_remote_object_store, ObjectError, ObjectStoreRef};
use risingwave_pb::catalog::Table;
use risingwave_pb::hummock::level_handler::RunningCompactTask;
Expand Down Expand Up @@ -856,6 +850,7 @@ impl HummockManager {
&self,
compaction_group_id: CompactionGroupId,
selector: &mut Box<dyn CompactionSelector>,
local_metrics: &mut HashMap<u32, LocalTableMetrics>,
) -> Result<Option<CompactTask>> {
// TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the
// lock in compaction_guard, take out all table_options in advance there may be a
Expand Down Expand Up @@ -1017,6 +1012,7 @@ impl HummockManager {
vec![],
&mut compaction_guard,
None,
local_metrics,
)
.await?;
tracing::debug!(
Expand All @@ -1040,6 +1036,7 @@ impl HummockManager {
compact_task.input_ssts[0].table_infos.clone(),
&mut compaction_guard,
None,
local_metrics,
)
.await?;

Expand Down Expand Up @@ -1181,6 +1178,7 @@ impl HummockManager {
vec![],
&mut compaction_guard,
None,
&mut HashMap::default(),
)
.await?;
#[cfg(test)]
Expand Down Expand Up @@ -1222,13 +1220,14 @@ impl HummockManager {
&self,
compaction_group_id: CompactionGroupId,
selector: &mut Box<dyn CompactionSelector>,
local_metrics: &mut HashMap<u32, LocalTableMetrics>,
) -> Result<Option<CompactTask>> {
fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
anyhow::anyhow!("failpoint metastore error")
)));

while let Some(task) = self
.get_compact_task_impl(compaction_group_id, selector)
.get_compact_task_impl(compaction_group_id, selector, local_metrics)
.await?
{
if let TaskStatus::Pending = task.task_status() {
Expand All @@ -1250,7 +1249,7 @@ impl HummockManager {
) -> Result<Option<CompactTask>> {
let mut selector: Box<dyn CompactionSelector> =
Box::new(ManualCompactionSelector::new(manual_compaction_option));
self.get_compact_task(compaction_group_id, &mut selector)
self.get_compact_task(compaction_group_id, &mut selector, &mut HashMap::default())
.await
}

Expand Down Expand Up @@ -1280,6 +1279,7 @@ impl HummockManager {
task_status: TaskStatus,
sorted_output_ssts: Vec<SstableInfo>,
table_stats_change: Option<PbTableStatsMap>,
local_metrics: &mut HashMap<u32, LocalTableMetrics>,
) -> Result<bool> {
let mut guard = write_lock!(self, compaction).await;
self.report_compact_task_impl(
Expand All @@ -1289,6 +1289,7 @@ impl HummockManager {
sorted_output_ssts,
&mut guard,
table_stats_change,
local_metrics,
)
.await
}
Expand All @@ -1309,6 +1310,7 @@ impl HummockManager {
sorted_output_ssts: Vec<SstableInfo>,
compaction_guard: &mut RwLockWriteGuard<'_, Compaction>,
table_stats_change: Option<PbTableStatsMap>,
local_metrics: &mut HashMap<u32, LocalTableMetrics>,
) -> Result<bool> {
let deterministic_mode = self.env.opts.compaction_deterministic_test;
let compaction = compaction_guard.deref_mut();
Expand Down Expand Up @@ -1430,13 +1432,18 @@ impl HummockManager {
VarTransactionWrapper,
VarTransaction::new(&mut versioning.version_stats,)
);
if let Some(table_stats_change) = &table_stats_change {
add_prost_table_stats_map(&mut version_stats.table_stats, table_stats_change);
}

// apply version delta before we persist this change. If it causes panic we can
// recover to a correct state after restarting meta-node.
current_version.apply_version_delta(&version_delta);
if purge_prost_table_stats(&mut version_stats.table_stats, &current_version) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, This metric is just an approximation, we don't need to update it every time we report the compact task, we can reduce the overhead by reporting it less frequently, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR only report the table which has update

self.metrics.version_stats.reset();
local_metrics.clear();
}
if let Some(table_stats_change) = &table_stats_change {
add_prost_table_stats_map(&mut version_stats.table_stats, table_stats_change);
trigger_table_stat(&self.metrics, local_metrics, &version_stats, table_stats_change);
}
commit_multi_var!(
self.env.meta_store(),
self.sql_meta_store(),
Expand All @@ -1447,7 +1454,11 @@ impl HummockManager {
)?;
branched_ssts.commit_memory();

trigger_version_stat(&self.metrics, &current_version, &versioning.version_stats);
self.metrics
.version_size
.set(current_version.estimated_encode_len() as i64);
self.metrics.safe_epoch.set(current_version.safe_epoch as i64);
self.metrics.current_version_id.set(current_version.id as i64);
trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len());
self.notify_stats(&versioning.version_stats);
versioning.current_version = current_version;
Expand Down Expand Up @@ -1708,7 +1719,6 @@ impl HummockManager {
VarTransaction::new(&mut versioning.version_stats)
);
add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
purge_prost_table_stats(&mut version_stats.table_stats, &new_hummock_version);
for (table_id, stats) in &table_stats_change {
let table_id_str = table_id.to_string();
let stats_value =
Expand Down Expand Up @@ -2143,6 +2153,7 @@ impl HummockManager {
sorted_output_ssts,
&mut guard,
table_stats_change,
&mut HashMap::default(),
)
.await
}
Expand Down Expand Up @@ -3090,6 +3101,11 @@ impl HummockManager {
Ok(())
}

async fn local_metrics(&self) -> HashMap<u32, LocalTableMetrics> {
let versioning = self.versioning.read(&["local_metrics"]).await;
get_local_table_stats(self.metrics.as_ref(), &versioning.version_stats)
}

/// dedicated event runtime for CPU/IO bound event
#[named]
async fn compact_task_dedicated_event_handler(
Expand All @@ -3098,6 +3114,7 @@ impl HummockManager {
shutdown_rx_shared: Shared<OneShotReceiver<()>>,
) {
let mut compaction_selectors = init_selectors();
let mut local_metrics = hummock_manager.local_metrics().await;

tokio::select! {
_ = shutdown_rx_shared => {}
Expand Down Expand Up @@ -3146,7 +3163,7 @@ impl HummockManager {
};
for _ in 0..pull_task_count {
let compact_task =
hummock_manager.get_compact_task(group, selector).await;
hummock_manager.get_compact_task(group, selector, &mut local_metrics).await;

match compact_task {
Ok(Some(compact_task)) => {
Expand Down Expand Up @@ -3206,6 +3223,7 @@ impl HummockManager {
TaskStatus::try_from(task_status).unwrap(),
sorted_output_ssts,
Some(table_stats_change),
&mut local_metrics,
)
.await
{
Expand Down
25 changes: 21 additions & 4 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ async fn test_hummock_compaction_task() {
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_compaction_selector(),
&mut HashMap::default(),
)
.await
.unwrap()
Expand Down Expand Up @@ -232,6 +233,7 @@ async fn test_hummock_compaction_task() {
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_compaction_selector(),
&mut HashMap::default(),
)
.await
.unwrap()
Expand All @@ -257,6 +259,7 @@ async fn test_hummock_compaction_task() {
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_compaction_selector(),
&mut HashMap::default(),
)
.await
.unwrap()
Expand Down Expand Up @@ -756,6 +759,7 @@ async fn test_print_compact_task() {
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_compaction_selector(),
&mut HashMap::default(),
)
.await
.unwrap()
Expand Down Expand Up @@ -906,6 +910,7 @@ async fn test_hummock_compaction_task_heartbeat() {
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_compaction_selector(),
&mut HashMap::default(),
)
.await
.unwrap()
Expand Down Expand Up @@ -937,6 +942,7 @@ async fn test_hummock_compaction_task_heartbeat() {
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_compaction_selector(),
&mut HashMap::default(),
)
.await
.unwrap()
Expand Down Expand Up @@ -979,6 +985,7 @@ async fn test_hummock_compaction_task_heartbeat() {
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_compaction_selector(),
&mut HashMap::default(),
)
.await
.unwrap()
Expand Down Expand Up @@ -1026,6 +1033,7 @@ async fn test_hummock_compaction_task_heartbeat_removal_on_node_removal() {
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_compaction_selector(),
&mut HashMap::default(),
)
.await
.unwrap()
Expand Down Expand Up @@ -1057,6 +1065,7 @@ async fn test_hummock_compaction_task_heartbeat_removal_on_node_removal() {
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_compaction_selector(),
&mut HashMap::default(),
)
.await
.unwrap()
Expand Down Expand Up @@ -1223,6 +1232,7 @@ async fn test_version_stats() {
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_compaction_selector(),
&mut HashMap::default(),
)
.await
.unwrap()
Expand Down Expand Up @@ -1671,7 +1681,8 @@ async fn test_split_compaction_group_trivial_expired() {
.await
.unwrap();
let task = hummock_manager
.get_compact_task(2, &mut default_compaction_selector())
.get_compact_task(2, &mut default_compaction_selector(),
&mut HashMap::default())
.await
.unwrap()
.unwrap();
Expand All @@ -1683,7 +1694,9 @@ async fn test_split_compaction_group_trivial_expired() {
let mut selector: Box<dyn CompactionSelector> =
Box::<SpaceReclaimCompactionSelector>::default();
let reclaim_task = hummock_manager
.get_compact_task_impl(2, &mut selector)
.get_compact_task_impl(2, &mut selector,
&mut HashMap::default(),
)
.await
.unwrap()
.unwrap();
Expand All @@ -1707,7 +1720,9 @@ async fn test_split_compaction_group_trivial_expired() {
);

let task2 = hummock_manager
.get_compact_task(new_group_id, &mut default_compaction_selector())
.get_compact_task(new_group_id, &mut default_compaction_selector(),
&mut HashMap::default(),
)
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -2084,7 +2099,9 @@ async fn test_move_tables_between_compaction_group() {
Box::<SpaceReclaimCompactionSelector>::default();

let compaction_task = hummock_manager
.get_compact_task(2, &mut selector)
.get_compact_task(2, &mut selector,
&mut HashMap::default(),
)
.await
.unwrap()
.unwrap();
Expand Down
Loading
Loading