diff --git a/src/batch/src/executor/group_top_n.rs b/src/batch/src/executor/group_top_n.rs index 1b76bb6e6e997..3fb335398d89b 100644 --- a/src/batch/src/executor/group_top_n.rs +++ b/src/batch/src/executor/group_top_n.rs @@ -240,9 +240,9 @@ impl GroupTopNExecutor { #[cfg(test)] mod tests { use futures::stream::StreamExt; - use prometheus::IntGauge; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; + use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_common::test_prelude::DataChunkTestExt; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; @@ -254,7 +254,7 @@ mod tests { #[tokio::test] async fn test_group_top_n_executor() { - let parent_mem = MemoryContext::root(IntGauge::new("root_memory_usage", " ").unwrap()); + let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge()); { let schema = Schema { fields: vec![ @@ -290,7 +290,7 @@ mod tests { ]; let mem_ctx = MemoryContext::new( Some(parent_mem.clone()), - IntGauge::new("memory_usage", " ").unwrap(), + LabelGuardedIntGauge::<4>::test_int_gauge(), ); let top_n_executor = (GroupTopNExecutorBuilder { child: Box::new(mock_executor), diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 03ce86d475620..7cc408485444d 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -308,8 +308,8 @@ mod tests { use std::sync::Arc; use futures_async_stream::for_await; - use prometheus::IntGauge; use risingwave_common::catalog::{Field, Schema}; + use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_common::test_prelude::DataChunkTestExt; use risingwave_pb::data::data_type::TypeName; use risingwave_pb::data::PbDataType; @@ -323,7 +323,7 @@ mod tests { #[tokio::test] async fn execute_int32_grouped() { - let parent_mem = MemoryContext::root(IntGauge::new("root_memory_usage", " ").unwrap()); + let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge()); { let src_exec = Box::new(MockExecutor::with_chunk( DataChunk::from_pretty( @@ -370,7 +370,7 @@ mod tests { let mem_context = MemoryContext::new( Some(parent_mem.clone()), - IntGauge::new("memory_usage", " ").unwrap(), + LabelGuardedIntGauge::<4>::test_int_gauge(), ); let actual_exec = HashAggExecutorBuilder::deserialize( &agg_prost, diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 0f5a0788b23ec..cd9caa54e4b2a 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -1933,12 +1933,12 @@ impl HashJoinExecutor { mod tests { use futures::StreamExt; use futures_async_stream::for_await; - use prometheus::IntGauge; use risingwave_common::array::{ArrayBuilderImpl, DataChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::Result; use risingwave_common::hash::Key32; use risingwave_common::memory::MemoryContext; + use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_common::test_prelude::DataChunkTestExt; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqDebug; @@ -2157,7 +2157,7 @@ mod tests { }; let mem_ctx = - MemoryContext::new(parent_mem_ctx, IntGauge::new("memory_usage", " ").unwrap()); + MemoryContext::new(parent_mem_ctx, LabelGuardedIntGauge::<4>::test_int_gauge()); Box::new(HashJoinExecutor::::new( join_type, output_indices, @@ -2198,7 +2198,7 @@ mod tests { right_executor: BoxedExecutor, ) { let parent_mem_context = - MemoryContext::root(IntGauge::new("total_memory_usage", " ").unwrap()); + MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge()); { let join_executor = self.create_join_executor_with_chunk_size_and_executors( diff --git a/src/batch/src/task/context.rs b/src/batch/src/task/context.rs index afe4e10186c75..70fa48b20d2e9 100644 --- a/src/batch/src/task/context.rs +++ b/src/batch/src/task/context.rs @@ -11,16 +11,14 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - -use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use prometheus::IntGauge; use risingwave_common::catalog::SysCatalogReaderRef; use risingwave_common::config::BatchConfig; use risingwave_common::error::Result; use risingwave_common::memory::MemoryContext; +use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_common::util::addr::{is_local_address, HostAddr}; use risingwave_connector::source::monitor::SourceMetrics; use risingwave_rpc_client::ComputeClientPoolRef; @@ -147,10 +145,7 @@ impl BatchTaskContext for ComputeNodeContext { .executor_metrics() .mem_usage .with_label_values(&metrics.executor_labels(executor_id)); - MemoryContext::new( - Some(self.mem_context.clone()), - executor_mem_usage.deref().clone(), - ) + MemoryContext::new(Some(self.mem_context.clone()), executor_mem_usage) } else { MemoryContext::none() } @@ -181,9 +176,7 @@ impl ComputeNodeContext { batch_metrics .get_task_metrics() .task_mem_usage - .with_label_values(&batch_metrics.task_labels()) - .deref() - .clone(), + .with_label_values(&batch_metrics.task_labels()), ); Self { env, @@ -201,7 +194,7 @@ impl ComputeNodeContext { cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), // Leave it for now, it should be None - mem_context: MemoryContext::root(IntGauge::new("test", "test").unwrap()), + mem_context: MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge()), } } diff --git a/src/common/src/estimate_size/collections/heap.rs b/src/common/src/estimate_size/collections/heap.rs index beb5430844c32..96bd942aa02b1 100644 --- a/src/common/src/estimate_size/collections/heap.rs +++ b/src/common/src/estimate_size/collections/heap.rs @@ -105,14 +105,13 @@ where #[cfg(test)] mod tests { - use prometheus::IntGauge; - use crate::estimate_size::collections::MemMonitoredHeap; use crate::memory::MemoryContext; + use crate::metrics::LabelGuardedIntGauge; #[test] fn test_heap() { - let gauge = IntGauge::new("test", "test").unwrap(); + let gauge = LabelGuardedIntGauge::<4>::test_int_gauge(); let mem_ctx = MemoryContext::root(gauge.clone()); let mut heap = MemMonitoredHeap::::new_with(mem_ctx); @@ -130,7 +129,7 @@ mod tests { #[test] fn test_heap_drop() { - let gauge = IntGauge::new("test", "test").unwrap(); + let gauge = LabelGuardedIntGauge::<4>::test_int_gauge(); let mem_ctx = MemoryContext::root(gauge.clone()); let vec = { diff --git a/src/common/src/memory/mem_context.rs b/src/common/src/memory/mem_context.rs index 70aa20d9e331d..fa53e5cd62b97 100644 --- a/src/common/src/memory/mem_context.rs +++ b/src/common/src/memory/mem_context.rs @@ -12,64 +12,88 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Deref; use std::sync::Arc; -use prometheus::IntGauge; - use super::MonitoredGlobalAlloc; -use crate::metrics::TrAdderGauge; +use crate::metrics::{LabelGuardedIntGauge, TrAdderGauge}; -struct MemoryContextInner { - counter: MemCounter, - parent: Option, +pub trait MemCounter: Send + Sync + 'static { + fn add(&self, bytes: i64); + fn get_bytes_used(&self) -> i64; } -#[derive(Clone)] -pub struct MemoryContext { - /// Add None op mem context, so that we don't need to return [`Option`] in - /// `BatchTaskContext`. This helps with later `Allocator` implementation. - inner: Option>, -} - -#[derive(Debug)] -pub enum MemCounter { - /// Used when the add/sub operation don't have much conflicts. - Atomic(IntGauge), - /// Used when the add/sub operation may cause a lot of conflicts. - TrAdder(TrAdderGauge), -} +impl MemCounter for TrAdderGauge { + fn add(&self, bytes: i64) { + self.add(bytes) + } -impl From for MemCounter { - fn from(value: IntGauge) -> Self { - MemCounter::Atomic(value) + fn get_bytes_used(&self) -> i64 { + self.get() } } -impl MemCounter { +impl MemCounter for LabelGuardedIntGauge { fn add(&self, bytes: i64) { - match &self { - MemCounter::TrAdder(c) => c.add(bytes), - MemCounter::Atomic(c) => c.add(bytes), - } + self.deref().add(bytes) } fn get_bytes_used(&self) -> i64 { - match &self { - MemCounter::TrAdder(c) => c.get(), - MemCounter::Atomic(c) => c.get(), - } + self.get() } } -impl From for MemCounter { - fn from(value: TrAdderGauge) -> Self { - MemCounter::TrAdder(value) - } +struct MemoryContextInner { + counter: Box, + parent: Option, } +#[derive(Clone)] +pub struct MemoryContext { + /// Add None op mem context, so that we don't need to return [`Option`] in + /// `BatchTaskContext`. This helps with later `Allocator` implementation. + inner: Option>, +} + +// #[derive(Debug)] +// pub enum MemCounter { +// /// Used when the add/sub operation don't have much conflicts. +// Atomic(LabelGuardedIntGauge), +// /// Used when the add/sub operation may cause a lot of conflicts. +// TrAdder(TrAdderGauge), +// } + +// impl From> for MemCounter { +// fn from(value: LabelGuardedIntGauge) -> Self { +// MemCounter::Atomic(value) +// } +// } +// +// impl MemCounter { +// fn add(&self, bytes: i64) { +// match &self { +// MemCounter::TrAdder(c) => c.add(bytes), +// MemCounter::Atomic(c) => c.add(bytes), +// } +// } +// +// fn get_bytes_used(&self) -> i64 { +// match &self { +// MemCounter::TrAdder(c) => c.get(), +// MemCounter::Atomic(c) => c.get(), +// } +// } +// } +// +// impl From for MemCounter { +// fn from(value: TrAdderGauge) -> Self { +// MemCounter::TrAdder(value) +// } +// } + impl MemoryContext { - pub fn new(parent: Option, counter: impl Into) -> Self { - let c = counter.into(); + pub fn new(parent: Option, counter: impl MemCounter) -> Self { + let c = Box::new(counter); Self { inner: Some(Arc::new(MemoryContextInner { counter: c, parent })), } @@ -80,7 +104,7 @@ impl MemoryContext { Self { inner: None } } - pub fn root(counter: impl Into) -> Self { + pub fn root(counter: impl MemCounter) -> Self { Self::new(None, counter) }