diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 724443d1e317..9eaff6fa48e3 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -23,6 +23,8 @@ pub const TYPE_LABEL: &str = "type"; pub const FLUSH_REASON: &str = "reason"; /// File type label. pub const FILE_TYPE_LABEL: &str = "file_type"; +/// Region worker id label. +pub const WORKER_LABEL: &str = "worker"; lazy_static! { /// Global write buffer size in bytes. @@ -70,9 +72,12 @@ lazy_static! { // ------ Write related metrics - /// Counter of stalled write requests. - pub static ref WRITE_STALL_TOTAL: IntCounter = - register_int_counter!("greptime_mito_write_stall_total", "mito write stall total").unwrap(); + /// Number of stalled write requests in each worker. + pub static ref WRITE_STALL_TOTAL: IntGaugeVec = register_int_gauge_vec!( + "greptime_mito_write_stall_total", + "mito stalled write request in each worker", + &[WORKER_LABEL] + ).unwrap(); /// Counter of rejected write requests. pub static ref WRITE_REJECT_TOTAL: IntCounter = register_int_counter!("greptime_mito_write_reject_total", "mito write reject total").unwrap(); diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 7483c73bad31..b842b0d70716 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -34,13 +34,14 @@ use common_runtime::JoinHandle; use common_telemetry::{error, info, warn}; use futures::future::try_join_all; use object_store::manager::ObjectStoreManagerRef; +use prometheus::IntGauge; use rand::{thread_rng, Rng}; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; use store_api::region_engine::SetReadonlyResponse; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{mpsc, oneshot, watch, Mutex}; use crate::cache::write_cache::{WriteCache, WriteCacheRef}; use crate::cache::{CacheManager, CacheManagerRef}; @@ -49,6 +50,7 @@ use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::memtable::MemtableBuilderProvider; +use crate::metrics::WRITE_STALL_TOTAL; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, @@ -151,6 +153,7 @@ impl WorkerGroup { .build(), ); let time_provider = Arc::new(StdTimeProvider); + let (flush_sender, flush_receiver) = watch::channel(()); let workers = (0..config.num_workers) .map(|id| { @@ -166,6 +169,8 @@ impl WorkerGroup { cache_manager: cache_manager.clone(), intermediate_manager: intermediate_manager.clone(), time_provider: time_provider.clone(), + flush_sender: flush_sender.clone(), + flush_receiver: flush_receiver.clone(), } .start() }) @@ -266,6 +271,7 @@ impl WorkerGroup { .write_cache(write_cache) .build(), ); + let (flush_sender, flush_receiver) = watch::channel(()); let workers = (0..config.num_workers) .map(|id| { WorkerStarter { @@ -280,6 +286,8 @@ impl WorkerGroup { cache_manager: cache_manager.clone(), intermediate_manager: intermediate_manager.clone(), time_provider: time_provider.clone(), + flush_sender: flush_sender.clone(), + flush_receiver: flush_receiver.clone(), } .start() }) @@ -346,6 +354,10 @@ struct WorkerStarter { cache_manager: CacheManagerRef, intermediate_manager: IntermediateManager, time_provider: TimeProviderRef, + /// Watch channel sender to notify workers to handle stalled requests. + flush_sender: watch::Sender<()>, + /// Watch channel receiver to wait for background flush job. + flush_receiver: watch::Receiver<()>, } impl WorkerStarter { @@ -386,6 +398,9 @@ impl WorkerStarter { intermediate_manager: self.intermediate_manager, time_provider: self.time_provider, last_periodical_check_millis: now, + flush_sender: self.flush_sender, + flush_receiver: self.flush_receiver, + stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&self.id.to_string()]), }; let handle = common_runtime::spawn_write(async move { worker_thread.run().await; @@ -548,6 +563,12 @@ struct RegionWorkerLoop { time_provider: TimeProviderRef, /// Last time to check regions periodically. last_periodical_check_millis: i64, + /// Watch channel sender to notify workers to handle stalled requests. + flush_sender: watch::Sender<()>, + /// Watch channel receiver to wait for background flush job. + flush_receiver: watch::Receiver<()>, + /// Gauge of stalled request count. + stalled_count: IntGauge, } impl RegionWorkerLoop { @@ -568,17 +589,41 @@ impl RegionWorkerLoop { buffer.clear(); let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL); - match tokio::time::timeout(max_wait_time, self.receiver.recv()).await { - Ok(Some(request)) => buffer.push(request), - // The channel is disconnected. - Ok(None) => break, - Err(_) => { + let sleep = tokio::time::sleep(max_wait_time); + tokio::pin!(sleep); + + tokio::select! { + request_opt = self.receiver.recv() => { + match request_opt { + Some(request) => buffer.push(request), + // The channel is disconnected. + None => break, + } + } + recv_res = self.flush_receiver.changed() => { + if recv_res.is_err() { + // The channel is disconnected. + break; + } else { + // A flush job is finished, handles stalled requests. + self.handle_stalled_requests().await; + continue; + } + } + _ = &mut sleep => { // Timeout. Checks periodical tasks. self.handle_periodical_tasks(); continue; } } + if self.flush_receiver.has_changed().unwrap_or(false) { + // Always checks whether we could process stalled requests to avoid a request + // hangs too long. + // If the channel is closed, do nothing. + self.handle_stalled_requests().await; + } + // Try to recv more requests from the channel. for _ in 1..buffer.capacity() { // We have received one request so we start from 1. @@ -735,7 +780,7 @@ impl RegionWorkerLoop { } impl RegionWorkerLoop { - // Clean up the worker. + /// Cleans up the worker. async fn clean(&self) { // Closes remaining regions. let regions = self.regions.list_regions(); @@ -745,6 +790,15 @@ impl RegionWorkerLoop { self.regions.clear(); } + + /// Notifies the whole group that a flush job is finished so other + /// workers can handle stalled requests. + fn notify_group(&mut self) { + // Notifies all receivers. + let _ = self.flush_sender.send(()); + // Marks the receiver in current worker as seen so the loop won't be waked up immediately. + self.flush_receiver.borrow_and_update(); + } } /// Wrapper that only calls event listener in tests. diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 9b1f86df751d..d99959de4cac 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -190,6 +190,10 @@ impl RegionWorkerLoop { region_id: RegionId, mut request: FlushFinished, ) { + // Notifies other workers. Even the remaining steps of this method fail we still + // wake up other workers as we have released some memory by flush. + self.notify_group(); + let Some(region) = self.regions.writable_region_or(region_id, &mut request) else { warn!( "Unable to finish the flush task for a read only region {}", @@ -229,9 +233,7 @@ impl RegionWorkerLoop { } // Handle stalled requests. - let stalled = std::mem::take(&mut self.stalled_requests); - // We already stalled these requests, don't stall them again. - self.handle_write_requests(stalled.requests, false).await; + self.handle_stalled_requests().await; // Schedules compaction. if let Err(e) = self.compaction_scheduler.schedule_compaction( diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index ca9a1b31a391..3614d1be5de2 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -24,9 +24,7 @@ use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; use crate::error::{InvalidRequestSnafu, RejectWriteSnafu, Result}; -use crate::metrics::{ - WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL, -}; +use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::{SenderWriteRequest, WriteRequest}; use crate::worker::RegionWorkerLoop; @@ -50,13 +48,13 @@ impl RegionWorkerLoop { reject_write_requests(write_requests); // Also reject all stalled requests. let stalled = std::mem::take(&mut self.stalled_requests); + self.stalled_count.sub(stalled.requests.len() as i64); reject_write_requests(stalled.requests); return; } if self.write_buffer_manager.should_stall() && allow_stall { - WRITE_STALL_TOTAL.inc_by(write_requests.len() as u64); - + self.stalled_count.add(write_requests.len() as i64); self.stalled_requests.append(&mut write_requests); self.listener.on_write_stall(); return; @@ -120,6 +118,15 @@ impl RegionWorkerLoop { .with_label_values(&["delete"]) .inc_by(delete_rows as u64); } + + /// Handles all stalled write requests. + pub(crate) async fn handle_stalled_requests(&mut self) { + // Handle stalled requests. + let stalled = std::mem::take(&mut self.stalled_requests); + self.stalled_count.sub(stalled.requests.len() as i64); + // We already stalled these requests, don't stall them again. + self.handle_write_requests(stalled.requests, false).await; + } } impl RegionWorkerLoop {