From f99fa22d8ad1620ac9ef7b1af394fa4e498f153d Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 28 Aug 2023 08:51:02 +0000 Subject: [PATCH 1/2] fix: open region does not register catalog/schema --- src/catalog/src/local/memory.rs | 28 ++++++------- .../src/heartbeat/handler/open_region.rs | 42 ++++++++++++++----- 2 files changed, 44 insertions(+), 26 deletions(-) diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 547990447d84..5a20d576c7cd 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -245,7 +245,7 @@ impl CatalogManager for MemoryCatalogManager { } impl MemoryCatalogManager { - /// Create a manager with some default setups + /// Creates a manager with some default setups /// (e.g. default catalog/schema and information schema) pub fn with_default_setup() -> Arc { let manager = Arc::new(Self { @@ -267,19 +267,7 @@ impl MemoryCatalogManager { manager } - /// Registers a catalog and return the catalog already exist - pub fn register_catalog_if_absent(&self, name: String) -> bool { - let mut catalogs = self.catalogs.write().unwrap(); - let entry = catalogs.entry(name); - match entry { - Entry::Occupied(_) => true, - Entry::Vacant(v) => { - let _ = v.insert(HashMap::new()); - false - } - } - } - + /// Registers a catalog if it does not exist and returns true if it already exists. pub fn register_catalog_sync(self: &Arc, name: String) -> Result { let mut catalogs = self.catalogs.write().unwrap(); @@ -294,6 +282,9 @@ impl MemoryCatalogManager { } } + /// Registers a schema if it does not exist. + /// It returns an error if the catalog does not exist, + /// and returns true if the schema exists. pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result { let mut catalogs = self.catalogs.write().unwrap(); let catalog = catalogs @@ -312,6 +303,7 @@ impl MemoryCatalogManager { } } + /// Registers a schema and returns an error if the catalog or schema does not exist. pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result { let mut catalogs = self.catalogs.write().unwrap(); let schema = catalogs @@ -526,8 +518,12 @@ mod tests { #[test] pub fn test_register_if_absent() { let list = MemoryCatalogManager::with_default_setup(); - assert!(!list.register_catalog_if_absent("test_catalog".to_string(),)); - assert!(list.register_catalog_if_absent("test_catalog".to_string())); + assert!(!list + .register_catalog_sync("test_catalog".to_string()) + .unwrap()); + assert!(list + .register_catalog_sync("test_catalog".to_string()) + .unwrap()); } #[tokio::test] diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index c2bbc6848d20..da5cb94d7d85 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -15,9 +15,9 @@ use std::sync::Arc; use async_trait::async_trait; -use catalog::error::Error as CatalogError; +use catalog::error::{Error as CatalogError, Result as CatalogResult}; use catalog::remote::region_alive_keeper::RegionAliveKeepers; -use catalog::{CatalogManagerRef, RegisterTableRequest}; +use catalog::{CatalogManagerRef, RegisterSchemaRequest, RegisterTableRequest}; use common_catalog::format_full_table_name; use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ @@ -30,6 +30,7 @@ use store_api::storage::RegionNumber; use table::engine::manager::TableEngineManagerRef; use table::engine::EngineContext; use table::requests::OpenTableRequest; +use table::Table; use crate::error::{self, Result}; @@ -157,6 +158,33 @@ impl OpenRegionHandler { Ok(false) } + async fn register_table( + &self, + request: &OpenTableRequest, + table: Arc, + ) -> CatalogResult { + self.catalog_manager + .clone() + .register_catalog(request.catalog_name.to_string()) + .await?; + + self.catalog_manager + .register_schema(RegisterSchemaRequest { + catalog: request.catalog_name.to_string(), + schema: request.schema_name.to_string(), + }) + .await?; + + let request = RegisterTableRequest { + catalog: request.catalog_name.to_string(), + schema: request.schema_name.to_string(), + table_name: request.table_name.to_string(), + table_id: request.table_id, + table, + }; + self.catalog_manager.register_table(request).await + } + async fn open_region_inner(&self, engine: String, request: OpenTableRequest) -> Result { let OpenTableRequest { catalog_name, @@ -187,14 +215,8 @@ impl OpenRegionHandler { table_name: format_full_table_name(catalog_name, schema_name, table_name), })? { - let request = RegisterTableRequest { - catalog: request.catalog_name.clone(), - schema: request.schema_name.clone(), - table_name: request.table_name.clone(), - table_id: request.table_id, - table, - }; - let result = self.catalog_manager.register_table(request).await; + let result = self.register_table(&request, table).await; + match result { Ok(_) | Err(CatalogError::TableExists { .. }) => Ok(true), e => e.with_context(|_| error::RegisterTableSnafu { From 3f9ed37aee4a0adaed8c18c8fdef0d3555237c34 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 28 Aug 2023 10:54:41 +0000 Subject: [PATCH 2/2] fix: fix ci --- src/catalog/src/local/memory.rs | 10 +++--- .../src/heartbeat/handler/open_region.rs | 34 +++++++++++++------ src/datanode/src/tests.rs | 4 +-- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 5a20d576c7cd..a47de669afbd 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -267,7 +267,7 @@ impl MemoryCatalogManager { manager } - /// Registers a catalog if it does not exist and returns true if it already exists. + /// Registers a catalog if it does not exist and returns false if the schema exists. pub fn register_catalog_sync(self: &Arc, name: String) -> Result { let mut catalogs = self.catalogs.write().unwrap(); @@ -284,7 +284,7 @@ impl MemoryCatalogManager { /// Registers a schema if it does not exist. /// It returns an error if the catalog does not exist, - /// and returns true if the schema exists. + /// and returns false if the schema exists. pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result { let mut catalogs = self.catalogs.write().unwrap(); let catalog = catalogs @@ -516,12 +516,12 @@ mod tests { } #[test] - pub fn test_register_if_absent() { + pub fn test_register_catalog_sync() { let list = MemoryCatalogManager::with_default_setup(); - assert!(!list + assert!(list .register_catalog_sync("test_catalog".to_string()) .unwrap()); - assert!(list + assert!(!list .register_catalog_sync("test_catalog".to_string()) .unwrap()); } diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index da5cb94d7d85..3a78323ace42 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -163,17 +163,29 @@ impl OpenRegionHandler { request: &OpenTableRequest, table: Arc, ) -> CatalogResult { - self.catalog_manager - .clone() - .register_catalog(request.catalog_name.to_string()) - .await?; - - self.catalog_manager - .register_schema(RegisterSchemaRequest { - catalog: request.catalog_name.to_string(), - schema: request.schema_name.to_string(), - }) - .await?; + if !self + .catalog_manager + .catalog_exist(&request.catalog_name) + .await? + { + self.catalog_manager + .clone() + .register_catalog(request.catalog_name.to_string()) + .await?; + } + + if !self + .catalog_manager + .schema_exist(&request.catalog_name, &request.schema_name) + .await? + { + self.catalog_manager + .register_schema(RegisterSchemaRequest { + catalog: request.catalog_name.to_string(), + schema: request.schema_name.to_string(), + }) + .await?; + } let request = RegisterTableRequest { catalog: request.catalog_name.to_string(), diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 45a19e525318..f220981e9cee 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -182,8 +182,8 @@ async fn test_open_region_handler() { // Opens a non-exist table let non_exist_table_ident = TableIdent { - catalog: "greptime".to_string(), - schema: "public".to_string(), + catalog: "foo".to_string(), + schema: "non-exist".to_string(), table: "non-exist".to_string(), table_id: 2024, engine: "mito".to_string(),