Skip to content

Commit

Permalink
chore: Make batch mpp task num metrics more accurate. (#13589)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 authored Nov 22, 2023
1 parent 32717b4 commit b3eecb3
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 10 deletions.
11 changes: 9 additions & 2 deletions src/batch/src/monitor/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ pub type BatchMetricsWithTaskLabels = Arc<BatchMetricsWithTaskLabelsInner>;
/// a `TaskId` so that we don't have to pass `task_id` around and repeatedly generate the same
/// labels.
pub struct BatchMetricsWithTaskLabelsInner {
batch_manager_metrics: Arc<BatchManagerMetrics>,
task_metrics: Arc<BatchTaskMetrics>,
executor_metrics: Arc<BatchExecutorMetrics>,
task_id: TaskId,
Expand All @@ -178,11 +179,13 @@ pub struct BatchMetricsWithTaskLabelsInner {

impl BatchMetricsWithTaskLabelsInner {
pub fn new(
batch_manager_metrics: Arc<BatchManagerMetrics>,
task_metrics: Arc<BatchTaskMetrics>,
executor_metrics: Arc<BatchExecutorMetrics>,
id: TaskId,
) -> Self {
Self {
batch_manager_metrics,
task_metrics,
executor_metrics,
task_id: id.clone(),
Expand Down Expand Up @@ -217,6 +220,10 @@ impl BatchMetricsWithTaskLabelsInner {
executor_id.as_ref(),
]
}

pub fn batch_manager_metrics(&self) -> &BatchManagerMetrics {
&self.batch_manager_metrics
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -258,7 +265,7 @@ impl BatchManagerMetrics {
}

#[cfg(test)]
pub fn for_test() -> Self {
GLOBAL_BATCH_MANAGER_METRICS.clone()
pub fn for_test() -> Arc<Self> {
Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone())
}
}
2 changes: 2 additions & 0 deletions src/batch/src/task/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ impl ComputeNodeContext {

pub fn new(env: BatchEnvironment, task_id: TaskId) -> Self {
let batch_mem_context = env.task_manager().memory_context_ref();

let batch_metrics = Arc::new(BatchMetricsWithTaskLabelsInner::new(
env.task_manager().metrics(),
env.task_metrics(),
env.executor_metrics(),
task_id,
Expand Down
8 changes: 5 additions & 3 deletions src/batch/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_source::dml_manager::DmlManagerRef;
use risingwave_storage::StateStoreImpl;

use crate::monitor::{BatchExecutorMetrics, BatchTaskMetrics};
use crate::monitor::{BatchExecutorMetrics, BatchManagerMetrics, BatchTaskMetrics};
use crate::task::BatchManager;

/// The global environment for task execution.
Expand Down Expand Up @@ -95,8 +95,6 @@ impl BatchEnvironment {
use risingwave_source::dml_manager::DmlManager;
use risingwave_storage::monitor::MonitoredStorageMetrics;

use crate::monitor::BatchManagerMetrics;

BatchEnvironment {
task_manager: Arc::new(BatchManager::new(
BatchConfig::default(),
Expand Down Expand Up @@ -136,6 +134,10 @@ impl BatchEnvironment {
self.state_store.clone()
}

pub fn manager_metrics(&self) -> Arc<BatchManagerMetrics> {
self.task_manager.metrics()
}

pub fn task_metrics(&self) -> Arc<BatchTaskMetrics> {
self.task_metrics.clone()
}
Expand Down
9 changes: 9 additions & 0 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,10 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
mut sender: ChanSenderImpl,
state_tx: Option<&mut StateReporter>,
) {
self.context
.batch_metrics()
.as_ref()
.inspect(|m| m.batch_manager_metrics().task_num.inc());
let mut data_chunk_stream = root.execute();
let mut state;
let mut error = None;
Expand Down Expand Up @@ -681,6 +685,11 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
e
);
}

self.context
.batch_metrics()
.as_ref()
.inspect(|m| m.batch_manager_metrics().task_num.dec());
}

pub fn abort(&self, err_msg: String) {
Expand Down
10 changes: 6 additions & 4 deletions src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ pub struct BatchManager {
mem_context: MemoryContext,

/// Metrics for batch manager.
metrics: BatchManagerMetrics,
metrics: Arc<BatchManagerMetrics>,
}

impl BatchManager {
pub fn new(config: BatchConfig, metrics: BatchManagerMetrics) -> Self {
pub fn new(config: BatchConfig, metrics: Arc<BatchManagerMetrics>) -> Self {
let runtime = {
let mut builder = tokio::runtime::Builder::new_multi_thread();
if let Some(worker_threads_num) = config.worker_threads_num {
Expand All @@ -86,6 +86,10 @@ impl BatchManager {
}
}

pub(crate) fn metrics(&self) -> Arc<BatchManagerMetrics> {
self.metrics.clone()
}

pub fn memory_context_ref(&self) -> MemoryContext {
self.mem_context.clone()
}
Expand All @@ -108,7 +112,6 @@ impl BatchManager {
// it's possible do not found parent task id in theory.
let ret = if let hash_map::Entry::Vacant(e) = self.tasks.lock().entry(task_id.clone()) {
e.insert(task.clone());
self.metrics.task_num.inc();

let this = self.clone();
let task_id = task_id.clone();
Expand Down Expand Up @@ -229,7 +232,6 @@ impl BatchManager {
// Use `cancel` rather than `abort` here since this is not an error which should be
// propagated to upstream.
task.cancel();
self.metrics.task_num.dec();
if let Some(heartbeat_join_handle) = task.heartbeat_join_handle() {
heartbeat_join_handle.abort();
}
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub async fn compute_node_serve(
let streaming_metrics = Arc::new(global_streaming_metrics(config.server.metrics_level));
let batch_task_metrics = Arc::new(GLOBAL_BATCH_TASK_METRICS.clone());
let batch_executor_metrics = Arc::new(GLOBAL_BATCH_EXECUTOR_METRICS.clone());
let batch_manager_metrics = GLOBAL_BATCH_MANAGER_METRICS.clone();
let batch_manager_metrics = Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone());
let exchange_srv_metrics = Arc::new(GLOBAL_EXCHANGE_SERVICE_METRICS.clone());

// Initialize state store.
Expand Down

0 comments on commit b3eecb3

Please sign in to comment.