diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 724443d1e317..9eaff6fa48e3 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -23,6 +23,8 @@ pub const TYPE_LABEL: &str = "type"; pub const FLUSH_REASON: &str = "reason"; /// File type label. pub const FILE_TYPE_LABEL: &str = "file_type"; +/// Region worker id label. +pub const WORKER_LABEL: &str = "worker"; lazy_static! { /// Global write buffer size in bytes. @@ -70,9 +72,12 @@ lazy_static! { // ------ Write related metrics - /// Counter of stalled write requests. - pub static ref WRITE_STALL_TOTAL: IntCounter = - register_int_counter!("greptime_mito_write_stall_total", "mito write stall total").unwrap(); + /// Number of stalled write requests in each worker. + pub static ref WRITE_STALL_TOTAL: IntGaugeVec = register_int_gauge_vec!( + "greptime_mito_write_stall_total", + "mito stalled write request in each worker", + &[WORKER_LABEL] + ).unwrap(); /// Counter of rejected write requests. pub static ref WRITE_REJECT_TOTAL: IntCounter = register_int_counter!("greptime_mito_write_reject_total", "mito write reject total").unwrap(); diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index f5a50db30072..25df5a7ec45a 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -34,6 +34,7 @@ use common_runtime::JoinHandle; use common_telemetry::{error, info, warn}; use futures::future::try_join_all; use object_store::manager::ObjectStoreManagerRef; +use prometheus::IntGauge; use rand::{thread_rng, Rng}; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; @@ -49,6 +50,7 @@ use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::memtable::MemtableBuilderProvider; +use crate::metrics::WRITE_STALL_TOTAL; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, @@ -398,6 +400,7 @@ impl WorkerStarter { last_periodical_check_millis: now, flush_sender: self.flush_sender, flush_receiver: self.flush_receiver, + stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&self.id.to_string()]), }; let handle = common_runtime::spawn_write(async move { worker_thread.run().await; @@ -564,6 +567,8 @@ struct RegionWorkerLoop { flush_sender: watch::Sender<()>, /// Watch channel receiver to wait for background flush job. flush_receiver: watch::Receiver<()>, + /// Gauge of stalled request count. + stalled_count: IntGauge, } impl RegionWorkerLoop { diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 4fbd4cc3913e..3614d1be5de2 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -24,9 +24,7 @@ use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; use crate::error::{InvalidRequestSnafu, RejectWriteSnafu, Result}; -use crate::metrics::{ - WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL, -}; +use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::{SenderWriteRequest, WriteRequest}; use crate::worker::RegionWorkerLoop; @@ -50,13 +48,13 @@ impl RegionWorkerLoop { reject_write_requests(write_requests); // Also reject all stalled requests. let stalled = std::mem::take(&mut self.stalled_requests); + self.stalled_count.sub(stalled.requests.len() as i64); reject_write_requests(stalled.requests); return; } if self.write_buffer_manager.should_stall() && allow_stall { - WRITE_STALL_TOTAL.inc_by(write_requests.len() as u64); - + self.stalled_count.add(write_requests.len() as i64); self.stalled_requests.append(&mut write_requests); self.listener.on_write_stall(); return; @@ -125,6 +123,7 @@ impl RegionWorkerLoop { pub(crate) async fn handle_stalled_requests(&mut self) { // Handle stalled requests. let stalled = std::mem::take(&mut self.stalled_requests); + self.stalled_count.sub(stalled.requests.len() as i64); // We already stalled these requests, don't stall them again. self.handle_write_requests(stalled.requests, false).await; }