From fe954b78a27b279a8ead4287bc3fa91b3a1f6836 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Tue, 12 Sep 2023 11:36:21 +0800 Subject: [PATCH] refactor: system tables in new region server (#2344) refactor: inverse the dependency between system tables and catalog manager --- src/catalog/src/error.rs | 18 - src/catalog/src/lib.rs | 54 +-- src/catalog/src/local.rs | 1 - src/catalog/src/local/manager.rs | 350 ++---------------- src/catalog/src/local/memory.rs | 245 ++---------- src/catalog/src/system.rs | 66 +--- src/catalog/src/tables.rs | 12 - src/catalog/tests/local_catalog_tests.rs | 175 --------- src/cmd/src/frontend.rs | 8 +- src/datanode/src/error.rs | 8 +- src/frontend/src/catalog.rs | 67 ++-- src/frontend/src/instance.rs | 19 +- src/frontend/src/statement/ddl.rs | 33 +- src/promql/src/planner.rs | 3 +- src/query/src/datafusion.rs | 2 +- src/query/src/range_select/plan_rewrite.rs | 3 +- src/query/src/tests/query_engine_test.rs | 2 +- src/query/src/tests/time_range_filter_test.rs | 2 +- src/script/src/manager.rs | 41 +- src/table-procedure/src/alter.rs | 93 +---- src/table-procedure/src/create.rs | 3 +- src/table-procedure/src/drop.rs | 3 +- 22 files changed, 183 insertions(+), 1025 deletions(-) delete mode 100644 src/catalog/tests/local_catalog_tests.rs diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 9fe1f5cbfb2d..d97ccd544a4f 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -22,8 +22,6 @@ use datatypes::prelude::ConcreteDataType; use snafu::{Location, Snafu}; use tokio::task::JoinError; -use crate::DeregisterTableRequest; - #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { @@ -179,20 +177,6 @@ pub enum Error { source: table::error::Error, }, - #[snafu(display( - "Failed to deregister table, request: {:?}, source: {}", - request, - source - ))] - DeregisterTable { - request: DeregisterTableRequest, - location: Location, - source: table::error::Error, - }, - - #[snafu(display("Illegal catalog manager state: {}", msg))] - IllegalManagerState { location: Location, msg: String }, - #[snafu(display("Failed to scan system catalog table, source: {}", source))] SystemCatalogTableScan { location: Location, @@ -269,7 +253,6 @@ impl ErrorExt for Error { Error::InvalidKey { .. } | Error::SchemaNotFound { .. } | Error::TableNotFound { .. } - | Error::IllegalManagerState { .. } | Error::CatalogNotFound { .. } | Error::InvalidEntryType { .. } | Error::ParallelOpenTable { .. } => StatusCode::Unexpected, @@ -302,7 +285,6 @@ impl ErrorExt for Error { | Error::InsertCatalogRecord { source, .. } | Error::OpenTable { source, .. } | Error::CreateTable { source, .. } - | Error::DeregisterTable { source, .. } | Error::TableSchemaMismatch { source, .. } => source.status_code(), Error::MetaSrv { source, .. } => source.status_code(), diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 81b4b742b89a..4f2af428af3b 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -44,36 +44,40 @@ pub mod tables; pub trait CatalogManager: Send + Sync { fn as_any(&self) -> &dyn Any; - /// Starts a catalog manager. - async fn start(&self) -> Result<()>; - - /// Registers a catalog to catalog manager, returns whether the catalog exist before. - async fn register_catalog(self: Arc, name: String) -> Result; + /// Register a local catalog. + /// + /// # Returns + /// + /// Whether the catalog is registered. + fn register_local_catalog(&self, name: &str) -> Result; - /// Register a schema with catalog name and schema name. Retuens whether the - /// schema registered. + /// Register a local schema. + /// + /// # Returns + /// + /// Whether the schema is registered. /// /// # Errors /// /// This method will/should fail if catalog not exist - async fn register_schema(&self, request: RegisterSchemaRequest) -> Result; + fn register_local_schema(&self, request: RegisterSchemaRequest) -> Result; /// Deregisters a database within given catalog/schema to catalog manager - async fn deregister_schema(&self, request: DeregisterSchemaRequest) -> Result; + fn deregister_local_schema(&self, request: DeregisterSchemaRequest) -> Result; - /// Registers a table within given catalog/schema to catalog manager, - /// returns whether the table registered. + /// Registers a local table. + /// + /// # Returns + /// + /// Whether the table is registered. /// /// # Errors /// /// This method will/should fail if catalog or schema not exist - async fn register_table(&self, request: RegisterTableRequest) -> Result; + fn register_local_table(&self, request: RegisterTableRequest) -> Result; /// Deregisters a table within given catalog/schema to catalog manager - async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()>; - - /// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed. - async fn rename_table(&self, request: RenameTableRequest) -> Result; + fn deregister_local_table(&self, request: DeregisterTableRequest) -> Result<()>; async fn catalog_names(&self) -> Result>; @@ -160,7 +164,7 @@ pub struct RegisterSchemaRequest { pub schema: String, } -pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>( +pub(crate) async fn handle_system_table_request<'a, M: CatalogManager + ?Sized>( manager: &'a M, engine: TableEngineRef, sys_table_requests: &'a mut Vec, @@ -185,15 +189,13 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>( table_name, ), })?; - let _ = manager - .register_table(RegisterTableRequest { - catalog: catalog_name.clone(), - schema: schema_name.clone(), - table_name: table_name.clone(), - table_id, - table: table.clone(), - }) - .await?; + manager.register_local_table(RegisterTableRequest { + catalog: catalog_name.clone(), + schema: schema_name.clone(), + table_name: table_name.clone(), + table_id, + table: table.clone(), + })?; info!("Created and registered system table: {table_name}"); table }; diff --git a/src/catalog/src/local.rs b/src/catalog/src/local.rs index 8e8bcf40b556..39b1a7de38d6 100644 --- a/src/catalog/src/local.rs +++ b/src/catalog/src/local.rs @@ -15,5 +15,4 @@ pub mod manager; pub mod memory; -pub use manager::LocalCatalogManager; pub use memory::{new_memory_catalog_manager, MemoryCatalogManager}; diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 5c5fb11623b0..cf5543f57e9d 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -12,77 +12,61 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; -use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use common_catalog::consts::{ - DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID, - MITO_ENGINE, NUMBERS_TABLE_ID, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, - SYSTEM_CATALOG_TABLE_NAME, + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MITO_ENGINE, + NUMBERS_TABLE_ID, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME, }; -use common_catalog::format_full_table_name; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; -use common_telemetry::{error, info}; +use common_telemetry::info; use datatypes::prelude::ScalarVector; use datatypes::vectors::{BinaryVector, UInt8Vector}; use futures_util::lock::Mutex; -use metrics::increment_gauge; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::manager::TableEngineManagerRef; use table::engine::EngineContext; -use table::metadata::TableId; use table::requests::OpenTableRequest; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; -use table::table::TableIdProvider; -use table::TableRef; use crate::error::{ - self, CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, - Result, SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu, - SystemCatalogTypeMismatchSnafu, TableEngineNotFoundSnafu, TableExistsSnafu, TableNotExistSnafu, - TableNotFoundSnafu, UnimplementedSnafu, + CatalogNotFoundSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, Result, SchemaNotFoundSnafu, + SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableEngineNotFoundSnafu, + TableNotFoundSnafu, }; -use crate::local::memory::MemoryCatalogManager; use crate::system::{ decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX, VALUE_INDEX, }; use crate::tables::SystemCatalog; use crate::{ - handle_system_table_request, CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, - RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, + handle_system_table_request, CatalogManagerRef, RegisterSchemaRequest, + RegisterSystemTableRequest, RegisterTableRequest, }; -/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs. -pub struct LocalCatalogManager { +pub struct SystemTableInitializer { system: Arc, - catalogs: Arc, + catalog_manager: CatalogManagerRef, engine_manager: TableEngineManagerRef, - next_table_id: AtomicU32, - init_lock: Mutex, - register_lock: Mutex<()>, system_table_requests: Mutex>, } -impl LocalCatalogManager { - /// Create a new [CatalogManager] with given user catalogs and mito engine - pub async fn try_new(engine_manager: TableEngineManagerRef) -> Result { +impl SystemTableInitializer { + pub async fn try_new( + engine_manager: TableEngineManagerRef, + catalog_manager: CatalogManagerRef, + ) -> Result { let engine = engine_manager .engine(MITO_ENGINE) .context(TableEngineNotFoundSnafu { engine_name: MITO_ENGINE, })?; let table = SystemCatalogTable::new(engine.clone()).await?; - let memory_catalog_manager = crate::local::memory::new_memory_catalog_manager()?; let system_catalog = Arc::new(SystemCatalog::new(table)); Ok(Self { system: system_catalog, - catalogs: memory_catalog_manager, + catalog_manager, engine_manager, - next_table_id: AtomicU32::new(MIN_USER_TABLE_ID), - init_lock: Mutex::new(false), - register_lock: Mutex::new(()), system_table_requests: Mutex::new(Vec::default()), }) } @@ -92,15 +76,7 @@ impl LocalCatalogManager { self.init_system_catalog().await?; let system_records = self.system.information_schema.system.records().await?; let entries = self.collect_system_catalog_entries(system_records).await?; - let max_table_id = self.handle_system_catalog_entries(entries).await?; - - info!( - "All system catalog entries processed, max table id: {}", - max_table_id - ); - self.next_table_id - .store((max_table_id + 1).max(MIN_USER_TABLE_ID), Ordering::Relaxed); - *self.init_lock.lock().await = true; + self.handle_system_catalog_entries(entries).await?; // Processing system table hooks let mut sys_table_requests = self.system_table_requests.lock().await; @@ -111,26 +87,24 @@ impl LocalCatalogManager { engine_name: MITO_ENGINE, })?; - handle_system_table_request(self, engine, &mut sys_table_requests).await?; + handle_system_table_request( + self.catalog_manager.as_ref(), + engine, + &mut sys_table_requests, + ) + .await?; Ok(()) } async fn init_system_catalog(&self) -> Result<()> { - // register default catalog and default schema - self.catalogs - .register_catalog_sync(DEFAULT_CATALOG_NAME.to_string())?; - self.catalogs.register_schema_sync(RegisterSchemaRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - })?; + let catalog_manager = &self.catalog_manager; + catalog_manager.register_local_catalog(SYSTEM_CATALOG_NAME)?; - // register SystemCatalogTable - self.catalogs - .register_catalog_sync(SYSTEM_CATALOG_NAME.to_string())?; - self.catalogs.register_schema_sync(RegisterSchemaRequest { + catalog_manager.register_local_schema(RegisterSchemaRequest { catalog: SYSTEM_CATALOG_NAME.to_string(), schema: INFORMATION_SCHEMA_NAME.to_string(), })?; + let register_table_req = RegisterTableRequest { catalog: SYSTEM_CATALOG_NAME.to_string(), schema: INFORMATION_SCHEMA_NAME.to_string(), @@ -138,7 +112,7 @@ impl LocalCatalogManager { table_id: SYSTEM_CATALOG_TABLE_ID, table: self.system.information_schema.system.as_table_ref(), }; - self.catalogs.register_table(register_table_req).await?; + catalog_manager.register_local_table(register_table_req)?; // Add numbers table for test let register_number_table_req = RegisterTableRequest { @@ -149,9 +123,7 @@ impl LocalCatalogManager { table: NumbersTable::table(NUMBERS_TABLE_ID), }; - self.catalogs - .register_table(register_number_table_req) - .await?; + catalog_manager.register_local_table(register_number_table_req)?; Ok(()) } @@ -216,16 +188,14 @@ impl LocalCatalogManager { Ok(res) } - /// Processes records from system catalog table and returns the max table id persisted - /// in system catalog table. - async fn handle_system_catalog_entries(&self, entries: Vec) -> Result { + /// Processes records from system catalog table. + async fn handle_system_catalog_entries(&self, entries: Vec) -> Result<()> { let entries = Self::sort_entries(entries); - let mut max_table_id = 0; for entry in entries { match entry { Entry::Catalog(c) => { - self.catalogs - .register_catalog_sync(c.catalog_name.clone())?; + self.catalog_manager + .register_local_catalog(&c.catalog_name)?; info!("Register catalog: {}", c.catalog_name); } Entry::Schema(s) => { @@ -233,11 +203,10 @@ impl LocalCatalogManager { catalog: s.catalog_name.clone(), schema: s.schema_name.clone(), }; - let _ = self.catalogs.register_schema_sync(req)?; + self.catalog_manager.register_local_schema(req)?; info!("Registered schema: {:?}", s); } Entry::Table(t) => { - max_table_id = max_table_id.max(t.table_id); if t.is_deleted { continue; } @@ -246,7 +215,7 @@ impl LocalCatalogManager { } } } - Ok(max_table_id) + Ok(()) } /// Sort catalog entries to ensure catalog entries comes first, then schema entries, @@ -298,19 +267,8 @@ impl LocalCatalogManager { table_id: t.table_id, table: table_ref, }; - let _ = self.catalogs.register_table(register_request).await?; - - Ok(()) - } - - async fn check_state(&self) -> Result<()> { - let started = self.init_lock.lock().await; - ensure!( - *started, - IllegalManagerStateSnafu { - msg: "Catalog manager not started", - } - ); + self.catalog_manager + .register_local_table(register_request)?; Ok(()) } @@ -319,11 +277,11 @@ impl LocalCatalogManager { catalog_name: &str, schema_name: &str, ) -> Result<()> { - if !self.catalogs.catalog_exist(catalog_name).await? { + if !self.catalog_manager.catalog_exist(catalog_name).await? { return CatalogNotFoundSnafu { catalog_name }.fail()?; } if !self - .catalogs + .catalog_manager .schema_exist(catalog_name, schema_name) .await? { @@ -337,234 +295,6 @@ impl LocalCatalogManager { } } -#[async_trait::async_trait] -impl TableIdProvider for LocalCatalogManager { - async fn next_table_id(&self) -> table::Result { - Ok(self.next_table_id.fetch_add(1, Ordering::Relaxed)) - } -} - -#[async_trait::async_trait] -impl CatalogManager for LocalCatalogManager { - /// Start [LocalCatalogManager] to load all information from system catalog table. - /// Make sure table engine is initialized before starting [MemoryCatalogManager]. - async fn start(&self) -> Result<()> { - self.init().await - } - - async fn register_table(&self, request: RegisterTableRequest) -> Result { - self.check_state().await?; - - let catalog_name = request.catalog.clone(); - let schema_name = request.schema.clone(); - - self.check_catalog_schema_exist(&catalog_name, &schema_name) - .await?; - - { - let _lock = self.register_lock.lock().await; - if let Some(existing) = self - .catalogs - .table(&request.catalog, &request.schema, &request.table_name) - .await? - { - if existing.table_info().ident.table_id != request.table_id { - error!( - "Unexpected table register request: {:?}, existing: {:?}", - request, - existing.table_info() - ); - return TableExistsSnafu { - table: format_full_table_name( - &catalog_name, - &schema_name, - &request.table_name, - ), - } - .fail(); - } - // Try to register table with same table id, just ignore. - Ok(false) - } else { - // table does not exist - let engine = request.table.table_info().meta.engine.to_string(); - let table_name = request.table_name.clone(); - let table_id = request.table_id; - let _ = self.catalogs.register_table(request).await?; - let _ = self - .system - .register_table( - catalog_name.clone(), - schema_name.clone(), - table_name, - table_id, - engine, - ) - .await?; - increment_gauge!( - crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, - 1.0, - &[crate::metrics::db_label(&catalog_name, &schema_name)], - ); - Ok(true) - } - } - } - - async fn rename_table(&self, request: RenameTableRequest) -> Result { - self.check_state().await?; - - let catalog_name = &request.catalog; - let schema_name = &request.schema; - - self.check_catalog_schema_exist(catalog_name, schema_name) - .await?; - ensure!( - self.catalogs - .table(catalog_name, schema_name, &request.new_table_name) - .await? - .is_none(), - TableExistsSnafu { - table: &request.new_table_name - } - ); - - let _lock = self.register_lock.lock().await; - let old_table = self - .catalogs - .table(catalog_name, schema_name, &request.table_name) - .await? - .context(TableNotExistSnafu { - table: &request.table_name, - })?; - - let engine = old_table.table_info().meta.engine.to_string(); - // rename table in system catalog - let _ = self - .system - .register_table( - catalog_name.clone(), - schema_name.clone(), - request.new_table_name.clone(), - request.table_id, - engine, - ) - .await?; - - self.catalogs.rename_table(request).await - } - - async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> { - self.check_state().await?; - - { - let _ = self.register_lock.lock().await; - - let DeregisterTableRequest { - catalog, - schema, - table_name, - } = &request; - let table_id = self - .catalogs - .table(catalog, schema, table_name) - .await? - .with_context(|| error::TableNotExistSnafu { - table: format_full_table_name(catalog, schema, table_name), - })? - .table_info() - .ident - .table_id; - - self.system.deregister_table(&request, table_id).await?; - self.catalogs.deregister_table(request).await - } - } - - async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { - self.check_state().await?; - - let catalog_name = &request.catalog; - let schema_name = &request.schema; - - if !self.catalogs.catalog_exist(catalog_name).await? { - return CatalogNotFoundSnafu { catalog_name }.fail()?; - } - - { - let _lock = self.register_lock.lock().await; - ensure!( - !self - .catalogs - .schema_exist(catalog_name, schema_name) - .await?, - SchemaExistsSnafu { - schema: schema_name, - } - ); - let _ = self - .system - .register_schema(request.catalog.clone(), schema_name.clone()) - .await?; - self.catalogs.register_schema_sync(request) - } - } - - async fn deregister_schema(&self, _request: DeregisterSchemaRequest) -> Result { - UnimplementedSnafu { - operation: "deregister schema", - } - .fail() - } - - async fn schema_exist(&self, catalog: &str, schema: &str) -> Result { - self.catalogs.schema_exist(catalog, schema).await - } - - async fn table( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - ) -> Result> { - self.catalogs - .table(catalog_name, schema_name, table_name) - .await - } - - async fn catalog_exist(&self, catalog: &str) -> Result { - if catalog.eq_ignore_ascii_case(SYSTEM_CATALOG_NAME) { - Ok(true) - } else { - self.catalogs.catalog_exist(catalog).await - } - } - - async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result { - self.catalogs.table_exist(catalog, schema, table).await - } - - async fn catalog_names(&self) -> Result> { - self.catalogs.catalog_names().await - } - - async fn schema_names(&self, catalog_name: &str) -> Result> { - self.catalogs.schema_names(catalog_name).await - } - - async fn table_names(&self, catalog_name: &str, schema_name: &str) -> Result> { - self.catalogs.table_names(catalog_name, schema_name).await - } - - async fn register_catalog(self: Arc, name: String) -> Result { - self.catalogs.clone().register_catalog(name).await - } - - fn as_any(&self) -> &dyn Any { - self - } -} - #[cfg(test)] mod tests { use std::assert_matches::assert_matches; @@ -608,7 +338,7 @@ mod tests { is_deleted: false, }), ]; - let res = LocalCatalogManager::sort_entries(vec); + let res = SystemTableInitializer::sort_entries(vec); assert_matches!(res[0], Entry::Catalog(..)); assert_matches!(res[1], Entry::Catalog(..)); assert_matches!(res[2], Entry::Schema(..)); diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index c4449bffac6b..7360941483f9 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -15,96 +15,47 @@ use std::any::Any; use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, RwLock, Weak}; -use common_catalog::consts::{ - DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID, -}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME}; use metrics::{decrement_gauge, increment_gauge}; use snafu::OptionExt; -use table::metadata::TableId; -use table::table::TableIdProvider; use table::TableRef; -use crate::error::{ - CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu, -}; +use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; use crate::information_schema::InformationSchemaProvider; use crate::{ CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest, - RegisterTableRequest, RenameTableRequest, + RegisterTableRequest, }; type SchemaEntries = HashMap>; /// Simple in-memory list of catalogs +#[derive(Clone)] pub struct MemoryCatalogManager { /// Collection of catalogs containing schemas and ultimately Tables - pub catalogs: RwLock>, - pub table_id: AtomicU32, -} - -#[async_trait::async_trait] -impl TableIdProvider for MemoryCatalogManager { - async fn next_table_id(&self) -> table::error::Result { - Ok(self.table_id.fetch_add(1, Ordering::Relaxed)) - } + catalogs: Arc>>, } #[async_trait::async_trait] impl CatalogManager for MemoryCatalogManager { - async fn start(&self) -> Result<()> { - self.table_id.store(MIN_USER_TABLE_ID, Ordering::Relaxed); - Ok(()) + fn register_local_catalog(&self, name: &str) -> Result { + self.register_catalog(name) } - - async fn register_table(&self, request: RegisterTableRequest) -> Result { - self.register_table_sync(request) + fn register_local_table(&self, request: RegisterTableRequest) -> Result { + self.register_table(request) } - async fn rename_table(&self, request: RenameTableRequest) -> Result { - let mut catalogs = self.catalogs.write().unwrap(); - let schema = catalogs - .get_mut(&request.catalog) - .with_context(|| CatalogNotFoundSnafu { - catalog_name: &request.catalog, - })? - .get_mut(&request.schema) - .with_context(|| SchemaNotFoundSnafu { - catalog: &request.catalog, - schema: &request.schema, - })?; - - // check old and new table names - if !schema.contains_key(&request.table_name) { - return TableNotFoundSnafu { - table_info: request.table_name.to_string(), - } - .fail()?; - } - if schema.contains_key(&request.new_table_name) { - return TableExistsSnafu { - table: &request.new_table_name, - } - .fail(); - } - - let table = schema.remove(&request.table_name).unwrap(); - let _ = schema.insert(request.new_table_name, table); - - Ok(true) - } - - async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> { + fn deregister_local_table(&self, request: DeregisterTableRequest) -> Result<()> { self.deregister_table_sync(request) } - async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { + fn register_local_schema(&self, request: RegisterSchemaRequest) -> Result { self.register_schema_sync(request) } - async fn deregister_schema(&self, request: DeregisterSchemaRequest) -> Result { + fn deregister_local_schema(&self, request: DeregisterSchemaRequest) -> Result { let mut catalogs = self.catalogs.write().unwrap(); let schemas = catalogs .get_mut(&request.catalog) @@ -203,28 +154,27 @@ impl CatalogManager for MemoryCatalogManager { .collect()) } - async fn register_catalog(self: Arc, name: String) -> Result { - self.register_catalog_sync(name) - } - fn as_any(&self) -> &dyn Any { self } } impl MemoryCatalogManager { + pub fn new() -> Arc { + Arc::new(Self { + catalogs: Default::default(), + }) + } + /// Creates a manager with some default setups /// (e.g. default catalog/schema and information schema) pub fn with_default_setup() -> Arc { let manager = Arc::new(Self { - table_id: AtomicU32::new(MIN_USER_TABLE_ID), catalogs: Default::default(), }); // Safety: default catalog/schema is registered in order so no CatalogNotFound error will occur - manager - .register_catalog_sync(DEFAULT_CATALOG_NAME.to_string()) - .unwrap(); + manager.register_catalog(DEFAULT_CATALOG_NAME).unwrap(); manager .register_schema_sync(RegisterSchemaRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), @@ -252,12 +202,15 @@ impl MemoryCatalogManager { } /// Registers a catalog if it does not exist and returns false if the schema exists. - pub fn register_catalog_sync(self: &Arc, name: String) -> Result { + pub fn register_catalog(&self, name: &str) -> Result { + let name = name.to_string(); + let mut catalogs = self.catalogs.write().unwrap(); match catalogs.entry(name.clone()) { Entry::Vacant(e) => { - let catalog = self.create_catalog_entry(name); + let arc_self = Arc::new(self.clone()); + let catalog = arc_self.create_catalog_entry(name); e.insert(catalog); increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0); Ok(true) @@ -311,7 +264,7 @@ impl MemoryCatalogManager { } /// Registers a schema and returns an error if the catalog or schema does not exist. - pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result { + pub fn register_table(&self, request: RegisterTableRequest) -> Result { let mut catalogs = self.catalogs.write().unwrap(); let schema = catalogs .get_mut(&request.catalog) @@ -356,7 +309,7 @@ impl MemoryCatalogManager { let schema = &table.table_info().schema_name; if !manager.catalog_exist_sync(catalog).unwrap() { - manager.register_catalog_sync(catalog.to_string()).unwrap(); + manager.register_catalog(catalog).unwrap(); } if !manager.schema_exist_sync(catalog, schema).unwrap() { @@ -375,7 +328,7 @@ impl MemoryCatalogManager { table_id: table.table_info().ident.table_id, table, }; - let _ = manager.register_table_sync(request).unwrap(); + let _ = manager.register_table(request).unwrap(); manager } } @@ -388,8 +341,6 @@ pub fn new_memory_catalog_manager() -> Result> { #[cfg(test)] mod tests { use common_catalog::consts::*; - use common_error::ext::ErrorExt; - use common_error::status_code::StatusCode; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use super::*; @@ -406,7 +357,7 @@ mod tests { table: NumbersTable::table(NUMBERS_TABLE_ID), }; - let _ = catalog_list.register_table(register_request).await.unwrap(); + catalog_list.register_local_table(register_request).unwrap(); let table = catalog_list .table( DEFAULT_CATALOG_NAME, @@ -423,130 +374,11 @@ mod tests { .is_none()); } - #[tokio::test] - async fn test_mem_manager_rename_table() { - let catalog = MemoryCatalogManager::with_default_setup(); - let table_name = "test_table"; - assert!(!catalog - .table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) - .await - .unwrap()); - // register test table - let table_id = 2333; - let register_request = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - table_id, - table: NumbersTable::table(table_id), - }; - assert!(catalog.register_table(register_request).await.unwrap()); - assert!(catalog - .table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) - .await - .unwrap()); - - // rename test table - let new_table_name = "test_table_renamed"; - let rename_request = RenameTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - new_table_name: new_table_name.to_string(), - table_id, - }; - let _ = catalog.rename_table(rename_request).await.unwrap(); - - // test old table name not exist - assert!(!catalog - .table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) - .await - .unwrap()); - - // test new table name exists - assert!(catalog - .table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) - .await - .unwrap()); - let registered_table = catalog - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) - .await - .unwrap() - .unwrap(); - assert_eq!(registered_table.table_info().ident.table_id, table_id); - - let dup_register_request = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: new_table_name.to_string(), - table_id: table_id + 1, - table: NumbersTable::table(table_id + 1), - }; - let result = catalog.register_table(dup_register_request).await; - let err = result.err().unwrap(); - assert_eq!(StatusCode::TableAlreadyExists, err.status_code()); - } - - #[tokio::test] - async fn test_catalog_rename_table() { - let catalog = MemoryCatalogManager::with_default_setup(); - let table_name = "num"; - let table_id = 2333; - let table = NumbersTable::table(table_id); - - // register table - let register_table_req = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - table_id, - table, - }; - assert!(catalog.register_table(register_table_req).await.unwrap()); - assert!(catalog - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) - .await - .unwrap() - .is_some()); - - // rename table - let new_table_name = "numbers_new"; - let rename_table_req = RenameTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - new_table_name: new_table_name.to_string(), - table_id, - }; - assert!(catalog.rename_table(rename_table_req).await.unwrap()); - assert!(catalog - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) - .await - .unwrap() - .is_none()); - assert!(catalog - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) - .await - .unwrap() - .is_some()); - - let registered_table = catalog - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) - .await - .unwrap() - .unwrap(); - assert_eq!(registered_table.table_info().ident.table_id, table_id); - } - #[test] pub fn test_register_catalog_sync() { let list = MemoryCatalogManager::with_default_setup(); - assert!(list - .register_catalog_sync("test_catalog".to_string()) - .unwrap()); - assert!(!list - .register_catalog_sync("test_catalog".to_string()) - .unwrap()); + assert!(list.register_catalog("test_catalog").unwrap()); + assert!(!list.register_catalog("test_catalog").unwrap()); } #[tokio::test] @@ -561,7 +393,7 @@ mod tests { table_id: 2333, table: NumbersTable::table(2333), }; - let _ = catalog.register_table(register_table_req).await.unwrap(); + catalog.register_local_table(register_table_req).unwrap(); assert!(catalog .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) .await @@ -574,8 +406,7 @@ mod tests { table_name: table_name.to_string(), }; catalog - .deregister_table(deregister_table_req) - .await + .deregister_local_table(deregister_table_req) .unwrap(); assert!(catalog .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) @@ -603,20 +434,16 @@ mod tests { table_id: 0, table: NumbersTable::table(0), }; - catalog - .clone() - .register_catalog(catalog_name.clone()) - .await - .unwrap(); - catalog.register_schema(schema).await.unwrap(); - catalog.register_table(table).await.unwrap(); + catalog.register_local_catalog(&catalog_name).unwrap(); + catalog.register_local_schema(schema).unwrap(); + catalog.register_local_table(table).unwrap(); let request = DeregisterSchemaRequest { catalog: catalog_name.clone(), schema: schema_name.clone(), }; - assert!(catalog.deregister_schema(request).await.unwrap()); + assert!(catalog.deregister_local_schema(request).unwrap()); assert!(!catalog .schema_exist(&catalog_name, &schema_name) .await diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index c4c9d654eaaa..624b1c697672 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -20,7 +20,7 @@ use common_catalog::consts::{ SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME, }; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::{debug, warn}; +use common_telemetry::debug; use common_time::util; use datatypes::prelude::{ConcreteDataType, ScalarVector, VectorRef}; use datatypes::schema::{ColumnSchema, RawSchema}; @@ -34,11 +34,9 @@ use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest, Table use table::TableRef; use crate::error::{ - self, CreateSystemCatalogSnafu, DeregisterTableSnafu, EmptyValueSnafu, Error, - InsertCatalogRecordSnafu, InvalidEntryTypeSnafu, InvalidKeySnafu, OpenSystemCatalogSnafu, - Result, ValueDeserializeSnafu, + self, CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InsertCatalogRecordSnafu, + InvalidEntryTypeSnafu, InvalidKeySnafu, OpenSystemCatalogSnafu, Result, ValueDeserializeSnafu, }; -use crate::DeregisterTableRequest; pub const ENTRY_TYPE_INDEX: usize = 0; pub const KEY_INDEX: usize = 1; @@ -104,30 +102,6 @@ impl SystemCatalogTable { .context(InsertCatalogRecordSnafu) } - pub(crate) async fn deregister_table( - &self, - request: &DeregisterTableRequest, - table_id: TableId, - ) -> Result<()> { - let deletion_request = build_table_deletion_request(request, table_id); - self.0 - .insert(deletion_request) - .await - .map(|x| { - if x != 1 { - let table = common_catalog::format_full_table_name( - &request.catalog, - &request.schema, - &request.table_name - ); - warn!("Failed to delete table record from information_schema, unexpected returned result: {x}, table: {table}"); - } - }) - .with_context(|_| DeregisterTableSnafu { - request: request.clone(), - }) - } - pub async fn register_schema(&self, catalog: String, schema: String) -> Result { let insert_request = build_schema_insert_request(catalog, schema); self.0 @@ -232,24 +206,6 @@ pub fn build_table_insert_request( ) } -pub(crate) fn build_table_deletion_request( - request: &DeregisterTableRequest, - table_id: TableId, -) -> InsertRequest { - let entry_key = format_table_entry_key(&request.catalog, &request.schema, table_id); - build_insert_request( - EntryType::Table, - entry_key.as_bytes(), - serde_json::to_string(&TableEntryValue { - table_name: "".to_string(), - engine: "".to_string(), - is_deleted: true, - }) - .unwrap() - .as_bytes(), - ) -} - fn build_primary_key_columns(entry_type: EntryType, key: &[u8]) -> HashMap { HashMap::from([ ( @@ -614,21 +570,5 @@ mod tests { is_deleted: false, }); assert_eq!(entry, expected); - - catalog_table - .deregister_table( - &DeregisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: "my_table".to_string(), - }, - 1, - ) - .await - .unwrap(); - - let records = catalog_table.records().await.unwrap(); - let batches = RecordBatches::try_collect(records).await.unwrap().take(); - assert_eq!(batches.len(), 1); } } diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 7efa1fba062d..003dc3be3c5f 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use table::metadata::TableId; use crate::system::SystemCatalogTable; -use crate::DeregisterTableRequest; pub struct InformationSchema { pub system: Arc, @@ -53,17 +52,6 @@ impl SystemCatalog { .await } - pub(crate) async fn deregister_table( - &self, - request: &DeregisterTableRequest, - table_id: TableId, - ) -> crate::error::Result<()> { - self.information_schema - .system - .deregister_table(request, table_id) - .await - } - pub async fn register_schema( &self, catalog: String, diff --git a/src/catalog/tests/local_catalog_tests.rs b/src/catalog/tests/local_catalog_tests.rs deleted file mode 100644 index 483bb1e31830..000000000000 --- a/src/catalog/tests/local_catalog_tests.rs +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use catalog::local::LocalCatalogManager; - use catalog::{CatalogManager, RegisterTableRequest, RenameTableRequest}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_telemetry::{error, info}; - use common_test_util::temp_dir::TempDir; - use mito::config::EngineConfig; - use table::engine::manager::MemoryTableEngineManager; - use table::table::numbers::NumbersTable; - use tokio::sync::Mutex; - - async fn create_local_catalog_manager( - ) -> Result<(TempDir, LocalCatalogManager), catalog::error::Error> { - let (dir, object_store) = - mito::table::test_util::new_test_object_store("setup_mock_engine_and_table").await; - let mock_engine = Arc::new(mito::table::test_util::MockMitoEngine::new( - EngineConfig::default(), - mito::table::test_util::MockEngine::default(), - object_store, - )); - let engine_manager = Arc::new(MemoryTableEngineManager::new(mock_engine.clone())); - let catalog_manager = LocalCatalogManager::try_new(engine_manager).await.unwrap(); - catalog_manager.start().await?; - Ok((dir, catalog_manager)) - } - - #[tokio::test] - async fn test_rename_table() { - common_telemetry::init_default_ut_logging(); - let (_dir, catalog_manager) = create_local_catalog_manager().await.unwrap(); - // register table - let table_name = "test_table"; - let table_id = 42; - let request = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - table_id, - table: NumbersTable::table(table_id), - }; - assert!(catalog_manager.register_table(request).await.unwrap()); - - // rename table - let new_table_name = "table_t"; - let rename_table_req = RenameTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - new_table_name: new_table_name.to_string(), - table_id, - }; - assert!(catalog_manager - .rename_table(rename_table_req) - .await - .unwrap()); - - let registered_table = catalog_manager - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) - .await - .unwrap() - .unwrap(); - assert_eq!(registered_table.table_info().ident.table_id, table_id); - } - - #[tokio::test] - async fn test_duplicate_register() { - let (_dir, catalog_manager) = create_local_catalog_manager().await.unwrap(); - let request = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: "test_table".to_string(), - table_id: 42, - table: NumbersTable::table(42), - }; - assert!(catalog_manager - .register_table(request.clone()) - .await - .unwrap()); - - // register table with same table id will succeed with 0 as return val. - assert!(!catalog_manager.register_table(request).await.unwrap()); - - let err = catalog_manager - .register_table(RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: "test_table".to_string(), - table_id: 43, - table: NumbersTable::table(43), - }) - .await - .unwrap_err(); - assert!( - err.to_string() - .contains("Table `greptime.public.test_table` already exists"), - "Actual error message: {err}", - ); - } - - #[test] - fn test_concurrent_register() { - common_telemetry::init_default_ut_logging(); - let rt = Arc::new(tokio::runtime::Builder::new_multi_thread().build().unwrap()); - let (_dir, catalog_manager) = - rt.block_on(async { create_local_catalog_manager().await.unwrap() }); - let catalog_manager = Arc::new(catalog_manager); - - let succeed = Arc::new(Mutex::new(None)); - - let mut handles = Vec::with_capacity(8); - for i in 0..8 { - let catalog = catalog_manager.clone(); - let succeed = succeed.clone(); - let handle = rt.spawn(async move { - let table_id = 42 + i; - let table = NumbersTable::table(table_id); - let table_info = table.table_info(); - let req = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: "test_table".to_string(), - table_id, - table, - }; - match catalog.register_table(req).await { - Ok(res) => { - if res { - let mut succeed = succeed.lock().await; - info!("Successfully registered table: {}", table_id); - *succeed = Some(table_info); - } - } - Err(_) => { - error!("Failed to register table {}", table_id); - } - } - }); - handles.push(handle); - } - - rt.block_on(async move { - for handle in handles { - handle.await.unwrap(); - } - let guard = succeed.lock().await; - let table_info = guard.as_ref().unwrap(); - let table_registered = catalog_manager - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "test_table") - .await - .unwrap() - .unwrap(); - assert_eq!( - table_registered.table_info().ident.table_id, - table_info.ident.table_id - ); - }); - } -} diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 57780ff1a04e..db3d4d6422ca 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -25,7 +25,7 @@ use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::ResultExt; -use crate::error::{self, IllegalAuthConfigSnafu, Result, StartCatalogManagerSnafu}; +use crate::error::{self, IllegalAuthConfigSnafu, Result}; use crate::options::{Options, TopLevelOptions}; pub struct Instance { @@ -34,12 +34,6 @@ pub struct Instance { impl Instance { pub async fn start(&mut self) -> Result<()> { - self.frontend - .catalog_manager() - .start() - .await - .context(StartCatalogManagerSnafu)?; - self.frontend .start() .await diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 099edcbcef26..fc2d3c447f29 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -126,12 +126,6 @@ pub enum Error { #[snafu(display("Incorrect internal state: {}", state))] IncorrectInternalState { state: String, location: Location }, - #[snafu(display("Failed to create catalog list, source: {}", source))] - NewCatalog { - location: Location, - source: catalog::error::Error, - }, - #[snafu(display("Catalog not found: {}", name))] CatalogNotFound { name: String, location: Location }, @@ -583,7 +577,7 @@ impl ErrorExt for Error { HandleHeartbeatResponse { source, .. } => source.status_code(), DecodeLogicalPlan { source, .. } => source.status_code(), - NewCatalog { source, .. } | RegisterSchema { source, .. } => source.status_code(), + RegisterSchema { source, .. } => source.status_code(), CreateTable { source, .. } => source.status_code(), DropTable { source, .. } => source.status_code(), FlushTable { source, .. } => source.status_code(), diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index bc6c1ce249b1..89170efaaea1 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -21,10 +21,11 @@ use catalog::error::{ TableMetadataManagerSnafu, }; use catalog::information_schema::{InformationSchemaProvider, COLUMNS, TABLES}; +use catalog::local::MemoryCatalogManager; use catalog::remote::KvCacheInvalidatorRef; use catalog::{ CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest, - RegisterTableRequest, RenameTableRequest, + RegisterTableRequest, }; use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID, @@ -50,13 +51,44 @@ use table::TableRef; use crate::table::DistTable; +// There are two sources for finding a table: the `local_catalog_manager` and the +// `table_metadata_manager`. +// +// The `local_catalog_manager` is for storing tables that are often transparent, not saving any +// real data. For example, our system tables, the `numbers` table and the "information_schema" +// table. +// +// The `table_metadata_manager`, on the other hand, is for storing tables that are created by users, +// obviously. +// +// For now, separating the two makes the code simpler, at least in the retrieval site. Now we have +// `numbers` and `information_schema` system tables. Both have their special implementations. If we +// put them with other ordinary tables that are created by users, we need to check the table name +// to decide which `TableRef` to return. Like this: +// +// ```rust +// match table_name { +// "numbers" => ... // return NumbersTable impl +// "information_schema" => ... // return InformationSchemaTable impl +// _ => .. // return DistTable impl +// } +// ``` +// +// On the other hand, because we use `MemoryCatalogManager` for system tables, we can easily store +// and retrieve the concrete implementation of the system tables by their names, no more "if-else"s. +// +// However, if the system table is designed to have more features in the future, we may revisit +// the implementation here. #[derive(Clone)] pub struct FrontendCatalogManager { - backend: KvBackendRef, + // TODO(LFC): Maybe use a real implementation for Standalone mode. + // Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend + // is implemented by RaftEngine. Maybe we need a cache for it? backend_cache_invalidator: KvCacheInvalidatorRef, partition_manager: PartitionRuleManagerRef, table_metadata_manager: TableMetadataManagerRef, datanode_manager: DatanodeManagerRef, + local_catalog_manager: Arc, } #[async_trait::async_trait] @@ -105,18 +137,14 @@ impl FrontendCatalogManager { datanode_manager: DatanodeManagerRef, ) -> Self { Self { - backend: backend.clone(), backend_cache_invalidator, partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())), table_metadata_manager: Arc::new(TableMetadataManager::new(backend)), datanode_manager, + local_catalog_manager: MemoryCatalogManager::new(), } } - pub fn backend(&self) -> KvBackendRef { - self.backend.clone() - } - pub fn partition_manager(&self) -> PartitionRuleManagerRef { self.partition_manager.clone() } @@ -136,45 +164,34 @@ impl FrontendCatalogManager { } } -// FIXME(hl): Frontend only needs a CatalogList, should replace with trait upcasting -// as soon as it's stable: https://github.com/rust-lang/rust/issues/65991 #[async_trait::async_trait] impl CatalogManager for FrontendCatalogManager { - async fn start(&self) -> catalog::error::Result<()> { - Ok(()) - } - - async fn register_catalog(self: Arc, _name: String) -> CatalogResult { - unimplemented!("FrontendCatalogManager does not support registering catalog") + fn register_local_catalog(&self, name: &str) -> CatalogResult { + self.local_catalog_manager.register_catalog(name) } - // TODO(LFC): Handle the table caching in (de)register_table. - async fn register_table(&self, _request: RegisterTableRequest) -> CatalogResult { - Ok(true) + fn register_local_table(&self, request: RegisterTableRequest) -> CatalogResult { + self.local_catalog_manager.register_table(request) } - async fn deregister_table(&self, _request: DeregisterTableRequest) -> CatalogResult<()> { + fn deregister_local_table(&self, _request: DeregisterTableRequest) -> CatalogResult<()> { Ok(()) } - async fn register_schema( + fn register_local_schema( &self, _request: RegisterSchemaRequest, ) -> catalog::error::Result { unimplemented!("FrontendCatalogManager does not support registering schema") } - async fn deregister_schema( + fn deregister_local_schema( &self, _request: DeregisterSchemaRequest, ) -> catalog_err::Result { unimplemented!("FrontendCatalogManager does not support deregistering schema") } - async fn rename_table(&self, _request: RenameTableRequest) -> catalog_err::Result { - unimplemented!() - } - async fn catalog_names(&self) -> CatalogResult> { let stream = self .table_metadata_manager diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index a3d9872d02c6..fdb7daa18773 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -28,6 +28,7 @@ use api::v1::meta::Role; use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; +use catalog::local::manager::SystemTableInitializer; use catalog::remote::CachedMetaKvBackend; use catalog::CatalogManagerRef; use client::client_manager::DatanodeClients; @@ -78,14 +79,16 @@ use sql::statements::copy::CopyTable; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; pub use standalone::StandaloneDatanodeManager; +use table::engine::manager::MemoryTableEngineManager; use self::distributed::DistRegionRequestHandler; use self::standalone::{StandaloneRegionRequestHandler, StandaloneTableMetadataCreator}; use crate::catalog::FrontendCatalogManager; use crate::delete::Deleter; use crate::error::{ - self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu, - ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, + self, CatalogSnafu, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, + MissingMetasrvOptsSnafu, ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, + SqlExecInterceptedSnafu, }; use crate::expr_factory::CreateExprFactory; use crate::frontend::FrontendOptions; @@ -159,11 +162,11 @@ impl Instance { datanode_clients.clone(), )); - let dist_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone()); + let region_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone()); let query_engine = QueryEngineFactory::new_with_plugins( catalog_manager.clone(), - Some(dist_request_handler), + Some(region_request_handler.clone()), true, plugins.clone(), ) @@ -421,6 +424,14 @@ impl FrontendInstance for Instance { heartbeat_task.start().await?; } + let initializer = SystemTableInitializer::try_new( + Arc::new(MemoryTableEngineManager::new_empty()), + self.catalog_manager.clone(), + ) + .await + .context(CatalogSnafu)?; + initializer.init().await.context(CatalogSnafu)?; + self.script_executor.start(self).await?; futures::future::try_join_all(self.servers.values().map(start_server)) diff --git a/src/frontend/src/statement/ddl.rs b/src/frontend/src/statement/ddl.rs index a250e5febba5..ac949e77f32a 100644 --- a/src/frontend/src/statement/ddl.rs +++ b/src/frontend/src/statement/ddl.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::{column_def, AlterExpr, CreateTableExpr, TruncateTableExpr}; -use catalog::{CatalogManagerRef, DeregisterTableRequest, RegisterTableRequest}; +use catalog::CatalogManagerRef; use chrono::DateTime; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; @@ -34,7 +34,7 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::RawSchema; use partition::partition::{PartitionBound, PartitionDef}; use session::context::QueryContextRef; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt}; use sql::ast::Value as SqlValue; use sql::statements::alter::AlterTable; use sql::statements::create::{CreateExternalTable, CreateTable, Partitions}; @@ -46,7 +46,7 @@ use table::TableRef; use super::StatementExecutor; use crate::error::{ self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, - DeserializePartitionSnafu, ParseSqlSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistSnafu, + DeserializePartitionSnafu, ParseSqlSnafu, Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, }; use crate::table::DistTable; @@ -121,23 +121,6 @@ impl StatementExecutor { let table = DistTable::table(table_info); - let request = RegisterTableRequest { - catalog: table_name.catalog_name.clone(), - schema: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - table_id, - table: table.clone(), - }; - ensure!( - self.catalog_manager - .register_table(request) - .await - .context(CatalogSnafu)?, - TableAlreadyExistSnafu { - table: table_name.to_string() - } - ); - // Invalidates local cache ASAP. self.cache_invalidator .invalidate_table( @@ -173,16 +156,6 @@ impl StatementExecutor { let engine = table.table_info().meta.engine.to_string(); self.drop_table_procedure(&table_name, table_id).await?; - let request = DeregisterTableRequest { - catalog: table_name.catalog_name.clone(), - schema: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - }; - self.catalog_manager - .deregister_table(request) - .await - .context(CatalogSnafu)?; - // Invalidates local cache ASAP. self.cache_invalidator .invalidate_table( diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 946d0a28daae..bbaab7021ac8 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -1432,14 +1432,13 @@ mod test { let table = EmptyTable::from_table_info(&table_info); let catalog_list = MemoryCatalogManager::with_default_setup(); assert!(catalog_list - .register_table(RegisterTableRequest { + .register_local_table(RegisterTableRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), table_name, table_id: 1024, table, }) - .await .is_ok()); DfTableSourceProvider::new(catalog_list, false, QueryContext::arc().as_ref()) } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 237f4e750d38..4790aa498700 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -515,7 +515,7 @@ mod tests { table_id: NUMBERS_TABLE_ID, table: NumbersTable::table(NUMBERS_TABLE_ID), }; - let _ = catalog_manager.register_table(req).await.unwrap(); + catalog_manager.register_local_table(req).unwrap(); QueryEngineFactory::new(catalog_manager, None, false).query_engine() } diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index eff319f456d7..e079b31fc5bb 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -380,14 +380,13 @@ mod test { let table = EmptyTable::from_table_info(&table_info); let catalog_list = MemoryCatalogManager::with_default_setup(); assert!(catalog_list - .register_table(RegisterTableRequest { + .register_local_table(RegisterTableRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), table_name, table_id: 1024, table, }) - .await .is_ok()); QueryEngineFactory::new(catalog_list, None, false).query_engine() } diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 83ec1e52bc67..c98c471e6e16 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -112,7 +112,7 @@ fn catalog_manager() -> Result> { table_id: NUMBERS_TABLE_ID, table: NumbersTable::table(NUMBERS_TABLE_ID), }; - let _ = catalog_manager.register_table_sync(req).unwrap(); + let _ = catalog_manager.register_table(req).unwrap(); Ok(catalog_manager) } diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index 383ac2ea8035..fdf529e2072c 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -104,7 +104,7 @@ fn create_test_engine() -> TimeRangeTester { table_id: table.table_info().ident.table_id, table: table.clone(), }; - let _ = catalog_manager.register_table_sync(req).unwrap(); + let _ = catalog_manager.register_table(req).unwrap(); let engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine(); TimeRangeTester { engine, filter } diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index 08a9a27840ce..87cdf25eea10 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -160,59 +160,22 @@ impl ScriptManager { #[cfg(test)] mod tests { - use catalog::CatalogManager; - use common_config::WalConfig; - use common_test_util::temp_dir::create_temp_dir; - use log_store::raft_engine::log_store::RaftEngineLogStore; - use mito::config::EngineConfig as TableEngineConfig; - use mito::engine::MitoEngine; - use mito::table::test_util::new_test_object_store; + use catalog::local::MemoryCatalogManager; use query::QueryEngineFactory; - use storage::compaction::noop::NoopCompactionScheduler; - use storage::config::EngineConfig as StorageEngineConfig; - use storage::EngineImpl; - use table::engine::manager::MemoryTableEngineManager; use super::*; - type DefaultEngine = MitoEngine>; - #[ignore = "script engine is temporary disabled"] #[tokio::test] async fn test_insert_find_compile_script() { - let wal_dir = create_temp_dir("test_insert_find_compile_script_wal"); - let wal_dir_str = wal_dir.path().to_string_lossy().to_string(); - common_telemetry::init_default_ut_logging(); - let (_dir, object_store) = new_test_object_store("test_insert_find_compile_script").await; - let log_store = RaftEngineLogStore::try_new(wal_dir_str, WalConfig::default()) - .await - .unwrap(); - let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); - let mock_engine = Arc::new(DefaultEngine::new( - TableEngineConfig::default(), - EngineImpl::new( - StorageEngineConfig::default(), - Arc::new(log_store), - object_store.clone(), - compaction_scheduler, - ) - .unwrap(), - object_store, - )); - let engine_manager = Arc::new(MemoryTableEngineManager::new(mock_engine.clone())); - let catalog_manager = Arc::new( - catalog::local::LocalCatalogManager::try_new(engine_manager) - .await - .unwrap(), - ); + let catalog_manager = MemoryCatalogManager::new(); let factory = QueryEngineFactory::new(catalog_manager.clone(), None, false); let query_engine = factory.query_engine(); let mgr = ScriptManager::new(catalog_manager.clone(), query_engine) .await .unwrap(); - catalog_manager.start().await.unwrap(); let schema = "schema"; let name = "test"; diff --git a/src/table-procedure/src/alter.rs b/src/table-procedure/src/alter.rs index 9d30c54fc023..72f0ac2ab9b0 100644 --- a/src/table-procedure/src/alter.rs +++ b/src/table-procedure/src/alter.rs @@ -15,7 +15,7 @@ //! Procedure to alter a table. use async_trait::async_trait; -use catalog::{CatalogManagerRef, RenameTableRequest}; +use catalog::CatalogManagerRef; use common_procedure::error::SubprocedureFailedSnafu; use common_procedure::{ Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState, @@ -50,7 +50,9 @@ impl Procedure for AlterTableProcedure { match self.data.state { AlterTableState::Prepare => self.on_prepare().await, AlterTableState::EngineAlterTable => self.on_engine_alter_table(ctx).await, - AlterTableState::RenameInCatalog => self.on_rename_in_catalog().await, + // No more need to "rename table in catalog", because the table metadata is now stored + // in kv backend, and updated by the unified DDL procedure soon. For ordinary tables, + // catalog manager will be a readonly proxy. } } @@ -214,15 +216,7 @@ impl AlterTableProcedure { self.data.request.table_name, sub_id ); - // The sub procedure is done, we can execute next step. - if self.data.request.is_rename_table() { - // We also need to rename the table in the catalog. - self.data.state = AlterTableState::RenameInCatalog; - Ok(Status::executing(true)) - } else { - // If this isn't a rename operation, we are done. - Ok(Status::Done) - } + Ok(Status::Done) } ProcedureState::Failed { error } => { // Return error if the subprocedure is failed. @@ -232,28 +226,6 @@ impl AlterTableProcedure { } } } - - async fn on_rename_in_catalog(&mut self) -> Result { - // Safety: table id is available in this state. - let table_id = self.data.table_id.unwrap(); - if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { - let rename_req = RenameTableRequest { - catalog: self.data.request.catalog_name.clone(), - schema: self.data.request.schema_name.clone(), - table_name: self.data.request.table_name.clone(), - new_table_name: new_table_name.clone(), - table_id, - }; - - let _ = self - .catalog_manager - .rename_table(rename_req) - .await - .map_err(Error::from_error_ext)?; - } - - Ok(Status::Done) - } } /// Represents each step while altering a table in the datanode. @@ -263,8 +235,6 @@ enum AlterTableState { Prepare, /// Alter table in the table engine. EngineAlterTable, - /// Rename the table in the catalog (optional). - RenameInCatalog, } /// Serializable data of [AlterTableProcedure]. @@ -294,56 +264,3 @@ impl AlterTableData { } } } - -#[cfg(test)] -mod tests { - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - - use super::*; - use crate::test_util::TestEnv; - - #[tokio::test] - async fn test_alter_table_procedure_rename() { - let env = TestEnv::new("rename"); - let table_name = "test_old"; - let table_id = env.create_table(table_name).await; - - let new_table_name = "test_new"; - let request = AlterTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - table_id, - alter_kind: AlterKind::RenameTable { - new_table_name: new_table_name.to_string(), - }, - table_version: None, - }; - - let TestEnv { - dir: _dir, - table_engine, - procedure_manager, - catalog_manager, - } = env; - let procedure = - AlterTableProcedure::new(request, catalog_manager.clone(), table_engine.clone()); - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - - let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap(); - watcher.changed().await.unwrap(); - - let table = catalog_manager - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) - .await - .unwrap() - .unwrap(); - let table_info = table.table_info(); - assert_eq!(new_table_name, table_info.name); - - assert!(!catalog_manager - .table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) - .await - .unwrap()); - } -} diff --git a/src/table-procedure/src/create.rs b/src/table-procedure/src/create.rs index ecdbf37bc826..126f7c735a08 100644 --- a/src/table-procedure/src/create.rs +++ b/src/table-procedure/src/create.rs @@ -256,8 +256,7 @@ impl CreateTableProcedure { }; let _ = self .catalog_manager - .register_table(register_req) - .await + .register_local_table(register_req) .map_err(Error::from_error_ext)?; Ok(Status::Done) diff --git a/src/table-procedure/src/drop.rs b/src/table-procedure/src/drop.rs index 9db9f37723e9..0fa521728757 100644 --- a/src/table-procedure/src/drop.rs +++ b/src/table-procedure/src/drop.rs @@ -163,8 +163,7 @@ impl DropTableProcedure { table_name: self.data.request.table_name.clone(), }; self.catalog_manager - .deregister_table(deregister_table_req) - .await + .deregister_local_table(deregister_table_req) .context(AccessCatalogSnafu)?; }