diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index b002bdd8370e..9971f6614861 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -44,3 +44,4 @@ tonic.workspace = true chrono.workspace = true datatypes.workspace = true hyper = { version = "0.14", features = ["full"] } +common-procedure = { workspace = true, features = ["testing"] } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 3d3cd5ae1c1f..defa6e84b71e 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -43,9 +43,9 @@ use crate::rpc::ddl::{ TruncateTableTask, }; use crate::rpc::router::RegionRoute; - pub type DdlManagerRef = Arc; +/// The [DdlManager] provides the ability to execute Ddl. pub struct DdlManager { procedure_manager: ProcedureManagerRef, datanode_manager: DatanodeManagerRef, @@ -55,26 +55,31 @@ pub struct DdlManager { } impl DdlManager { - pub fn new( + /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered. + pub fn try_new( procedure_manager: ProcedureManagerRef, datanode_clients: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, table_meta_allocator: TableMetadataAllocatorRef, - ) -> Self { - Self { + ) -> Result { + let manager = Self { procedure_manager, datanode_manager: datanode_clients, cache_invalidator, table_metadata_manager, table_meta_allocator, - } + }; + manager.register_loaders()?; + Ok(manager) } + /// Returns the [TableMetadataManagerRef]. pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { &self.table_metadata_manager } + /// Returns the [DdlContext] pub fn create_context(&self) -> DdlContext { DdlContext { datanode_manager: self.datanode_manager.clone(), @@ -83,7 +88,7 @@ impl DdlManager { } } - pub fn try_start(&self) -> Result<()> { + fn register_loaders(&self) -> Result<()> { let context = self.create_context(); self.procedure_manager @@ -142,6 +147,7 @@ impl DdlManager { } #[tracing::instrument(skip_all)] + /// Submits and executes an alter table task. pub async fn submit_alter_table_task( &self, cluster_id: u64, @@ -159,6 +165,7 @@ impl DdlManager { } #[tracing::instrument(skip_all)] + /// Submits and executes a create table task. pub async fn submit_create_table_task( &self, cluster_id: u64, @@ -176,6 +183,7 @@ impl DdlManager { } #[tracing::instrument(skip_all)] + /// Submits and executes a drop table task. pub async fn submit_drop_table_task( &self, cluster_id: u64, @@ -199,6 +207,7 @@ impl DdlManager { } #[tracing::instrument(skip_all)] + /// Submits and executes a truncate table task. pub async fn submit_truncate_table_task( &self, cluster_id: u64, @@ -416,3 +425,80 @@ impl DdlTaskExecutor for DdlManager { .await } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::meta::Partition; + use common_procedure::local::LocalManager; + use table::metadata::{RawTableInfo, TableId}; + + use super::DdlManager; + use crate::cache_invalidator::DummyCacheInvalidator; + use crate::datanode_manager::{DatanodeManager, DatanodeRef}; + use crate::ddl::alter_table::AlterTableProcedure; + use crate::ddl::create_table::CreateTableProcedure; + use crate::ddl::drop_table::DropTableProcedure; + use crate::ddl::truncate_table::TruncateTableProcedure; + use crate::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext}; + use crate::error::Result; + use crate::key::TableMetadataManager; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::peer::Peer; + use crate::rpc::router::RegionRoute; + use crate::state_store::KvStateStore; + + /// A dummy implemented [DatanodeManager]. + pub struct DummyDatanodeManager; + + #[async_trait::async_trait] + impl DatanodeManager for DummyDatanodeManager { + async fn datanode(&self, _datanode: &Peer) -> DatanodeRef { + unimplemented!() + } + } + + /// A dummy implemented [TableMetadataAllocator]. + pub struct DummyTableMetadataAllocator; + + #[async_trait::async_trait] + impl TableMetadataAllocator for DummyTableMetadataAllocator { + async fn create( + &self, + _ctx: &TableMetadataAllocatorContext, + _table_info: &mut RawTableInfo, + _partitions: &[Partition], + ) -> Result<(TableId, Vec)> { + unimplemented!() + } + } + + #[test] + fn test_try_new() { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + + let state_store = Arc::new(KvStateStore::new(kv_backend)); + let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store)); + + let _ = DdlManager::try_new( + procedure_manager.clone(), + Arc::new(DummyDatanodeManager), + Arc::new(DummyCacheInvalidator), + table_metadata_manager, + Arc::new(DummyTableMetadataAllocator), + ); + + let expected_loaders = vec![ + CreateTableProcedure::TYPE_NAME, + AlterTableProcedure::TYPE_NAME, + DropTableProcedure::TYPE_NAME, + TruncateTableProcedure::TYPE_NAME, + ]; + + for loader in expected_loaders { + assert!(procedure_manager.contains_loader(loader)); + } + } +} diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index 23354db0cb43..b029c52ce7bd 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -4,6 +4,10 @@ version.workspace = true edition.workspace = true license.workspace = true + +[features] +testing=[] + [dependencies] async-stream.workspace = true async-trait.workspace = true diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 7bb7732bfcdf..ae01022c9cc4 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -527,6 +527,13 @@ impl LocalManager { Ok(()) } + + #[cfg(any(test, feature = "testing"))] + /// Returns true if contains a specified loader. + pub fn contains_loader(&self, name: &str) -> bool { + let loaders = self.manager_ctx.loaders.lock().unwrap(); + loaders.contains_key(name) + } } #[async_trait] diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index b55cd398444d..75a5e66ce9b7 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -26,6 +26,12 @@ use store_api::storage::RegionNumber; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Failed to init ddl manager"))] + InitDdlManager { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to invalidate table cache"))] InvalidateTableCache { location: Location, @@ -319,7 +325,9 @@ impl ErrorExt for Error { Error::ParseSql { source, .. } => source.status_code(), - Error::InvalidateTableCache { source, .. } => source.status_code(), + Error::InvalidateTableCache { source, .. } | Error::InitDdlManager { source, .. } => { + source.status_code() + } Error::Table { source, .. } | Error::CopyTable { source, .. } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 2ba0daa0a202..ed470d900c34 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -328,13 +328,16 @@ impl Instance { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); let cache_invalidator = Arc::new(DummyCacheInvalidator); - let ddl_executor = Arc::new(DdlManager::new( - procedure_manager, - datanode_manager, - cache_invalidator.clone(), - table_metadata_manager.clone(), - Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())), - )); + let ddl_executor = Arc::new( + DdlManager::try_new( + procedure_manager, + datanode_manager, + cache_invalidator.clone(), + table_metadata_manager.clone(), + Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())), + ) + .context(error::InitDdlManagerSnafu)?, + ); let statement_executor = Arc::new(StatementExecutor::new( catalog_manager.clone(), diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 600c188e3a91..3a007b3163c4 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -43,6 +43,12 @@ pub enum Error { region_id: RegionId, }, + #[snafu(display("Failed to init ddl manager"))] + InitDdlManager { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to create default catalog and schema"))] InitMetadata { location: Location, @@ -685,7 +691,9 @@ impl ErrorExt for Error { | Error::UpdateTableRoute { source, .. } | Error::GetFullTableInfo { source, .. } => source.status_code(), - Error::InitMetadata { source, .. } => source.status_code(), + Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => { + source.status_code() + } Error::Other { source, .. } => source.status_code(), } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 92b259c10cdd..e3d03da142fb 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -28,10 +28,11 @@ use common_meta::sequence::{Sequence, SequenceRef}; use common_meta::state_store::KvStateStore; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; +use snafu::ResultExt; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; -use crate::error::Result; +use crate::error::{self, Result}; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::check_leader_handler::CheckLeaderHandler; use crate::handler::collect_stats_handler::CollectStatsHandler; @@ -196,8 +197,7 @@ impl MetaSrvBuilder { &table_metadata_manager, (&selector, &selector_ctx), &table_id_sequence, - ); - let _ = ddl_manager.try_start(); + )?; let opening_region_keeper = Arc::new(OpeningRegionKeeper::default()); let handler_group = match handler_group { @@ -330,7 +330,7 @@ fn build_ddl_manager( table_metadata_manager: &TableMetadataManagerRef, (selector, selector_ctx): (&SelectorRef, &SelectorContext), table_id_sequence: &SequenceRef, -) -> DdlManagerRef { +) -> Result { let datanode_clients = datanode_clients.unwrap_or_else(|| { let datanode_client_channel_config = ChannelConfig::new() .timeout(Duration::from_millis( @@ -355,12 +355,15 @@ fn build_ddl_manager( table_id_sequence.clone(), )); - Arc::new(DdlManager::new( - procedure_manager.clone(), - datanode_clients, - cache_invalidator, - table_metadata_manager.clone(), - table_meta_allocator, + Ok(Arc::new( + DdlManager::try_new( + procedure_manager.clone(), + datanode_clients, + cache_invalidator, + table_metadata_manager.clone(), + table_meta_allocator, + ) + .context(error::InitDdlManagerSnafu)?, )) } diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 33afcb02d64a..f11fe91bd621 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -99,10 +99,10 @@ impl GreptimeDbStandaloneBuilder { .init() .await .unwrap(); - procedure_manager.start().await.unwrap(); + let instance = Instance::try_new_standalone( kv_backend, - procedure_manager, + procedure_manager.clone(), catalog_manager, plugins, datanode.region_server(), @@ -110,6 +110,9 @@ impl GreptimeDbStandaloneBuilder { .await .unwrap(); + // Ensures all loaders are registered. + procedure_manager.start().await.unwrap(); + test_util::prepare_another_catalog_and_schema(&instance).await; instance.start().await.unwrap();