Skip to content

Commit

Permalink
chore: change stalled count to gauge
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed May 22, 2024
1 parent b2ec133 commit e27b7c9
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
11 changes: 8 additions & 3 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub const TYPE_LABEL: &str = "type";
pub const FLUSH_REASON: &str = "reason";
/// File type label.
pub const FILE_TYPE_LABEL: &str = "file_type";
/// Region worker id label.
pub const WORKER_LABEL: &str = "worker";

lazy_static! {
/// Global write buffer size in bytes.
Expand Down Expand Up @@ -70,9 +72,12 @@ lazy_static! {


// ------ Write related metrics
/// Counter of stalled write requests.
pub static ref WRITE_STALL_TOTAL: IntCounter =
register_int_counter!("greptime_mito_write_stall_total", "mito write stall total").unwrap();
/// Number of stalled write requests in each worker.
pub static ref WRITE_STALL_TOTAL: IntGaugeVec = register_int_gauge_vec!(
"greptime_mito_write_stall_total",
"mito stalled write request in each worker",
&[WORKER_LABEL]
).unwrap();
/// Counter of rejected write requests.
pub static ref WRITE_REJECT_TOTAL: IntCounter =
register_int_counter!("greptime_mito_write_reject_total", "mito write reject total").unwrap();
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
use prometheus::IntGauge;
use rand::{thread_rng, Rng};
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
Expand All @@ -49,6 +50,7 @@ use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::WRITE_STALL_TOTAL;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
Expand Down Expand Up @@ -398,6 +400,7 @@ impl<S: LogStore> WorkerStarter<S> {
last_periodical_check_millis: now,
flush_sender: self.flush_sender,
flush_receiver: self.flush_receiver,
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&self.id.to_string()]),
};
let handle = common_runtime::spawn_write(async move {
worker_thread.run().await;
Expand Down Expand Up @@ -564,6 +567,8 @@ struct RegionWorkerLoop<S> {
flush_sender: watch::Sender<()>,
/// Watch channel receiver to wait for background flush job.
flush_receiver: watch::Receiver<()>,
/// Gauge of stalled request count.
stalled_count: IntGauge,
}

impl<S: LogStore> RegionWorkerLoop<S> {
Expand Down
9 changes: 4 additions & 5 deletions src/mito2/src/worker/handle_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;

use crate::error::{InvalidRequestSnafu, RejectWriteSnafu, Result};
use crate::metrics::{
WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
};
use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::{SenderWriteRequest, WriteRequest};
use crate::worker::RegionWorkerLoop;
Expand All @@ -50,13 +48,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
reject_write_requests(write_requests);
// Also reject all stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
reject_write_requests(stalled.requests);
return;
}

if self.write_buffer_manager.should_stall() && allow_stall {
WRITE_STALL_TOTAL.inc_by(write_requests.len() as u64);

self.stalled_count.add(write_requests.len() as i64);
self.stalled_requests.append(&mut write_requests);
self.listener.on_write_stall();
return;
Expand Down Expand Up @@ -125,6 +123,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_stalled_requests(&mut self) {
// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;
}
Expand Down

0 comments on commit e27b7c9

Please sign in to comment.