diff --git a/config/standalone.example.toml b/config/standalone.example.toml index b8c3cf1140a5..305043d95c41 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -3,10 +3,6 @@ mode = "standalone" # Whether to enable greptimedb telemetry, true by default. enable_telemetry = true -# Initialize all regions in the background during the startup. -# By default, it provides services after all regions have been initialized. -initialize_region_in_background = false - # HTTP server options. [http] # Server address, "127.0.0.1:4000" by default. diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index fc765fa93fe6..fb0e23ebefd9 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -97,7 +97,6 @@ impl SubCommand { pub struct StandaloneOptions { pub mode: Mode, pub enable_telemetry: bool, - pub initialize_region_in_background: bool, pub http: HttpOptions, pub grpc: GrpcOptions, pub mysql: MysqlOptions, @@ -120,7 +119,6 @@ impl Default for StandaloneOptions { Self { mode: Mode::Standalone, enable_telemetry: true, - initialize_region_in_background: false, http: HttpOptions::default(), grpc: GrpcOptions::default(), mysql: MysqlOptions::default(), @@ -168,7 +166,6 @@ impl StandaloneOptions { storage: self.storage, region_engine: self.region_engine, rpc_addr: self.grpc.addr, - initialize_region_in_background: self.initialize_region_in_background, ..Default::default() } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 481baa614422..604146a5d4a3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -25,7 +25,7 @@ use common_config::wal::{KafkaConfig, RaftEngineConfig}; use common_config::{WalConfig, WAL_OPTIONS_KEY}; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; -use common_meta::key::datanode_table::DatanodeTableManager; +use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue}; use common_meta::kv_backend::KvBackendRef; pub use common_procedure::options::ProcedureConfig; use common_runtime::Runtime; @@ -33,7 +33,7 @@ use common_telemetry::{error, info, warn}; use file_engine::engine::FileRegionEngine; use futures::future; use futures_util::future::try_join_all; -use futures_util::StreamExt; +use futures_util::TryStreamExt; use log_store::kafka::log_store::KafkaLogStore; use log_store::raft_engine::log_store::RaftEngineLogStore; use meta_client::client::MetaClient; @@ -234,18 +234,21 @@ impl DatanodeBuilder { let region_server = self.new_region_server(region_event_listener).await?; - let open_all_regions = open_all_regions( - region_server.clone(), - kv_backend, - !controlled_by_metasrv, - node_id, - ); + let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); + let table_values = datanode_table_manager + .tables(node_id) + .try_collect::>() + .await + .context(GetMetadataSnafu)?; + + let open_all_regions = + open_all_regions(region_server.clone(), table_values, !controlled_by_metasrv); if self.opts.initialize_region_in_background { // Opens regions in background. common_runtime::spawn_bg(async move { if let Err(err) = open_all_regions.await { - error!(err; "Failed to opening regions during the startup."); + error!(err; "Failed to open regions during the startup."); } }); } else { @@ -356,13 +359,15 @@ impl DatanodeBuilder { open_with_writable: bool, ) -> Result<()> { let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; - open_all_regions( - region_server.clone(), - kv_backend, - open_with_writable, - node_id, - ) - .await + + let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); + let table_values = datanode_table_manager + .tables(node_id) + .try_collect::>() + .await + .context(GetMetadataSnafu)?; + + open_all_regions(region_server.clone(), table_values, open_with_writable).await } async fn new_region_server( @@ -511,17 +516,11 @@ impl DatanodeBuilder { /// Open all regions belong to this datanode. async fn open_all_regions( region_server: RegionServer, - kv_backend: KvBackendRef, + table_values: Vec, open_with_writable: bool, - node_id: u64, ) -> Result<()> { - let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); let mut regions = vec![]; - let mut table_values = datanode_table_manager.tables(node_id); - - while let Some(table_value) = table_values.next().await { - let table_value = table_value.context(GetMetadataSnafu)?; - + for table_value in table_values { for region_number in table_value.regions { // Augments region options with wal options if a wal options is provided. let mut region_options = table_value.region_info.region_options.clone(); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 6b2be8505da0..94b751393d85 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -210,7 +210,7 @@ pub enum Error { location: Location, }, - #[snafu(display("RegionId {} not ready", region_id))] + #[snafu(display("Region {} not ready", region_id))] RegionNotReady { region_id: RegionId, location: Location, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 5854326fd956..1e2a4f21e60b 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::collections::HashMap; +use std::fmt::Debug; use std::ops::Deref; use std::sync::{Arc, Mutex, RwLock}; @@ -252,7 +253,7 @@ impl FlightCraft for RegionServer { #[derive(Clone)] enum RegionEngineWithStatus { - // A opening, or creating region. + // An opening, or creating region. Registering(RegionEngineRef), // A closing, or dropping region. Deregistering(RegionEngineRef), @@ -300,6 +301,26 @@ struct RegionServerInner { table_provider_factory: TableProviderFactoryRef, } +enum CurrentEngine { + Engine(RegionEngineRef), + EarlyReturn(AffectedRows), +} + +impl Debug for CurrentEngine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CurrentEngine::Engine(engine) => f + .debug_struct("CurrentEngine") + .field("engine", &engine.name()) + .finish(), + CurrentEngine::EarlyReturn(rows) => f + .debug_struct("CurrentEngine") + .field("return", rows) + .finish(), + } + } +} + impl RegionServerInner { pub fn new( query_engine: QueryEngineRef, @@ -326,35 +347,18 @@ impl RegionServerInner { .insert(engine_name.to_string(), engine); } - pub async fn handle_request( + fn get_engine( &self, region_id: RegionId, - request: RegionRequest, - ) -> Result { - let request_type = request.request_type(); - let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED - .with_label_values(&[request_type]) - .start_timer(); - - let region_change = match &request { - RegionRequest::Create(create) => RegionChange::Register(create.engine.clone()), - RegionRequest::Open(open) => RegionChange::Register(open.engine.clone()), - RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters, - RegionRequest::Put(_) - | RegionRequest::Delete(_) - | RegionRequest::Alter(_) - | RegionRequest::Flush(_) - | RegionRequest::Compact(_) - | RegionRequest::Truncate(_) => RegionChange::None, - }; - - let region_status = self.region_map.get(®ion_id); + region_change: &RegionChange, + ) -> Result { + let current_region_status = self.region_map.get(®ion_id); let engine = match region_change { - RegionChange::Register(ref engine_type) => match region_status { + RegionChange::Register(ref engine_type) => match current_region_status { Some(status) => { if status.is_registering() { - return Ok(0); + return Ok(CurrentEngine::EarlyReturn(0)); } else { status.clone().into_engine() } @@ -367,17 +371,17 @@ impl RegionServerInner { .with_context(|| RegionEngineNotFoundSnafu { name: engine_type })? .clone(), }, - RegionChange::Deregisters => match region_status { + RegionChange::Deregisters => match current_region_status { Some(status) => { if status.is_deregistering() { - return Ok(0); + return Ok(CurrentEngine::EarlyReturn(0)); } else { status.clone().into_engine() } } - None => return Ok(0), + None => return Ok(CurrentEngine::EarlyReturn(0)), }, - RegionChange::None => match region_status { + RegionChange::None => match current_region_status { Some(status) => match status.clone() { RegionEngineWithStatus::Registering(_) => { return error::RegionNotReadySnafu { region_id }.fail() @@ -391,9 +395,69 @@ impl RegionServerInner { }, }; + Ok(CurrentEngine::Engine(engine)) + } + + pub async fn handle_request( + &self, + region_id: RegionId, + request: RegionRequest, + ) -> Result { + let request_type = request.request_type(); + let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED + .with_label_values(&[request_type]) + .start_timer(); + + let region_change = match &request { + RegionRequest::Create(create) => RegionChange::Register(create.engine.clone()), + RegionRequest::Open(open) => RegionChange::Register(open.engine.clone()), + RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters, + RegionRequest::Put(_) + | RegionRequest::Delete(_) + | RegionRequest::Alter(_) + | RegionRequest::Flush(_) + | RegionRequest::Compact(_) + | RegionRequest::Truncate(_) => RegionChange::None, + }; + + let engine = match self.get_engine(region_id, ®ion_change)? { + CurrentEngine::Engine(engine) => engine, + CurrentEngine::EarlyReturn(rows) => return Ok(rows), + }; + let engine_type = engine.name(); - // Sets the RegionStatus before the operation. + // Sets corresponding region status to registering/deregistering before the operation. + self.set_region_status_not_ready(region_id, &engine, ®ion_change); + + match engine + .handle_request(region_id, request) + .trace(info_span!( + "RegionEngine::handle_region_request", + engine_type + )) + .await + .with_context(|_| HandleRegionRequestSnafu { region_id }) + { + Ok(result) => { + // Sets corresponding region status to ready. + self.set_region_status_ready(region_id, engine, region_change); + Ok(result) + } + Err(err) => { + // Removes the region status if the operation fails. + self.unset_region_status(region_id, region_change); + Err(err) + } + } + } + + fn set_region_status_not_ready( + &self, + region_id: RegionId, + engine: &RegionEngineRef, + region_change: &RegionChange, + ) { match region_change { RegionChange::Register(_) => { self.region_map.insert( @@ -409,46 +473,40 @@ impl RegionServerInner { } _ => {} } + } - match engine - .handle_request(region_id, request) - .trace(info_span!( - "RegionEngine::handle_region_request", - engine_type - )) - .await - .with_context(|_| HandleRegionRequestSnafu { region_id }) - { - Ok(result) => { - match region_change { - RegionChange::None => {} - RegionChange::Register(_) => { - info!("Region {region_id} is registered to engine {engine_type}"); - self.region_map - .insert(region_id, RegionEngineWithStatus::Ready(engine)); - self.event_listener.on_region_registered(region_id); - } - RegionChange::Deregisters => { - info!("Region {region_id} is deregistered from engine {engine_type}"); - self.region_map - .remove(®ion_id) - .map(|(id, engine)| engine.set_writable(id, false)); - self.event_listener.on_region_deregistered(region_id); - } - } - Ok(result) + fn unset_region_status(&self, region_id: RegionId, region_change: RegionChange) { + match region_change { + RegionChange::None => {} + RegionChange::Register(_) | RegionChange::Deregisters => { + self.region_map + .remove(®ion_id) + .map(|(id, engine)| engine.set_writable(id, false)); } - Err(err) => { - // Removes the RegionStatus if the operation fails. - match region_change { - RegionChange::None => {} - RegionChange::Register(_) | RegionChange::Deregisters => { - self.region_map - .remove(®ion_id) - .map(|(id, engine)| engine.set_writable(id, false)); - } - } - Err(err) + } + } + + fn set_region_status_ready( + &self, + region_id: RegionId, + engine: RegionEngineRef, + region_change: RegionChange, + ) { + let engine_type = engine.name(); + match region_change { + RegionChange::None => {} + RegionChange::Register(_) => { + info!("Region {region_id} is registered to engine {engine_type}"); + self.region_map + .insert(region_id, RegionEngineWithStatus::Ready(engine)); + self.event_listener.on_region_registered(region_id); + } + RegionChange::Deregisters => { + info!("Region {region_id} is deregistered from engine {engine_type}"); + self.region_map + .remove(®ion_id) + .map(|(id, engine)| engine.set_writable(id, false)); + self.event_listener.on_region_deregistered(region_id); } } } @@ -730,6 +788,8 @@ pub type TableProviderFactoryRef = Arc; #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + use common_error::ext::ErrorExt; use mito2::test_util::CreateRequestBuilder; use store_api::region_engine::RegionEngine; @@ -737,6 +797,7 @@ mod tests { use store_api::storage::RegionId; use super::*; + use crate::error::Result; use crate::tests::{mock_region_server, MockRegionEngine}; #[tokio::test] @@ -781,6 +842,7 @@ mod tests { engine: engine_name.to_string(), region_dir: String::new(), options: Default::default(), + skip_wal_replay: false, }), ) .await @@ -911,4 +973,160 @@ mod tests { let status = mock_region_server.inner.region_map.get(®ion_id); assert!(status.is_none()); } + + struct CurrentEngineTest { + region_id: RegionId, + current_region_status: Option, + region_change: RegionChange, + assert: Box)>, + } + + #[tokio::test] + async fn test_current_engine() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let (engine, _) = MockRegionEngine::new(); + mock_region_server.register_engine(engine.clone()); + + let region_id = RegionId::new(1024, 1); + let tests = vec![ + // RegionChange::None + CurrentEngineTest { + region_id, + current_region_status: None, + region_change: RegionChange::None, + assert: Box::new(|result| { + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionNotFound); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())), + region_change: RegionChange::None, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())), + region_change: RegionChange::None, + assert: Box::new(|result| { + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionNotReady); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())), + region_change: RegionChange::None, + assert: Box::new(|result| { + let err = result.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionNotFound); + }), + }, + // RegionChange::Register + CurrentEngineTest { + region_id, + current_region_status: None, + region_change: RegionChange::Register(engine.name().to_string()), + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())), + region_change: RegionChange::Register(engine.name().to_string()), + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::EarlyReturn(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())), + region_change: RegionChange::Register(engine.name().to_string()), + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())), + region_change: RegionChange::Register(engine.name().to_string()), + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + // RegionChange::Deregister + CurrentEngineTest { + region_id, + current_region_status: None, + region_change: RegionChange::Deregisters, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::EarlyReturn(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())), + region_change: RegionChange::Deregisters, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())), + region_change: RegionChange::Deregisters, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::EarlyReturn(_)); + }), + }, + CurrentEngineTest { + region_id, + current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())), + region_change: RegionChange::Deregisters, + assert: Box::new(|result| { + let current_engine = result.unwrap(); + assert_matches!(current_engine, CurrentEngine::Engine(_)); + }), + }, + ]; + + for test in tests { + let CurrentEngineTest { + region_id, + current_region_status, + region_change, + assert, + } = test; + + // Sets up + if let Some(status) = current_region_status { + mock_region_server + .inner + .region_map + .insert(region_id, status); + } else { + mock_region_server.inner.region_map.remove(®ion_id); + } + + let result = mock_region_server + .inner + .get_engine(region_id, ®ion_change); + + assert(result); + } + } }