diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 9aa2d34ce4c7..6b2be8505da0 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -210,6 +210,12 @@ pub enum Error { location: Location, }, + #[snafu(display("RegionId {} not ready", region_id))] + RegionNotReady { + region_id: RegionId, + location: Location, + }, + #[snafu(display("Region engine {} is not registered", name))] RegionEngineNotFound { name: String, location: Location }, @@ -295,6 +301,7 @@ impl ErrorExt for Error { | GetRegionMetadata { .. } => StatusCode::Internal, RegionNotFound { .. } => StatusCode::RegionNotFound, + RegionNotReady { .. } => StatusCode::RegionNotReady, StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(), diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 6b820b85356d..5854326fd956 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::collections::HashMap; +use std::ops::Deref; use std::sync::{Arc, Mutex, RwLock}; use api::v1::region::{region_request, QueryRequest, RegionResponse}; @@ -57,7 +58,7 @@ use table::table::scan::StreamScanAdapter; use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ - BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, + self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu, }; @@ -249,9 +250,50 @@ impl FlightCraft for RegionServer { } } +#[derive(Clone)] +enum RegionEngineWithStatus { + // A opening, or creating region. + Registering(RegionEngineRef), + // A closing, or dropping region. + Deregistering(RegionEngineRef), + // A ready region. + Ready(RegionEngineRef), +} + +impl RegionEngineWithStatus { + /// Returns [RegionEngineRef]. + pub fn into_engine(self) -> RegionEngineRef { + match self { + RegionEngineWithStatus::Registering(engine) => engine, + RegionEngineWithStatus::Deregistering(engine) => engine, + RegionEngineWithStatus::Ready(engine) => engine, + } + } + + pub fn is_registering(&self) -> bool { + matches!(self, Self::Registering(_)) + } + + pub fn is_deregistering(&self) -> bool { + matches!(self, Self::Deregistering(_)) + } +} + +impl Deref for RegionEngineWithStatus { + type Target = RegionEngineRef; + + fn deref(&self) -> &Self::Target { + match self { + RegionEngineWithStatus::Registering(engine) => engine, + RegionEngineWithStatus::Deregistering(engine) => engine, + RegionEngineWithStatus::Ready(engine) => engine, + } + } +} + struct RegionServerInner { engines: RwLock>, - region_map: DashMap, + region_map: DashMap, query_engine: QueryEngineRef, runtime: Arc, event_listener: RegionServerEventListenerRef, @@ -306,48 +348,109 @@ impl RegionServerInner { | RegionRequest::Truncate(_) => RegionChange::None, }; - let engine = match ®ion_change { - RegionChange::Register(engine_type) => self - .engines - .read() - .unwrap() - .get(engine_type) - .with_context(|| RegionEngineNotFoundSnafu { name: engine_type })? - .clone(), - RegionChange::None | RegionChange::Deregisters => self - .region_map - .get(®ion_id) - .with_context(|| RegionNotFoundSnafu { region_id })? - .clone(), + let region_status = self.region_map.get(®ion_id); + + let engine = match region_change { + RegionChange::Register(ref engine_type) => match region_status { + Some(status) => { + if status.is_registering() { + return Ok(0); + } else { + status.clone().into_engine() + } + } + _ => self + .engines + .read() + .unwrap() + .get(engine_type) + .with_context(|| RegionEngineNotFoundSnafu { name: engine_type })? + .clone(), + }, + RegionChange::Deregisters => match region_status { + Some(status) => { + if status.is_deregistering() { + return Ok(0); + } else { + status.clone().into_engine() + } + } + None => return Ok(0), + }, + RegionChange::None => match region_status { + Some(status) => match status.clone() { + RegionEngineWithStatus::Registering(_) => { + return error::RegionNotReadySnafu { region_id }.fail() + } + RegionEngineWithStatus::Deregistering(_) => { + return error::RegionNotFoundSnafu { region_id }.fail() + } + RegionEngineWithStatus::Ready(engine) => engine, + }, + None => return error::RegionNotFoundSnafu { region_id }.fail(), + }, }; + let engine_type = engine.name(); - let result = engine + // Sets the RegionStatus before the operation. + match region_change { + RegionChange::Register(_) => { + self.region_map.insert( + region_id, + RegionEngineWithStatus::Registering(engine.clone()), + ); + } + RegionChange::Deregisters => { + self.region_map.insert( + region_id, + RegionEngineWithStatus::Deregistering(engine.clone()), + ); + } + _ => {} + } + + match engine .handle_request(region_id, request) .trace(info_span!( "RegionEngine::handle_region_request", engine_type )) .await - .with_context(|_| HandleRegionRequestSnafu { region_id })?; - - match region_change { - RegionChange::None => {} - RegionChange::Register(_) => { - info!("Region {region_id} is registered to engine {engine_type}"); - self.region_map.insert(region_id, engine); - self.event_listener.on_region_registered(region_id); + .with_context(|_| HandleRegionRequestSnafu { region_id }) + { + Ok(result) => { + match region_change { + RegionChange::None => {} + RegionChange::Register(_) => { + info!("Region {region_id} is registered to engine {engine_type}"); + self.region_map + .insert(region_id, RegionEngineWithStatus::Ready(engine)); + self.event_listener.on_region_registered(region_id); + } + RegionChange::Deregisters => { + info!("Region {region_id} is deregistered from engine {engine_type}"); + self.region_map + .remove(®ion_id) + .map(|(id, engine)| engine.set_writable(id, false)); + self.event_listener.on_region_deregistered(region_id); + } + } + Ok(result) } - RegionChange::Deregisters => { - info!("Region {region_id} is deregistered from engine {engine_type}"); - self.region_map - .remove(®ion_id) - .map(|(id, engine)| engine.set_writable(id, false)); - self.event_listener.on_region_deregistered(region_id); + Err(err) => { + // Removes the RegionStatus if the operation fails. + match region_change { + RegionChange::None => {} + RegionChange::Register(_) | RegionChange::Deregisters => { + self.region_map + .remove(®ion_id) + .map(|(id, engine)| engine.set_writable(id, false)); + } + } + Err(err) } } - - Ok(result) } pub async fn handle_read(&self, request: QueryRequest) -> Result { @@ -366,15 +469,19 @@ impl RegionServerInner { .unwrap_or_else(|| QueryContextBuilder::default().build()); // build dummy catalog list - let engine = self + let region_status = self .region_map .get(®ion_id) .with_context(|| RegionNotFoundSnafu { region_id })? .clone(); + if region_status.is_registering() { + return error::RegionNotReadySnafu { region_id }.fail(); + } + let table_provider = self .table_provider_factory - .create(region_id, engine) + .create(region_id, region_status.into_engine()) .await?; let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider)); @@ -619,3 +726,189 @@ pub trait TableProviderFactory: Send + Sync { } pub type TableProviderFactoryRef = Arc; + +#[cfg(test)] +mod tests { + + use common_error::ext::ErrorExt; + use mito2::test_util::CreateRequestBuilder; + use store_api::region_engine::RegionEngine; + use store_api::region_request::{RegionDropRequest, RegionOpenRequest, RegionTruncateRequest}; + use store_api::storage::RegionId; + + use super::*; + use crate::tests::{mock_region_server, MockRegionEngine}; + + #[tokio::test] + async fn test_region_registering() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = MockRegionEngine::new(); + let engine_name = engine.name(); + + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1, 1); + let builder = CreateRequestBuilder::new(); + let create_req = builder.build(); + + // Tries to create/open a registering region. + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Registering(engine.clone()), + ); + + let affected_rows = mock_region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + assert_eq!(affected_rows, 0); + + let status = mock_region_server + .inner + .region_map + .get(®ion_id) + .unwrap() + .clone(); + + assert!(matches!(status, RegionEngineWithStatus::Registering(_))); + + let affected_rows = mock_region_server + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: engine_name.to_string(), + region_dir: String::new(), + options: Default::default(), + }), + ) + .await + .unwrap(); + assert_eq!(affected_rows, 0); + + let status = mock_region_server + .inner + .region_map + .get(®ion_id) + .unwrap() + .clone(); + assert!(matches!(status, RegionEngineWithStatus::Registering(_))); + } + + #[tokio::test] + async fn test_region_deregistering() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = MockRegionEngine::new(); + + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1, 1); + + // Tries to drop/close a registering region. + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Deregistering(engine.clone()), + ); + + let affected_rows = mock_region_server + .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) + .await + .unwrap(); + assert_eq!(affected_rows, 0); + + let status = mock_region_server + .inner + .region_map + .get(®ion_id) + .unwrap() + .clone(); + assert!(matches!(status, RegionEngineWithStatus::Deregistering(_))); + + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Deregistering(engine.clone()), + ); + + let affected_rows = mock_region_server + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + assert_eq!(affected_rows, 0); + + let status = mock_region_server + .inner + .region_map + .get(®ion_id) + .unwrap() + .clone(); + assert!(matches!(status, RegionEngineWithStatus::Deregistering(_))); + } + + #[tokio::test] + async fn test_region_not_ready() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = MockRegionEngine::new(); + + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1, 1); + + // Tries to drop/close a registering region. + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Registering(engine.clone()), + ); + + let err = mock_region_server + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap_err(); + + assert_eq!(err.status_code(), StatusCode::RegionNotReady); + } + + #[tokio::test] + async fn test_region_request_failed() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = + MockRegionEngine::with_mock_fn(Box::new(|_region_id, _request| { + error::UnexpectedSnafu { + violated: "test".to_string(), + } + .fail() + })); + + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1, 1); + let builder = CreateRequestBuilder::new(); + let create_req = builder.build(); + mock_region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap_err(); + + let status = mock_region_server.inner.region_map.get(®ion_id); + assert!(status.is_none()); + + mock_region_server + .inner + .region_map + .insert(region_id, RegionEngineWithStatus::Ready(engine.clone())); + + mock_region_server + .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) + .await + .unwrap_err(); + + let status = mock_region_server.inner.region_map.get(®ion_id); + assert!(status.is_none()); + } +} diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index abf1064de826..37e8ea2ce77a 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -36,6 +36,7 @@ use store_api::storage::{RegionId, ScanRequest}; use table::TableRef; use tokio::sync::mpsc::{Receiver, Sender}; +use crate::error::Error; use crate::event_listener::NoopRegionServerEventListener; use crate::region_server::RegionServer; @@ -87,15 +88,39 @@ pub fn mock_region_server() -> RegionServer { ) } +pub type MockRequestHandler = + Box Result + Send + Sync>; + pub struct MockRegionEngine { sender: Sender<(RegionId, RegionRequest)>, + handle_request_mock_fn: Option, } impl MockRegionEngine { pub fn new() -> (Arc, Receiver<(RegionId, RegionRequest)>) { let (tx, rx) = tokio::sync::mpsc::channel(8); - (Arc::new(Self { sender: tx }), rx) + ( + Arc::new(Self { + sender: tx, + handle_request_mock_fn: None, + }), + rx, + ) + } + + pub fn with_mock_fn( + mock_fn: MockRequestHandler, + ) -> (Arc, Receiver<(RegionId, RegionRequest)>) { + let (tx, rx) = tokio::sync::mpsc::channel(8); + + ( + Arc::new(Self { + sender: tx, + handle_request_mock_fn: Some(mock_fn), + }), + rx, + ) } } @@ -110,6 +135,10 @@ impl RegionEngine for MockRegionEngine { region_id: RegionId, request: RegionRequest, ) -> Result { + if let Some(mock_fn) = &self.handle_request_mock_fn { + return mock_fn(region_id, request).map_err(BoxedError::new); + }; + let _ = self.sender.send((region_id, request)).await; Ok(0) } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 4c21b9832693..8dbb28d4dd16 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -538,7 +538,7 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code { | StatusCode::TableColumnNotFound | StatusCode::DatabaseNotFound | StatusCode::UserNotFound => Code::NotFound, - StatusCode::StorageUnavailable => Code::Unavailable, + StatusCode::StorageUnavailable | StatusCode::RegionNotReady => Code::Unavailable, StatusCode::RuntimeResourcesExhausted | StatusCode::RateLimited => Code::ResourceExhausted, StatusCode::UnsupportedPasswordType | StatusCode::UserPasswordMismatch diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c35070bf5a43..44153d365108 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -757,6 +757,7 @@ tcp_nodelay = true mode = "standalone" node_id = 0 require_lease_before_startup = true +initialize_region_in_background = false rpc_addr = "127.0.0.1:3001" rpc_runtime_size = 8 rpc_max_recv_message_size = "512MiB"