diff --git a/config/datanode.example.toml b/config/datanode.example.toml index bbb125adddd7..591246737d9b 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -12,6 +12,10 @@ rpc_runtime_size = 8 # It will block the datanode start if it can't receive leases in the heartbeat from metasrv. require_lease_before_startup = false +# Initialize all regions in the background during the startup. +# By default, it provides services after all regions have been initialized. +initialize_region_in_background = false + [heartbeat] # Interval for sending heartbeat messages to the Metasrv, 3 seconds by default. interval = "3s" diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 64b9a9e7fb1b..949c8d44736c 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -59,6 +59,10 @@ pub enum StatusCode { RegionNotFound = 4005, RegionAlreadyExists = 4006, RegionReadonly = 4007, + RegionNotReady = 4008, + // If mutually exclusive operations are reached at the same time, + // only one can be executed, another one will get region busy. + RegionBusy = 4009, // ====== End of catalog related status code ======= // ====== Begin of storage related status code ===== @@ -103,7 +107,9 @@ impl StatusCode { match self { StatusCode::StorageUnavailable | StatusCode::RuntimeResourcesExhausted - | StatusCode::Internal => true, + | StatusCode::Internal + | StatusCode::RegionNotReady + | StatusCode::RegionBusy => true, StatusCode::Success | StatusCode::Unknown @@ -152,6 +158,8 @@ impl StatusCode { | StatusCode::TableAlreadyExists | StatusCode::TableNotFound | StatusCode::RegionNotFound + | StatusCode::RegionNotReady + | StatusCode::RegionBusy | StatusCode::RegionAlreadyExists | StatusCode::RegionReadonly | StatusCode::TableColumnNotFound @@ -183,6 +191,8 @@ impl StatusCode { v if v == StatusCode::TableAlreadyExists as u32 => Some(StatusCode::TableAlreadyExists), v if v == StatusCode::TableNotFound as u32 => Some(StatusCode::TableNotFound), v if v == StatusCode::RegionNotFound as u32 => Some(StatusCode::RegionNotFound), + v if v == StatusCode::RegionNotReady as u32 => Some(StatusCode::RegionNotReady), + v if v == StatusCode::RegionBusy as u32 => Some(StatusCode::RegionBusy), v if v == StatusCode::RegionAlreadyExists as u32 => { Some(StatusCode::RegionAlreadyExists) } diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index f26b0828c873..772033c2d35f 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -225,6 +225,7 @@ pub struct DatanodeOptions { pub mode: Mode, pub node_id: Option, pub require_lease_before_startup: bool, + pub initialize_region_in_background: bool, pub rpc_addr: String, pub rpc_hostname: Option, pub rpc_runtime_size: usize, @@ -249,6 +250,7 @@ impl Default for DatanodeOptions { mode: Mode::Standalone, node_id: None, require_lease_before_startup: false, + initialize_region_in_background: false, rpc_addr: "127.0.0.1:3001".to_string(), rpc_hostname: None, rpc_runtime_size: 8, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 767a12c4acc7..604146a5d4a3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -25,7 +25,7 @@ use common_config::wal::{KafkaConfig, RaftEngineConfig}; use common_config::{WalConfig, WAL_OPTIONS_KEY}; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; -use common_meta::key::datanode_table::DatanodeTableManager; +use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue}; use common_meta::kv_backend::KvBackendRef; pub use common_procedure::options::ProcedureConfig; use common_runtime::Runtime; @@ -33,7 +33,7 @@ use common_telemetry::{error, info, warn}; use file_engine::engine::FileRegionEngine; use futures::future; use futures_util::future::try_join_all; -use futures_util::StreamExt; +use futures_util::TryStreamExt; use log_store::kafka::log_store::KafkaLogStore; use log_store::raft_engine::log_store::RaftEngineLogStore; use meta_client::client::MetaClient; @@ -213,6 +213,7 @@ impl DatanodeBuilder { pub async fn build(mut self) -> Result { let mode = &self.opts.mode; + let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; let meta_client = self.meta_client.take(); @@ -233,8 +234,26 @@ impl DatanodeBuilder { let region_server = self.new_region_server(region_event_listener).await?; - self.initialize_region_server(®ion_server, kv_backend, !controlled_by_metasrv) - .await?; + let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); + let table_values = datanode_table_manager + .tables(node_id) + .try_collect::>() + .await + .context(GetMetadataSnafu)?; + + let open_all_regions = + open_all_regions(region_server.clone(), table_values, !controlled_by_metasrv); + + if self.opts.initialize_region_in_background { + // Opens regions in background. + common_runtime::spawn_bg(async move { + if let Err(err) = open_all_regions.await { + error!(err; "Failed to open regions during the startup."); + } + }); + } else { + open_all_regions.await?; + } let heartbeat_task = if let Some(meta_client) = meta_client { Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?) @@ -331,6 +350,7 @@ impl DatanodeBuilder { Ok((server, addr)) } + #[cfg(test)] /// Open all regions belong to this datanode. async fn initialize_region_server( &self, @@ -338,67 +358,16 @@ impl DatanodeBuilder { kv_backend: KvBackendRef, open_with_writable: bool, ) -> Result<()> { - let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; - let mut regions = vec![]; - let mut table_values = datanode_table_manager.tables(node_id); - - while let Some(table_value) = table_values.next().await { - let table_value = table_value.context(GetMetadataSnafu)?; - for region_number in table_value.regions { - // Augments region options with wal options if a wal options is provided. - let mut region_options = table_value.region_info.region_options.clone(); - table_value - .region_info - .region_wal_options - .get(®ion_number.to_string()) - .and_then(|wal_options| { - region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()) - }); - - regions.push(( - RegionId::new(table_value.table_id, region_number), - table_value.region_info.engine.clone(), - table_value.region_info.region_storage_path.clone(), - region_options, - )); - } - } - info!("going to open {} regions", regions.len()); - let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM)); - let mut tasks = vec![]; - - for (region_id, engine, store_path, options) in regions { - let region_dir = region_dir(&store_path, region_id); - let semaphore_moved = semaphore.clone(); - tasks.push(async move { - let _permit = semaphore_moved.acquire().await; - region_server - .handle_request( - region_id, - RegionRequest::Open(RegionOpenRequest { - engine: engine.clone(), - region_dir, - options, - skip_wal_replay: false, - }), - ) - .await?; - if open_with_writable { - if let Err(e) = region_server.set_writable(region_id, true) { - error!( - e; "failed to set writable for region {region_id}" - ); - } - } - Ok(()) - }); - } - let _ = try_join_all(tasks).await?; - info!("region server is initialized"); + let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); + let table_values = datanode_table_manager + .tables(node_id) + .try_collect::>() + .await + .context(GetMetadataSnafu)?; - Ok(()) + open_all_regions(region_server.clone(), table_values, open_with_writable).await } async fn new_region_server( @@ -544,6 +513,72 @@ impl DatanodeBuilder { } } +/// Open all regions belong to this datanode. +async fn open_all_regions( + region_server: RegionServer, + table_values: Vec, + open_with_writable: bool, +) -> Result<()> { + let mut regions = vec![]; + for table_value in table_values { + for region_number in table_value.regions { + // Augments region options with wal options if a wal options is provided. + let mut region_options = table_value.region_info.region_options.clone(); + table_value + .region_info + .region_wal_options + .get(®ion_number.to_string()) + .and_then(|wal_options| { + region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()) + }); + + regions.push(( + RegionId::new(table_value.table_id, region_number), + table_value.region_info.engine.clone(), + table_value.region_info.region_storage_path.clone(), + region_options, + )); + } + } + info!("going to open {} regions", regions.len()); + let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM)); + let mut tasks = vec![]; + + let region_server_ref = ®ion_server; + for (region_id, engine, store_path, options) in regions { + let region_dir = region_dir(&store_path, region_id); + let semaphore_moved = semaphore.clone(); + + tasks.push(async move { + let _permit = semaphore_moved.acquire().await; + region_server_ref + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: engine.clone(), + region_dir, + options, + skip_wal_replay: false, + }), + ) + .await?; + if open_with_writable { + if let Err(e) = region_server_ref.set_writable(region_id, true) { + error!( + e; "failed to set writable for region {region_id}" + ); + } + } + Ok(()) + }); + } + let _ = try_join_all(tasks).await?; + + info!("all regions are opened"); + + Ok(()) +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 9aa2d34ce4c7..07b8fe6f8349 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -210,6 +210,18 @@ pub enum Error { location: Location, }, + #[snafu(display("Region {} not ready", region_id))] + RegionNotReady { + region_id: RegionId, + location: Location, + }, + + #[snafu(display("Region {} is busy", region_id))] + RegionBusy { + region_id: RegionId, + location: Location, + }, + #[snafu(display("Region engine {} is not registered", name))] RegionEngineNotFound { name: String, location: Location }, @@ -295,6 +307,8 @@ impl ErrorExt for Error { | GetRegionMetadata { .. } => StatusCode::Internal, RegionNotFound { .. } => StatusCode::RegionNotFound, + RegionNotReady { .. } => StatusCode::RegionNotReady, + RegionBusy { .. } => StatusCode::RegionBusy, StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(), diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 6b820b85356d..c25775c944fe 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -14,6 +14,8 @@ use std::any::Any; use std::collections::HashMap; +use std::fmt::Debug; +use std::ops::Deref; use std::sync::{Arc, Mutex, RwLock}; use api::v1::region::{region_request, QueryRequest, RegionResponse}; @@ -57,7 +59,7 @@ use table::table::scan::StreamScanAdapter; use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ - BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, + self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu, }; @@ -249,15 +251,72 @@ impl FlightCraft for RegionServer { } } +#[derive(Clone)] +enum RegionEngineWithStatus { + // An opening, or creating region. + Registering(RegionEngineRef), + // A closing, or dropping region. + Deregistering(RegionEngineRef), + // A ready region. + Ready(RegionEngineRef), +} + +impl RegionEngineWithStatus { + /// Returns [RegionEngineRef]. + pub fn into_engine(self) -> RegionEngineRef { + match self { + RegionEngineWithStatus::Registering(engine) => engine, + RegionEngineWithStatus::Deregistering(engine) => engine, + RegionEngineWithStatus::Ready(engine) => engine, + } + } + + pub fn is_registering(&self) -> bool { + matches!(self, Self::Registering(_)) + } +} + +impl Deref for RegionEngineWithStatus { + type Target = RegionEngineRef; + + fn deref(&self) -> &Self::Target { + match self { + RegionEngineWithStatus::Registering(engine) => engine, + RegionEngineWithStatus::Deregistering(engine) => engine, + RegionEngineWithStatus::Ready(engine) => engine, + } + } +} + struct RegionServerInner { engines: RwLock>, - region_map: DashMap, + region_map: DashMap, query_engine: QueryEngineRef, runtime: Arc, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, } +enum CurrentEngine { + Engine(RegionEngineRef), + EarlyReturn(AffectedRows), +} + +impl Debug for CurrentEngine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CurrentEngine::Engine(engine) => f + .debug_struct("CurrentEngine") + .field("engine", &engine.name()) + .finish(), + CurrentEngine::EarlyReturn(rows) => f + .debug_struct("CurrentEngine") + .field("return", rows) + .finish(), + } + } +} + impl RegionServerInner { pub fn new( query_engine: QueryEngineRef, @@ -284,6 +343,61 @@ impl RegionServerInner { .insert(engine_name.to_string(), engine); } + fn get_engine( + &self, + region_id: RegionId, + region_change: &RegionChange, + ) -> Result { + let current_region_status = self.region_map.get(®ion_id); + + let engine = match region_change { + RegionChange::Register(ref engine_type) => match current_region_status { + Some(status) => match status.clone() { + RegionEngineWithStatus::Registering(_) => { + return Ok(CurrentEngine::EarlyReturn(0)) + } + RegionEngineWithStatus::Deregistering(_) => { + return error::RegionBusySnafu { region_id }.fail() + } + RegionEngineWithStatus::Ready(_) => status.clone().into_engine(), + }, + _ => self + .engines + .read() + .unwrap() + .get(engine_type) + .with_context(|| RegionEngineNotFoundSnafu { name: engine_type })? + .clone(), + }, + RegionChange::Deregisters => match current_region_status { + Some(status) => match status.clone() { + RegionEngineWithStatus::Registering(_) => { + return error::RegionBusySnafu { region_id }.fail() + } + RegionEngineWithStatus::Deregistering(_) => { + return Ok(CurrentEngine::EarlyReturn(0)) + } + RegionEngineWithStatus::Ready(_) => status.clone().into_engine(), + }, + None => return Ok(CurrentEngine::EarlyReturn(0)), + }, + RegionChange::None => match current_region_status { + Some(status) => match status.clone() { + RegionEngineWithStatus::Registering(_) => { + return error::RegionNotReadySnafu { region_id }.fail() + } + RegionEngineWithStatus::Deregistering(_) => { + return error::RegionNotFoundSnafu { region_id }.fail() + } + RegionEngineWithStatus::Ready(engine) => engine, + }, + None => return error::RegionNotFoundSnafu { region_id }.fail(), + }, + }; + + Ok(CurrentEngine::Engine(engine)) + } + pub async fn handle_request( &self, region_id: RegionId, @@ -306,36 +420,85 @@ impl RegionServerInner { | RegionRequest::Truncate(_) => RegionChange::None, }; - let engine = match ®ion_change { - RegionChange::Register(engine_type) => self - .engines - .read() - .unwrap() - .get(engine_type) - .with_context(|| RegionEngineNotFoundSnafu { name: engine_type })? - .clone(), - RegionChange::None | RegionChange::Deregisters => self - .region_map - .get(®ion_id) - .with_context(|| RegionNotFoundSnafu { region_id })? - .clone(), + let engine = match self.get_engine(region_id, ®ion_change)? { + CurrentEngine::Engine(engine) => engine, + CurrentEngine::EarlyReturn(rows) => return Ok(rows), }; + let engine_type = engine.name(); - let result = engine + // Sets corresponding region status to registering/deregistering before the operation. + self.set_region_status_not_ready(region_id, &engine, ®ion_change); + + match engine .handle_request(region_id, request) .trace(info_span!( "RegionEngine::handle_region_request", engine_type )) .await - .with_context(|_| HandleRegionRequestSnafu { region_id })?; + .with_context(|_| HandleRegionRequestSnafu { region_id }) + { + Ok(result) => { + // Sets corresponding region status to ready. + self.set_region_status_ready(region_id, engine, region_change); + Ok(result) + } + Err(err) => { + // Removes the region status if the operation fails. + self.unset_region_status(region_id, region_change); + Err(err) + } + } + } + + fn set_region_status_not_ready( + &self, + region_id: RegionId, + engine: &RegionEngineRef, + region_change: &RegionChange, + ) { + match region_change { + RegionChange::Register(_) => { + self.region_map.insert( + region_id, + RegionEngineWithStatus::Registering(engine.clone()), + ); + } + RegionChange::Deregisters => { + self.region_map.insert( + region_id, + RegionEngineWithStatus::Deregistering(engine.clone()), + ); + } + _ => {} + } + } + + fn unset_region_status(&self, region_id: RegionId, region_change: RegionChange) { + match region_change { + RegionChange::None => {} + RegionChange::Register(_) | RegionChange::Deregisters => { + self.region_map + .remove(®ion_id) + .map(|(id, engine)| engine.set_writable(id, false)); + } + } + } + fn set_region_status_ready( + &self, + region_id: RegionId, + engine: RegionEngineRef, + region_change: RegionChange, + ) { + let engine_type = engine.name(); match region_change { RegionChange::None => {} RegionChange::Register(_) => { info!("Region {region_id} is registered to engine {engine_type}"); - self.region_map.insert(region_id, engine); + self.region_map + .insert(region_id, RegionEngineWithStatus::Ready(engine)); self.event_listener.on_region_registered(region_id); } RegionChange::Deregisters => { @@ -346,8 +509,6 @@ impl RegionServerInner { self.event_listener.on_region_deregistered(region_id); } } - - Ok(result) } pub async fn handle_read(&self, request: QueryRequest) -> Result { @@ -366,15 +527,19 @@ impl RegionServerInner { .unwrap_or_else(|| QueryContextBuilder::default().build()); // build dummy catalog list - let engine = self + let region_status = self .region_map .get(®ion_id) .with_context(|| RegionNotFoundSnafu { region_id })? .clone(); + if region_status.is_registering() { + return error::RegionNotReadySnafu { region_id }.fail(); + } + let table_provider = self .table_provider_factory - .create(region_id, engine) + .create(region_id, region_status.into_engine()) .await?; let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider)); @@ -619,3 +784,349 @@ pub trait TableProviderFactory: Send + Sync { } pub type TableProviderFactoryRef = Arc; + +#[cfg(test)] +mod tests { + + use std::assert_matches::assert_matches; + + use common_error::ext::ErrorExt; + use mito2::test_util::CreateRequestBuilder; + use store_api::region_engine::RegionEngine; + use store_api::region_request::{RegionDropRequest, RegionOpenRequest, RegionTruncateRequest}; + use store_api::storage::RegionId; + + use super::*; + use crate::error::Result; + use crate::tests::{mock_region_server, MockRegionEngine}; + + #[tokio::test] + async fn test_region_registering() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = MockRegionEngine::new(); + let engine_name = engine.name(); + + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1, 1); + let builder = CreateRequestBuilder::new(); + let create_req = builder.build(); + + // Tries to create/open a registering region. + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Registering(engine.clone()), + ); + + let affected_rows = mock_region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + assert_eq!(affected_rows, 0); + + let status = mock_region_server + .inner + .region_map + .get(®ion_id) + .unwrap() + .clone(); + + assert!(matches!(status, RegionEngineWithStatus::Registering(_))); + + let affected_rows = mock_region_server + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: engine_name.to_string(), + region_dir: String::new(), + options: Default::default(), + skip_wal_replay: false, + }), + ) + .await + .unwrap(); + assert_eq!(affected_rows, 0); + + let status = mock_region_server + .inner + .region_map + .get(®ion_id) + .unwrap() + .clone(); + assert!(matches!(status, RegionEngineWithStatus::Registering(_))); + } + + #[tokio::test] + async fn test_region_deregistering() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = MockRegionEngine::new(); + + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1, 1); + + // Tries to drop/close a registering region. + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Deregistering(engine.clone()), + ); + + let affected_rows = mock_region_server + .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) + .await + .unwrap(); + assert_eq!(affected_rows, 0); + + let status = mock_region_server + .inner + .region_map + .get(®ion_id) + .unwrap() + .clone(); + assert!(matches!(status, RegionEngineWithStatus::Deregistering(_))); + + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Deregistering(engine.clone()), + ); + + let affected_rows = mock_region_server + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + assert_eq!(affected_rows, 0); + + let status = mock_region_server + .inner + .region_map + .get(®ion_id) + .unwrap() + .clone(); + assert!(matches!(status, RegionEngineWithStatus::Deregistering(_))); + } + + #[tokio::test] + async fn test_region_not_ready() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = MockRegionEngine::new(); + + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1, 1); + + // Tries to drop/close a registering region. + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Registering(engine.clone()), + ); + + let err = mock_region_server + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap_err(); + + assert_eq!(err.status_code(), StatusCode::RegionNotReady); + } + + #[tokio::test] + async fn test_region_request_failed() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = + MockRegionEngine::with_mock_fn(Box::new(|_region_id, _request| { + error::UnexpectedSnafu { + violated: "test".to_string(), + } + .fail() + })); + + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1, 1); + let builder = CreateRequestBuilder::new(); + let create_req = builder.build(); + mock_region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap_err(); + + let status = mock_region_server.inner.region_map.get(®ion_id); + assert!(status.is_none()); + + mock_region_server + .inner + .region_map + .insert(region_id, RegionEngineWithStatus::Ready(engine.clone())); + + mock_region_server + .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) + .await + .unwrap_err(); + + let status = mock_region_server.inner.region_map.get(®ion_id); + assert!(status.is_none()); + } + + struct CurrentEngineTest { + region_id: RegionId, + current_region_status: Option, + region_change: RegionChange, + assert: Box)>, + } + + #[tokio::test] + async fn test_current_engine() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _) = MockRegionEngine::new(); + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1024, 1); + let tests = vec![ + // RegionChange::None + CurrentEngineTest { + region_id, + current_region_status: None, + region_change: RegionChange::None, + assert: Box::new(|result| { + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionNotFound); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())), + region_change: RegionChange::None, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())), + region_change: RegionChange::None, + assert: Box::new(|result| { + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionNotReady); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())), + region_change: RegionChange::None, + assert: Box::new(|result| { + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionNotFound); + }), + }, + // RegionChange::Register + CurrentEngineTest { + region_id, + current_region_status: None, + region_change: RegionChange::Register(engine.name().to_string()), + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())), + region_change: RegionChange::Register(engine.name().to_string()), + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::EarlyReturn(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())), + region_change: RegionChange::Register(engine.name().to_string()), + assert: Box::new(|result| { + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionBusy); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())), + region_change: RegionChange::Register(engine.name().to_string()), + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + // RegionChange::Deregister + CurrentEngineTest { + region_id, + current_region_status: None, + region_change: RegionChange::Deregisters, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::EarlyReturn(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())), + region_change: RegionChange::Deregisters, + assert: Box::new(|result| { + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionBusy); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())), + region_change: RegionChange::Deregisters, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::EarlyReturn(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())), + region_change: RegionChange::Deregisters, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + ]; + + for test in tests { + let CurrentEngineTest { + region_id, + current_region_status, + region_change, + assert, + } = test; + + // Sets up + if let Some(status) = current_region_status { + mock_region_server + .inner + .region_map + .insert(region_id, status); + } else { + mock_region_server.inner.region_map.remove(®ion_id); + } + + let result = mock_region_server + .inner + .get_engine(region_id, ®ion_change); + + assert(result); + } + } +} diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index abf1064de826..37e8ea2ce77a 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -36,6 +36,7 @@ use store_api::storage::{RegionId, ScanRequest}; use table::TableRef; use tokio::sync::mpsc::{Receiver, Sender}; +use crate::error::Error; use crate::event_listener::NoopRegionServerEventListener; use crate::region_server::RegionServer; @@ -87,15 +88,39 @@ pub fn mock_region_server() -> RegionServer { ) } +pub type MockRequestHandler = + Box Result + Send + Sync>; + pub struct MockRegionEngine { sender: Sender<(RegionId, RegionRequest)>, + handle_request_mock_fn: Option, } impl MockRegionEngine { pub fn new() -> (Arc, Receiver<(RegionId, RegionRequest)>) { let (tx, rx) = tokio::sync::mpsc::channel(8); - (Arc::new(Self { sender: tx }), rx) + ( + Arc::new(Self { + sender: tx, + handle_request_mock_fn: None, + }), + rx, + ) + } + + pub fn with_mock_fn( + mock_fn: MockRequestHandler, + ) -> (Arc, Receiver<(RegionId, RegionRequest)>) { + let (tx, rx) = tokio::sync::mpsc::channel(8); + + ( + Arc::new(Self { + sender: tx, + handle_request_mock_fn: Some(mock_fn), + }), + rx, + ) } } @@ -110,6 +135,10 @@ impl RegionEngine for MockRegionEngine { region_id: RegionId, request: RegionRequest, ) -> Result { + if let Some(mock_fn) = &self.handle_request_mock_fn { + return mock_fn(region_id, request).map_err(BoxedError::new); + }; + let _ = self.sender.send((region_id, request)).await; Ok(0) } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 4c21b9832693..72819a90de9b 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -538,8 +538,10 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code { | StatusCode::TableColumnNotFound | StatusCode::DatabaseNotFound | StatusCode::UserNotFound => Code::NotFound, - StatusCode::StorageUnavailable => Code::Unavailable, - StatusCode::RuntimeResourcesExhausted | StatusCode::RateLimited => Code::ResourceExhausted, + StatusCode::StorageUnavailable | StatusCode::RegionNotReady => Code::Unavailable, + StatusCode::RuntimeResourcesExhausted + | StatusCode::RateLimited + | StatusCode::RegionBusy => Code::ResourceExhausted, StatusCode::UnsupportedPasswordType | StatusCode::UserPasswordMismatch | StatusCode::AuthHeaderNotFound diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c35070bf5a43..44153d365108 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -757,6 +757,7 @@ tcp_nodelay = true mode = "standalone" node_id = 0 require_lease_before_startup = true +initialize_region_in_background = false rpc_addr = "127.0.0.1:3001" rpc_runtime_size = 8 rpc_max_recv_message_size = "512MiB"