diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 4f2af428af3b..2d152f16f226 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -21,15 +21,13 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use api::v1::meta::RegionStat; -use common_telemetry::{info, warn}; +use common_telemetry::warn; use futures::future::BoxFuture; -use snafu::ResultExt; -use table::engine::{EngineContext, TableEngineRef}; use table::metadata::{TableId, TableType}; use table::requests::CreateTableRequest; use table::TableRef; -use crate::error::{CreateTableSnafu, Result}; +use crate::error::Result; pub mod error; pub mod information_schema; @@ -38,47 +36,11 @@ mod metrics; pub mod remote; pub mod system; pub mod table_source; -pub mod tables; #[async_trait::async_trait] pub trait CatalogManager: Send + Sync { fn as_any(&self) -> &dyn Any; - /// Register a local catalog. - /// - /// # Returns - /// - /// Whether the catalog is registered. - fn register_local_catalog(&self, name: &str) -> Result; - - /// Register a local schema. - /// - /// # Returns - /// - /// Whether the schema is registered. - /// - /// # Errors - /// - /// This method will/should fail if catalog not exist - fn register_local_schema(&self, request: RegisterSchemaRequest) -> Result; - - /// Deregisters a database within given catalog/schema to catalog manager - fn deregister_local_schema(&self, request: DeregisterSchemaRequest) -> Result; - - /// Registers a local table. - /// - /// # Returns - /// - /// Whether the table is registered. - /// - /// # Errors - /// - /// This method will/should fail if catalog or schema not exist - fn register_local_table(&self, request: RegisterTableRequest) -> Result; - - /// Deregisters a table within given catalog/schema to catalog manager - fn deregister_local_table(&self, request: DeregisterTableRequest) -> Result<()>; - async fn catalog_names(&self) -> Result>; async fn schema_names(&self, catalog: &str) -> Result>; @@ -164,48 +126,6 @@ pub struct RegisterSchemaRequest { pub schema: String, } -pub(crate) async fn handle_system_table_request<'a, M: CatalogManager + ?Sized>( - manager: &'a M, - engine: TableEngineRef, - sys_table_requests: &'a mut Vec, -) -> Result<()> { - for req in sys_table_requests.drain(..) { - let catalog_name = &req.create_table_request.catalog_name; - let schema_name = &req.create_table_request.schema_name; - let table_name = &req.create_table_request.table_name; - let table_id = req.create_table_request.id; - - let table = manager.table(catalog_name, schema_name, table_name).await?; - let table = if let Some(table) = table { - table - } else { - let table = engine - .create_table(&EngineContext::default(), req.create_table_request.clone()) - .await - .with_context(|_| CreateTableSnafu { - table_info: common_catalog::format_full_table_name( - catalog_name, - schema_name, - table_name, - ), - })?; - manager.register_local_table(RegisterTableRequest { - catalog: catalog_name.clone(), - schema: schema_name.clone(), - table_name: table_name.clone(), - table_id, - table: table.clone(), - })?; - info!("Created and registered system table: {table_name}"); - table - }; - if let Some(hook) = req.open_hook { - (hook)(table).await?; - } - } - Ok(()) -} - /// The stat of regions in the datanode node. /// The number of regions can be got from len of vec. /// diff --git a/src/catalog/src/local.rs b/src/catalog/src/local.rs index 39b1a7de38d6..2a9f1258a207 100644 --- a/src/catalog/src/local.rs +++ b/src/catalog/src/local.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod manager; pub mod memory; pub use memory::{new_memory_catalog_manager, MemoryCatalogManager}; diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs deleted file mode 100644 index cf5543f57e9d..000000000000 --- a/src/catalog/src/local/manager.rs +++ /dev/null @@ -1,349 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use common_catalog::consts::{ - DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MITO_ENGINE, - NUMBERS_TABLE_ID, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME, -}; -use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; -use common_telemetry::info; -use datatypes::prelude::ScalarVector; -use datatypes::vectors::{BinaryVector, UInt8Vector}; -use futures_util::lock::Mutex; -use snafu::{ensure, OptionExt, ResultExt}; -use table::engine::manager::TableEngineManagerRef; -use table::engine::EngineContext; -use table::requests::OpenTableRequest; -use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; - -use crate::error::{ - CatalogNotFoundSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, Result, SchemaNotFoundSnafu, - SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableEngineNotFoundSnafu, - TableNotFoundSnafu, -}; -use crate::system::{ - decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX, - VALUE_INDEX, -}; -use crate::tables::SystemCatalog; -use crate::{ - handle_system_table_request, CatalogManagerRef, RegisterSchemaRequest, - RegisterSystemTableRequest, RegisterTableRequest, -}; - -pub struct SystemTableInitializer { - system: Arc, - catalog_manager: CatalogManagerRef, - engine_manager: TableEngineManagerRef, - system_table_requests: Mutex>, -} - -impl SystemTableInitializer { - pub async fn try_new( - engine_manager: TableEngineManagerRef, - catalog_manager: CatalogManagerRef, - ) -> Result { - let engine = engine_manager - .engine(MITO_ENGINE) - .context(TableEngineNotFoundSnafu { - engine_name: MITO_ENGINE, - })?; - let table = SystemCatalogTable::new(engine.clone()).await?; - let system_catalog = Arc::new(SystemCatalog::new(table)); - Ok(Self { - system: system_catalog, - catalog_manager, - engine_manager, - system_table_requests: Mutex::new(Vec::default()), - }) - } - - /// Scan all entries from system catalog table - pub async fn init(&self) -> Result<()> { - self.init_system_catalog().await?; - let system_records = self.system.information_schema.system.records().await?; - let entries = self.collect_system_catalog_entries(system_records).await?; - self.handle_system_catalog_entries(entries).await?; - - // Processing system table hooks - let mut sys_table_requests = self.system_table_requests.lock().await; - let engine = self - .engine_manager - .engine(MITO_ENGINE) - .context(TableEngineNotFoundSnafu { - engine_name: MITO_ENGINE, - })?; - - handle_system_table_request( - self.catalog_manager.as_ref(), - engine, - &mut sys_table_requests, - ) - .await?; - Ok(()) - } - - async fn init_system_catalog(&self) -> Result<()> { - let catalog_manager = &self.catalog_manager; - catalog_manager.register_local_catalog(SYSTEM_CATALOG_NAME)?; - - catalog_manager.register_local_schema(RegisterSchemaRequest { - catalog: SYSTEM_CATALOG_NAME.to_string(), - schema: INFORMATION_SCHEMA_NAME.to_string(), - })?; - - let register_table_req = RegisterTableRequest { - catalog: SYSTEM_CATALOG_NAME.to_string(), - schema: INFORMATION_SCHEMA_NAME.to_string(), - table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), - table_id: SYSTEM_CATALOG_TABLE_ID, - table: self.system.information_schema.system.as_table_ref(), - }; - catalog_manager.register_local_table(register_table_req)?; - - // Add numbers table for test - let register_number_table_req = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: NUMBERS_TABLE_NAME.to_string(), - table_id: NUMBERS_TABLE_ID, - table: NumbersTable::table(NUMBERS_TABLE_ID), - }; - - catalog_manager.register_local_table(register_number_table_req)?; - - Ok(()) - } - - /// Collect stream of system catalog entries to `Vec` - async fn collect_system_catalog_entries( - &self, - stream: SendableRecordBatchStream, - ) -> Result> { - let record_batch = common_recordbatch::util::collect(stream) - .await - .context(ReadSystemCatalogSnafu)?; - let rbs = record_batch - .into_iter() - .map(Self::record_batch_to_entry) - .collect::>>()?; - Ok(rbs.into_iter().flat_map(Vec::into_iter).collect::<_>()) - } - - /// Convert `RecordBatch` to a vector of `Entry`. - fn record_batch_to_entry(rb: RecordBatch) -> Result> { - ensure!( - rb.num_columns() >= 6, - SystemCatalogSnafu { - msg: format!("Length mismatch: {}", rb.num_columns()) - } - ); - - let entry_type = rb - .column(ENTRY_TYPE_INDEX) - .as_any() - .downcast_ref::() - .with_context(|| SystemCatalogTypeMismatchSnafu { - data_type: rb.column(ENTRY_TYPE_INDEX).data_type(), - })?; - - let key = rb - .column(KEY_INDEX) - .as_any() - .downcast_ref::() - .with_context(|| SystemCatalogTypeMismatchSnafu { - data_type: rb.column(KEY_INDEX).data_type(), - })?; - - let value = rb - .column(VALUE_INDEX) - .as_any() - .downcast_ref::() - .with_context(|| SystemCatalogTypeMismatchSnafu { - data_type: rb.column(VALUE_INDEX).data_type(), - })?; - - let mut res = Vec::with_capacity(rb.num_rows()); - for ((t, k), v) in entry_type - .iter_data() - .zip(key.iter_data()) - .zip(value.iter_data()) - { - let entry = decode_system_catalog(t, k, v)?; - res.push(entry); - } - Ok(res) - } - - /// Processes records from system catalog table. - async fn handle_system_catalog_entries(&self, entries: Vec) -> Result<()> { - let entries = Self::sort_entries(entries); - for entry in entries { - match entry { - Entry::Catalog(c) => { - self.catalog_manager - .register_local_catalog(&c.catalog_name)?; - info!("Register catalog: {}", c.catalog_name); - } - Entry::Schema(s) => { - let req = RegisterSchemaRequest { - catalog: s.catalog_name.clone(), - schema: s.schema_name.clone(), - }; - self.catalog_manager.register_local_schema(req)?; - info!("Registered schema: {:?}", s); - } - Entry::Table(t) => { - if t.is_deleted { - continue; - } - self.open_and_register_table(&t).await?; - info!("Registered table: {:?}", t); - } - } - } - Ok(()) - } - - /// Sort catalog entries to ensure catalog entries comes first, then schema entries, - /// and table entries is the last. - fn sort_entries(mut entries: Vec) -> Vec { - entries.sort(); - entries - } - - async fn open_and_register_table(&self, t: &TableEntry) -> Result<()> { - self.check_catalog_schema_exist(&t.catalog_name, &t.schema_name) - .await?; - - let context = EngineContext {}; - let open_request = OpenTableRequest { - catalog_name: t.catalog_name.clone(), - schema_name: t.schema_name.clone(), - table_name: t.table_name.clone(), - table_id: t.table_id, - region_numbers: vec![0], - }; - let engine = self - .engine_manager - .engine(&t.engine) - .context(TableEngineNotFoundSnafu { - engine_name: &t.engine, - })?; - - let table_ref = engine - .open_table(&context, open_request) - .await - .with_context(|_| OpenTableSnafu { - table_info: format!( - "{}.{}.{}, id: {}", - &t.catalog_name, &t.schema_name, &t.table_name, t.table_id - ), - })? - .with_context(|| TableNotFoundSnafu { - table_info: format!( - "{}.{}.{}, id: {}", - &t.catalog_name, &t.schema_name, &t.table_name, t.table_id - ), - })?; - - let register_request = RegisterTableRequest { - catalog: t.catalog_name.clone(), - schema: t.schema_name.clone(), - table_name: t.table_name.clone(), - table_id: t.table_id, - table: table_ref, - }; - self.catalog_manager - .register_local_table(register_request)?; - Ok(()) - } - - async fn check_catalog_schema_exist( - &self, - catalog_name: &str, - schema_name: &str, - ) -> Result<()> { - if !self.catalog_manager.catalog_exist(catalog_name).await? { - return CatalogNotFoundSnafu { catalog_name }.fail()?; - } - if !self - .catalog_manager - .schema_exist(catalog_name, schema_name) - .await? - { - return SchemaNotFoundSnafu { - catalog: catalog_name, - schema: schema_name, - } - .fail()?; - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - - use mito::engine::MITO_ENGINE; - - use super::*; - use crate::system::{CatalogEntry, SchemaEntry}; - - #[test] - fn test_sort_entry() { - let vec = vec![ - Entry::Table(TableEntry { - catalog_name: "C1".to_string(), - schema_name: "S1".to_string(), - table_name: "T1".to_string(), - table_id: 1, - engine: MITO_ENGINE.to_string(), - is_deleted: false, - }), - Entry::Catalog(CatalogEntry { - catalog_name: "C2".to_string(), - }), - Entry::Schema(SchemaEntry { - catalog_name: "C1".to_string(), - schema_name: "S1".to_string(), - }), - Entry::Schema(SchemaEntry { - catalog_name: "C2".to_string(), - schema_name: "S2".to_string(), - }), - Entry::Catalog(CatalogEntry { - catalog_name: "".to_string(), - }), - Entry::Table(TableEntry { - catalog_name: "C1".to_string(), - schema_name: "S1".to_string(), - table_name: "T2".to_string(), - table_id: 2, - engine: MITO_ENGINE.to_string(), - is_deleted: false, - }), - ]; - let res = SystemTableInitializer::sort_entries(vec); - assert_matches!(res[0], Entry::Catalog(..)); - assert_matches!(res[1], Entry::Catalog(..)); - assert_matches!(res[2], Entry::Schema(..)); - assert_matches!(res[3], Entry::Schema(..)); - assert_matches!(res[4], Entry::Table(..)); - assert_matches!(res[5], Entry::Table(..)); - } -} diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 7360941483f9..29efc75ab9ff 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -24,10 +24,7 @@ use table::TableRef; use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; use crate::information_schema::InformationSchemaProvider; -use crate::{ - CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest, - RegisterTableRequest, -}; +use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest}; type SchemaEntries = HashMap>; @@ -40,49 +37,6 @@ pub struct MemoryCatalogManager { #[async_trait::async_trait] impl CatalogManager for MemoryCatalogManager { - fn register_local_catalog(&self, name: &str) -> Result { - self.register_catalog(name) - } - fn register_local_table(&self, request: RegisterTableRequest) -> Result { - self.register_table(request) - } - - fn deregister_local_table(&self, request: DeregisterTableRequest) -> Result<()> { - self.deregister_table_sync(request) - } - - fn register_local_schema(&self, request: RegisterSchemaRequest) -> Result { - self.register_schema_sync(request) - } - - fn deregister_local_schema(&self, request: DeregisterSchemaRequest) -> Result { - let mut catalogs = self.catalogs.write().unwrap(); - let schemas = catalogs - .get_mut(&request.catalog) - .with_context(|| CatalogNotFoundSnafu { - catalog_name: &request.catalog, - })?; - let table_count = schemas - .remove(&request.schema) - .with_context(|| SchemaNotFoundSnafu { - catalog: &request.catalog, - schema: &request.schema, - })? - .len(); - decrement_gauge!( - crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, - table_count as f64, - &[crate::metrics::db_label(&request.catalog, &request.schema)], - ); - - decrement_gauge!( - crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, - 1.0, - &[crate::metrics::db_label(&request.catalog, &request.schema)], - ); - Ok(true) - } - async fn schema_exist(&self, catalog: &str, schema: &str) -> Result { self.schema_exist_sync(catalog, schema) } @@ -174,7 +128,7 @@ impl MemoryCatalogManager { }); // Safety: default catalog/schema is registered in order so no CatalogNotFound error will occur - manager.register_catalog(DEFAULT_CATALOG_NAME).unwrap(); + manager.register_catalog_sync(DEFAULT_CATALOG_NAME).unwrap(); manager .register_schema_sync(RegisterSchemaRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), @@ -202,7 +156,7 @@ impl MemoryCatalogManager { } /// Registers a catalog if it does not exist and returns false if the schema exists. - pub fn register_catalog(&self, name: &str) -> Result { + pub fn register_catalog_sync(&self, name: &str) -> Result { let name = name.to_string(); let mut catalogs = self.catalogs.write().unwrap(); @@ -264,7 +218,7 @@ impl MemoryCatalogManager { } /// Registers a schema and returns an error if the catalog or schema does not exist. - pub fn register_table(&self, request: RegisterTableRequest) -> Result { + pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result { let mut catalogs = self.catalogs.write().unwrap(); let schema = catalogs .get_mut(&request.catalog) @@ -309,7 +263,7 @@ impl MemoryCatalogManager { let schema = &table.table_info().schema_name; if !manager.catalog_exist_sync(catalog).unwrap() { - manager.register_catalog(catalog).unwrap(); + manager.register_catalog_sync(catalog).unwrap(); } if !manager.schema_exist_sync(catalog, schema).unwrap() { @@ -328,7 +282,7 @@ impl MemoryCatalogManager { table_id: table.table_info().ident.table_id, table, }; - let _ = manager.register_table(request).unwrap(); + let _ = manager.register_table_sync(request).unwrap(); manager } } @@ -357,7 +311,7 @@ mod tests { table: NumbersTable::table(NUMBERS_TABLE_ID), }; - catalog_list.register_local_table(register_request).unwrap(); + catalog_list.register_table_sync(register_request).unwrap(); let table = catalog_list .table( DEFAULT_CATALOG_NAME, @@ -377,8 +331,8 @@ mod tests { #[test] pub fn test_register_catalog_sync() { let list = MemoryCatalogManager::with_default_setup(); - assert!(list.register_catalog("test_catalog").unwrap()); - assert!(!list.register_catalog("test_catalog").unwrap()); + assert!(list.register_catalog_sync("test_catalog").unwrap()); + assert!(!list.register_catalog_sync("test_catalog").unwrap()); } #[tokio::test] @@ -393,7 +347,7 @@ mod tests { table_id: 2333, table: NumbersTable::table(2333), }; - catalog.register_local_table(register_table_req).unwrap(); + catalog.register_table_sync(register_table_req).unwrap(); assert!(catalog .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) .await @@ -405,48 +359,11 @@ mod tests { schema: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_name.to_string(), }; - catalog - .deregister_local_table(deregister_table_req) - .unwrap(); + catalog.deregister_table_sync(deregister_table_req).unwrap(); assert!(catalog .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) .await .unwrap() .is_none()); } - - #[tokio::test] - async fn test_catalog_deregister_schema() { - let catalog = MemoryCatalogManager::with_default_setup(); - - // Registers a catalog, a schema, and a table. - let catalog_name = "foo_catalog".to_string(); - let schema_name = "foo_schema".to_string(); - let table_name = "foo_table".to_string(); - let schema = RegisterSchemaRequest { - catalog: catalog_name.clone(), - schema: schema_name.clone(), - }; - let table = RegisterTableRequest { - catalog: catalog_name.clone(), - schema: schema_name.clone(), - table_name, - table_id: 0, - table: NumbersTable::table(0), - }; - catalog.register_local_catalog(&catalog_name).unwrap(); - catalog.register_local_schema(schema).unwrap(); - catalog.register_local_table(table).unwrap(); - - let request = DeregisterSchemaRequest { - catalog: catalog_name.clone(), - schema: schema_name.clone(), - }; - - assert!(catalog.deregister_local_schema(request).unwrap()); - assert!(!catalog - .schema_exist(&catalog_name, &schema_name) - .await - .unwrap()); - } } diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs deleted file mode 100644 index 003dc3be3c5f..000000000000 --- a/src/catalog/src/tables.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// The `tables` table in system catalog keeps a record of all tables created by user. - -use std::sync::Arc; - -use table::metadata::TableId; - -use crate::system::SystemCatalogTable; - -pub struct InformationSchema { - pub system: Arc, -} - -pub struct SystemCatalog { - pub information_schema: Arc, -} - -impl SystemCatalog { - pub(crate) fn new(system: SystemCatalogTable) -> Self { - let schema = InformationSchema { - system: Arc::new(system), - }; - Self { - information_schema: Arc::new(schema), - } - } - - pub async fn register_table( - &self, - catalog: String, - schema: String, - table_name: String, - table_id: TableId, - engine: String, - ) -> crate::error::Result { - self.information_schema - .system - .register_table(catalog, schema, table_name, table_id, engine) - .await - } - - pub async fn register_schema( - &self, - catalog: String, - schema: String, - ) -> crate::error::Result { - self.information_schema - .system - .register_schema(catalog, schema) - .await - } -} diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 1e2513af6a1e..579dc949e987 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -253,11 +253,11 @@ async fn create_query_engine(meta_addr: &str) -> Result { let datanode_clients = Arc::new(DatanodeClients::default()); - let catalog_list = Arc::new(FrontendCatalogManager::new( + let catalog_list = FrontendCatalogManager::new( cached_meta_backend.clone(), cached_meta_backend.clone(), datanode_clients, - )); + ); let plugins: Arc = Default::default(); let state = Arc::new(QueryEngineState::new( catalog_list, diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 15cb26c3a932..8c26ced2df21 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -311,11 +311,11 @@ impl StartCommand { .context(StartDatanodeSnafu)?; let region_server = datanode.region_server(); - let catalog_manager = Arc::new(FrontendCatalogManager::new( + let catalog_manager = FrontendCatalogManager::new( kv_store.clone(), Arc::new(DummyKvCacheInvalidator), Arc::new(StandaloneDatanodeManager(region_server.clone())), - )); + ); catalog_manager .table_metadata_manager_ref() diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 89170efaaea1..76d7ed218446 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -14,22 +14,16 @@ use std::any::Any; use std::collections::BTreeSet; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use catalog::error::{ self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult, TableMetadataManagerSnafu, }; use catalog::information_schema::{InformationSchemaProvider, COLUMNS, TABLES}; -use catalog::local::MemoryCatalogManager; use catalog::remote::KvCacheInvalidatorRef; -use catalog::{ - CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest, - RegisterTableRequest, -}; -use common_catalog::consts::{ - DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID, -}; +use catalog::CatalogManager; +use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID}; use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, Context}; use common_meta::datanode_manager::DatanodeManagerRef; @@ -51,34 +45,11 @@ use table::TableRef; use crate::table::DistTable; -// There are two sources for finding a table: the `local_catalog_manager` and the -// `table_metadata_manager`. -// -// The `local_catalog_manager` is for storing tables that are often transparent, not saving any -// real data. For example, our system tables, the `numbers` table and the "information_schema" -// table. -// -// The `table_metadata_manager`, on the other hand, is for storing tables that are created by users, -// obviously. -// -// For now, separating the two makes the code simpler, at least in the retrieval site. Now we have -// `numbers` and `information_schema` system tables. Both have their special implementations. If we -// put them with other ordinary tables that are created by users, we need to check the table name -// to decide which `TableRef` to return. Like this: -// -// ```rust -// match table_name { -// "numbers" => ... // return NumbersTable impl -// "information_schema" => ... // return InformationSchemaTable impl -// _ => .. // return DistTable impl -// } -// ``` -// -// On the other hand, because we use `MemoryCatalogManager` for system tables, we can easily store -// and retrieve the concrete implementation of the system tables by their names, no more "if-else"s. -// -// However, if the system table is designed to have more features in the future, we may revisit -// the implementation here. +/// Access all existing catalog, schema and tables. +/// +/// The result comes from two source, all the user tables are presented in +/// a kv-backend which persists the metadata of a table. And system tables +/// comes from [SystemCatalog], which is static and read-only. #[derive(Clone)] pub struct FrontendCatalogManager { // TODO(LFC): Maybe use a real implementation for Standalone mode. @@ -88,7 +59,8 @@ pub struct FrontendCatalogManager { partition_manager: PartitionRuleManagerRef, table_metadata_manager: TableMetadataManagerRef, datanode_manager: DatanodeManagerRef, - local_catalog_manager: Arc, + /// A sub-CatalogManager that handles system tables + system_catalog: SystemCatalog, } #[async_trait::async_trait] @@ -135,14 +107,16 @@ impl FrontendCatalogManager { backend: KvBackendRef, backend_cache_invalidator: KvCacheInvalidatorRef, datanode_manager: DatanodeManagerRef, - ) -> Self { - Self { + ) -> Arc { + Arc::new_cyclic(|me| Self { backend_cache_invalidator, partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())), table_metadata_manager: Arc::new(TableMetadataManager::new(backend)), datanode_manager, - local_catalog_manager: MemoryCatalogManager::new(), - } + system_catalog: SystemCatalog { + catalog_manager: me.clone(), + }, + }) } pub fn partition_manager(&self) -> PartitionRuleManagerRef { @@ -166,32 +140,6 @@ impl FrontendCatalogManager { #[async_trait::async_trait] impl CatalogManager for FrontendCatalogManager { - fn register_local_catalog(&self, name: &str) -> CatalogResult { - self.local_catalog_manager.register_catalog(name) - } - - fn register_local_table(&self, request: RegisterTableRequest) -> CatalogResult { - self.local_catalog_manager.register_table(request) - } - - fn deregister_local_table(&self, _request: DeregisterTableRequest) -> CatalogResult<()> { - Ok(()) - } - - fn register_local_schema( - &self, - _request: RegisterSchemaRequest, - ) -> catalog::error::Result { - unimplemented!("FrontendCatalogManager does not support registering schema") - } - - fn deregister_local_schema( - &self, - _request: DeregisterSchemaRequest, - ) -> catalog_err::Result { - unimplemented!("FrontendCatalogManager does not support deregistering schema") - } - async fn catalog_names(&self) -> CatalogResult> { let stream = self .table_metadata_manager @@ -218,11 +166,13 @@ impl CatalogManager for FrontendCatalogManager { .try_collect::>() .await .map_err(BoxedError::new) - .context(ListSchemasSnafu { catalog })?; + .context(ListSchemasSnafu { catalog })? + .into_iter() + .collect::>(); - keys.insert(INFORMATION_SCHEMA_NAME.to_string()); + keys.extend_from_slice(&self.system_catalog.schema_names()); - Ok(keys.into_iter().collect::>()) + Ok(keys) } async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult> { @@ -235,13 +185,7 @@ impl CatalogManager for FrontendCatalogManager { .into_iter() .map(|(k, _)| k) .collect::>(); - if catalog == DEFAULT_CATALOG_NAME && schema == DEFAULT_SCHEMA_NAME { - tables.push(NUMBERS_TABLE_NAME.to_string()); - } - if schema == INFORMATION_SCHEMA_NAME { - tables.push(TABLES.to_string()); - tables.push(COLUMNS.to_string()); - } + tables.extend_from_slice(&self.system_catalog.table_names(schema)); Ok(tables) } @@ -255,9 +199,10 @@ impl CatalogManager for FrontendCatalogManager { } async fn schema_exist(&self, catalog: &str, schema: &str) -> CatalogResult { - if schema == INFORMATION_SCHEMA_NAME { + if self.system_catalog.schema_exist(schema) { return Ok(true); } + self.table_metadata_manager .schema_manager() .exist(SchemaNameKey::new(catalog, schema)) @@ -266,7 +211,7 @@ impl CatalogManager for FrontendCatalogManager { } async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult { - if schema == INFORMATION_SCHEMA_NAME && (table == TABLES || table == COLUMNS) { + if self.system_catalog.table_exist(schema, table) { return Ok(true); } @@ -285,19 +230,8 @@ impl CatalogManager for FrontendCatalogManager { schema: &str, table_name: &str, ) -> CatalogResult> { - if catalog == DEFAULT_CATALOG_NAME - && schema == DEFAULT_SCHEMA_NAME - && table_name == NUMBERS_TABLE_NAME - { - return Ok(Some(NumbersTable::table(NUMBERS_TABLE_ID))); - } - - if schema == INFORMATION_SCHEMA_NAME { - let manager = Arc::new(self.clone()) as _; - - let provider = - InformationSchemaProvider::new(catalog.to_string(), Arc::downgrade(&manager)); - return Ok(provider.table(table_name)); + if let Some(table) = self.system_catalog.table(catalog, schema, table_name) { + return Ok(Some(table)); } let key = TableNameKey::new(catalog, schema, table_name); @@ -334,3 +268,57 @@ impl CatalogManager for FrontendCatalogManager { self } } + +// TODO: This struct can hold a static map of all system tables when +// the upper layer (e.g., procedure) can inform the catalog manager +// a new catalog is created. +/// Existing system tables: +/// - public.numbers +/// - information_schema.tables +/// - information_schema.columns +#[derive(Clone)] +struct SystemCatalog { + catalog_manager: Weak, +} + +impl SystemCatalog { + fn schema_names(&self) -> Vec { + vec![INFORMATION_SCHEMA_NAME.to_string()] + } + + fn table_names(&self, schema: &str) -> Vec { + if schema == INFORMATION_SCHEMA_NAME { + vec![TABLES.to_string(), COLUMNS.to_string()] + } else if schema == DEFAULT_SCHEMA_NAME { + vec![NUMBERS_TABLE_NAME.to_string()] + } else { + vec![] + } + } + + fn schema_exist(&self, schema: &str) -> bool { + schema == INFORMATION_SCHEMA_NAME + } + + fn table_exist(&self, schema: &str, table: &str) -> bool { + if schema == INFORMATION_SCHEMA_NAME { + table == TABLES || table == COLUMNS + } else if schema == DEFAULT_SCHEMA_NAME { + table == NUMBERS_TABLE_NAME + } else { + false + } + } + + fn table(&self, catalog: &str, schema: &str, table_name: &str) -> Option { + if schema == INFORMATION_SCHEMA_NAME { + let information_schema_provider = + InformationSchemaProvider::new(catalog.to_string(), self.catalog_manager.clone()); + information_schema_provider.table(table_name) + } else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME { + Some(NumbersTable::table(NUMBERS_TABLE_ID)) + } else { + None + } + } +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index d15d7cf7a60d..db88c1199359 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -27,7 +27,6 @@ use std::time::Duration; use api::v1::meta::Role; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; -use catalog::local::manager::SystemTableInitializer; use catalog::remote::CachedMetaKvBackend; use catalog::CatalogManagerRef; use client::client_manager::DatanodeClients; @@ -78,16 +77,14 @@ use sql::statements::copy::CopyTable; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; pub use standalone::StandaloneDatanodeManager; -use table::engine::manager::MemoryTableEngineManager; use self::distributed::DistRegionRequestHandler; use self::standalone::StandaloneTableMetadataCreator; use crate::catalog::FrontendCatalogManager; use crate::delete::{Deleter, DeleterRef}; use crate::error::{ - self, CatalogSnafu, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, - MissingMetasrvOptsSnafu, ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, - SqlExecInterceptedSnafu, + self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu, + ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, }; use crate::frontend::FrontendOptions; use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; @@ -154,11 +151,11 @@ impl Instance { ) -> Result { let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); - let catalog_manager = Arc::new(FrontendCatalogManager::new( + let catalog_manager = FrontendCatalogManager::new( meta_backend.clone(), meta_backend.clone(), datanode_clients.clone(), - )); + ); let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone())); let region_request_handler = DistRegionRequestHandler::arc( @@ -398,14 +395,6 @@ impl FrontendInstance for Instance { heartbeat_task.start().await?; } - let initializer = SystemTableInitializer::try_new( - Arc::new(MemoryTableEngineManager::new_empty()), - self.catalog_manager.clone(), - ) - .await - .context(CatalogSnafu)?; - initializer.init().await.context(CatalogSnafu)?; - self.script_executor.start(self).await?; futures::future::try_join_all(self.servers.values().map(start_server)) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index bbaab7021ac8..44c6a2288420 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -1376,7 +1376,7 @@ mod test { use std::time::{Duration, UNIX_EPOCH}; use catalog::local::MemoryCatalogManager; - use catalog::{CatalogManager, RegisterTableRequest}; + use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; @@ -1432,7 +1432,7 @@ mod test { let table = EmptyTable::from_table_info(&table_info); let catalog_list = MemoryCatalogManager::with_default_setup(); assert!(catalog_list - .register_local_table(RegisterTableRequest { + .register_table_sync(RegisterTableRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), table_name, diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 4790aa498700..cb387807b272 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -486,7 +486,7 @@ mod tests { use std::borrow::Cow::Borrowed; use std::sync::Arc; - use catalog::{CatalogManager, RegisterTableRequest}; + use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID}; use common_query::Output; use common_recordbatch::{util, RecordBatch}; @@ -515,7 +515,7 @@ mod tests { table_id: NUMBERS_TABLE_ID, table: NumbersTable::table(NUMBERS_TABLE_ID), }; - catalog_manager.register_local_table(req).unwrap(); + catalog_manager.register_table_sync(req).unwrap(); QueryEngineFactory::new(catalog_manager, None, false).query_engine() } diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index e079b31fc5bb..b4205f12c063 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -326,7 +326,7 @@ fn have_range_in_exprs(exprs: &Vec) -> bool { mod test { use catalog::local::MemoryCatalogManager; - use catalog::{CatalogManager, RegisterTableRequest}; + use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; @@ -380,7 +380,7 @@ mod test { let table = EmptyTable::from_table_info(&table_info); let catalog_list = MemoryCatalogManager::with_default_setup(); assert!(catalog_list - .register_local_table(RegisterTableRequest { + .register_table_sync(RegisterTableRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), table_name, diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index c98c471e6e16..83ec1e52bc67 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -112,7 +112,7 @@ fn catalog_manager() -> Result> { table_id: NUMBERS_TABLE_ID, table: NumbersTable::table(NUMBERS_TABLE_ID), }; - let _ = catalog_manager.register_table(req).unwrap(); + let _ = catalog_manager.register_table_sync(req).unwrap(); Ok(catalog_manager) } diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index fdf529e2072c..383ac2ea8035 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -104,7 +104,7 @@ fn create_test_engine() -> TimeRangeTester { table_id: table.table_info().ident.table_id, table: table.clone(), }; - let _ = catalog_manager.register_table(req).unwrap(); + let _ = catalog_manager.register_table_sync(req).unwrap(); let engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine(); TimeRangeTester { engine, filter } diff --git a/src/table-procedure/src/create.rs b/src/table-procedure/src/create.rs index 126f7c735a08..d3a17ce89bec 100644 --- a/src/table-procedure/src/create.rs +++ b/src/table-procedure/src/create.rs @@ -15,7 +15,7 @@ //! Procedure to create a table. use async_trait::async_trait; -use catalog::{CatalogManagerRef, RegisterTableRequest}; +use catalog::CatalogManagerRef; use common_procedure::error::SubprocedureFailedSnafu; use common_procedure::{ Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState, @@ -240,25 +240,13 @@ impl CreateTableProcedure { region_numbers: self.data.request.region_numbers.clone(), }; // Safety: The table is already created. - let table = self + let _ = self .table_engine .open_table(&engine_ctx, open_req) .await .map_err(Error::from_error_ext)? .unwrap(); - let register_req = RegisterTableRequest { - catalog: self.data.request.catalog_name.clone(), - schema: self.data.request.schema_name.clone(), - table_name: self.data.request.table_name.clone(), - table_id: self.data.request.id, - table, - }; - let _ = self - .catalog_manager - .register_local_table(register_req) - .map_err(Error::from_error_ext)?; - Ok(Status::Done) } } diff --git a/src/table-procedure/src/drop.rs b/src/table-procedure/src/drop.rs index 0fa521728757..7dc1bafd7a7b 100644 --- a/src/table-procedure/src/drop.rs +++ b/src/table-procedure/src/drop.rs @@ -15,7 +15,7 @@ //! Procedure to drop a table. use async_trait::async_trait; -use catalog::{CatalogManagerRef, DeregisterTableRequest}; +use catalog::CatalogManagerRef; use common_procedure::error::SubprocedureFailedSnafu; use common_procedure::{ Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState, @@ -144,29 +144,6 @@ impl DropTableProcedure { } async fn on_remove_from_catalog(&mut self) -> Result { - let request = &self.data.request; - let has_table = self - .catalog_manager - .table( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ) - .await - .context(AccessCatalogSnafu)? - .is_some(); - if has_table { - // The table is still in the catalog. - let deregister_table_req = DeregisterTableRequest { - catalog: self.data.request.catalog_name.clone(), - schema: self.data.request.schema_name.clone(), - table_name: self.data.request.table_name.clone(), - }; - self.catalog_manager - .deregister_local_table(deregister_table_req) - .context(AccessCatalogSnafu)?; - } - self.data.state = DropTableState::EngineDropTable; // Assign procedure id to the subprocedure. self.data.subprocedure_id = Some(ProcedureId::random()); @@ -264,45 +241,10 @@ impl DropTableData { #[cfg(test)] mod tests { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use table::engine::TableEngine; use super::*; use crate::test_util::TestEnv; - #[tokio::test] - async fn test_drop_table_procedure() { - let env = TestEnv::new("drop"); - let table_name = "test_drop"; - let table_id = env.create_table(table_name).await; - - let request = DropTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - table_id, - }; - let TestEnv { - dir: _dir, - table_engine, - procedure_manager, - catalog_manager, - } = env; - let procedure = - DropTableProcedure::new(request, catalog_manager.clone(), table_engine.clone()); - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - - let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap(); - watcher.changed().await.unwrap(); - - assert!(!catalog_manager - .table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) - .await - .unwrap()); - - let ctx = EngineContext::default(); - assert!(!table_engine.table_exists(&ctx, table_id,)); - } - #[tokio::test] async fn test_drop_not_exists_table() { common_telemetry::init_default_ut_logging(); diff --git a/src/table-procedure/src/test_util.rs b/src/table-procedure/src/test_util.rs index 740bd9d62964..c986531fcdb4 100644 --- a/src/table-procedure/src/test_util.rs +++ b/src/table-procedure/src/test_util.rs @@ -20,7 +20,7 @@ use catalog::CatalogManagerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::store::state_store::ObjectStateStore; -use common_procedure::{ProcedureManagerRef, ProcedureWithId}; +use common_procedure::ProcedureManagerRef; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; @@ -32,11 +32,8 @@ use object_store::ObjectStore; use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; -use table::metadata::TableId; use table::requests::CreateTableRequest; -use crate::CreateTableProcedure; - pub struct TestEnv { pub dir: TempDir, pub table_engine: Arc>>, @@ -92,27 +89,6 @@ impl TestEnv { catalog_manager, } } - - pub async fn create_table(&self, table_name: &str) -> TableId { - let request = new_create_request(table_name); - let table_id = request.id; - let procedure = CreateTableProcedure::new( - request, - self.catalog_manager.clone(), - self.table_engine.clone(), - self.table_engine.clone(), - ); - - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - let mut watcher = self - .procedure_manager - .submit(procedure_with_id) - .await - .unwrap(); - watcher.changed().await.unwrap(); - - table_id - } } pub fn schema_for_test() -> RawSchema { diff --git a/src/table-procedure/src/truncate.rs b/src/table-procedure/src/truncate.rs index 868b077b7dd7..63bc07cc4dfb 100644 --- a/src/table-procedure/src/truncate.rs +++ b/src/table-procedure/src/truncate.rs @@ -234,45 +234,10 @@ impl TruncateTableData { #[cfg(test)] mod tests { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use table::engine::TableEngine; use super::*; use crate::test_util::TestEnv; - #[tokio::test] - async fn test_truncate_table_procedure() { - let env = TestEnv::new("truncate"); - let table_name = "test_truncate"; - let table_id = env.create_table(table_name).await; - - let request = TruncateTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - table_id, - }; - let TestEnv { - dir: _dir, - table_engine, - procedure_manager, - catalog_manager, - } = env; - let procedure = - TruncateTableProcedure::new(request, catalog_manager.clone(), table_engine.clone()); - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - - let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap(); - watcher.changed().await.unwrap(); - - assert!(catalog_manager - .table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) - .await - .unwrap()); - - let ctx = EngineContext::default(); - assert!(table_engine.table_exists(&ctx, table_id)); - } - #[tokio::test] async fn test_truncate_not_exists_table() { common_telemetry::init_default_ut_logging(); diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 38350646ce69..222640ced4e2 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -26,7 +26,6 @@ use crate::requests::{ TruncateTableRequest, }; use crate::TableRef; -pub mod manager; /// Represents a resolved path to a table of the form “catalog.schema.table” #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] diff --git a/src/table/src/engine/manager.rs b/src/table/src/engine/manager.rs deleted file mode 100644 index e642d3aebbf7..000000000000 --- a/src/table/src/engine/manager.rs +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; - -use async_trait::async_trait; -use common_telemetry::error; -use snafu::{ensure, OptionExt}; - -use crate::engine::{TableEngineProcedureRef, TableEngineRef}; -use crate::error::{EngineExistSnafu, EngineNotFoundSnafu, Result}; - -#[async_trait::async_trait] -pub trait TableEngineManager: Send + Sync { - /// returns [Error::EngineNotFound](crate::error::Error::EngineNotFound) if engine not found - fn engine(&self, name: &str) -> Result; - - /// returns [Error::EngineExist](crate::error::Error::EngineExist) if engine exists - fn register_engine(&self, name: &str, engine: TableEngineRef) -> Result<()>; - - /// closes all registered engines - async fn close(&self) -> Result<()>; - - /// returns [TableEngineProcedureRef] of specific engine `name` or - /// [Error::EngineNotFound](crate::error::Error::EngineNotFound) if engine not found - fn engine_procedure(&self, name: &str) -> Result; -} -pub type TableEngineManagerRef = Arc; - -/// Simple in-memory table engine manager -pub struct MemoryTableEngineManager { - pub engines: RwLock>, - engine_procedures: RwLock>, -} - -impl MemoryTableEngineManager { - /// Create a new [MemoryTableEngineManager] with single table `engine`. - pub fn new(engine: TableEngineRef) -> Self { - MemoryTableEngineManager::alias(engine.name().to_string(), engine) - } - - // TODO: remove `TableEngineManager` - pub fn new_empty() -> Self { - let engines = RwLock::new(HashMap::new()); - let engine_procedures = RwLock::new(HashMap::new()); - - MemoryTableEngineManager { - engines, - engine_procedures, - } - } - - /// Create a new [MemoryTableEngineManager] with single table `engine` and - /// an alias `name` instead of the engine's name. - pub fn alias(name: String, engine: TableEngineRef) -> Self { - let engines = HashMap::from([(name, engine)]); - let engines = RwLock::new(engines); - - MemoryTableEngineManager { - engines, - engine_procedures: RwLock::new(HashMap::new()), - } - } - - /// Attach the `engine_procedures` to the manager. - pub fn with_engine_procedures( - mut self, - engine_procedures: HashMap, - ) -> Self { - self.engine_procedures = RwLock::new(engine_procedures); - self - } - - pub fn with(engines: Vec) -> Self { - let engines = engines - .into_iter() - .map(|engine| (engine.name().to_string(), engine)) - .collect::>(); - let engines = RwLock::new(engines); - MemoryTableEngineManager { - engines, - engine_procedures: RwLock::new(HashMap::new()), - } - } -} - -#[async_trait] -impl TableEngineManager for MemoryTableEngineManager { - fn engine(&self, name: &str) -> Result { - let engines = self.engines.read().unwrap(); - engines - .get(name) - .cloned() - .context(EngineNotFoundSnafu { engine: name }) - } - - fn register_engine(&self, name: &str, engine: TableEngineRef) -> Result<()> { - let mut engines = self.engines.write().unwrap(); - - ensure!( - !engines.contains_key(name), - EngineExistSnafu { engine: name } - ); - - let _ = engines.insert(name.to_string(), engine); - - Ok(()) - } - - async fn close(&self) -> Result<()> { - let engines = { - let engines = self.engines.write().unwrap(); - engines.values().cloned().collect::>() - }; - - if let Err(err) = - futures::future::try_join_all(engines.iter().map(|engine| engine.close())).await - { - error!("Failed to close engine: {}", err); - } - - Ok(()) - } - - fn engine_procedure(&self, name: &str) -> Result { - let engine_procedures = self.engine_procedures.read().unwrap(); - engine_procedures - .get(name) - .cloned() - .context(EngineNotFoundSnafu { engine: name }) - } -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - - use super::*; - use crate::engine::TableEngine; - use crate::error; - use crate::test_util::MockTableEngine; - - #[test] - fn test_table_engine_manager() { - let table_engine = MockTableEngine::new(); - let table_engine_ref = Arc::new(table_engine); - let table_engine_manager = MemoryTableEngineManager::new(table_engine_ref.clone()); - - // Attach engine procedures. - let engine_procedure: TableEngineProcedureRef = table_engine_ref.clone(); - let engine_procedures = - HashMap::from([(table_engine_ref.name().to_string(), engine_procedure)]); - let table_engine_manager = table_engine_manager.with_engine_procedures(engine_procedures); - - table_engine_manager - .register_engine("yet_another", table_engine_ref.clone()) - .unwrap(); - - let got = table_engine_manager.engine(table_engine_ref.name()); - - assert_eq!(got.unwrap().name(), table_engine_ref.name()); - - let got = table_engine_manager.engine("yet_another"); - - assert_eq!(got.unwrap().name(), table_engine_ref.name()); - - let missing = table_engine_manager.engine("not_exists"); - - assert_matches!(missing.err().unwrap(), error::Error::EngineNotFound { .. }); - - assert!(table_engine_manager - .engine_procedure(table_engine_ref.name()) - .is_ok()); - assert_matches!( - table_engine_manager - .engine_procedure("unknown") - .err() - .unwrap(), - error::Error::EngineNotFound { .. } - ); - } -}