Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: notifies all workers once a region is flushed #4016

Merged
merged 3 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub const TYPE_LABEL: &str = "type";
pub const FLUSH_REASON: &str = "reason";
/// File type label.
pub const FILE_TYPE_LABEL: &str = "file_type";
/// Region worker id label.
pub const WORKER_LABEL: &str = "worker";

lazy_static! {
/// Global write buffer size in bytes.
Expand Down Expand Up @@ -70,9 +72,12 @@ lazy_static! {


// ------ Write related metrics
/// Counter of stalled write requests.
pub static ref WRITE_STALL_TOTAL: IntCounter =
register_int_counter!("greptime_mito_write_stall_total", "mito write stall total").unwrap();
/// Number of stalled write requests in each worker.
pub static ref WRITE_STALL_TOTAL: IntGaugeVec = register_int_gauge_vec!(
"greptime_mito_write_stall_total",
"mito stalled write request in each worker",
&[WORKER_LABEL]
).unwrap();
/// Counter of rejected write requests.
pub static ref WRITE_REJECT_TOTAL: IntCounter =
register_int_counter!("greptime_mito_write_reject_total", "mito write reject total").unwrap();
Expand Down
56 changes: 49 additions & 7 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
use prometheus::IntGauge;
use rand::{thread_rng, Rng};
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_engine::SetReadonlyResponse;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::sync::{mpsc, oneshot, watch, Mutex};

use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
Expand All @@ -49,6 +50,7 @@ use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::WRITE_STALL_TOTAL;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
Expand Down Expand Up @@ -151,6 +153,7 @@ impl WorkerGroup {
.build(),
);
let time_provider = Arc::new(StdTimeProvider);
let (flush_sender, flush_receiver) = watch::channel(());

let workers = (0..config.num_workers)
.map(|id| {
Expand All @@ -166,6 +169,8 @@ impl WorkerGroup {
cache_manager: cache_manager.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
flush_sender: flush_sender.clone(),
flush_receiver: flush_receiver.clone(),
}
.start()
})
Expand Down Expand Up @@ -266,6 +271,7 @@ impl WorkerGroup {
.write_cache(write_cache)
.build(),
);
let (flush_sender, flush_receiver) = watch::channel(());
let workers = (0..config.num_workers)
.map(|id| {
WorkerStarter {
Expand All @@ -280,6 +286,8 @@ impl WorkerGroup {
cache_manager: cache_manager.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
flush_sender: flush_sender.clone(),
flush_receiver: flush_receiver.clone(),
}
.start()
})
Expand Down Expand Up @@ -346,6 +354,10 @@ struct WorkerStarter<S> {
cache_manager: CacheManagerRef,
intermediate_manager: IntermediateManager,
time_provider: TimeProviderRef,
/// Watch channel sender to notify workers to handle stalled requests.
flush_sender: watch::Sender<()>,
/// Watch channel receiver to wait for background flush job.
flush_receiver: watch::Receiver<()>,
}

impl<S: LogStore> WorkerStarter<S> {
Expand Down Expand Up @@ -386,6 +398,9 @@ impl<S: LogStore> WorkerStarter<S> {
intermediate_manager: self.intermediate_manager,
time_provider: self.time_provider,
last_periodical_check_millis: now,
flush_sender: self.flush_sender,
flush_receiver: self.flush_receiver,
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&self.id.to_string()]),
};
let handle = common_runtime::spawn_write(async move {
worker_thread.run().await;
Expand Down Expand Up @@ -548,6 +563,12 @@ struct RegionWorkerLoop<S> {
time_provider: TimeProviderRef,
/// Last time to check regions periodically.
last_periodical_check_millis: i64,
/// Watch channel sender to notify workers to handle stalled requests.
flush_sender: watch::Sender<()>,
/// Watch channel receiver to wait for background flush job.
flush_receiver: watch::Receiver<()>,
/// Gauge of stalled request count.
stalled_count: IntGauge,
}

impl<S: LogStore> RegionWorkerLoop<S> {
Expand All @@ -568,11 +589,23 @@ impl<S: LogStore> RegionWorkerLoop<S> {
buffer.clear();

let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
match tokio::time::timeout(max_wait_time, self.receiver.recv()).await {
Ok(Some(request)) => buffer.push(request),
// The channel is disconnected.
Ok(None) => break,
Err(_) => {
let sleep = tokio::time::sleep(max_wait_time);
tokio::pin!(sleep);

tokio::select! {
waynexia marked this conversation as resolved.
Show resolved Hide resolved
request_opt = self.receiver.recv() => {
match request_opt {
Some(request) => buffer.push(request),
// The channel is disconnected.
None => break,
}
}
_ = self.flush_receiver.changed() => {
// A flush job is finished, handles stalled requests.
self.handle_stalled_requests().await;
continue;
}
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
_ = &mut sleep => {
// Timeout. Checks periodical tasks.
self.handle_periodical_tasks();
continue;
Expand Down Expand Up @@ -735,7 +768,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}

impl<S> RegionWorkerLoop<S> {
// Clean up the worker.
/// Cleans up the worker.
async fn clean(&self) {
// Closes remaining regions.
let regions = self.regions.list_regions();
Expand All @@ -745,6 +778,15 @@ impl<S> RegionWorkerLoop<S> {

self.regions.clear();
}

/// Notifies the whole group that a flush job is finished so other
/// workers can handle stalled requests.
fn notify_group(&mut self) {
// Notifies all receivers.
let _ = self.flush_sender.send(());
// Marks the receiver in current worker as seen so the loop won't be waked up immediately.
self.flush_receiver.borrow_and_update();
}
}

/// Wrapper that only calls event listener in tests.
Expand Down
8 changes: 5 additions & 3 deletions src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: RegionId,
mut request: FlushFinished,
) {
// Notifies other workers. Even the remaining steps of this method fail we still
// wake up other workers as we have released some memory by flush.
self.notify_group();

let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
warn!(
"Unable to finish the flush task for a read only region {}",
Expand Down Expand Up @@ -229,9 +233,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}

// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;
self.handle_stalled_requests().await;

// Schedules compaction.
if let Err(e) = self.compaction_scheduler.schedule_compaction(
Expand Down
17 changes: 12 additions & 5 deletions src/mito2/src/worker/handle_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;

use crate::error::{InvalidRequestSnafu, RejectWriteSnafu, Result};
use crate::metrics::{
WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
};
use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::{SenderWriteRequest, WriteRequest};
use crate::worker::RegionWorkerLoop;
Expand All @@ -50,13 +48,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
reject_write_requests(write_requests);
// Also reject all stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
reject_write_requests(stalled.requests);
return;
}

if self.write_buffer_manager.should_stall() && allow_stall {
WRITE_STALL_TOTAL.inc_by(write_requests.len() as u64);

self.stalled_count.add(write_requests.len() as i64);
self.stalled_requests.append(&mut write_requests);
self.listener.on_write_stall();
return;
Expand Down Expand Up @@ -120,6 +118,15 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.with_label_values(&["delete"])
.inc_by(delete_rows as u64);
}

/// Handles all stalled write requests.
pub(crate) async fn handle_stalled_requests(&mut self) {
// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;
}
}

impl<S> RegionWorkerLoop<S> {
Expand Down