diff --git a/src/batch/src/executor/generic_exchange.rs b/src/batch/src/executor/generic_exchange.rs index 6ac7e41b45959..3520d750bbc5e 100644 --- a/src/batch/src/executor/generic_exchange.rs +++ b/src/batch/src/executor/generic_exchange.rs @@ -201,12 +201,15 @@ impl GenericExchangeExec // create the collector let source_id = source.get_task_id(); let counter = metrics.as_ref().map(|metrics| { - metrics.create_collector_for_exchange_recv_row_number(vec![ - identity, - source_id.query_id, - source_id.stage_id.to_string(), - source_id.task_id.to_string(), - ]) + metrics + .executor_metrics() + .exchange_recv_row_number + .with_label_values(&[ + source_id.query_id.as_str(), + format!("{}", source_id.stage_id).as_str(), + format!("{}", source_id.task_id).as_str(), + identity.as_str(), + ]) }); loop { diff --git a/src/batch/src/executor/group_top_n.rs b/src/batch/src/executor/group_top_n.rs index 1b76bb6e6e997..3fb335398d89b 100644 --- a/src/batch/src/executor/group_top_n.rs +++ b/src/batch/src/executor/group_top_n.rs @@ -240,9 +240,9 @@ impl GroupTopNExecutor { #[cfg(test)] mod tests { use futures::stream::StreamExt; - use prometheus::IntGauge; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; + use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_common::test_prelude::DataChunkTestExt; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; @@ -254,7 +254,7 @@ mod tests { #[tokio::test] async fn test_group_top_n_executor() { - let parent_mem = MemoryContext::root(IntGauge::new("root_memory_usage", " ").unwrap()); + let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge()); { let schema = Schema { fields: vec![ @@ -290,7 +290,7 @@ mod tests { ]; let mem_ctx = MemoryContext::new( Some(parent_mem.clone()), - IntGauge::new("memory_usage", " ").unwrap(), + LabelGuardedIntGauge::<4>::test_int_gauge(), ); let top_n_executor = (GroupTopNExecutorBuilder { child: Box::new(mock_executor), diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 03ce86d475620..7cc408485444d 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -308,8 +308,8 @@ mod tests { use std::sync::Arc; use futures_async_stream::for_await; - use prometheus::IntGauge; use risingwave_common::catalog::{Field, Schema}; + use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_common::test_prelude::DataChunkTestExt; use risingwave_pb::data::data_type::TypeName; use risingwave_pb::data::PbDataType; @@ -323,7 +323,7 @@ mod tests { #[tokio::test] async fn execute_int32_grouped() { - let parent_mem = MemoryContext::root(IntGauge::new("root_memory_usage", " ").unwrap()); + let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge()); { let src_exec = Box::new(MockExecutor::with_chunk( DataChunk::from_pretty( @@ -370,7 +370,7 @@ mod tests { let mem_context = MemoryContext::new( Some(parent_mem.clone()), - IntGauge::new("memory_usage", " ").unwrap(), + LabelGuardedIntGauge::<4>::test_int_gauge(), ); let actual_exec = HashAggExecutorBuilder::deserialize( &agg_prost, diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 0f5a0788b23ec..cd9caa54e4b2a 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -1933,12 +1933,12 @@ impl HashJoinExecutor { mod tests { use futures::StreamExt; use futures_async_stream::for_await; - use prometheus::IntGauge; use risingwave_common::array::{ArrayBuilderImpl, DataChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::Result; use risingwave_common::hash::Key32; use risingwave_common::memory::MemoryContext; + use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_common::test_prelude::DataChunkTestExt; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqDebug; @@ -2157,7 +2157,7 @@ mod tests { }; let mem_ctx = - MemoryContext::new(parent_mem_ctx, IntGauge::new("memory_usage", " ").unwrap()); + MemoryContext::new(parent_mem_ctx, LabelGuardedIntGauge::<4>::test_int_gauge()); Box::new(HashJoinExecutor::::new( join_type, output_indices, @@ -2198,7 +2198,7 @@ mod tests { right_executor: BoxedExecutor, ) { let parent_mem_context = - MemoryContext::root(IntGauge::new("total_memory_usage", " ").unwrap()); + MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge()); { let join_executor = self.create_join_executor_with_chunk_size_and_executors( diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 3c46682c139de..1a94e9ef25b4c 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::{Bound, RangeBounds}; +use std::ops::{Bound, Deref, RangeBounds}; use std::sync::Arc; use futures::{pin_mut, StreamExt}; @@ -310,9 +310,12 @@ impl RowSeqScanExecutor { let table = Arc::new(table); // Create collector. - let histogram = metrics - .as_ref() - .map(|metrics| metrics.create_collector_for_row_seq_scan_next_duration(vec![identity])); + let histogram = metrics.as_ref().map(|metrics| { + metrics + .executor_metrics() + .row_seq_scan_next_duration + .with_label_values(&metrics.executor_labels(&identity)) + }); if ordered { // Currently we execute range-scans concurrently so the order is not guaranteed if @@ -329,9 +332,8 @@ impl RowSeqScanExecutor { // Point Get for point_get in point_gets { let table = table.clone(); - let histogram = histogram.clone(); if let Some(row) = - Self::execute_point_get(table, point_get, epoch.clone(), histogram).await? + Self::execute_point_get(table, point_get, epoch.clone(), histogram.clone()).await? { if let Some(chunk) = data_chunk_builder.append_one_row(row) { yield chunk; @@ -365,7 +367,7 @@ impl RowSeqScanExecutor { table: Arc>, scan_range: ScanRange, epoch: BatchQueryEpoch, - histogram: Option, + histogram: Option>, ) -> Result> { let pk_prefix = scan_range.pk_prefix; assert!(pk_prefix.len() == table.pk_indices().len()); @@ -389,7 +391,7 @@ impl RowSeqScanExecutor { ordered: bool, epoch: BatchQueryEpoch, chunk_size: usize, - histogram: Option, + histogram: Option>, ) { let ScanRange { pk_prefix, diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index 809c096eb49df..6793b779ac754 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -32,6 +32,7 @@ #![feature(result_option_inspect)] #![feature(assert_matches)] #![feature(lazy_cell)] +#![feature(array_methods)] mod error; pub mod exchange_source; diff --git a/src/batch/src/monitor/stats.rs b/src/batch/src/monitor/stats.rs index f59e0217cd7a0..f286515aafbe5 100644 --- a/src/batch/src/monitor/stats.rs +++ b/src/batch/src/monitor/stats.rs @@ -12,51 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::{Arc, LazyLock}; -use itertools::Itertools; -use parking_lot::Mutex; -use paste::paste; -use prometheus::core::{ - AtomicF64, AtomicU64, Collector, Desc, GenericCounter, GenericCounterVec, GenericGaugeVec, +use prometheus::{IntGauge, Registry}; +use risingwave_common::metrics::{ + LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounterVec, + LabelGuardedIntGaugeVec, TrAdderGauge, }; -use prometheus::{ - exponential_buckets, opts, proto, GaugeVec, Histogram, HistogramOpts, HistogramVec, - IntCounterVec, IntGauge, IntGaugeVec, Registry, -}; -use risingwave_common::metrics::TrAdderGauge; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use crate::task::TaskId; -macro_rules! for_all_task_metrics { - ($macro:ident) => { - $macro! { - { task_first_poll_delay, GenericGaugeVec }, - { task_fast_poll_duration, GenericGaugeVec }, - { task_idle_duration, GenericGaugeVec }, - { task_poll_duration, GenericGaugeVec }, - { task_scheduled_duration, GenericGaugeVec }, - { task_slow_poll_duration, GenericGaugeVec }, - { task_mem_usage, IntGaugeVec }, - } - }; -} - -macro_rules! def_task_metrics { - ($( { $metric:ident, $type:ty }, )*) => { - #[derive(Clone)] - pub struct BatchTaskMetrics { - descs: Vec, - delete_task: Arc>>, - $( pub $metric: $type, )* - } - }; +#[derive(Clone)] +pub struct BatchTaskMetrics { + pub task_first_poll_delay: LabelGuardedGaugeVec<3>, + pub task_fast_poll_duration: LabelGuardedGaugeVec<3>, + pub task_idle_duration: LabelGuardedGaugeVec<3>, + pub task_poll_duration: LabelGuardedGaugeVec<3>, + pub task_scheduled_duration: LabelGuardedGaugeVec<3>, + pub task_slow_poll_duration: LabelGuardedGaugeVec<3>, + pub task_mem_usage: LabelGuardedIntGaugeVec<3>, } -for_all_task_metrics!(def_task_metrics); - pub static GLOBAL_BATCH_TASK_METRICS: LazyLock = LazyLock::new(|| BatchTaskMetrics::new(&GLOBAL_METRICS_REGISTRY)); @@ -64,76 +41,62 @@ impl BatchTaskMetrics { /// The created [`BatchTaskMetrics`] is already registered to the `registry`. fn new(registry: &Registry) -> Self { let task_labels = ["query_id", "stage_id", "task_id"]; - let mut descs = Vec::with_capacity(8); - let task_first_poll_delay = GaugeVec::new(opts!( + let task_first_poll_delay = register_guarded_gauge_vec_with_registry!( "batch_task_first_poll_delay", "The total duration (s) elapsed between the instant tasks are instrumented, and the instant they are first polled.", - ), &task_labels[..]).unwrap(); - descs.extend(task_first_poll_delay.desc().into_iter().cloned()); - - let task_fast_poll_duration = GaugeVec::new( - opts!( - "batch_task_fast_poll_duration", - "The total duration (s) of fast polls.", - ), - &task_labels[..], + &task_labels, + registry).unwrap(); + + let task_fast_poll_duration = register_guarded_gauge_vec_with_registry!( + "batch_task_fast_poll_duration", + "The total duration (s) of fast polls.", + &task_labels, + registry ) .unwrap(); - descs.extend(task_fast_poll_duration.desc().into_iter().cloned()); - - let task_idle_duration = GaugeVec::new( - opts!( - "batch_task_idle_duration", - "The total duration (s) that tasks idled.", - ), - &task_labels[..], + + let task_idle_duration = register_guarded_gauge_vec_with_registry!( + "batch_task_idle_duration", + "The total duration (s) that tasks idled.", + &task_labels, + registry ) .unwrap(); - descs.extend(task_idle_duration.desc().into_iter().cloned()); - - let task_poll_duration = GaugeVec::new( - opts!( - "batch_task_poll_duration", - "The total duration (s) elapsed during polls.", - ), - &task_labels[..], + + let task_poll_duration = register_guarded_gauge_vec_with_registry!( + "batch_task_poll_duration", + "The total duration (s) elapsed during polls.", + &task_labels, + registry, ) .unwrap(); - descs.extend(task_poll_duration.desc().into_iter().cloned()); - - let task_scheduled_duration = GaugeVec::new( - opts!( - "batch_task_scheduled_duration", - "The total duration (s) that tasks spent waiting to be polled after awakening.", - ), - &task_labels[..], + + let task_scheduled_duration = register_guarded_gauge_vec_with_registry!( + "batch_task_scheduled_duration", + "The total duration (s) that tasks spent waiting to be polled after awakening.", + &task_labels, + registry, ) .unwrap(); - descs.extend(task_scheduled_duration.desc().into_iter().cloned()); - - let task_slow_poll_duration = GaugeVec::new( - opts!( - "batch_task_slow_poll_duration", - "The total duration (s) of slow polls.", - ), - &task_labels[..], + + let task_slow_poll_duration = register_guarded_gauge_vec_with_registry!( + "batch_task_slow_poll_duration", + "The total duration (s) of slow polls.", + &task_labels, + registry, ) .unwrap(); - descs.extend(task_slow_poll_duration.desc().into_iter().cloned()); - - let task_mem_usage = IntGaugeVec::new( - opts!( - "batch_task_mem_usage", - "Memory usage of batch tasks in bytes." - ), - &task_labels[..], + + let task_mem_usage = register_guarded_int_gauge_vec_with_registry!( + "batch_task_mem_usage", + "Memory usage of batch tasks in bytes.", + &task_labels, + registry, ) .unwrap(); - descs.extend(task_mem_usage.desc().into_iter().cloned()); - let metrics = Self { - descs, + Self { task_first_poll_delay, task_fast_poll_duration, task_idle_duration, @@ -141,197 +104,64 @@ impl BatchTaskMetrics { task_scheduled_duration, task_slow_poll_duration, task_mem_usage, - delete_task: Arc::new(Mutex::new(Vec::new())), - }; - registry.register(Box::new(metrics.clone())).unwrap(); - metrics + } } /// Create a new `BatchTaskMetrics` instance used in tests or other places. pub fn for_test() -> Self { GLOBAL_BATCH_TASK_METRICS.clone() } - - fn clean_metrics(&self) { - let delete_task: Vec = { - let mut delete_task = self.delete_task.lock(); - if delete_task.is_empty() { - return; - } - std::mem::take(delete_task.as_mut()) - }; - for id in &delete_task { - let stage_id = id.stage_id.to_string(); - let task_id = id.task_id.to_string(); - let labels = vec![id.query_id.as_str(), stage_id.as_str(), task_id.as_str()]; - - macro_rules! remove { - ($({ $metric:ident, $type:ty},)*) => { - $( - if let Err(err) = self.$metric.remove_label_values(&labels) { - warn!("Failed to remove label values: {:?}", err); - } - )* - }; - } - for_all_task_metrics!(remove); - } - } - - pub fn add_delete_task(&self, id: TaskId) { - self.delete_task.lock().push(id); - } -} - -impl Collector for BatchTaskMetrics { - fn desc(&self) -> Vec<&Desc> { - self.descs.iter().collect() - } - - fn collect(&self) -> Vec { - let mut mfs = Vec::with_capacity(8); - - macro_rules! collect { - ($({ $metric:ident, $type:ty },)*) => { - $( - mfs.extend(self.$metric.collect()); - )* - }; - } - for_all_task_metrics!(collect); - - // TODO: Every time we execute it involving get the lock, here maybe a bottleneck. - self.clean_metrics(); - - mfs - } -} - -macro_rules! for_all_executor_metrics { - ($macro:ident) => { - $macro! { - { exchange_recv_row_number, GenericCounterVec, GenericCounter}, - { row_seq_scan_next_duration, HistogramVec , Histogram}, - { mem_usage, IntGaugeVec, IntGauge }, - } - }; } -macro_rules! def_executor_metrics { - ($( { $metric:ident, $type:ty, $_t:ty }, )*) => { - #[derive(Clone)] - pub struct BatchExecutorMetrics { - descs: Vec, - delete_task: Arc>>, - register_labels: Arc>>>>, - $( pub $metric: $type, )* - } - }; +#[derive(Clone)] +pub struct BatchExecutorMetrics { + pub exchange_recv_row_number: LabelGuardedIntCounterVec<4>, + pub row_seq_scan_next_duration: LabelGuardedHistogramVec<4>, + pub mem_usage: LabelGuardedIntGaugeVec<4>, } -for_all_executor_metrics!(def_executor_metrics); - pub static GLOBAL_BATCH_EXECUTOR_METRICS: LazyLock = LazyLock::new(|| BatchExecutorMetrics::new(&GLOBAL_METRICS_REGISTRY)); impl BatchExecutorMetrics { fn new(register: &Registry) -> Self { - let executor_labels = vec!["query_id", "stage_id", "task_id", "executor_id"]; - let mut descs = Vec::with_capacity(2); - - let mut custom_labels = executor_labels.clone(); - custom_labels.extend_from_slice(&["source_query_id", "source_stage_id", "source_task_id"]); - let exchange_recv_row_number = IntCounterVec::new( - opts!( - "batch_exchange_recv_row_number", - "Total number of row that have been received from upstream source", - ), - &custom_labels, + let executor_labels = ["query_id", "stage_id", "task_id", "executor_id"]; + + let exchange_recv_row_number = register_guarded_int_counter_vec_with_registry!( + "batch_exchange_recv_row_number", + "Total number of row that have been received from upstream source", + &executor_labels, + register, ) .unwrap(); - descs.extend(exchange_recv_row_number.desc().into_iter().cloned()); - - let row_seq_scan_next_duration = HistogramVec::new( - HistogramOpts::new( - "batch_row_seq_scan_next_duration", - "Time spent deserializing into a row in cell based table.", - ) - .buckets(exponential_buckets(0.0001, 2.0, 20).unwrap()), + + let row_seq_scan_next_duration = register_guarded_histogram_vec_with_registry!( + "batch_row_seq_scan_next_duration", + "Time spent deserializing into a row in cell based table.", &executor_labels, + register, ) .unwrap(); - descs.extend(row_seq_scan_next_duration.desc().into_iter().cloned()); - let mem_usage = IntGaugeVec::new( - opts!( - "batch_executor_mem_usage", - "Batch executor memory usage in bytes." - ), + let mem_usage = register_guarded_int_gauge_vec_with_registry!( + "batch_executor_mem_usage", + "Batch executor memory usage in bytes.", &executor_labels, + register, ) .unwrap(); - descs.extend(mem_usage.desc().into_iter().cloned()); - let metrics = Self { - descs, - delete_task: Arc::new(Mutex::new(Vec::new())), + Self { exchange_recv_row_number, row_seq_scan_next_duration, mem_usage, - register_labels: Arc::new(Mutex::new(HashMap::new())), - }; - register.register(Box::new(metrics.clone())).unwrap(); - metrics + } } /// Create a new `BatchTaskMetrics` instance used in tests or other places. pub fn for_test() -> Self { GLOBAL_BATCH_EXECUTOR_METRICS.clone() } - - fn clean_metrics(&self) { - let delete_task: Vec = { - let mut delete_task = self.delete_task.lock(); - if delete_task.is_empty() { - return; - } - std::mem::take(delete_task.as_mut()) - }; - let delete_labels = { - let mut register_labels = self.register_labels.lock(); - let mut delete_labels = Vec::with_capacity(delete_task.len()); - for id in delete_task { - if let Some(callback) = register_labels.remove(&id) { - delete_labels.push(callback); - } - } - delete_labels - }; - delete_labels - .into_iter() - .for_each(|delete_labels| delete_labels.into_iter().for_each(|callback| callback())); - } - - pub fn add_delete_task(&self, task_id: TaskId) { - self.delete_task.lock().push(task_id); - } -} - -impl Collector for BatchExecutorMetrics { - fn desc(&self) -> Vec<&Desc> { - self.descs.iter().collect() - } - - fn collect(&self) -> Vec { - let mut mfs = Vec::with_capacity(2); - - mfs.extend(self.exchange_recv_row_number.collect()); - mfs.extend(self.row_seq_scan_next_duration.collect()); - - self.clean_metrics(); - - mfs - } } pub type BatchMetricsWithTaskLabels = Arc; @@ -343,46 +173,10 @@ pub struct BatchMetricsWithTaskLabelsInner { task_metrics: Arc, executor_metrics: Arc, task_id: TaskId, - task_labels: Vec, -} - -macro_rules! def_create_executor_collector { - ($( { $metric:ident, $type:ty, $collector_type:ty }, )*) => { - paste! { - $( - pub fn [](&self,executor_label: Vec) -> $collector_type { - let mut owned_task_labels = self.task_labels.clone(); - owned_task_labels.extend(executor_label); - let task_labels = owned_task_labels.iter().map(|s| s.as_str()).collect_vec(); - - let collecter = self - .executor_metrics - .$metric - .with_label_values(&task_labels); - - let metrics = self.executor_metrics.$metric.clone(); - - self.executor_metrics - .register_labels - .lock() - .entry(self.task_id.clone()) - .or_default() - .push(Box::new(move || { - metrics.remove_label_values( - &owned_task_labels.iter().map(|s| s.as_str()).collect::>(), - ).expect("Collector with same label only can be created once. It should never have case of duplicate remove"); - })); - - collecter - } - )* - } - }; + task_labels: [String; 3], } impl BatchMetricsWithTaskLabelsInner { - for_all_executor_metrics! {def_create_executor_collector} - pub fn new( task_metrics: Arc, executor_metrics: Arc, @@ -392,12 +186,12 @@ impl BatchMetricsWithTaskLabelsInner { task_metrics, executor_metrics, task_id: id.clone(), - task_labels: vec![id.query_id, id.stage_id.to_string(), id.task_id.to_string()], + task_labels: [id.query_id, id.stage_id.to_string(), id.task_id.to_string()], } } - pub fn task_labels(&self) -> Vec<&str> { - self.task_labels.iter().map(AsRef::as_ref).collect() + pub fn task_labels(&self) -> [&str; 3] { + self.task_labels.each_ref().map(String::as_str) } pub fn task_id(&self) -> TaskId { @@ -407,12 +201,21 @@ impl BatchMetricsWithTaskLabelsInner { pub fn get_task_metrics(&self) -> &Arc { &self.task_metrics } -} -impl Drop for BatchMetricsWithTaskLabelsInner { - fn drop(&mut self) { - self.task_metrics.add_delete_task(self.task_id()); - self.executor_metrics.add_delete_task(self.task_id()); + pub fn executor_metrics(&self) -> &BatchExecutorMetrics { + &self.executor_metrics + } + + pub fn executor_labels<'a>( + &'a self, + executor_id: &'a (impl AsRef + ?Sized), + ) -> [&'a str; 4] { + [ + self.task_labels[0].as_str(), + self.task_labels[1].as_str(), + self.task_labels[2].as_str(), + executor_id.as_ref(), + ] } } diff --git a/src/batch/src/task/context.rs b/src/batch/src/task/context.rs index f9648c58f7a50..70fa48b20d2e9 100644 --- a/src/batch/src/task/context.rs +++ b/src/batch/src/task/context.rs @@ -11,15 +11,14 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use prometheus::IntGauge; use risingwave_common::catalog::SysCatalogReaderRef; use risingwave_common::config::BatchConfig; use risingwave_common::error::Result; use risingwave_common::memory::MemoryContext; +use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_common::util::addr::{is_local_address, HostAddr}; use risingwave_connector::source::monitor::SourceMetrics; use risingwave_rpc_client::ComputeClientPoolRef; @@ -142,10 +141,10 @@ impl BatchTaskContext for ComputeNodeContext { fn create_executor_mem_context(&self, executor_id: &str) -> MemoryContext { if let Some(metrics) = &self.batch_metrics { - let mut labels = metrics.task_labels(); - labels.push(executor_id); - let executor_mem_usage = - metrics.create_collector_for_mem_usage(vec![executor_id.to_string()]); + let executor_mem_usage = metrics + .executor_metrics() + .mem_usage + .with_label_values(&metrics.executor_labels(executor_id)); MemoryContext::new(Some(self.mem_context.clone()), executor_mem_usage) } else { MemoryContext::none() @@ -195,7 +194,7 @@ impl ComputeNodeContext { cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), // Leave it for now, it should be None - mem_context: MemoryContext::root(IntGauge::new("test", "test").unwrap()), + mem_context: MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge()), } } diff --git a/src/common/src/estimate_size/collections/heap.rs b/src/common/src/estimate_size/collections/heap.rs index beb5430844c32..96bd942aa02b1 100644 --- a/src/common/src/estimate_size/collections/heap.rs +++ b/src/common/src/estimate_size/collections/heap.rs @@ -105,14 +105,13 @@ where #[cfg(test)] mod tests { - use prometheus::IntGauge; - use crate::estimate_size::collections::MemMonitoredHeap; use crate::memory::MemoryContext; + use crate::metrics::LabelGuardedIntGauge; #[test] fn test_heap() { - let gauge = IntGauge::new("test", "test").unwrap(); + let gauge = LabelGuardedIntGauge::<4>::test_int_gauge(); let mem_ctx = MemoryContext::root(gauge.clone()); let mut heap = MemMonitoredHeap::::new_with(mem_ctx); @@ -130,7 +129,7 @@ mod tests { #[test] fn test_heap_drop() { - let gauge = IntGauge::new("test", "test").unwrap(); + let gauge = LabelGuardedIntGauge::<4>::test_int_gauge(); let mem_ctx = MemoryContext::root(gauge.clone()); let vec = { diff --git a/src/common/src/memory/mem_context.rs b/src/common/src/memory/mem_context.rs index 70aa20d9e331d..c835a46732a73 100644 --- a/src/common/src/memory/mem_context.rs +++ b/src/common/src/memory/mem_context.rs @@ -12,64 +12,52 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Deref; use std::sync::Arc; -use prometheus::IntGauge; - use super::MonitoredGlobalAlloc; -use crate::metrics::TrAdderGauge; +use crate::metrics::{LabelGuardedIntGauge, TrAdderGauge}; -struct MemoryContextInner { - counter: MemCounter, - parent: Option, +pub trait MemCounter: Send + Sync + 'static { + fn add(&self, bytes: i64); + fn get_bytes_used(&self) -> i64; } -#[derive(Clone)] -pub struct MemoryContext { - /// Add None op mem context, so that we don't need to return [`Option`] in - /// `BatchTaskContext`. This helps with later `Allocator` implementation. - inner: Option>, -} - -#[derive(Debug)] -pub enum MemCounter { - /// Used when the add/sub operation don't have much conflicts. - Atomic(IntGauge), - /// Used when the add/sub operation may cause a lot of conflicts. - TrAdder(TrAdderGauge), -} +impl MemCounter for TrAdderGauge { + fn add(&self, bytes: i64) { + self.add(bytes) + } -impl From for MemCounter { - fn from(value: IntGauge) -> Self { - MemCounter::Atomic(value) + fn get_bytes_used(&self) -> i64 { + self.get() } } -impl MemCounter { +impl MemCounter for LabelGuardedIntGauge { fn add(&self, bytes: i64) { - match &self { - MemCounter::TrAdder(c) => c.add(bytes), - MemCounter::Atomic(c) => c.add(bytes), - } + self.deref().add(bytes) } fn get_bytes_used(&self) -> i64 { - match &self { - MemCounter::TrAdder(c) => c.get(), - MemCounter::Atomic(c) => c.get(), - } + self.get() } } -impl From for MemCounter { - fn from(value: TrAdderGauge) -> Self { - MemCounter::TrAdder(value) - } +struct MemoryContextInner { + counter: Box, + parent: Option, +} + +#[derive(Clone)] +pub struct MemoryContext { + /// Add None op mem context, so that we don't need to return [`Option`] in + /// `BatchTaskContext`. This helps with later `Allocator` implementation. + inner: Option>, } impl MemoryContext { - pub fn new(parent: Option, counter: impl Into) -> Self { - let c = counter.into(); + pub fn new(parent: Option, counter: impl MemCounter) -> Self { + let c = Box::new(counter); Self { inner: Some(Arc::new(MemoryContextInner { counter: c, parent })), } @@ -80,7 +68,7 @@ impl MemoryContext { Self { inner: None } } - pub fn root(counter: impl Into) -> Self { + pub fn root(counter: impl MemCounter) -> Self { Self::new(None, counter) }