diff --git a/src/batch/src/executor/generic_exchange.rs b/src/batch/src/executor/generic_exchange.rs index 6ac7e41b45959..3520d750bbc5e 100644 --- a/src/batch/src/executor/generic_exchange.rs +++ b/src/batch/src/executor/generic_exchange.rs @@ -201,12 +201,15 @@ impl GenericExchangeExec // create the collector let source_id = source.get_task_id(); let counter = metrics.as_ref().map(|metrics| { - metrics.create_collector_for_exchange_recv_row_number(vec![ - identity, - source_id.query_id, - source_id.stage_id.to_string(), - source_id.task_id.to_string(), - ]) + metrics + .executor_metrics() + .exchange_recv_row_number + .with_label_values(&[ + source_id.query_id.as_str(), + format!("{}", source_id.stage_id).as_str(), + format!("{}", source_id.task_id).as_str(), + identity.as_str(), + ]) }); loop { diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 3c46682c139de..cfbc311523fb3 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::ops::{Bound, RangeBounds}; +use std::process::id; use std::sync::Arc; use futures::{pin_mut, StreamExt}; @@ -310,9 +311,12 @@ impl RowSeqScanExecutor { let table = Arc::new(table); // Create collector. - let histogram = metrics - .as_ref() - .map(|metrics| metrics.create_collector_for_row_seq_scan_next_duration(vec![identity])); + let histogram = metrics.as_ref().map(|metrics| { + metrics + .executor_metrics() + .row_seq_scan_next_duration + .with_label_values(&metrics.executor_labels(&identity)) + }); if ordered { // Currently we execute range-scans concurrently so the order is not guaranteed if diff --git a/src/batch/src/monitor/stats.rs b/src/batch/src/monitor/stats.rs index b0d58b32ecec0..8f5df61e28112 100644 --- a/src/batch/src/monitor/stats.rs +++ b/src/batch/src/monitor/stats.rs @@ -25,7 +25,9 @@ use prometheus::{ exponential_buckets, opts, proto, GaugeVec, Histogram, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry, }; -use risingwave_common::metrics::TrAdderGauge; +use risingwave_common::metrics::{ + LabelGuardedGaugeVec, LabelGuardedIntGaugeVec, TrAdderGauge, LabelGuardedIntCounterVec, LabelGuardedHistogramVec, +}; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use crate::task::TaskId; @@ -33,13 +35,13 @@ use crate::task::TaskId; macro_rules! for_all_task_metrics { ($macro:ident) => { $macro! { - { task_first_poll_delay, GenericGaugeVec }, - { task_fast_poll_duration, GenericGaugeVec }, - { task_idle_duration, GenericGaugeVec }, - { task_poll_duration, GenericGaugeVec }, - { task_scheduled_duration, GenericGaugeVec }, - { task_slow_poll_duration, GenericGaugeVec }, - { task_mem_usage, IntGaugeVec }, + { task_first_poll_delay, LabelGuardedGaugeVec<3> }, + { task_fast_poll_duration, LabelGuardedGaugeVec<3> }, + { task_idle_duration, LabelGuardedGaugeVec<3> }, + { task_poll_duration, LabelGuardedGaugeVec<3> }, + { task_scheduled_duration, LabelGuardedGaugeVec<3> }, + { task_slow_poll_duration, LabelGuardedGaugeVec<3> }, + { task_mem_usage, LabelGuardedIntGaugeVec<3> }, } }; } @@ -48,8 +50,6 @@ macro_rules! def_task_metrics { ($( { $metric:ident, $type:ty }, )*) => { #[derive(Clone)] pub struct BatchTaskMetrics { - descs: Vec, - delete_task: Arc>>, $( pub $metric: $type, )* } }; @@ -64,76 +64,62 @@ impl BatchTaskMetrics { /// The created [`BatchTaskMetrics`] is already registered to the `registry`. fn new(registry: &Registry) -> Self { let task_labels = ["query_id", "stage_id", "task_id"]; - let mut descs = Vec::with_capacity(8); - let task_first_poll_delay = GaugeVec::new(opts!( + let task_first_poll_delay = register_guarded_gauge_vec_with_registry!( "batch_task_first_poll_delay", "The total duration (s) elapsed between the instant tasks are instrumented, and the instant they are first polled.", - ), &task_labels[..]).unwrap(); - descs.extend(task_first_poll_delay.desc().into_iter().cloned()); + &task_labels, + registry).unwrap(); - let task_fast_poll_duration = GaugeVec::new( - opts!( + let task_fast_poll_duration = register_guarded_gauge_vec_with_registry!( "batch_task_fast_poll_duration", "The total duration (s) of fast polls.", - ), - &task_labels[..], + &task_labels, + registry ) .unwrap(); - descs.extend(task_fast_poll_duration.desc().into_iter().cloned()); - let task_idle_duration = GaugeVec::new( - opts!( + let task_idle_duration = register_guarded_gauge_vec_with_registry!( "batch_task_idle_duration", "The total duration (s) that tasks idled.", - ), - &task_labels[..], + &task_labels, + registry ) .unwrap(); - descs.extend(task_idle_duration.desc().into_iter().cloned()); - let task_poll_duration = GaugeVec::new( - opts!( + let task_poll_duration = register_guarded_gauge_vec_with_registry!( "batch_task_poll_duration", "The total duration (s) elapsed during polls.", - ), - &task_labels[..], + &task_labels, + registry, ) .unwrap(); - descs.extend(task_poll_duration.desc().into_iter().cloned()); - let task_scheduled_duration = GaugeVec::new( - opts!( + let task_scheduled_duration = register_guarded_gauge_vec_with_registry!( "batch_task_scheduled_duration", "The total duration (s) that tasks spent waiting to be polled after awakening.", - ), - &task_labels[..], + &task_labels, + registry, ) .unwrap(); - descs.extend(task_scheduled_duration.desc().into_iter().cloned()); - let task_slow_poll_duration = GaugeVec::new( - opts!( + let task_slow_poll_duration = register_guarded_gauge_vec_with_registry!( "batch_task_slow_poll_duration", "The total duration (s) of slow polls.", - ), - &task_labels[..], + &task_labels, + registry, ) .unwrap(); - descs.extend(task_slow_poll_duration.desc().into_iter().cloned()); - let task_mem_usage = IntGaugeVec::new( - opts!( + let task_mem_usage = register_guarded_int_gauge_vec_with_registry!( "batch_task_mem_usage", - "Memory usage of batch tasks in bytes." - ), - &task_labels[..], + "Memory usage of batch tasks in bytes.", + &task_labels, + registry, ) .unwrap(); - descs.extend(task_mem_usage.desc().into_iter().cloned()); let metrics = Self { - descs, task_first_poll_delay, task_fast_poll_duration, task_idle_duration, @@ -141,9 +127,7 @@ impl BatchTaskMetrics { task_scheduled_duration, task_slow_poll_duration, task_mem_usage, - delete_task: Arc::new(Mutex::new(Vec::new())), }; - registry.register(Box::new(metrics.clone())).unwrap(); metrics } @@ -151,136 +135,52 @@ impl BatchTaskMetrics { pub fn for_test() -> Self { GLOBAL_BATCH_TASK_METRICS.clone() } - - fn clean_metrics(&self) { - let delete_task: Vec = { - let mut delete_task = self.delete_task.lock(); - if delete_task.is_empty() { - return; - } - std::mem::take(delete_task.as_mut()) - }; - for id in &delete_task { - let stage_id = id.stage_id.to_string(); - let task_id = id.task_id.to_string(); - let labels = vec![id.query_id.as_str(), stage_id.as_str(), task_id.as_str()]; - - macro_rules! remove { - ($({ $metric:ident, $type:ty},)*) => { - $( - if let Err(err) = self.$metric.remove_label_values(&labels) { - warn!("Failed to remove label values: {:?}", err); - } - )* - }; - } - for_all_task_metrics!(remove); - } - } - - pub fn add_delete_task(&self, id: TaskId) { - self.delete_task.lock().push(id); - } } -impl Collector for BatchTaskMetrics { - fn desc(&self) -> Vec<&Desc> { - self.descs.iter().collect() - } - - fn collect(&self) -> Vec { - let mut mfs = Vec::with_capacity(8); - - macro_rules! collect { - ($({ $metric:ident, $type:ty },)*) => { - $( - mfs.extend(self.$metric.collect()); - )* - }; - } - for_all_task_metrics!(collect); - - // TODO: Every time we execute it involving get the lock, here maybe a bottleneck. - self.clean_metrics(); - mfs - } -} - -macro_rules! for_all_executor_metrics { - ($macro:ident) => { - $macro! { - { exchange_recv_row_number, GenericCounterVec, GenericCounter}, - { row_seq_scan_next_duration, HistogramVec , Histogram}, - { mem_usage, IntGaugeVec, IntGauge }, - } - }; -} - -macro_rules! def_executor_metrics { - ($( { $metric:ident, $type:ty, $_t:ty }, )*) => { #[derive(Clone)] - pub struct BatchExecutorMetrics { - descs: Vec, - delete_task: Arc>>, - register_labels: Arc>>>>, - $( pub $metric: $type, )* + pub struct BatchExecutorMetrics { + pub exchange_recv_row_number: LabelGuardedIntCounterVec<4>, + pub row_seq_scan_next_duration: LabelGuardedHistogramVec<4>, + pub mem_usage: LabelGuardedIntGaugeVec<4>, } - }; -} - -for_all_executor_metrics!(def_executor_metrics); pub static GLOBAL_BATCH_EXECUTOR_METRICS: LazyLock = LazyLock::new(|| BatchExecutorMetrics::new(&GLOBAL_METRICS_REGISTRY)); impl BatchExecutorMetrics { fn new(register: &Registry) -> Self { - let executor_labels = vec!["query_id", "stage_id", "task_id", "executor_id"]; - let mut descs = Vec::with_capacity(2); + let executor_labels = ["query_id", "stage_id", "task_id", "executor_id"]; - let mut custom_labels = executor_labels.clone(); - custom_labels.extend_from_slice(&["source_query_id", "source_stage_id", "source_task_id"]); - let exchange_recv_row_number = IntCounterVec::new( - opts!( + let exchange_recv_row_number = register_guarded_int_counter_vec_with_registry!( "batch_exchange_recv_row_number", "Total number of row that have been received from upstream source", - ), - &custom_labels, + &executor_labels, + register, ) .unwrap(); - descs.extend(exchange_recv_row_number.desc().into_iter().cloned()); - let row_seq_scan_next_duration = HistogramVec::new( - HistogramOpts::new( + let row_seq_scan_next_duration = register_guarded_histogram_vec_with_registry!( "batch_row_seq_scan_next_duration", "Time spent deserializing into a row in cell based table.", - ) - .buckets(exponential_buckets(0.0001, 2.0, 20).unwrap()), &executor_labels, + register, ) .unwrap(); - descs.extend(row_seq_scan_next_duration.desc().into_iter().cloned()); - let mem_usage = IntGaugeVec::new( - opts!( + let mem_usage = register_guarded_int_gauge_vec_with_registry!( "batch_executor_mem_usage", - "Batch executor memory usage in bytes." - ), + "Batch executor memory usage in bytes.", &executor_labels, + register, ) .unwrap(); - descs.extend(mem_usage.desc().into_iter().cloned()); let metrics = Self { - descs, - delete_task: Arc::new(Mutex::new(Vec::new())), exchange_recv_row_number, row_seq_scan_next_duration, mem_usage, - register_labels: Arc::new(Mutex::new(HashMap::new())), }; - register.register(Box::new(metrics.clone())).unwrap(); metrics } @@ -289,50 +189,6 @@ impl BatchExecutorMetrics { GLOBAL_BATCH_EXECUTOR_METRICS.clone() } - fn clean_metrics(&self) { - let delete_task: Vec = { - let mut delete_task = self.delete_task.lock(); - if delete_task.is_empty() { - return; - } - std::mem::take(delete_task.as_mut()) - }; - let delete_labels = { - let mut register_labels = self.register_labels.lock(); - let mut delete_labels = Vec::with_capacity(delete_task.len()); - for id in delete_task { - if let Some(callback) = register_labels.remove(&id) { - delete_labels.push(callback); - } - } - delete_labels - }; - delete_labels - .into_iter() - .for_each(|delete_labels| delete_labels.into_iter().for_each(|callback| callback())); - } - - pub fn add_delete_task(&self, task_id: TaskId) { - self.delete_task.lock().push(task_id); - } -} - -impl Collector for BatchExecutorMetrics { - fn desc(&self) -> Vec<&Desc> { - self.descs.iter().collect() - } - - fn collect(&self) -> Vec { - let mut mfs = Vec::with_capacity(2); - - mfs.extend(self.exchange_recv_row_number.collect()); - mfs.extend(self.row_seq_scan_next_duration.collect()); - mfs.extend(self.mem_usage.collect()); - - self.clean_metrics(); - - mfs - } } pub type BatchMetricsWithTaskLabels = Arc; @@ -344,48 +200,46 @@ pub struct BatchMetricsWithTaskLabelsInner { task_metrics: Arc, executor_metrics: Arc, task_id: TaskId, - task_labels: Vec, + task_labels: [String; 3], } -macro_rules! def_create_executor_collector { - ($( { $metric:ident, $type:ty, $collector_type:ty }, )*) => { - paste! { - $( - pub fn [](&self,executor_label: Vec) -> $collector_type { - let mut owned_task_labels = self.task_labels.clone(); - owned_task_labels.extend(executor_label); - let task_labels = owned_task_labels.iter().map(|s| s.as_str()).collect_vec(); - - let collecter = self - .executor_metrics - .$metric - .with_label_values(&task_labels); - - let metrics = self.executor_metrics.$metric.clone(); - - self.executor_metrics - .register_labels - .lock() - .entry(self.task_id.clone()) - .or_default() - .push(Box::new(move || { - if let Err(err) = metrics.remove_label_values( - &owned_task_labels.iter().map(|s| s.as_str()).collect::>(), - ) { - warn!("Failed to remove label values for executor: {:?}", err); - }; - })); - - collecter - } - )* - } - }; -} +// macro_rules! def_create_executor_collector { +// ($( { $metric:ident, $type:ty, $collector_type:ty }, )*) => { +// paste! { +// $( +// pub fn [](&self,executor_label: Vec) -> $collector_type { +// let mut owned_task_labels = self.task_labels.clone(); +// owned_task_labels.extend(executor_label); +// let task_labels = owned_task_labels.iter().map(|s| s.as_str()).collect_vec(); + +// let collecter = self +// .executor_metrics +// .$metric +// .with_label_values(&task_labels); + +// let metrics = self.executor_metrics.$metric.clone(); + +// self.executor_metrics +// .register_labels +// .lock() +// .entry(self.task_id.clone()) +// .or_default() +// .push(Box::new(move || { +// if let Err(err) = metrics.remove_label_values( +// &owned_task_labels.iter().map(|s| s.as_str()).collect::>(), +// ) { +// warn!("Failed to remove label values for executor: {:?}", err); +// }; +// })); + +// collecter +// } +// )* +// } +// }; +// } impl BatchMetricsWithTaskLabelsInner { - for_all_executor_metrics! {def_create_executor_collector} - pub fn new( task_metrics: Arc, executor_metrics: Arc, @@ -395,7 +249,7 @@ impl BatchMetricsWithTaskLabelsInner { task_metrics, executor_metrics, task_id: id.clone(), - task_labels: vec![id.query_id, id.stage_id.to_string(), id.task_id.to_string()], + task_labels: [id.query_id, id.stage_id.to_string(), id.task_id.to_string()], } } @@ -410,12 +264,13 @@ impl BatchMetricsWithTaskLabelsInner { pub fn get_task_metrics(&self) -> &Arc { &self.task_metrics } -} -impl Drop for BatchMetricsWithTaskLabelsInner { - fn drop(&mut self) { - self.task_metrics.add_delete_task(self.task_id()); - self.executor_metrics.add_delete_task(self.task_id()); + pub fn executor_metrics(&self) -> &BatchExecutorMetrics { + &self.executor_metrics + } + + pub fn executor_labels(&self, executor_id: impl AsRef) -> [&str; 4] { + [self.task_labels[0].as_str(), self.task_labels[1].as_str(), self.task_labels[2].as_str(), executor_id.as_ref()] } } diff --git a/src/common/src/metrics/guarded_metrics.rs b/src/common/src/metrics/guarded_metrics.rs index 7cc166aa5998b..04eb136481f30 100644 --- a/src/common/src/metrics/guarded_metrics.rs +++ b/src/common/src/metrics/guarded_metrics.rs @@ -21,8 +21,8 @@ use std::sync::Arc; use itertools::Itertools; use parking_lot::Mutex; use prometheus::core::{ - Atomic, AtomicI64, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, - GenericGaugeVec, MetricVec, MetricVecBuilder, + Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, GenericCounter, GenericCounterVec, + GenericGauge, GenericGaugeVec, MetricVec, MetricVecBuilder, }; use prometheus::{Histogram, HistogramVec}; use tracing::warn; @@ -77,6 +77,21 @@ macro_rules! register_guarded_int_gauge_vec_with_registry { }}; } +#[macro_export] +macro_rules! register_guarded_gauge_vec_with_registry { + ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{ + let result = prometheus::register_gauge_vec_with_registry!( + prometheus::opts!($NAME, $HELP), + $LABELS_NAMES, + $REGISTRY + ); + result.map(|inner| { + let inner = $crate::metrics::__extract_gauge_builder(inner); + $crate::metrics::LabelGuardedGaugeVec::new(inner, { $LABELS_NAMES }) + }) + }}; +} + #[macro_export] macro_rules! register_guarded_int_counter_vec_with_registry { ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{ @@ -101,6 +116,8 @@ pub type LabelGuardedIntCounterVec = LabelGuardedMetricVec, N>; pub type LabelGuardedIntGaugeVec = LabelGuardedMetricVec, N>; +pub type LabelGuardedGaugeVec = + LabelGuardedMetricVec, N>; pub type LabelGuardedHistogram = LabelGuardedMetric; pub type LabelGuardedIntCounter =