diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index f0cacf863fcc9..1094558c93305 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -655,7 +655,7 @@ impl SstableStore { _ => (), } stats.cache_meta_block_total += 1; - lookup_response.verbose_instrument_await("sstable") + lookup_response } pub async fn list_object_metadata_from_object_store( diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 1be6893144a27..80e0e94130d10 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -18,8 +18,8 @@ use std::collections::HashSet; use std::iter::once; use std::ops::Bound::Included; use std::sync::Arc; +use std::time::Instant; -use await_tree::InstrumentAwait; use bytes::Bytes; use itertools::Itertools; use parking_lot::RwLock; @@ -38,7 +38,6 @@ use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::{LevelType, SstableInfo}; use sync_point::sync_point; -use tracing::Instrument; use super::StagingDataIterator; use crate::error::StorageResult; @@ -777,8 +776,6 @@ impl HummockVersionReader { read_version_tuple: ReadVersionTuple, mem_table: Option>, ) -> StorageResult>> { - let table_id_string = read_options.table_id.to_string(); - let table_id_label = table_id_string.as_str(); let (imms, uncommitted_ssts, committed) = read_version_tuple; let mut local_stats = StoreLocalStatistic::default(); @@ -814,7 +811,6 @@ impl HummockVersionReader { let table_holder = self .sstable_store .sstable(sstable_info, &mut local_stats) - .instrument(tracing::trace_span!("get_sstable")) .await?; if !table_holder.meta.monotonic_tombstone_events.is_empty() @@ -846,11 +842,7 @@ impl HummockVersionReader { let mut non_overlapping_iters = Vec::new(); let mut overlapping_iters = Vec::new(); - let timer = self - .state_store_metrics - .iter_fetch_meta_duration - .with_label_values(&[table_id_label]) - .start_timer(); + let timer = Instant::now(); for level in committed.levels(read_options.table_id) { if level.table_infos.is_empty() { @@ -893,7 +885,6 @@ impl HummockVersionReader { let sstable = self .sstable_store .sstable(&sstables[0], &mut local_stats) - .instrument(tracing::trace_span!("get_sstable")) .await?; if !sstable.meta.monotonic_tombstone_events.is_empty() && !read_options.ignore_range_tombstone @@ -938,7 +929,6 @@ impl HummockVersionReader { let sstable = self .sstable_store .sstable(sstable_info, &mut local_stats) - .instrument(tracing::trace_span!("get_sstable")) .await?; assert_eq!(sstable_info.get_object_id(), sstable.id); if !sstable.meta.monotonic_tombstone_events.is_empty() @@ -966,8 +956,9 @@ impl HummockVersionReader { } } } - let fetch_meta_duration_sec = timer.stop_and_record(); + let fetch_meta_duration_sec = timer.elapsed().as_secs_f64(); if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND { + let table_id_string = read_options.table_id.to_string(); tracing::warn!("Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.", table_id_string, epoch, fetch_meta_duration_sec, local_stats.cache_meta_block_miss); self.state_store_metrics @@ -1006,10 +997,7 @@ impl HummockVersionReader { Some(committed), delete_range_iter, ); - user_iter - .rewind() - .verbose_instrument_await("rewind") - .await?; + user_iter.rewind().await?; local_stats.found_key = user_iter.is_valid(); local_stats.sub_iter_count = local_stats.staging_imm_iter_count + local_stats.staging_sst_iter_count diff --git a/src/storage/src/monitor/monitored_storage_metrics.rs b/src/storage/src/monitor/monitored_storage_metrics.rs index 009cd7308df9e..15f58cb57feb3 100644 --- a/src/storage/src/monitor/monitored_storage_metrics.rs +++ b/src/storage/src/monitor/monitored_storage_metrics.rs @@ -12,15 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::OnceLock; +use std::cell::RefCell; +use std::collections::HashMap; +use std::sync::{Arc, OnceLock}; +use std::time::{Duration, Instant}; +use prometheus::core::{AtomicU64, GenericLocalCounter}; +use prometheus::local::LocalHistogram; use prometheus::{ exponential_buckets, histogram_opts, linear_buckets, register_histogram_vec_with_registry, register_histogram_with_registry, register_int_counter_vec_with_registry, Histogram, Registry, }; use risingwave_common::config::MetricLevel; use risingwave_common::metrics::{ - RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledHistogramVec, + LabelGuardedMetric, RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledHistogramVec, }; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use risingwave_common::register_guarded_histogram_vec_with_registry; @@ -218,4 +223,165 @@ impl MonitoredStorageMetrics { pub fn unused() -> Self { global_storage_metrics(MetricLevel::Disabled) } + + pub fn local_metrics(&self, table_label: &str) -> LocalStorageMetrics { + let iter_init_duration = self + .iter_init_duration + .with_label_values(&[table_label]) + .local(); + let iter_in_process_counts = self + .iter_in_process_counts + .with_label_values(&[table_label]) + .local(); + let iter_scan_duration = self + .iter_scan_duration + .with_label_values(&[table_label]) + .local(); + let iter_item = self.iter_item.with_label_values(&[table_label]).local(); + let iter_size = self.iter_size.with_label_values(&[table_label]).local(); + + let get_duration = self.get_duration.with_label_values(&[table_label]).local(); + let get_key_size = self.get_key_size.with_label_values(&[table_label]).local(); + let get_value_size = self + .get_value_size + .with_label_values(&[table_label]) + .local(); + + LocalStorageMetrics { + iter_init_duration, + iter_scan_duration, + iter_in_process_counts, + iter_item, + iter_size, + get_duration, + get_key_size, + get_value_size, + report_count: 0, + } + } +} + +pub struct LocalStorageMetrics { + iter_init_duration: LabelGuardedMetric, + iter_scan_duration: LabelGuardedMetric, + iter_in_process_counts: GenericLocalCounter, + iter_item: LocalHistogram, + iter_size: LocalHistogram, + + get_duration: LabelGuardedMetric, + get_key_size: LocalHistogram, + get_value_size: LocalHistogram, + report_count: usize, +} + +impl LocalStorageMetrics { + pub fn may_flush(&mut self) { + self.report_count += 1; + if self.report_count > MAX_FLUSH_TIMES { + self.iter_scan_duration.flush(); + self.iter_init_duration.flush(); + self.iter_in_process_counts.flush(); + self.iter_item.flush(); + self.iter_size.flush(); + self.get_duration.flush(); + self.get_key_size.flush(); + self.get_value_size.flush(); + self.report_count = 0; + } + } +} + +pub struct MonitoredStateStoreIterStats { + pub iter_init_duration: Duration, + pub iter_scan_time: Instant, + pub total_items: usize, + pub total_size: usize, + pub table_id: u32, + pub metrics: Arc, +} + +thread_local!(static LOCAL_METRICS: RefCell> = RefCell::new(HashMap::default())); +const MAX_FLUSH_TIMES: usize = 64; + +impl MonitoredStateStoreIterStats { + pub fn new( + table_id: u32, + iter_init_duration: Duration, + metrics: Arc, + ) -> Self { + Self { + iter_init_duration, + iter_scan_time: Instant::now(), + total_items: 0, + total_size: 0, + table_id, + metrics, + } + } + + pub fn report(&self) { + LOCAL_METRICS.with_borrow_mut(|local_metrics| { + let table_metrics = local_metrics.entry(self.table_id).or_insert_with(|| { + let table_label = self.table_id.to_string(); + self.metrics.local_metrics(&table_label) + }); + let iter_scan_duration = self.iter_scan_time.elapsed(); + table_metrics + .iter_scan_duration + .observe(iter_scan_duration.as_secs_f64()); + table_metrics + .iter_init_duration + .observe(self.iter_init_duration.as_secs_f64()); + table_metrics.iter_in_process_counts.inc(); + table_metrics.iter_item.observe(self.total_items as f64); + table_metrics.iter_size.observe(self.total_size as f64); + table_metrics.may_flush(); + }); + } +} + +impl Drop for MonitoredStateStoreIterStats { + fn drop(&mut self) { + self.report(); + } +} + +pub struct MonitoredStateStoreGetStats { + pub get_duration: Instant, + pub get_key_size: usize, + pub get_value_size: usize, + pub table_id: u32, + pub metrics: Arc, +} + +impl MonitoredStateStoreGetStats { + pub fn new(table_id: u32, metrics: Arc) -> Self { + Self { + get_duration: Instant::now(), + get_key_size: 0, + get_value_size: 0, + table_id, + metrics, + } + } + + pub fn report(&self) { + LOCAL_METRICS.with_borrow_mut(|local_metrics| { + let table_metrics = local_metrics.entry(self.table_id).or_insert_with(|| { + let table_label = self.table_id.to_string(); + self.metrics.local_metrics(&table_label) + }); + let get_duration = self.get_duration.elapsed(); + table_metrics + .get_duration + .observe(get_duration.as_secs_f64()); + table_metrics.get_key_size.observe(self.get_key_size as _); + if self.get_value_size > 0 { + table_metrics + .get_value_size + .observe(self.get_value_size as _); + } + table_metrics.may_flush(); + }); + } } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index e1110df630595..239f2ce9df7a0 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -28,7 +28,7 @@ use tracing::error; #[cfg(all(not(madsim), feature = "hm-trace"))] use super::traced_store::TracedStateStore; -use super::MonitoredStorageMetrics; +use super::{MonitoredStateStoreGetStats, MonitoredStateStoreIterStats, MonitoredStorageMetrics}; use crate::error::{StorageError, StorageResult}; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::{HummockStorage, SstableObjectIdManagerRef}; @@ -93,35 +93,21 @@ impl MonitoredStateStore { iter_stream_future: impl Future> + 'a, ) -> StorageResult> { // start time takes iterator build time into account - let start_time = Instant::now(); - let table_id_label = table_id.to_string(); - // wait for iterator creation (e.g. seek) + let start_time = Instant::now(); let iter_stream = iter_stream_future - .verbose_instrument_await("store_create_iter") .await .inspect_err(|e| error!(error = %e.as_report(), "Failed in iter"))?; - - self.storage_metrics - .iter_init_duration - .with_label_values(&[table_id_label.as_str()]) - .observe(start_time.elapsed().as_secs_f64()); - // statistics of iter in process count to estimate the read ops in the same time - self.storage_metrics - .iter_in_process_counts - .with_label_values(&[table_id_label.as_str()]) - .inc(); + let iter_init_duration = start_time.elapsed(); // create a monitored iterator to collect metrics let monitored = MonitoredStateStoreIter { inner: iter_stream, - stats: MonitoredStateStoreIterStats { - total_items: 0, - total_size: 0, - scan_time: Instant::now(), - storage_metrics: self.storage_metrics.clone(), - table_id, - }, + stats: MonitoredStateStoreIterStats::new( + table_id.table_id, + iter_init_duration, + self.storage_metrics.clone(), + ), }; Ok(monitored.into_stream()) } @@ -143,12 +129,8 @@ impl MonitoredStateStore { ) -> StorageResult> { use tracing::Instrument; - let table_id_label = table_id.to_string(); - let timer = self - .storage_metrics - .get_duration - .with_label_values(&[table_id_label.as_str()]) - .start_timer(); + let mut stats = + MonitoredStateStoreGetStats::new(table_id.table_id, self.storage_metrics.clone()); let value = get_future .verbose_instrument_await("store_get") @@ -156,18 +138,11 @@ impl MonitoredStateStore { .await .inspect_err(|e| error!(error = %e.as_report(), "Failed in get"))?; - timer.observe_duration(); - - self.storage_metrics - .get_key_size - .with_label_values(&[table_id_label.as_str()]) - .observe(key_len as _); + stats.get_key_size = key_len; if let Some(value) = value.as_ref() { - self.storage_metrics - .get_value_size - .with_label_values(&[table_id_label.as_str()]) - .observe(value.len() as _); + stats.get_value_size = value.len(); } + stats.report(); Ok(value) } @@ -373,15 +348,6 @@ pub struct MonitoredStateStoreIter { stats: MonitoredStateStoreIterStats, } -struct MonitoredStateStoreIterStats { - total_items: usize, - total_size: usize, - scan_time: Instant, - storage_metrics: Arc, - - table_id: TableId, -} - impl MonitoredStateStoreIter { #[try_stream(ok = StateStoreIterItem, error = StorageError)] async fn into_stream_inner(self) { @@ -402,27 +368,6 @@ impl MonitoredStateStoreIter { } fn into_stream(self) -> MonitoredStateStoreIterStream { - use risingwave_common::util::tracing::InstrumentStream; - - Self::into_stream_inner(self).instrument(tracing::trace_span!("store_iter")) - } -} - -impl Drop for MonitoredStateStoreIterStats { - fn drop(&mut self) { - let table_id_label = self.table_id.to_string(); - - self.storage_metrics - .iter_scan_duration - .with_label_values(&[table_id_label.as_str()]) - .observe(self.scan_time.elapsed().as_secs_f64()); - self.storage_metrics - .iter_item - .with_label_values(&[table_id_label.as_str()]) - .observe(self.total_items as f64); - self.storage_metrics - .iter_size - .with_label_values(&[table_id_label.as_str()]) - .observe(self.total_size as f64); + Self::into_stream_inner(self) } }