diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 547990447d84..a47de669afbd 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -245,7 +245,7 @@ impl CatalogManager for MemoryCatalogManager { } impl MemoryCatalogManager { - /// Create a manager with some default setups + /// Creates a manager with some default setups /// (e.g. default catalog/schema and information schema) pub fn with_default_setup() -> Arc { let manager = Arc::new(Self { @@ -267,19 +267,7 @@ impl MemoryCatalogManager { manager } - /// Registers a catalog and return the catalog already exist - pub fn register_catalog_if_absent(&self, name: String) -> bool { - let mut catalogs = self.catalogs.write().unwrap(); - let entry = catalogs.entry(name); - match entry { - Entry::Occupied(_) => true, - Entry::Vacant(v) => { - let _ = v.insert(HashMap::new()); - false - } - } - } - + /// Registers a catalog if it does not exist and returns false if the schema exists. pub fn register_catalog_sync(self: &Arc, name: String) -> Result { let mut catalogs = self.catalogs.write().unwrap(); @@ -294,6 +282,9 @@ impl MemoryCatalogManager { } } + /// Registers a schema if it does not exist. + /// It returns an error if the catalog does not exist, + /// and returns false if the schema exists. pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result { let mut catalogs = self.catalogs.write().unwrap(); let catalog = catalogs @@ -312,6 +303,7 @@ impl MemoryCatalogManager { } } + /// Registers a schema and returns an error if the catalog or schema does not exist. pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result { let mut catalogs = self.catalogs.write().unwrap(); let schema = catalogs @@ -524,10 +516,14 @@ mod tests { } #[test] - pub fn test_register_if_absent() { + pub fn test_register_catalog_sync() { let list = MemoryCatalogManager::with_default_setup(); - assert!(!list.register_catalog_if_absent("test_catalog".to_string(),)); - assert!(list.register_catalog_if_absent("test_catalog".to_string())); + assert!(list + .register_catalog_sync("test_catalog".to_string()) + .unwrap()); + assert!(!list + .register_catalog_sync("test_catalog".to_string()) + .unwrap()); } #[tokio::test] diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index c2bbc6848d20..3a78323ace42 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -15,9 +15,9 @@ use std::sync::Arc; use async_trait::async_trait; -use catalog::error::Error as CatalogError; +use catalog::error::{Error as CatalogError, Result as CatalogResult}; use catalog::remote::region_alive_keeper::RegionAliveKeepers; -use catalog::{CatalogManagerRef, RegisterTableRequest}; +use catalog::{CatalogManagerRef, RegisterSchemaRequest, RegisterTableRequest}; use common_catalog::format_full_table_name; use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ @@ -30,6 +30,7 @@ use store_api::storage::RegionNumber; use table::engine::manager::TableEngineManagerRef; use table::engine::EngineContext; use table::requests::OpenTableRequest; +use table::Table; use crate::error::{self, Result}; @@ -157,6 +158,45 @@ impl OpenRegionHandler { Ok(false) } + async fn register_table( + &self, + request: &OpenTableRequest, + table: Arc, + ) -> CatalogResult { + if !self + .catalog_manager + .catalog_exist(&request.catalog_name) + .await? + { + self.catalog_manager + .clone() + .register_catalog(request.catalog_name.to_string()) + .await?; + } + + if !self + .catalog_manager + .schema_exist(&request.catalog_name, &request.schema_name) + .await? + { + self.catalog_manager + .register_schema(RegisterSchemaRequest { + catalog: request.catalog_name.to_string(), + schema: request.schema_name.to_string(), + }) + .await?; + } + + let request = RegisterTableRequest { + catalog: request.catalog_name.to_string(), + schema: request.schema_name.to_string(), + table_name: request.table_name.to_string(), + table_id: request.table_id, + table, + }; + self.catalog_manager.register_table(request).await + } + async fn open_region_inner(&self, engine: String, request: OpenTableRequest) -> Result { let OpenTableRequest { catalog_name, @@ -187,14 +227,8 @@ impl OpenRegionHandler { table_name: format_full_table_name(catalog_name, schema_name, table_name), })? { - let request = RegisterTableRequest { - catalog: request.catalog_name.clone(), - schema: request.schema_name.clone(), - table_name: request.table_name.clone(), - table_id: request.table_id, - table, - }; - let result = self.catalog_manager.register_table(request).await; + let result = self.register_table(&request, table).await; + match result { Ok(_) | Err(CatalogError::TableExists { .. }) => Ok(true), e => e.with_context(|_| error::RegisterTableSnafu { diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 45a19e525318..f220981e9cee 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -182,8 +182,8 @@ async fn test_open_region_handler() { // Opens a non-exist table let non_exist_table_ident = TableIdent { - catalog: "greptime".to_string(), - schema: "public".to_string(), + catalog: "foo".to_string(), + schema: "non-exist".to_string(), table: "non-exist".to_string(), table_id: 2024, engine: "mito".to_string(),