diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 59cc0e2037a3..9ca785be143d 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -58,6 +58,7 @@ pub enum StatusCode { DatabaseNotFound = 4004, RegionNotFound = 4005, RegionAlreadyExists = 4006, + RegionReadonly = 4007, // ====== End of catalog related status code ======= // ====== Begin of storage related status code ===== @@ -117,6 +118,7 @@ impl StatusCode { | StatusCode::TableNotFound | StatusCode::RegionNotFound | StatusCode::RegionAlreadyExists + | StatusCode::RegionReadonly | StatusCode::TableColumnNotFound | StatusCode::TableColumnExists | StatusCode::DatabaseNotFound @@ -151,6 +153,7 @@ impl StatusCode { | StatusCode::TableNotFound | StatusCode::RegionNotFound | StatusCode::RegionAlreadyExists + | StatusCode::RegionReadonly | StatusCode::TableColumnNotFound | StatusCode::TableColumnExists | StatusCode::DatabaseNotFound @@ -183,6 +186,7 @@ impl StatusCode { v if v == StatusCode::RegionAlreadyExists as u32 => { Some(StatusCode::RegionAlreadyExists) } + v if v == StatusCode::RegionReadonly as u32 => Some(StatusCode::RegionReadonly), v if v == StatusCode::TableColumnNotFound as u32 => { Some(StatusCode::TableColumnNotFound) } diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 5f55bf6ebebf..08b64432a7d8 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -21,18 +21,16 @@ mod twcs; use std::sync::Arc; use std::time::Duration; -use common_query::Output; use common_telemetry::debug; pub use picker::CompactionPickerRef; use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use crate::access_layer::AccessLayerRef; use crate::compaction::twcs::TwcsPicker; -use crate::error; use crate::error::Result; use crate::region::version::VersionRef; -use crate::request::WorkerRequest; +use crate::request::{OptionOutputTx, WorkerRequest}; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::FilePurgerRef; @@ -43,7 +41,7 @@ pub struct CompactionRequest { pub(crate) ttl: Option, pub(crate) compaction_time_window: Option, pub(crate) request_sender: mpsc::Sender, - pub(crate) waiter: Option>>, + pub(crate) waiter: OptionOutputTx, pub(crate) file_purger: FilePurgerRef, } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index c0098319e2aa..1b6293b9c82a 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -27,7 +27,6 @@ use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; use tokio::sync::mpsc; -use tokio::sync::oneshot::Sender; use crate::access_layer::AccessLayerRef; use crate::compaction::output::CompactionOutput; @@ -35,11 +34,12 @@ use crate::compaction::picker::{CompactionTask, Picker}; use crate::compaction::CompactionRequest; use crate::error; use crate::error::CompactRegionSnafu; -use crate::request::{BackgroundNotify, CompactionFailed, CompactionFinished, WorkerRequest}; +use crate::request::{ + BackgroundNotify, CompactionFailed, CompactionFinished, OptionOutputTx, WorkerRequest, +}; use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::version::LevelMeta; -use crate::worker::send_result; const MAX_PARALLEL_COMPACTION: usize = 8; @@ -157,7 +157,7 @@ impl Picker for TwcsPicker { if outputs.is_empty() && expired_ssts.is_empty() { // Nothing to compact. - send_result(waiter, Ok(Output::AffectedRows(0))); + waiter.send(Ok(Output::AffectedRows(0))); return None; } let task = TwcsCompactionTask { @@ -228,7 +228,7 @@ pub(crate) struct TwcsCompactionTask { /// Request sender to notify the worker. pub(crate) request_sender: mpsc::Sender, /// Sender that are used to notify waiters waiting for pending compaction tasks. - pub sender: Option>>, + pub sender: OptionOutputTx, } impl Debug for TwcsCompactionTask { @@ -321,11 +321,10 @@ impl TwcsCompactionTask { /// Handles compaction failure, notifies all waiters. fn on_failure(&mut self, err: Arc) { - if let Some(sender) = self.sender.take() { - let _ = sender.send(Err(err.clone()).context(CompactRegionSnafu { + self.sender + .send_mut(Err(err.clone()).context(CompactRegionSnafu { region_id: self.region_id, })); - } } /// Notifies region worker to handle post-compaction tasks. diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 5a6100f64d8b..0d0bdd47ed48 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -78,7 +78,8 @@ impl MitoEngine { self.inner.workers.is_region_exists(region_id) } - fn scan(&self, region_id: RegionId, request: ScanRequest) -> Result { + /// Returns a scanner to scan for `request`. + fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result { self.inner.handle_query(region_id, request) } @@ -143,6 +144,17 @@ impl EngineInner { scan_region.scanner() } + + /// Set writable mode for a region. + fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> { + let region = self + .workers + .get_region(region_id) + .context(RegionNotFoundSnafu { region_id })?; + + region.set_writable(writable); + Ok(()) + } } #[async_trait] @@ -168,7 +180,7 @@ impl RegionEngine for MitoEngine { region_id: RegionId, request: ScanRequest, ) -> std::result::Result { - self.scan(region_id, request) + self.scanner(region_id, request) .map_err(BoxedError::new)? .scan() .await @@ -191,6 +203,12 @@ impl RegionEngine for MitoEngine { async fn stop(&self) -> std::result::Result<(), BoxedError> { self.inner.stop().await.map_err(BoxedError::new) } + + fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> { + self.inner + .set_writable(region_id, writable) + .map_err(BoxedError::new) + } } // Tests methods. diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 26faadd293e3..bbd9fece2a16 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -34,7 +34,7 @@ use crate::test_util::{ async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) { let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).unwrap(); assert_eq!(0, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); @@ -206,6 +206,8 @@ async fn test_put_after_alter() { ) .await .unwrap(); + // Set writable. + engine.set_writable(region_id, true).unwrap(); // Put with old schema. let rows = Rows { diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 2c76752bb5e1..12f0bd4e06b7 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -52,7 +52,7 @@ async fn test_manual_flush() { flush_region(&engine, region_id).await; let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).unwrap(); assert_eq!(0, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); @@ -110,7 +110,7 @@ async fn test_flush_engine() { listener.wait().await; let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).unwrap(); assert_eq!(1, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); @@ -175,7 +175,7 @@ async fn test_write_stall() { put_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).unwrap(); assert_eq!(1, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); @@ -211,7 +211,7 @@ async fn test_flush_empty() { flush_region(&engine, region_id).await; let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).unwrap(); assert_eq!(0, scanner.num_memtables()); assert_eq!(0, scanner.num_files()); let stream = scanner.scan().await.unwrap(); @@ -255,7 +255,7 @@ async fn test_flush_reopen_region() { }; check_region(); - reopen_region(&engine, region_id, region_dir).await; + reopen_region(&engine, region_id, region_dir, true).await; check_region(); // Puts again. diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index e76a8c7b1ebd..edacb0e067ef 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -14,23 +14,27 @@ use std::collections::HashMap; +use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use store_api::region_engine::RegionEngine; -use store_api::region_request::{RegionOpenRequest, RegionRequest}; +use store_api::region_request::{RegionOpenRequest, RegionPutRequest, RegionRequest}; use store_api::storage::RegionId; use crate::config::MitoConfig; -use crate::test_util::{reopen_region, CreateRequestBuilder, TestEnv}; +use crate::test_util::{ + build_rows, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv, +}; #[tokio::test] async fn test_engine_open_empty() { let mut env = TestEnv::with_prefix("open-empty"); let engine = env.create_engine(MitoConfig::default()).await; + let region_id = RegionId::new(1, 1); let err = engine .handle_request( - RegionId::new(1, 1), + region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), region_dir: "empty".to_string(), @@ -39,10 +43,9 @@ async fn test_engine_open_empty() { ) .await .unwrap_err(); - assert!( - matches!(err.status_code(), StatusCode::RegionNotFound), - "unexpected err: {err}" - ); + assert_eq!(StatusCode::RegionNotFound, err.status_code()); + let err = engine.set_writable(region_id, true).unwrap_err(); + assert_eq!(StatusCode::RegionNotFound, err.status_code()); } #[tokio::test] @@ -84,6 +87,41 @@ async fn test_engine_reopen_region() { .await .unwrap(); - reopen_region(&engine, region_id, region_dir).await; + reopen_region(&engine, region_id, region_dir, false).await; assert!(engine.is_region_exists(region_id)); } + +#[tokio::test] +async fn test_engine_open_readonly() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + reopen_region(&engine, region_id, region_dir, false).await; + + // Region is readonly. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 2), + }; + let err = engine + .handle_request( + region_id, + RegionRequest::Put(RegionPutRequest { rows: rows.clone() }), + ) + .await + .unwrap_err(); + assert_eq!(StatusCode::RegionReadonly, err.status_code()); + + // Set writable and write. + engine.set_writable(region_id, true).unwrap(); + put_rows(&engine, region_id, rows).await; +} diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/tests.rs index 968034f49a64..7caa75ea51ce 100644 --- a/src/mito2/src/engine/tests.rs +++ b/src/mito2/src/engine/tests.rs @@ -125,8 +125,7 @@ async fn test_region_replay() { assert_eq!(0, rows); let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request).unwrap(); - let stream = scanner.scan().await.unwrap(); + let stream = engine.handle_query(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(42, batches.iter().map(|b| b.num_rows()).sum::()); @@ -216,8 +215,7 @@ async fn test_put_delete() { delete_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request).unwrap(); - let stream = scanner.scan().await.unwrap(); + let stream = engine.handle_query(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ @@ -274,8 +272,7 @@ async fn test_put_overwrite() { put_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request).unwrap(); - let stream = scanner.scan().await.unwrap(); + let stream = engine.handle_query(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 1f744c3abecf..3c776ee55039 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -469,6 +469,12 @@ pub enum Error { source: store_api::metadata::MetadataError, location: Location, }, + + #[snafu(display("Region {} is read only, location: {}", region_id, location))] + RegionReadonly { + region_id: RegionId, + location: Location, + }, } pub type Result = std::result::Result; @@ -539,6 +545,7 @@ impl ErrorExt for Error { CompactRegion { source, .. } => source.status_code(), CompatReader { .. } => StatusCode::Unexpected, InvalidRegionRequest { source, .. } => source.status_code(), + RegionReadonly { .. } => StatusCode::RegionReadonly, } } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 715021590c74..cda4a1d3fca6 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -22,7 +22,7 @@ use common_query::Output; use common_telemetry::{error, info}; use snafu::ResultExt; use store_api::storage::RegionId; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use crate::access_layer::AccessLayerRef; use crate::error::{Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, Result}; @@ -31,14 +31,13 @@ use crate::read::Source; use crate::region::version::{VersionControlData, VersionRef}; use crate::region::MitoRegionRef; use crate::request::{ - BackgroundNotify, FlushFailed, FlushFinished, SenderDdlRequest, SenderWriteRequest, - WorkerRequest, + BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest, + SenderWriteRequest, WorkerRequest, }; use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::{FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::parquet::WriteOptions; -use crate::worker::send_result; /// Global write buffer (memtable) manager. /// @@ -178,7 +177,7 @@ pub(crate) struct RegionFlushTask { /// Reason to flush. pub(crate) reason: FlushReason, /// Flush result senders. - pub(crate) senders: Vec>>, + pub(crate) senders: Vec, /// Request sender to notify the worker. pub(crate) request_sender: mpsc::Sender, @@ -188,18 +187,24 @@ pub(crate) struct RegionFlushTask { } impl RegionFlushTask { + /// Push the sender if it is not none. + pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) { + if let Some(sender) = sender.take_inner() { + self.senders.push(sender); + } + } + /// Consumes the task and notify the sender the job is success. fn on_success(self) { for sender in self.senders { - let _ = sender.send(Ok(Output::AffectedRows(0))); + sender.send(Ok(Output::AffectedRows(0))); } } /// Send flush error to waiter. fn on_failure(&mut self, err: Arc) { for sender in self.senders.drain(..) { - // Ignore send result. - let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu { + sender.send(Err(err.clone()).context(FlushRegionSnafu { region_id: self.region_id, })); } @@ -516,6 +521,15 @@ impl FlushScheduler { } } +impl Drop for FlushScheduler { + fn drop(&mut self) { + for (region_id, flush_status) in self.region_status.drain() { + // We are shutting down so notify all pending tasks. + flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build())); + } + } +} + /// Flush status of a region scheduled by the [FlushScheduler]. /// /// Tracks running and pending flush tasks and all pending requests of a region. @@ -557,20 +571,16 @@ impl FlushStatus { task.on_failure(err.clone()); } for ddl in self.pending_ddls { - send_result( - ddl.sender, - Err(err.clone()).context(FlushRegionSnafu { - region_id: self.region.region_id, - }), - ); + ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu { + region_id: self.region.region_id, + })); } for write_req in self.pending_writes { - send_result( - write_req.sender, - Err(err.clone()).context(FlushRegionSnafu { + write_req + .sender + .send(Err(err.clone()).context(FlushRegionSnafu { region_id: self.region.region_id, - }), - ); + })); } } } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 54afb5463fd4..44f82fd36790 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -18,18 +18,20 @@ pub(crate) mod opener; pub(crate) mod version; use std::collections::HashMap; -use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; use std::sync::{Arc, RwLock}; use common_telemetry::info; use common_time::util::current_time_millis; +use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; use crate::access_layer::AccessLayerRef; -use crate::error::Result; +use crate::error::{RegionNotFoundSnafu, RegionReadonlySnafu, Result}; use crate::manifest::manager::RegionManifestManager; use crate::region::version::{VersionControlRef, VersionRef}; +use crate::request::OnFailure; use crate::sst::file_purger::FilePurgerRef; /// Metadata and runtime status of a region. @@ -55,6 +57,8 @@ pub(crate) struct MitoRegion { pub(crate) file_purger: FilePurgerRef, /// Last flush time in millis. last_flush_millis: AtomicI64, + /// Whether the region is writable. + writable: AtomicBool, } pub(crate) type MitoRegionRef = Arc; @@ -94,6 +98,16 @@ impl MitoRegion { let now = current_time_millis(); self.last_flush_millis.store(now, Ordering::Relaxed); } + + /// Returns whether the region is writable. + pub(crate) fn is_writable(&self) -> bool { + self.writable.load(Ordering::Relaxed) + } + + /// Sets the writable flag. + pub(crate) fn set_writable(&self, writable: bool) { + self.writable.store(writable, Ordering::Relaxed); + } } /// Regions indexed by ids. @@ -115,12 +129,40 @@ impl RegionMap { regions.insert(region.region_id, region); } - /// Get region by region id. + /// Gets region by region id. pub(crate) fn get_region(&self, region_id: RegionId) -> Option { let regions = self.regions.read().unwrap(); regions.get(®ion_id).cloned() } + /// Gets writable region by region id. + /// + /// Returns error if the region does not exist or is readonly. + pub(crate) fn writable_region(&self, region_id: RegionId) -> Result { + let region = self + .get_region(region_id) + .context(RegionNotFoundSnafu { region_id })?; + ensure!(region.is_writable(), RegionReadonlySnafu { region_id }); + Ok(region) + } + + /// Gets writable region by region id. + /// + /// Calls the callback if the region does not exist or is readonly. + pub(crate) fn writable_region_or( + &self, + region_id: RegionId, + cb: &mut F, + ) -> Option { + match self.writable_region(region_id) { + Ok(region) => Some(region), + Err(e) => { + cb.on_failure(e); + None + } + } + } + /// Remove region by id. pub(crate) fn remove_region(&self, region_id: RegionId) { let mut regions = self.regions.write().unwrap(); diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 2d7c6c3cc9e3..91de9cf4761c 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -14,7 +14,7 @@ //! Region opener. -use std::sync::atomic::AtomicI64; +use std::sync::atomic::{AtomicBool, AtomicI64}; use std::sync::Arc; use common_telemetry::info; @@ -35,6 +35,7 @@ use crate::memtable::MemtableBuilderRef; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; use crate::region::MitoRegion; use crate::region_write_ctx::RegionWriteCtx; +use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::LocalFilePurger; use crate::wal::{EntryId, Wal}; @@ -110,10 +111,12 @@ impl RegionOpener { manifest_manager, file_purger: Arc::new(LocalFilePurger::new(self.scheduler, access_layer)), last_flush_millis: AtomicI64::new(current_time_millis()), + // Region is writable after it is created. + writable: AtomicBool::new(true), }) } - /// Opens an existing region. + /// Opens an existing region in read only mode. /// /// Returns error if the region doesn't exist. pub(crate) async fn open( @@ -165,6 +168,8 @@ impl RegionOpener { manifest_manager, file_purger, last_flush_millis: AtomicI64::new(current_time_millis()), + // Region is always opened in read only mode. + writable: AtomicBool::new(false), }; Ok(region) } @@ -192,7 +197,7 @@ async fn replay_memtable( .as_ref() .map(|rows| rows.rows.len()) .unwrap_or(0); - region_write_ctx.push_mutation(mutation.op_type, mutation.rows, None); + region_write_ctx.push_mutation(mutation.op_type, mutation.rows, OptionOutputTx::none()); } } diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 2cb82db30809..b09d7230f10d 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -20,11 +20,11 @@ use common_query::Output; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::storage::{RegionId, SequenceNumber}; -use tokio::sync::oneshot::Sender; use crate::error::{Error, Result, WriteGroupSnafu}; use crate::memtable::KeyValues; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; +use crate::request::OptionOutputTx; use crate::wal::{EntryId, WalWriter}; /// Context to keep region metadata and buffer write requests. @@ -34,14 +34,14 @@ struct WriteNotify { /// Error to send to the waiter. err: Option>, /// Sender to send write result to the waiter for this mutation. - sender: Option>>, + sender: OptionOutputTx, /// Number of rows to be written. num_rows: usize, } impl WriteNotify { /// Creates a new notify from the `sender`. - fn new(sender: Option>>, num_rows: usize) -> WriteNotify { + fn new(sender: OptionOutputTx, num_rows: usize) -> WriteNotify { WriteNotify { err: None, sender, @@ -51,15 +51,14 @@ impl WriteNotify { /// Send result to the waiter. fn notify_result(&mut self) { - let Some(sender) = self.sender.take() else { - return; - }; if let Some(err) = &self.err { // Try to send the error to waiters. - let _ = sender.send(Err(err.clone()).context(WriteGroupSnafu)); + self.sender + .send_mut(Err(err.clone()).context(WriteGroupSnafu)); } else { // Send success result. - let _ = sender.send(Ok(Output::AffectedRows(self.num_rows))); + self.sender + .send_mut(Ok(Output::AffectedRows(self.num_rows))); } } } @@ -117,12 +116,7 @@ impl RegionWriteCtx { } /// Push [SenderWriteRequest] to the context. - pub(crate) fn push_mutation( - &mut self, - op_type: i32, - rows: Option, - tx: Option>>, - ) { + pub(crate) fn push_mutation(&mut self, op_type: i32, rows: Option, tx: OptionOutputTx) { let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0); self.wal_entry.mutations.push(Mutation { op_type, diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index c4e2d531434a..e0fc4ad2e340 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -391,11 +391,86 @@ pub(crate) fn validate_proto_value( Ok(()) } +/// Oneshot output result sender. +#[derive(Debug)] +pub(crate) struct OutputTx(Sender>); + +impl OutputTx { + /// Creates a new output sender. + pub(crate) fn new(sender: Sender>) -> OutputTx { + OutputTx(sender) + } + + /// Sends the `result`. + pub(crate) fn send(self, result: Result) { + // Ignores send result. + let _ = self.0.send(result); + } +} + +/// Optional output result sender. +#[derive(Debug)] +pub(crate) struct OptionOutputTx(Option); + +impl OptionOutputTx { + /// Creates a sender. + pub(crate) fn new(sender: Option) -> OptionOutputTx { + OptionOutputTx(sender) + } + + /// Creates an empty sender. + pub(crate) fn none() -> OptionOutputTx { + OptionOutputTx(None) + } + + /// Sends the `result` and consumes the inner sender. + pub(crate) fn send_mut(&mut self, result: Result) { + if let Some(sender) = self.0.take() { + sender.send(result); + } + } + + /// Sends the `result` and consumes the sender. + pub(crate) fn send(mut self, result: Result) { + if let Some(sender) = self.0.take() { + sender.send(result); + } + } + + /// Takes the sender. + pub(crate) fn take(&mut self) -> OptionOutputTx { + OptionOutputTx(self.0.take()) + } + + /// Takes the inner sender. + pub(crate) fn take_inner(&mut self) -> Option { + self.0.take() + } +} + +impl From>> for OptionOutputTx { + fn from(sender: Sender>) -> Self { + Self::new(Some(OutputTx::new(sender))) + } +} + +impl OnFailure for OptionOutputTx { + fn on_failure(&mut self, err: Error) { + self.send_mut(Err(err)); + } +} + +/// Callback on failure. +pub(crate) trait OnFailure { + /// Handles `err` on failure. + fn on_failure(&mut self, err: Error); +} + /// Sender and write request. #[derive(Debug)] pub(crate) struct SenderWriteRequest { /// Result sender. - pub(crate) sender: Option>>, + pub(crate) sender: OptionOutputTx, pub(crate) request: WriteRequest, } @@ -431,50 +506,50 @@ impl WorkerRequest { RegionRequest::Put(v) => { let write_request = WriteRequest::new(region_id, OpType::Put, v.rows)?; WorkerRequest::Write(SenderWriteRequest { - sender: Some(sender), + sender: sender.into(), request: write_request, }) } RegionRequest::Delete(v) => { let write_request = WriteRequest::new(region_id, OpType::Delete, v.rows)?; WorkerRequest::Write(SenderWriteRequest { - sender: Some(sender), + sender: sender.into(), request: write_request, }) } RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, - sender: Some(sender), + sender: sender.into(), request: DdlRequest::Create(v), }), RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, - sender: Some(sender), + sender: sender.into(), request: DdlRequest::Drop(v), }), RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, - sender: Some(sender), + sender: sender.into(), request: DdlRequest::Open(v), }), RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, - sender: Some(sender), + sender: sender.into(), request: DdlRequest::Close(v), }), RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, - sender: Some(sender), + sender: sender.into(), request: DdlRequest::Alter(v), }), RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, - sender: Some(sender), + sender: sender.into(), request: DdlRequest::Flush(v), }), RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, - sender: Some(sender), + sender: sender.into(), request: DdlRequest::Compact(v), }), }; @@ -501,7 +576,7 @@ pub(crate) struct SenderDdlRequest { /// Region id of the request. pub(crate) region_id: RegionId, /// Result sender. - pub(crate) sender: Option>>, + pub(crate) sender: OptionOutputTx, /// Ddl request. pub(crate) request: DdlRequest, } @@ -533,35 +608,35 @@ pub(crate) struct FlushFinished { /// Id of memtables to remove. pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>, /// Flush result senders. - pub(crate) senders: Vec>>, + pub(crate) senders: Vec, /// File purger for cleaning files on failure. pub(crate) file_purger: FilePurgerRef, } impl FlushFinished { - pub(crate) fn on_failure(self, err: Error) { - let err = Arc::new(err); + pub(crate) fn on_success(self) { for sender in self.senders { - // Ignore send result. - let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu { + sender.send(Ok(Output::AffectedRows(0))); + } + } +} + +impl OnFailure for FlushFinished { + fn on_failure(&mut self, err: Error) { + let err = Arc::new(err); + for sender in self.senders.drain(..) { + sender.send(Err(err.clone()).context(FlushRegionSnafu { region_id: self.region_id, })); } // Clean flushed files. - for file in self.file_metas { + for file in &self.file_metas { self.file_purger.send_request(PurgeRequest { region_id: file.region_id, file_id: file.file_id, }); } } - - pub(crate) fn on_success(self) { - for sender in self.senders { - // Ignore send result. - let _ = sender.send(Ok(Output::AffectedRows(0))); - } - } } /// Notifies a flush job is failed. @@ -581,22 +656,23 @@ pub(crate) struct CompactionFinished { /// Compacted files that are to be removed from region version. pub(crate) compacted_files: Vec, /// Compaction result sender. - pub(crate) sender: Option>>, + pub(crate) sender: OptionOutputTx, /// File purger for cleaning files on failure. pub(crate) file_purger: FilePurgerRef, } impl CompactionFinished { pub fn on_success(self) { - if let Some(sender) = self.sender { - let _ = sender.send(Ok(AffectedRows(0))); - } + self.sender.send(Ok(AffectedRows(0))); info!("Successfully compacted region: {}", self.region_id); } +} +impl OnFailure for CompactionFinished { /// Compaction succeeded but failed to update manifest or region's already been dropped, /// clean compaction output files. - pub fn on_failure(self, _err: Error) { + fn on_failure(&mut self, err: Error) { + self.sender.send_mut(Err(err)); for file in &self.compacted_files { let file_id = file.file_id; warn!( diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 6e55d7f22860..71adc7179ce8 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -569,7 +569,12 @@ pub async fn flush_region(engine: &MitoEngine, region_id: RegionId) { } /// Reopen a region. -pub async fn reopen_region(engine: &MitoEngine, region_id: RegionId, region_dir: String) { +pub async fn reopen_region( + engine: &MitoEngine, + region_id: RegionId, + region_dir: String, + writable: bool, +) { // Close the region. engine .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) @@ -588,4 +593,8 @@ pub async fn reopen_region(engine: &MitoEngine, region_id: RegionId, region_dir: ) .await .unwrap(); + + if writable { + engine.set_writable(region_id, true).unwrap(); + } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index c8ed7dcfbb94..3d824815ce4d 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -28,7 +28,6 @@ use std::hash::{Hash, Hasher}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use common_query::Output; use common_runtime::JoinHandle; use common_telemetry::{error, info, warn}; use futures::future::try_join_all; @@ -37,7 +36,7 @@ use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{mpsc, Mutex}; use crate::compaction::CompactionScheduler; use crate::config::MitoConfig; @@ -177,14 +176,6 @@ impl WorkerGroup { } } -/// Send result to the sender. -pub(crate) fn send_result(sender: Option>>, res: Result) { - if let Some(sender) = sender { - // Ignore send result. - let _ = sender.send(res); - } -} - // Tests methods. #[cfg(test)] impl WorkerGroup { @@ -514,10 +505,7 @@ impl RegionWorkerLoop { } }; - if let Some(sender) = ddl.sender { - // Ignore send result. - let _ = sender.send(res); - } + ddl.sender.send(res); } } diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 49c93a970504..38813d324c5d 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -22,26 +22,24 @@ use snafu::ResultExt; use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::region_request::RegionAlterRequest; use store_api::storage::RegionId; -use tokio::sync::oneshot; -use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, RegionNotFoundSnafu, Result}; +use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result}; use crate::flush::FlushReason; use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList}; use crate::memtable::MemtableBuilderRef; use crate::region::version::Version; use crate::region::MitoRegionRef; -use crate::request::{DdlRequest, SenderDdlRequest}; -use crate::worker::{send_result, RegionWorkerLoop}; +use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest}; +use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { pub(crate) async fn handle_alter_request( &mut self, region_id: RegionId, request: RegionAlterRequest, - sender: Option>>, + mut sender: OptionOutputTx, ) { - let Some(region) = self.regions.get_region(region_id) else { - send_result(sender, RegionNotFoundSnafu { region_id }.fail()); + let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { return; }; @@ -59,7 +57,7 @@ impl RegionWorkerLoop { let task = self.new_flush_task(®ion, FlushReason::Alter); if let Err(e) = self.flush_scheduler.schedule_flush(®ion, task) { // Unable to flush the region, send error to waiter. - send_result(sender, Err(e)); + sender.send(Err(e)); return; } @@ -79,7 +77,7 @@ impl RegionWorkerLoop { alter_region_schema(®ion, &version, request, &self.memtable_builder).await { error!(e; "Failed to alter region schema, region_id: {}", region_id); - send_result(sender, Err(e)); + sender.send(Err(e)); return; } @@ -91,7 +89,7 @@ impl RegionWorkerLoop { ); // Notifies waiters. - send_result(sender, Ok(Output::AffectedRows(0))); + sender.send(Ok(Output::AffectedRows(0))); } } diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 9cecccb41425..101e9011ebad 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -12,28 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_query::Output; use common_telemetry::{error, info}; use store_api::logstore::LogStore; use store_api::storage::RegionId; -use tokio::sync::oneshot; use crate::compaction::CompactionRequest; -use crate::error::{RegionNotFoundSnafu, Result}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; -use crate::request::{CompactionFailed, CompactionFinished}; -use crate::worker::{send_result, RegionWorkerLoop}; +use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; +use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { /// Handles compaction request submitted to region worker. pub(crate) fn handle_compaction_request( &mut self, region_id: RegionId, - sender: Option>>, + mut sender: OptionOutputTx, ) { - let Some(region) = self.regions.get_region(region_id) else { - send_result(sender, RegionNotFoundSnafu { region_id }.fail()); + let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { return; }; @@ -54,8 +50,7 @@ impl RegionWorkerLoop { region_id: RegionId, mut request: CompactionFinished, ) { - let Some(region) = self.regions.get_region(region_id) else { - request.on_failure(RegionNotFoundSnafu { region_id }.build()); + let Some(region) = self.regions.writable_region_or(region_id, &mut request) else { return; }; @@ -90,7 +85,7 @@ impl RegionWorkerLoop { fn new_compaction_request( &self, region: &MitoRegionRef, - waiter: Option>>, + waiter: OptionOutputTx, ) -> CompactionRequest { let current_version = region.version_control.current().version; let access_layer = region.access_layer.clone(); diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 50f05bfd7216..cfb4aca89f41 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -26,7 +26,7 @@ use snafu::ResultExt; use store_api::storage::RegionId; use tokio::time::sleep; -use crate::error::{self, OpenDalSnafu, RegionNotFoundSnafu, Result}; +use crate::error::{OpenDalSnafu, Result}; use crate::region::RegionMapRef; use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; @@ -35,9 +35,7 @@ const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288) impl RegionWorkerLoop { pub(crate) async fn handle_drop_request(&mut self, region_id: RegionId) -> Result { - let Some(region) = self.regions.get_region(region_id) else { - return RegionNotFoundSnafu { region_id }.fail(); - }; + let region = self.regions.writable_region(region_id)?; info!("Try to drop region: {}", region_id); @@ -119,7 +117,7 @@ pub(crate) async fn remove_region_dir_once( .lister_with(region_path) .await .context(OpenDalSnafu)?; - while let Some(file) = files.try_next().await.context(error::OpenDalSnafu)? { + while let Some(file) = files.try_next().await.context(OpenDalSnafu)? { if file.path().ends_with(".parquet") { has_parquet_file = true; break; diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 2375d8823f20..3c6b1deabd27 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -14,19 +14,17 @@ //! Handling flush related requests. -use common_query::Output; use common_telemetry::{error, info}; use common_time::util::current_time_millis; use store_api::logstore::LogStore; use store_api::storage::RegionId; -use tokio::sync::oneshot; -use crate::error::{RegionNotFoundSnafu, Result}; +use crate::error::Result; use crate::flush::{FlushReason, RegionFlushTask}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; -use crate::request::{FlushFailed, FlushFinished}; -use crate::worker::{send_result, RegionWorkerLoop}; +use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx}; +use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { /// On region flush job finished. @@ -35,9 +33,7 @@ impl RegionWorkerLoop { region_id: RegionId, mut request: FlushFinished, ) { - let Some(region) = self.regions.get_region(region_id) else { - // We may dropped or closed the region. - request.on_failure(RegionNotFoundSnafu { region_id }.build()); + let Some(region) = self.regions.writable_region_or(region_id, &mut request) else { return; }; @@ -102,17 +98,14 @@ impl RegionWorkerLoop { pub(crate) async fn handle_flush_request( &mut self, region_id: RegionId, - sender: Option>>, + mut sender: OptionOutputTx, ) { - let Some(region) = self.regions.get_region(region_id) else { - send_result(sender, RegionNotFoundSnafu { region_id }.fail()); + let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { return; }; let mut task = self.new_flush_task(®ion, FlushReason::Manual); - if let Some(sender) = sender { - task.senders.push(sender); - } + task.push_sender(sender); if let Err(e) = self.flush_scheduler.schedule_flush(®ion, task) { error!(e; "Failed to schedule flush task for region {}", region.region_id); } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index bba18d3d22f7..a450f8ddd7ab 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -21,10 +21,10 @@ use store_api::logstore::LogStore; use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; -use crate::error::{RegionNotFoundSnafu, RejectWriteSnafu, Result}; +use crate::error::{RejectWriteSnafu, Result}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::{SenderWriteRequest, WriteRequest}; -use crate::worker::{send_result, RegionWorkerLoop}; +use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { /// Takes and handles all write requests. @@ -102,10 +102,11 @@ impl RegionWorkerLoop { // Checks whether the region exists and is it stalling. if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) { - let Some(region) = self.regions.get_region(region_id) else { - // No such region. - send_result(sender_req.sender, RegionNotFoundSnafu { region_id }.fail()); - + let Some(region) = self + .regions + .writable_region_or(region_id, &mut sender_req.sender) + else { + // No such region or the region is read only. continue; }; @@ -121,7 +122,7 @@ impl RegionWorkerLoop { if let Err(e) = maybe_fill_missing_columns(&mut sender_req.request, ®ion_ctx.version().metadata) { - send_result(sender_req.sender, Err(e)); + sender_req.sender.send(Err(e)); continue; } @@ -148,14 +149,12 @@ impl RegionWorkerLoop { /// Send rejected error to all `write_requests`. fn reject_write_requests(write_requests: Vec) { for req in write_requests { - if let Some(sender) = req.sender { - let _ = sender.send( - RejectWriteSnafu { - region_id: req.request.region_id, - } - .fail(), - ); - } + req.sender.send( + RejectWriteSnafu { + region_id: req.request.region_id, + } + .fail(), + ); } } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index f376d3a344fc..bec041d2108e 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -479,7 +479,9 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code { | StatusCode::UserPasswordMismatch | StatusCode::AuthHeaderNotFound | StatusCode::InvalidAuthHeader => Code::Unauthenticated, - StatusCode::AccessDenied | StatusCode::PermissionDenied => Code::PermissionDenied, + StatusCode::AccessDenied | StatusCode::PermissionDenied | StatusCode::RegionReadonly => { + Code::PermissionDenied + } } } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index c23282a13024..504f9e5faa43 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -30,7 +30,7 @@ pub trait RegionEngine: Send + Sync { /// Name of this engine fn name(&self) -> &str; - /// Handle request to the region. + /// Handles request to the region. /// /// Only query is not included, which is handled in `handle_query` async fn handle_request( @@ -39,18 +39,25 @@ pub trait RegionEngine: Send + Sync { request: RegionRequest, ) -> Result; - /// Handle substrait query and return a stream of record batches + /// Handles substrait query and return a stream of record batches async fn handle_query( &self, region_id: RegionId, request: ScanRequest, ) -> Result; - /// Retrieve region's metadata. + /// Retrieves region's metadata. async fn get_metadata(&self, region_id: RegionId) -> Result; - /// Stop the engine + /// Stops the engine async fn stop(&self) -> Result<(), BoxedError>; + + /// Sets writable mode for a region. + /// + /// The engine checks whether the region is writable before writing to the region. Setting + /// the region as readonly doesn't guarantee that write operations in progress will not + /// take effect. + fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError>; } pub type RegionEngineRef = Arc;