diff --git a/src/batch/src/monitor/stats.rs b/src/batch/src/monitor/stats.rs index f286515aafbe5..b4d70e1651627 100644 --- a/src/batch/src/monitor/stats.rs +++ b/src/batch/src/monitor/stats.rs @@ -170,6 +170,7 @@ pub type BatchMetricsWithTaskLabels = Arc; /// a `TaskId` so that we don't have to pass `task_id` around and repeatedly generate the same /// labels. pub struct BatchMetricsWithTaskLabelsInner { + batch_manager_metrics: Arc, task_metrics: Arc, executor_metrics: Arc, task_id: TaskId, @@ -178,11 +179,13 @@ pub struct BatchMetricsWithTaskLabelsInner { impl BatchMetricsWithTaskLabelsInner { pub fn new( + batch_manager_metrics: Arc, task_metrics: Arc, executor_metrics: Arc, id: TaskId, ) -> Self { Self { + batch_manager_metrics, task_metrics, executor_metrics, task_id: id.clone(), @@ -217,6 +220,10 @@ impl BatchMetricsWithTaskLabelsInner { executor_id.as_ref(), ] } + + pub fn batch_manager_metrics(&self) -> &BatchManagerMetrics { + &self.batch_manager_metrics + } } #[derive(Clone)] @@ -258,7 +265,7 @@ impl BatchManagerMetrics { } #[cfg(test)] - pub fn for_test() -> Self { - GLOBAL_BATCH_MANAGER_METRICS.clone() + pub fn for_test() -> Arc { + Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone()) } } diff --git a/src/batch/src/task/context.rs b/src/batch/src/task/context.rs index 64e026bea0be4..a90cb1cef15b2 100644 --- a/src/batch/src/task/context.rs +++ b/src/batch/src/task/context.rs @@ -166,7 +166,9 @@ impl ComputeNodeContext { pub fn new(env: BatchEnvironment, task_id: TaskId) -> Self { let batch_mem_context = env.task_manager().memory_context_ref(); + let batch_metrics = Arc::new(BatchMetricsWithTaskLabelsInner::new( + env.task_manager().metrics(), env.task_metrics(), env.executor_metrics(), task_id, diff --git a/src/batch/src/task/env.rs b/src/batch/src/task/env.rs index 27be9362b1e8b..14c01b6133ce4 100644 --- a/src/batch/src/task/env.rs +++ b/src/batch/src/task/env.rs @@ -22,7 +22,7 @@ use risingwave_rpc_client::ComputeClientPoolRef; use risingwave_source::dml_manager::DmlManagerRef; use risingwave_storage::StateStoreImpl; -use crate::monitor::{BatchExecutorMetrics, BatchTaskMetrics}; +use crate::monitor::{BatchExecutorMetrics, BatchManagerMetrics, BatchTaskMetrics}; use crate::task::BatchManager; /// The global environment for task execution. @@ -95,8 +95,6 @@ impl BatchEnvironment { use risingwave_source::dml_manager::DmlManager; use risingwave_storage::monitor::MonitoredStorageMetrics; - use crate::monitor::BatchManagerMetrics; - BatchEnvironment { task_manager: Arc::new(BatchManager::new( BatchConfig::default(), @@ -136,6 +134,10 @@ impl BatchEnvironment { self.state_store.clone() } + pub fn manager_metrics(&self) -> Arc { + self.task_manager.metrics() + } + pub fn task_metrics(&self) -> Arc { self.task_metrics.clone() } diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 15dbc68ebc1d5..4fa26b4d5d69e 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -581,6 +581,10 @@ impl BatchTaskExecution { mut sender: ChanSenderImpl, state_tx: Option<&mut StateReporter>, ) { + self.context + .batch_metrics() + .as_ref() + .inspect(|m| m.batch_manager_metrics().task_num.inc()); let mut data_chunk_stream = root.execute(); let mut state; let mut error = None; @@ -681,6 +685,11 @@ impl BatchTaskExecution { e ); } + + self.context + .batch_metrics() + .as_ref() + .inspect(|m| m.batch_manager_metrics().task_num.dec()); } pub fn abort(&self, err_msg: String) { diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index 44268795812b5..8a297cd659640 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -58,11 +58,11 @@ pub struct BatchManager { mem_context: MemoryContext, /// Metrics for batch manager. - metrics: BatchManagerMetrics, + metrics: Arc, } impl BatchManager { - pub fn new(config: BatchConfig, metrics: BatchManagerMetrics) -> Self { + pub fn new(config: BatchConfig, metrics: Arc) -> Self { let runtime = { let mut builder = tokio::runtime::Builder::new_multi_thread(); if let Some(worker_threads_num) = config.worker_threads_num { @@ -86,6 +86,10 @@ impl BatchManager { } } + pub(crate) fn metrics(&self) -> Arc { + self.metrics.clone() + } + pub fn memory_context_ref(&self) -> MemoryContext { self.mem_context.clone() } @@ -108,7 +112,6 @@ impl BatchManager { // it's possible do not found parent task id in theory. let ret = if let hash_map::Entry::Vacant(e) = self.tasks.lock().entry(task_id.clone()) { e.insert(task.clone()); - self.metrics.task_num.inc(); let this = self.clone(); let task_id = task_id.clone(); @@ -229,7 +232,6 @@ impl BatchManager { // Use `cancel` rather than `abort` here since this is not an error which should be // propagated to upstream. task.cancel(); - self.metrics.task_num.dec(); if let Some(heartbeat_join_handle) = task.heartbeat_join_handle() { heartbeat_join_handle.abort(); } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 5ce4139ec3327..48a9be89c952d 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -164,7 +164,7 @@ pub async fn compute_node_serve( let streaming_metrics = Arc::new(global_streaming_metrics(config.server.metrics_level)); let batch_task_metrics = Arc::new(GLOBAL_BATCH_TASK_METRICS.clone()); let batch_executor_metrics = Arc::new(GLOBAL_BATCH_EXECUTOR_METRICS.clone()); - let batch_manager_metrics = GLOBAL_BATCH_MANAGER_METRICS.clone(); + let batch_manager_metrics = Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone()); let exchange_srv_metrics = Arc::new(GLOBAL_EXCHANGE_SERVICE_METRICS.clone()); // Initialize state store.