From b6d0a5caeb6af4758892640849aad62b8ebb64f4 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 13 Dec 2023 15:11:05 +0000 Subject: [PATCH 1/6] refactor: open regions in background --- config/datanode.example.toml | 4 + config/standalone.example.toml | 4 + src/cmd/src/standalone.rs | 3 + src/datanode/src/config.rs | 2 + src/datanode/src/datanode.rs | 160 ++++++++++++++++++++------------- 5 files changed, 111 insertions(+), 62 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index bbb125adddd7..591246737d9b 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -12,6 +12,10 @@ rpc_runtime_size = 8 # It will block the datanode start if it can't receive leases in the heartbeat from metasrv. require_lease_before_startup = false +# Initialize all regions in the background during the startup. +# By default, it provides services after all regions have been initialized. +initialize_region_in_background = false + [heartbeat] # Interval for sending heartbeat messages to the Metasrv, 3 seconds by default. interval = "3s" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 305043d95c41..b8c3cf1140a5 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -3,6 +3,10 @@ mode = "standalone" # Whether to enable greptimedb telemetry, true by default. enable_telemetry = true +# Initialize all regions in the background during the startup. +# By default, it provides services after all regions have been initialized. +initialize_region_in_background = false + # HTTP server options. [http] # Server address, "127.0.0.1:4000" by default. diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index fb0e23ebefd9..fc765fa93fe6 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -97,6 +97,7 @@ impl SubCommand { pub struct StandaloneOptions { pub mode: Mode, pub enable_telemetry: bool, + pub initialize_region_in_background: bool, pub http: HttpOptions, pub grpc: GrpcOptions, pub mysql: MysqlOptions, @@ -119,6 +120,7 @@ impl Default for StandaloneOptions { Self { mode: Mode::Standalone, enable_telemetry: true, + initialize_region_in_background: false, http: HttpOptions::default(), grpc: GrpcOptions::default(), mysql: MysqlOptions::default(), @@ -166,6 +168,7 @@ impl StandaloneOptions { storage: self.storage, region_engine: self.region_engine, rpc_addr: self.grpc.addr, + initialize_region_in_background: self.initialize_region_in_background, ..Default::default() } } diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index f26b0828c873..772033c2d35f 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -225,6 +225,7 @@ pub struct DatanodeOptions { pub mode: Mode, pub node_id: Option, pub require_lease_before_startup: bool, + pub initialize_region_in_background: bool, pub rpc_addr: String, pub rpc_hostname: Option, pub rpc_runtime_size: usize, @@ -249,6 +250,7 @@ impl Default for DatanodeOptions { mode: Mode::Standalone, node_id: None, require_lease_before_startup: false, + initialize_region_in_background: false, rpc_addr: "127.0.0.1:3001".to_string(), rpc_hostname: None, rpc_runtime_size: 8, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 767a12c4acc7..481baa614422 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -213,6 +213,7 @@ impl DatanodeBuilder { pub async fn build(mut self) -> Result { let mode = &self.opts.mode; + let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; let meta_client = self.meta_client.take(); @@ -233,8 +234,23 @@ impl DatanodeBuilder { let region_server = self.new_region_server(region_event_listener).await?; - self.initialize_region_server(®ion_server, kv_backend, !controlled_by_metasrv) - .await?; + let open_all_regions = open_all_regions( + region_server.clone(), + kv_backend, + !controlled_by_metasrv, + node_id, + ); + + if self.opts.initialize_region_in_background { + // Opens regions in background. + common_runtime::spawn_bg(async move { + if let Err(err) = open_all_regions.await { + error!(err; "Failed to opening regions during the startup."); + } + }); + } else { + open_all_regions.await?; + } let heartbeat_task = if let Some(meta_client) = meta_client { Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?) @@ -331,6 +347,7 @@ impl DatanodeBuilder { Ok((server, addr)) } + #[cfg(test)] /// Open all regions belong to this datanode. async fn initialize_region_server( &self, @@ -338,67 +355,14 @@ impl DatanodeBuilder { kv_backend: KvBackendRef, open_with_writable: bool, ) -> Result<()> { - let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; - let mut regions = vec![]; - let mut table_values = datanode_table_manager.tables(node_id); - - while let Some(table_value) = table_values.next().await { - let table_value = table_value.context(GetMetadataSnafu)?; - for region_number in table_value.regions { - // Augments region options with wal options if a wal options is provided. - let mut region_options = table_value.region_info.region_options.clone(); - table_value - .region_info - .region_wal_options - .get(®ion_number.to_string()) - .and_then(|wal_options| { - region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()) - }); - - regions.push(( - RegionId::new(table_value.table_id, region_number), - table_value.region_info.engine.clone(), - table_value.region_info.region_storage_path.clone(), - region_options, - )); - } - } - info!("going to open {} regions", regions.len()); - let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM)); - let mut tasks = vec![]; - - for (region_id, engine, store_path, options) in regions { - let region_dir = region_dir(&store_path, region_id); - let semaphore_moved = semaphore.clone(); - tasks.push(async move { - let _permit = semaphore_moved.acquire().await; - region_server - .handle_request( - region_id, - RegionRequest::Open(RegionOpenRequest { - engine: engine.clone(), - region_dir, - options, - skip_wal_replay: false, - }), - ) - .await?; - if open_with_writable { - if let Err(e) = region_server.set_writable(region_id, true) { - error!( - e; "failed to set writable for region {region_id}" - ); - } - } - Ok(()) - }); - } - let _ = try_join_all(tasks).await?; - - info!("region server is initialized"); - - Ok(()) + open_all_regions( + region_server.clone(), + kv_backend, + open_with_writable, + node_id, + ) + .await } async fn new_region_server( @@ -544,6 +508,78 @@ impl DatanodeBuilder { } } +/// Open all regions belong to this datanode. +async fn open_all_regions( + region_server: RegionServer, + kv_backend: KvBackendRef, + open_with_writable: bool, + node_id: u64, +) -> Result<()> { + let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); + let mut regions = vec![]; + let mut table_values = datanode_table_manager.tables(node_id); + + while let Some(table_value) = table_values.next().await { + let table_value = table_value.context(GetMetadataSnafu)?; + + for region_number in table_value.regions { + // Augments region options with wal options if a wal options is provided. + let mut region_options = table_value.region_info.region_options.clone(); + table_value + .region_info + .region_wal_options + .get(®ion_number.to_string()) + .and_then(|wal_options| { + region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()) + }); + + regions.push(( + RegionId::new(table_value.table_id, region_number), + table_value.region_info.engine.clone(), + table_value.region_info.region_storage_path.clone(), + region_options, + )); + } + } + info!("going to open {} regions", regions.len()); + let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM)); + let mut tasks = vec![]; + + let region_server_ref = ®ion_server; + for (region_id, engine, store_path, options) in regions { + let region_dir = region_dir(&store_path, region_id); + let semaphore_moved = semaphore.clone(); + + tasks.push(async move { + let _permit = semaphore_moved.acquire().await; + region_server_ref + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: engine.clone(), + region_dir, + options, + skip_wal_replay: false, + }), + ) + .await?; + if open_with_writable { + if let Err(e) = region_server_ref.set_writable(region_id, true) { + error!( + e; "failed to set writable for region {region_id}" + ); + } + } + Ok(()) + }); + } + let _ = try_join_all(tasks).await?; + + info!("all regions are opened"); + + Ok(()) +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; From e6dac4a3e5ec134eb9a3411de13cdb4db3ba9082 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 15 Dec 2023 04:46:30 +0000 Subject: [PATCH 2/6] feat: add status code of RegionNotReady --- src/common/error/src/status_code.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 64b9a9e7fb1b..7da99c00c235 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -59,6 +59,7 @@ pub enum StatusCode { RegionNotFound = 4005, RegionAlreadyExists = 4006, RegionReadonly = 4007, + RegionNotReady = 4008, // ====== End of catalog related status code ======= // ====== Begin of storage related status code ===== @@ -103,7 +104,8 @@ impl StatusCode { match self { StatusCode::StorageUnavailable | StatusCode::RuntimeResourcesExhausted - | StatusCode::Internal => true, + | StatusCode::Internal + | StatusCode::RegionNotReady => true, StatusCode::Success | StatusCode::Unknown @@ -152,6 +154,7 @@ impl StatusCode { | StatusCode::TableAlreadyExists | StatusCode::TableNotFound | StatusCode::RegionNotFound + | StatusCode::RegionNotReady | StatusCode::RegionAlreadyExists | StatusCode::RegionReadonly | StatusCode::TableColumnNotFound @@ -183,6 +186,7 @@ impl StatusCode { v if v == StatusCode::TableAlreadyExists as u32 => Some(StatusCode::TableAlreadyExists), v if v == StatusCode::TableNotFound as u32 => Some(StatusCode::TableNotFound), v if v == StatusCode::RegionNotFound as u32 => Some(StatusCode::RegionNotFound), + v if v == StatusCode::RegionNotReady as u32 => Some(StatusCode::RegionNotReady), v if v == StatusCode::RegionAlreadyExists as u32 => { Some(StatusCode::RegionAlreadyExists) } From c3a9c3db58446a5235a8e30b559ea672164f61e9 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 15 Dec 2023 07:54:31 +0000 Subject: [PATCH 3/6] feat: use RegionNotReady instead of RegionNotFound for a registering region --- src/datanode/src/error.rs | 7 + src/datanode/src/region_server.rs | 361 +++++++++++++++++++++++++++--- src/datanode/src/tests.rs | 31 ++- src/servers/src/error.rs | 2 +- tests-integration/tests/http.rs | 1 + 5 files changed, 366 insertions(+), 36 deletions(-) diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 9aa2d34ce4c7..6b2be8505da0 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -210,6 +210,12 @@ pub enum Error { location: Location, }, + #[snafu(display("RegionId {} not ready", region_id))] + RegionNotReady { + region_id: RegionId, + location: Location, + }, + #[snafu(display("Region engine {} is not registered", name))] RegionEngineNotFound { name: String, location: Location }, @@ -295,6 +301,7 @@ impl ErrorExt for Error { | GetRegionMetadata { .. } => StatusCode::Internal, RegionNotFound { .. } => StatusCode::RegionNotFound, + RegionNotReady { .. } => StatusCode::RegionNotReady, StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(), diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 6b820b85356d..5854326fd956 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::collections::HashMap; +use std::ops::Deref; use std::sync::{Arc, Mutex, RwLock}; use api::v1::region::{region_request, QueryRequest, RegionResponse}; @@ -57,7 +58,7 @@ use table::table::scan::StreamScanAdapter; use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ - BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, + self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu, }; @@ -249,9 +250,50 @@ impl FlightCraft for RegionServer { } } +#[derive(Clone)] +enum RegionEngineWithStatus { + // A opening, or creating region. + Registering(RegionEngineRef), + // A closing, or dropping region. + Deregistering(RegionEngineRef), + // A ready region. + Ready(RegionEngineRef), +} + +impl RegionEngineWithStatus { + /// Returns [RegionEngineRef]. + pub fn into_engine(self) -> RegionEngineRef { + match self { + RegionEngineWithStatus::Registering(engine) => engine, + RegionEngineWithStatus::Deregistering(engine) => engine, + RegionEngineWithStatus::Ready(engine) => engine, + } + } + + pub fn is_registering(&self) -> bool { + matches!(self, Self::Registering(_)) + } + + pub fn is_deregistering(&self) -> bool { + matches!(self, Self::Deregistering(_)) + } +} + +impl Deref for RegionEngineWithStatus { + type Target = RegionEngineRef; + + fn deref(&self) -> &Self::Target { + match self { + RegionEngineWithStatus::Registering(engine) => engine, + RegionEngineWithStatus::Deregistering(engine) => engine, + RegionEngineWithStatus::Ready(engine) => engine, + } + } +} + struct RegionServerInner { engines: RwLock>, - region_map: DashMap, + region_map: DashMap, query_engine: QueryEngineRef, runtime: Arc, event_listener: RegionServerEventListenerRef, @@ -306,48 +348,109 @@ impl RegionServerInner { | RegionRequest::Truncate(_) => RegionChange::None, }; - let engine = match ®ion_change { - RegionChange::Register(engine_type) => self - .engines - .read() - .unwrap() - .get(engine_type) - .with_context(|| RegionEngineNotFoundSnafu { name: engine_type })? - .clone(), - RegionChange::None | RegionChange::Deregisters => self - .region_map - .get(®ion_id) - .with_context(|| RegionNotFoundSnafu { region_id })? - .clone(), + let region_status = self.region_map.get(®ion_id); + + let engine = match region_change { + RegionChange::Register(ref engine_type) => match region_status { + Some(status) => { + if status.is_registering() { + return Ok(0); + } else { + status.clone().into_engine() + } + } + _ => self + .engines + .read() + .unwrap() + .get(engine_type) + .with_context(|| RegionEngineNotFoundSnafu { name: engine_type })? + .clone(), + }, + RegionChange::Deregisters => match region_status { + Some(status) => { + if status.is_deregistering() { + return Ok(0); + } else { + status.clone().into_engine() + } + } + None => return Ok(0), + }, + RegionChange::None => match region_status { + Some(status) => match status.clone() { + RegionEngineWithStatus::Registering(_) => { + return error::RegionNotReadySnafu { region_id }.fail() + } + RegionEngineWithStatus::Deregistering(_) => { + return error::RegionNotFoundSnafu { region_id }.fail() + } + RegionEngineWithStatus::Ready(engine) => engine, + }, + None => return error::RegionNotFoundSnafu { region_id }.fail(), + }, }; + let engine_type = engine.name(); - let result = engine + // Sets the RegionStatus before the operation. + match region_change { + RegionChange::Register(_) => { + self.region_map.insert( + region_id, + RegionEngineWithStatus::Registering(engine.clone()), + ); + } + RegionChange::Deregisters => { + self.region_map.insert( + region_id, + RegionEngineWithStatus::Deregistering(engine.clone()), + ); + } + _ => {} + } + + match engine .handle_request(region_id, request) .trace(info_span!( "RegionEngine::handle_region_request", engine_type )) .await - .with_context(|_| HandleRegionRequestSnafu { region_id })?; - - match region_change { - RegionChange::None => {} - RegionChange::Register(_) => { - info!("Region {region_id} is registered to engine {engine_type}"); - self.region_map.insert(region_id, engine); - self.event_listener.on_region_registered(region_id); + .with_context(|_| HandleRegionRequestSnafu { region_id }) + { + Ok(result) => { + match region_change { + RegionChange::None => {} + RegionChange::Register(_) => { + info!("Region {region_id} is registered to engine {engine_type}"); + self.region_map + .insert(region_id, RegionEngineWithStatus::Ready(engine)); + self.event_listener.on_region_registered(region_id); + } + RegionChange::Deregisters => { + info!("Region {region_id} is deregistered from engine {engine_type}"); + self.region_map + .remove(®ion_id) + .map(|(id, engine)| engine.set_writable(id, false)); + self.event_listener.on_region_deregistered(region_id); + } + } + Ok(result) } - RegionChange::Deregisters => { - info!("Region {region_id} is deregistered from engine {engine_type}"); - self.region_map - .remove(®ion_id) - .map(|(id, engine)| engine.set_writable(id, false)); - self.event_listener.on_region_deregistered(region_id); + Err(err) => { + // Removes the RegionStatus if the operation fails. + match region_change { + RegionChange::None => {} + RegionChange::Register(_) | RegionChange::Deregisters => { + self.region_map + .remove(®ion_id) + .map(|(id, engine)| engine.set_writable(id, false)); + } + } + Err(err) } } - - Ok(result) } pub async fn handle_read(&self, request: QueryRequest) -> Result { @@ -366,15 +469,19 @@ impl RegionServerInner { .unwrap_or_else(|| QueryContextBuilder::default().build()); // build dummy catalog list - let engine = self + let region_status = self .region_map .get(®ion_id) .with_context(|| RegionNotFoundSnafu { region_id })? .clone(); + if region_status.is_registering() { + return error::RegionNotReadySnafu { region_id }.fail(); + } + let table_provider = self .table_provider_factory - .create(region_id, engine) + .create(region_id, region_status.into_engine()) .await?; let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider)); @@ -619,3 +726,189 @@ pub trait TableProviderFactory: Send + Sync { } pub type TableProviderFactoryRef = Arc; + +#[cfg(test)] +mod tests { + + use common_error::ext::ErrorExt; + use mito2::test_util::CreateRequestBuilder; + use store_api::region_engine::RegionEngine; + use store_api::region_request::{RegionDropRequest, RegionOpenRequest, RegionTruncateRequest}; + use store_api::storage::RegionId; + + use super::*; + use crate::tests::{mock_region_server, MockRegionEngine}; + + #[tokio::test] + async fn test_region_registering() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = MockRegionEngine::new(); + let engine_name = engine.name(); + + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1, 1); + let builder = CreateRequestBuilder::new(); + let create_req = builder.build(); + + // Tries to create/open a registering region. + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Registering(engine.clone()), + ); + + let affected_rows = mock_region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + assert_eq!(affected_rows, 0); + + let status = mock_region_server + .inner + .region_map + .get(®ion_id) + .unwrap() + .clone(); + + assert!(matches!(status, RegionEngineWithStatus::Registering(_))); + + let affected_rows = mock_region_server + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: engine_name.to_string(), + region_dir: String::new(), + options: Default::default(), + }), + ) + .await + .unwrap(); + assert_eq!(affected_rows, 0); + + let status = mock_region_server + .inner + .region_map + .get(®ion_id) + .unwrap() + .clone(); + assert!(matches!(status, RegionEngineWithStatus::Registering(_))); + } + + #[tokio::test] + async fn test_region_deregistering() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = MockRegionEngine::new(); + + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1, 1); + + // Tries to drop/close a registering region. + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Deregistering(engine.clone()), + ); + + let affected_rows = mock_region_server + .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) + .await + .unwrap(); + assert_eq!(affected_rows, 0); + + let status = mock_region_server + .inner + .region_map + .get(®ion_id) + .unwrap() + .clone(); + assert!(matches!(status, RegionEngineWithStatus::Deregistering(_))); + + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Deregistering(engine.clone()), + ); + + let affected_rows = mock_region_server + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + assert_eq!(affected_rows, 0); + + let status = mock_region_server + .inner + .region_map + .get(®ion_id) + .unwrap() + .clone(); + assert!(matches!(status, RegionEngineWithStatus::Deregistering(_))); + } + + #[tokio::test] + async fn test_region_not_ready() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = MockRegionEngine::new(); + + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1, 1); + + // Tries to drop/close a registering region. + mock_region_server.inner.region_map.insert( + region_id, + RegionEngineWithStatus::Registering(engine.clone()), + ); + + let err = mock_region_server + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap_err(); + + assert_eq!(err.status_code(), StatusCode::RegionNotReady); + } + + #[tokio::test] + async fn test_region_request_failed() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = + MockRegionEngine::with_mock_fn(Box::new(|_region_id, _request| { + error::UnexpectedSnafu { + violated: "test".to_string(), + } + .fail() + })); + + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1, 1); + let builder = CreateRequestBuilder::new(); + let create_req = builder.build(); + mock_region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap_err(); + + let status = mock_region_server.inner.region_map.get(®ion_id); + assert!(status.is_none()); + + mock_region_server + .inner + .region_map + .insert(region_id, RegionEngineWithStatus::Ready(engine.clone())); + + mock_region_server + .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) + .await + .unwrap_err(); + + let status = mock_region_server.inner.region_map.get(®ion_id); + assert!(status.is_none()); + } +} diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index abf1064de826..37e8ea2ce77a 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -36,6 +36,7 @@ use store_api::storage::{RegionId, ScanRequest}; use table::TableRef; use tokio::sync::mpsc::{Receiver, Sender}; +use crate::error::Error; use crate::event_listener::NoopRegionServerEventListener; use crate::region_server::RegionServer; @@ -87,15 +88,39 @@ pub fn mock_region_server() -> RegionServer { ) } +pub type MockRequestHandler = + Box Result + Send + Sync>; + pub struct MockRegionEngine { sender: Sender<(RegionId, RegionRequest)>, + handle_request_mock_fn: Option, } impl MockRegionEngine { pub fn new() -> (Arc, Receiver<(RegionId, RegionRequest)>) { let (tx, rx) = tokio::sync::mpsc::channel(8); - (Arc::new(Self { sender: tx }), rx) + ( + Arc::new(Self { + sender: tx, + handle_request_mock_fn: None, + }), + rx, + ) + } + + pub fn with_mock_fn( + mock_fn: MockRequestHandler, + ) -> (Arc, Receiver<(RegionId, RegionRequest)>) { + let (tx, rx) = tokio::sync::mpsc::channel(8); + + ( + Arc::new(Self { + sender: tx, + handle_request_mock_fn: Some(mock_fn), + }), + rx, + ) } } @@ -110,6 +135,10 @@ impl RegionEngine for MockRegionEngine { region_id: RegionId, request: RegionRequest, ) -> Result { + if let Some(mock_fn) = &self.handle_request_mock_fn { + return mock_fn(region_id, request).map_err(BoxedError::new); + }; + let _ = self.sender.send((region_id, request)).await; Ok(0) } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 4c21b9832693..8dbb28d4dd16 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -538,7 +538,7 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code { | StatusCode::TableColumnNotFound | StatusCode::DatabaseNotFound | StatusCode::UserNotFound => Code::NotFound, - StatusCode::StorageUnavailable => Code::Unavailable, + StatusCode::StorageUnavailable | StatusCode::RegionNotReady => Code::Unavailable, StatusCode::RuntimeResourcesExhausted | StatusCode::RateLimited => Code::ResourceExhausted, StatusCode::UnsupportedPasswordType | StatusCode::UserPasswordMismatch diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c35070bf5a43..44153d365108 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -757,6 +757,7 @@ tcp_nodelay = true mode = "standalone" node_id = 0 require_lease_before_startup = true +initialize_region_in_background = false rpc_addr = "127.0.0.1:3001" rpc_runtime_size = 8 rpc_max_recv_message_size = "512MiB" From d8cd26fd5e55ae32d0ece0cf209ab4f23b9ac4c3 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 20 Dec 2023 08:21:26 +0000 Subject: [PATCH 4/6] chore: apply suggestions from CR --- config/standalone.example.toml | 4 - src/cmd/src/standalone.rs | 3 - src/datanode/src/datanode.rs | 47 ++-- src/datanode/src/error.rs | 2 +- src/datanode/src/region_server.rs | 352 ++++++++++++++++++++++++------ 5 files changed, 309 insertions(+), 99 deletions(-) diff --git a/config/standalone.example.toml b/config/standalone.example.toml index b8c3cf1140a5..305043d95c41 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -3,10 +3,6 @@ mode = "standalone" # Whether to enable greptimedb telemetry, true by default. enable_telemetry = true -# Initialize all regions in the background during the startup. -# By default, it provides services after all regions have been initialized. -initialize_region_in_background = false - # HTTP server options. [http] # Server address, "127.0.0.1:4000" by default. diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index fc765fa93fe6..fb0e23ebefd9 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -97,7 +97,6 @@ impl SubCommand { pub struct StandaloneOptions { pub mode: Mode, pub enable_telemetry: bool, - pub initialize_region_in_background: bool, pub http: HttpOptions, pub grpc: GrpcOptions, pub mysql: MysqlOptions, @@ -120,7 +119,6 @@ impl Default for StandaloneOptions { Self { mode: Mode::Standalone, enable_telemetry: true, - initialize_region_in_background: false, http: HttpOptions::default(), grpc: GrpcOptions::default(), mysql: MysqlOptions::default(), @@ -168,7 +166,6 @@ impl StandaloneOptions { storage: self.storage, region_engine: self.region_engine, rpc_addr: self.grpc.addr, - initialize_region_in_background: self.initialize_region_in_background, ..Default::default() } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 481baa614422..604146a5d4a3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -25,7 +25,7 @@ use common_config::wal::{KafkaConfig, RaftEngineConfig}; use common_config::{WalConfig, WAL_OPTIONS_KEY}; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; -use common_meta::key::datanode_table::DatanodeTableManager; +use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue}; use common_meta::kv_backend::KvBackendRef; pub use common_procedure::options::ProcedureConfig; use common_runtime::Runtime; @@ -33,7 +33,7 @@ use common_telemetry::{error, info, warn}; use file_engine::engine::FileRegionEngine; use futures::future; use futures_util::future::try_join_all; -use futures_util::StreamExt; +use futures_util::TryStreamExt; use log_store::kafka::log_store::KafkaLogStore; use log_store::raft_engine::log_store::RaftEngineLogStore; use meta_client::client::MetaClient; @@ -234,18 +234,21 @@ impl DatanodeBuilder { let region_server = self.new_region_server(region_event_listener).await?; - let open_all_regions = open_all_regions( - region_server.clone(), - kv_backend, - !controlled_by_metasrv, - node_id, - ); + let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); + let table_values = datanode_table_manager + .tables(node_id) + .try_collect::>() + .await + .context(GetMetadataSnafu)?; + + let open_all_regions = + open_all_regions(region_server.clone(), table_values, !controlled_by_metasrv); if self.opts.initialize_region_in_background { // Opens regions in background. common_runtime::spawn_bg(async move { if let Err(err) = open_all_regions.await { - error!(err; "Failed to opening regions during the startup."); + error!(err; "Failed to open regions during the startup."); } }); } else { @@ -356,13 +359,15 @@ impl DatanodeBuilder { open_with_writable: bool, ) -> Result<()> { let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; - open_all_regions( - region_server.clone(), - kv_backend, - open_with_writable, - node_id, - ) - .await + + let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); + let table_values = datanode_table_manager + .tables(node_id) + .try_collect::>() + .await + .context(GetMetadataSnafu)?; + + open_all_regions(region_server.clone(), table_values, open_with_writable).await } async fn new_region_server( @@ -511,17 +516,11 @@ impl DatanodeBuilder { /// Open all regions belong to this datanode. async fn open_all_regions( region_server: RegionServer, - kv_backend: KvBackendRef, + table_values: Vec, open_with_writable: bool, - node_id: u64, ) -> Result<()> { - let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); let mut regions = vec![]; - let mut table_values = datanode_table_manager.tables(node_id); - - while let Some(table_value) = table_values.next().await { - let table_value = table_value.context(GetMetadataSnafu)?; - + for table_value in table_values { for region_number in table_value.regions { // Augments region options with wal options if a wal options is provided. let mut region_options = table_value.region_info.region_options.clone(); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 6b2be8505da0..94b751393d85 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -210,7 +210,7 @@ pub enum Error { location: Location, }, - #[snafu(display("RegionId {} not ready", region_id))] + #[snafu(display("Region {} not ready", region_id))] RegionNotReady { region_id: RegionId, location: Location, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 5854326fd956..1e2a4f21e60b 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::collections::HashMap; +use std::fmt::Debug; use std::ops::Deref; use std::sync::{Arc, Mutex, RwLock}; @@ -252,7 +253,7 @@ impl FlightCraft for RegionServer { #[derive(Clone)] enum RegionEngineWithStatus { - // A opening, or creating region. + // An opening, or creating region. Registering(RegionEngineRef), // A closing, or dropping region. Deregistering(RegionEngineRef), @@ -300,6 +301,26 @@ struct RegionServerInner { table_provider_factory: TableProviderFactoryRef, } +enum CurrentEngine { + Engine(RegionEngineRef), + EarlyReturn(AffectedRows), +} + +impl Debug for CurrentEngine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CurrentEngine::Engine(engine) => f + .debug_struct("CurrentEngine") + .field("engine", &engine.name()) + .finish(), + CurrentEngine::EarlyReturn(rows) => f + .debug_struct("CurrentEngine") + .field("return", rows) + .finish(), + } + } +} + impl RegionServerInner { pub fn new( query_engine: QueryEngineRef, @@ -326,35 +347,18 @@ impl RegionServerInner { .insert(engine_name.to_string(), engine); } - pub async fn handle_request( + fn get_engine( &self, region_id: RegionId, - request: RegionRequest, - ) -> Result { - let request_type = request.request_type(); - let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED - .with_label_values(&[request_type]) - .start_timer(); - - let region_change = match &request { - RegionRequest::Create(create) => RegionChange::Register(create.engine.clone()), - RegionRequest::Open(open) => RegionChange::Register(open.engine.clone()), - RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters, - RegionRequest::Put(_) - | RegionRequest::Delete(_) - | RegionRequest::Alter(_) - | RegionRequest::Flush(_) - | RegionRequest::Compact(_) - | RegionRequest::Truncate(_) => RegionChange::None, - }; - - let region_status = self.region_map.get(®ion_id); + region_change: &RegionChange, + ) -> Result { + let current_region_status = self.region_map.get(®ion_id); let engine = match region_change { - RegionChange::Register(ref engine_type) => match region_status { + RegionChange::Register(ref engine_type) => match current_region_status { Some(status) => { if status.is_registering() { - return Ok(0); + return Ok(CurrentEngine::EarlyReturn(0)); } else { status.clone().into_engine() } @@ -367,17 +371,17 @@ impl RegionServerInner { .with_context(|| RegionEngineNotFoundSnafu { name: engine_type })? .clone(), }, - RegionChange::Deregisters => match region_status { + RegionChange::Deregisters => match current_region_status { Some(status) => { if status.is_deregistering() { - return Ok(0); + return Ok(CurrentEngine::EarlyReturn(0)); } else { status.clone().into_engine() } } - None => return Ok(0), + None => return Ok(CurrentEngine::EarlyReturn(0)), }, - RegionChange::None => match region_status { + RegionChange::None => match current_region_status { Some(status) => match status.clone() { RegionEngineWithStatus::Registering(_) => { return error::RegionNotReadySnafu { region_id }.fail() @@ -391,9 +395,69 @@ impl RegionServerInner { }, }; + Ok(CurrentEngine::Engine(engine)) + } + + pub async fn handle_request( + &self, + region_id: RegionId, + request: RegionRequest, + ) -> Result { + let request_type = request.request_type(); + let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED + .with_label_values(&[request_type]) + .start_timer(); + + let region_change = match &request { + RegionRequest::Create(create) => RegionChange::Register(create.engine.clone()), + RegionRequest::Open(open) => RegionChange::Register(open.engine.clone()), + RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters, + RegionRequest::Put(_) + | RegionRequest::Delete(_) + | RegionRequest::Alter(_) + | RegionRequest::Flush(_) + | RegionRequest::Compact(_) + | RegionRequest::Truncate(_) => RegionChange::None, + }; + + let engine = match self.get_engine(region_id, ®ion_change)? { + CurrentEngine::Engine(engine) => engine, + CurrentEngine::EarlyReturn(rows) => return Ok(rows), + }; + let engine_type = engine.name(); - // Sets the RegionStatus before the operation. + // Sets corresponding region status to registering/deregistering before the operation. + self.set_region_status_not_ready(region_id, &engine, ®ion_change); + + match engine + .handle_request(region_id, request) + .trace(info_span!( + "RegionEngine::handle_region_request", + engine_type + )) + .await + .with_context(|_| HandleRegionRequestSnafu { region_id }) + { + Ok(result) => { + // Sets corresponding region status to ready. + self.set_region_status_ready(region_id, engine, region_change); + Ok(result) + } + Err(err) => { + // Removes the region status if the operation fails. + self.unset_region_status(region_id, region_change); + Err(err) + } + } + } + + fn set_region_status_not_ready( + &self, + region_id: RegionId, + engine: &RegionEngineRef, + region_change: &RegionChange, + ) { match region_change { RegionChange::Register(_) => { self.region_map.insert( @@ -409,46 +473,40 @@ impl RegionServerInner { } _ => {} } + } - match engine - .handle_request(region_id, request) - .trace(info_span!( - "RegionEngine::handle_region_request", - engine_type - )) - .await - .with_context(|_| HandleRegionRequestSnafu { region_id }) - { - Ok(result) => { - match region_change { - RegionChange::None => {} - RegionChange::Register(_) => { - info!("Region {region_id} is registered to engine {engine_type}"); - self.region_map - .insert(region_id, RegionEngineWithStatus::Ready(engine)); - self.event_listener.on_region_registered(region_id); - } - RegionChange::Deregisters => { - info!("Region {region_id} is deregistered from engine {engine_type}"); - self.region_map - .remove(®ion_id) - .map(|(id, engine)| engine.set_writable(id, false)); - self.event_listener.on_region_deregistered(region_id); - } - } - Ok(result) + fn unset_region_status(&self, region_id: RegionId, region_change: RegionChange) { + match region_change { + RegionChange::None => {} + RegionChange::Register(_) | RegionChange::Deregisters => { + self.region_map + .remove(®ion_id) + .map(|(id, engine)| engine.set_writable(id, false)); } - Err(err) => { - // Removes the RegionStatus if the operation fails. - match region_change { - RegionChange::None => {} - RegionChange::Register(_) | RegionChange::Deregisters => { - self.region_map - .remove(®ion_id) - .map(|(id, engine)| engine.set_writable(id, false)); - } - } - Err(err) + } + } + + fn set_region_status_ready( + &self, + region_id: RegionId, + engine: RegionEngineRef, + region_change: RegionChange, + ) { + let engine_type = engine.name(); + match region_change { + RegionChange::None => {} + RegionChange::Register(_) => { + info!("Region {region_id} is registered to engine {engine_type}"); + self.region_map + .insert(region_id, RegionEngineWithStatus::Ready(engine)); + self.event_listener.on_region_registered(region_id); + } + RegionChange::Deregisters => { + info!("Region {region_id} is deregistered from engine {engine_type}"); + self.region_map + .remove(®ion_id) + .map(|(id, engine)| engine.set_writable(id, false)); + self.event_listener.on_region_deregistered(region_id); } } } @@ -730,6 +788,8 @@ pub type TableProviderFactoryRef = Arc; #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + use common_error::ext::ErrorExt; use mito2::test_util::CreateRequestBuilder; use store_api::region_engine::RegionEngine; @@ -737,6 +797,7 @@ mod tests { use store_api::storage::RegionId; use super::*; + use crate::error::Result; use crate::tests::{mock_region_server, MockRegionEngine}; #[tokio::test] @@ -781,6 +842,7 @@ mod tests { engine: engine_name.to_string(), region_dir: String::new(), options: Default::default(), + skip_wal_replay: false, }), ) .await @@ -911,4 +973,160 @@ mod tests { let status = mock_region_server.inner.region_map.get(®ion_id); assert!(status.is_none()); } + + struct CurrentEngineTest { + region_id: RegionId, + current_region_status: Option, + region_change: RegionChange, + assert: Box)>, + } + + #[tokio::test] + async fn test_current_engine() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _) = MockRegionEngine::new(); + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1024, 1); + let tests = vec![ + // RegionChange::None + CurrentEngineTest { + region_id, + current_region_status: None, + region_change: RegionChange::None, + assert: Box::new(|result| { + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionNotFound); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())), + region_change: RegionChange::None, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())), + region_change: RegionChange::None, + assert: Box::new(|result| { + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionNotReady); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())), + region_change: RegionChange::None, + assert: Box::new(|result| { + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionNotFound); + }), + }, + // RegionChange::Register + CurrentEngineTest { + region_id, + current_region_status: None, + region_change: RegionChange::Register(engine.name().to_string()), + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())), + region_change: RegionChange::Register(engine.name().to_string()), + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::EarlyReturn(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())), + region_change: RegionChange::Register(engine.name().to_string()), + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())), + region_change: RegionChange::Register(engine.name().to_string()), + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + // RegionChange::Deregister + CurrentEngineTest { + region_id, + current_region_status: None, + region_change: RegionChange::Deregisters, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::EarlyReturn(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())), + region_change: RegionChange::Deregisters, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())), + region_change: RegionChange::Deregisters, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::EarlyReturn(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())), + region_change: RegionChange::Deregisters, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + ]; + + for test in tests { + let CurrentEngineTest { + region_id, + current_region_status, + region_change, + assert, + } = test; + + // Sets up + if let Some(status) = current_region_status { + mock_region_server + .inner + .region_map + .insert(region_id, status); + } else { + mock_region_server.inner.region_map.remove(®ion_id); + } + + let result = mock_region_server + .inner + .get_engine(region_id, ®ion_change); + + assert(result); + } + } } From 8f0af82ad94f232b7b08779844f2f7973dbc0d5f Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 20 Dec 2023 10:29:12 +0000 Subject: [PATCH 5/6] feat: add status code of RegionBusy --- src/common/error/src/status_code.rs | 8 +++++++- src/servers/src/error.rs | 4 +++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 7da99c00c235..949c8d44736c 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -60,6 +60,9 @@ pub enum StatusCode { RegionAlreadyExists = 4006, RegionReadonly = 4007, RegionNotReady = 4008, + // If mutually exclusive operations are reached at the same time, + // only one can be executed, another one will get region busy. + RegionBusy = 4009, // ====== End of catalog related status code ======= // ====== Begin of storage related status code ===== @@ -105,7 +108,8 @@ impl StatusCode { StatusCode::StorageUnavailable | StatusCode::RuntimeResourcesExhausted | StatusCode::Internal - | StatusCode::RegionNotReady => true, + | StatusCode::RegionNotReady + | StatusCode::RegionBusy => true, StatusCode::Success | StatusCode::Unknown @@ -155,6 +159,7 @@ impl StatusCode { | StatusCode::TableNotFound | StatusCode::RegionNotFound | StatusCode::RegionNotReady + | StatusCode::RegionBusy | StatusCode::RegionAlreadyExists | StatusCode::RegionReadonly | StatusCode::TableColumnNotFound @@ -187,6 +192,7 @@ impl StatusCode { v if v == StatusCode::TableNotFound as u32 => Some(StatusCode::TableNotFound), v if v == StatusCode::RegionNotFound as u32 => Some(StatusCode::RegionNotFound), v if v == StatusCode::RegionNotReady as u32 => Some(StatusCode::RegionNotReady), + v if v == StatusCode::RegionBusy as u32 => Some(StatusCode::RegionBusy), v if v == StatusCode::RegionAlreadyExists as u32 => { Some(StatusCode::RegionAlreadyExists) } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 8dbb28d4dd16..72819a90de9b 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -539,7 +539,9 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code { | StatusCode::DatabaseNotFound | StatusCode::UserNotFound => Code::NotFound, StatusCode::StorageUnavailable | StatusCode::RegionNotReady => Code::Unavailable, - StatusCode::RuntimeResourcesExhausted | StatusCode::RateLimited => Code::ResourceExhausted, + StatusCode::RuntimeResourcesExhausted + | StatusCode::RateLimited + | StatusCode::RegionBusy => Code::ResourceExhausted, StatusCode::UnsupportedPasswordType | StatusCode::UserPasswordMismatch | StatusCode::AuthHeaderNotFound From 0ea06e0f56182e07d6525dd25b3744f56d599568 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 20 Dec 2023 10:34:23 +0000 Subject: [PATCH 6/6] feat: return RegionBusy for mutually exclusive operations --- src/datanode/src/error.rs | 7 ++++++ src/datanode/src/region_server.rs | 40 +++++++++++++++---------------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 94b751393d85..07b8fe6f8349 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -216,6 +216,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Region {} is busy", region_id))] + RegionBusy { + region_id: RegionId, + location: Location, + }, + #[snafu(display("Region engine {} is not registered", name))] RegionEngineNotFound { name: String, location: Location }, @@ -302,6 +308,7 @@ impl ErrorExt for Error { RegionNotFound { .. } => StatusCode::RegionNotFound, RegionNotReady { .. } => StatusCode::RegionNotReady, + RegionBusy { .. } => StatusCode::RegionBusy, StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(), diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 1e2a4f21e60b..c25775c944fe 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -274,10 +274,6 @@ impl RegionEngineWithStatus { pub fn is_registering(&self) -> bool { matches!(self, Self::Registering(_)) } - - pub fn is_deregistering(&self) -> bool { - matches!(self, Self::Deregistering(_)) - } } impl Deref for RegionEngineWithStatus { @@ -356,13 +352,15 @@ impl RegionServerInner { let engine = match region_change { RegionChange::Register(ref engine_type) => match current_region_status { - Some(status) => { - if status.is_registering() { - return Ok(CurrentEngine::EarlyReturn(0)); - } else { - status.clone().into_engine() + Some(status) => match status.clone() { + RegionEngineWithStatus::Registering(_) => { + return Ok(CurrentEngine::EarlyReturn(0)) } - } + RegionEngineWithStatus::Deregistering(_) => { + return error::RegionBusySnafu { region_id }.fail() + } + RegionEngineWithStatus::Ready(_) => status.clone().into_engine(), + }, _ => self .engines .read() @@ -372,13 +370,15 @@ impl RegionServerInner { .clone(), }, RegionChange::Deregisters => match current_region_status { - Some(status) => { - if status.is_deregistering() { - return Ok(CurrentEngine::EarlyReturn(0)); - } else { - status.clone().into_engine() + Some(status) => match status.clone() { + RegionEngineWithStatus::Registering(_) => { + return error::RegionBusySnafu { region_id }.fail() } - } + RegionEngineWithStatus::Deregistering(_) => { + return Ok(CurrentEngine::EarlyReturn(0)) + } + RegionEngineWithStatus::Ready(_) => status.clone().into_engine(), + }, None => return Ok(CurrentEngine::EarlyReturn(0)), }, RegionChange::None => match current_region_status { @@ -1052,8 +1052,8 @@ mod tests { current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())), region_change: RegionChange::Register(engine.name().to_string()), assert: Box::new(|result| { - let current_engine = result.unwrap(); - assert_matches!(current_engine, CurrentEngine::Engine(_)); + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionBusy); }), }, CurrentEngineTest { @@ -1080,8 +1080,8 @@ mod tests { current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())), region_change: RegionChange::Deregisters, assert: Box::new(|result| { - let current_engine = result.unwrap(); - assert_matches!(current_engine, CurrentEngine::Engine(_)); + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionBusy); }), }, CurrentEngineTest {