From 8fde57028b3ced2ff6a708396d46f3eecf3812e7 Mon Sep 17 00:00:00 2001 From: johnsonlee Date: Fri, 17 May 2024 14:38:54 +0800 Subject: [PATCH 01/12] WIP: pg_catalog --- src/catalog/src/kvbackend/manager.rs | 31 +++++-- src/catalog/src/lib.rs | 1 + src/catalog/src/memory/manager.rs | 11 ++- src/catalog/src/pg_catalog.rs | 107 ++++++++++++++++++++++ src/catalog/src/pg_catalog/pg_class.rs | 79 ++++++++++++++++ src/catalog/src/pg_catalog/table_names.rs | 17 ++++ src/common/catalog/src/consts.rs | 7 ++ 7 files changed, 243 insertions(+), 10 deletions(-) create mode 100644 src/catalog/src/pg_catalog.rs create mode 100644 src/catalog/src/pg_catalog/pg_class.rs create mode 100644 src/catalog/src/pg_catalog/table_names.rs diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 0bf51643b1b1..3fc721b121f9 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -19,6 +19,7 @@ use std::sync::{Arc, Weak}; use async_stream::try_stream; use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID, + PG_CATALOG_NAME, }; use common_config::Mode; use common_error::ext::BoxedError; @@ -46,6 +47,7 @@ use crate::error::{ }; use crate::information_schema::InformationSchemaProvider; use crate::kvbackend::TableCacheRef; +use crate::pg_catalog::PGCatalogProvider; use crate::CatalogManager; /// Access all existing catalog, schema and tables. @@ -90,6 +92,10 @@ impl KvBackendCatalogManager { DEFAULT_CATALOG_NAME.to_string(), me.clone(), )), + pg_catalog_provider: Arc::new(PGCatalogProvider::new( + DEFAULT_CATALOG_NAME.to_string(), + me.clone(), + )), }, cache_registry, }) @@ -295,30 +301,37 @@ fn build_table(table_info_value: TableInfoValue) -> Result { /// Existing system tables: /// - public.numbers /// - information_schema.{tables} +/// - pg_catalog.{tables} #[derive(Clone)] struct SystemCatalog { catalog_manager: Weak, catalog_cache: Cache>, information_schema_provider: Arc, + pg_catalog_provider: Arc, } impl SystemCatalog { + // TODO(j0hn50n133): remove the duplicated hard-coded table names logic fn schema_names(&self) -> Vec { - vec![INFORMATION_SCHEMA_NAME.to_string()] + vec![ + INFORMATION_SCHEMA_NAME.to_string(), + PG_CATALOG_NAME.to_string(), + ] } fn table_names(&self, schema: &str) -> Vec { - if schema == INFORMATION_SCHEMA_NAME { - self.information_schema_provider.table_names() - } else if schema == DEFAULT_SCHEMA_NAME { - vec![NUMBERS_TABLE_NAME.to_string()] - } else { - vec![] + match schema { + INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(), + PG_CATALOG_NAME => self.pg_catalog_provider.table_names(), + DEFAULT_SCHEMA_NAME => { + vec![NUMBERS_TABLE_NAME.to_string()] + } + _ => vec![], } } fn schema_exists(&self, schema: &str) -> bool { - schema == INFORMATION_SCHEMA_NAME + schema == INFORMATION_SCHEMA_NAME || schema == PG_CATALOG_NAME } fn table_exists(&self, schema: &str, table: &str) -> bool { @@ -326,6 +339,8 @@ impl SystemCatalog { self.information_schema_provider.table(table).is_some() } else if schema == DEFAULT_SCHEMA_NAME { table == NUMBERS_TABLE_NAME + } else if schema == PG_CATALOG_NAME { + self.pg_catalog_provider.table(table).is_some() } else { false } diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 494a94df2699..fa21ec517411 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -32,6 +32,7 @@ pub mod information_schema; pub mod kvbackend; pub mod memory; mod metrics; +pub mod pg_catalog; pub mod table_source; #[async_trait::async_trait] diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index a5513b89048e..97f64be6dc2d 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -20,7 +20,8 @@ use std::sync::{Arc, RwLock, Weak}; use async_stream::{stream, try_stream}; use common_catalog::build_db_string; use common_catalog::consts::{ - DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, + DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, + INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME, }; use futures_util::stream::BoxStream; use snafu::OptionExt; @@ -173,6 +174,12 @@ impl MemoryCatalogManager { schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), }) .unwrap(); + manager + .register_schema_sync(RegisterSchemaRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: PG_CATALOG_NAME.to_string(), + }) + .unwrap(); manager .register_schema_sync(RegisterSchemaRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), @@ -196,7 +203,7 @@ impl MemoryCatalogManager { } fn catalog_exist_sync(&self, catalog: &str) -> Result { - Ok(self.catalogs.read().unwrap().get(catalog).is_some()) + Ok(self.catalogs.read().unwrap().contains_key(catalog)) } /// Registers a catalog if it does not exist and returns false if the schema exists. diff --git a/src/catalog/src/pg_catalog.rs b/src/catalog/src/pg_catalog.rs new file mode 100644 index 000000000000..6f0332c04dcf --- /dev/null +++ b/src/catalog/src/pg_catalog.rs @@ -0,0 +1,107 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod pg_class; +mod table_names; + +use std::collections::HashMap; +use std::sync::{Arc, Weak}; + +use common_catalog::consts::DEFAULT_CATALOG_NAME; +use common_recordbatch::SendableRecordBatchStream; +use datatypes::schema::SchemaRef; +use store_api::storage::{ScanRequest, TableId}; +use table::metadata::TableType; +use table::TableRef; + +use self::table_names::PG_CLASS; +use crate::error::Result; +use crate::CatalogManager; + +/// The column name for the OID column. +/// The OID column is a unique identifier of type u32 for each object in the database. +const OID_COLUMN_NAME: &str = "oid"; + +/// [`PGCatalogProvider`] is the provider for a schema named `pg_catalog`, it is not a catalog. +pub struct PGCatalogProvider { + catalog_name: String, + catalog_manager: Weak, + tables: HashMap, +} + +impl PGCatalogProvider { + pub fn new(catalog_name: String, catalog_manager: Weak) -> Self { + let mut provider = Self { + catalog_name, + catalog_manager, + tables: HashMap::new(), + }; + provider.build_tables(); + provider + } + + fn build_tables(&mut self) { + // SECURITY NOTE: + // Follow the same security rules as [InformationSchemaProvider::build_tables]. + if self.catalog_name == DEFAULT_CATALOG_NAME { + self.tables.insert( + table_names::PG_CLASS.to_string(), + self.build_table(PG_CLASS).unwrap(), + ); + } + } + + fn build_table(&self, name: &str) -> Option { + // self. + todo!() + } + + pub fn table_names(&self) -> Vec { + // TODO(j0hn50n133): replace hardcoded table names with collected table names + vec![ + table_names::PG_DATABASE.to_string(), + table_names::PG_NAMESPACE.to_string(), + table_names::PG_CLASS.to_string(), + ] + } + + pub fn pg_catalog_table(&self, name: &str) -> Option { + match name.to_ascii_lowercase().as_str() { + table_names::PG_CLASS => todo!(), + _ => None, + } + } + + /// Returns the [TableRef] by table name. + pub fn table(&self, name: &str) -> Option { + todo!() + } +} + +/// The trait for all tables in the `pg_catalog` schema. +trait PGCatalogTable { + fn table_id(&self) -> TableId; + + fn table_name(&self) -> &'static str; + + fn schema(&self) -> SchemaRef; + + fn to_stream(&self, request: ScanRequest) -> Result; + + fn table_type(&self) -> TableType { + TableType::Temporary + } +} + +type PGCatalogTableRef = Arc; diff --git a/src/catalog/src/pg_catalog/pg_class.rs b/src/catalog/src/pg_catalog/pg_class.rs new file mode 100644 index 000000000000..574e693965e3 --- /dev/null +++ b/src/catalog/src/pg_catalog/pg_class.rs @@ -0,0 +1,79 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Weak}; + +use common_catalog::consts::PG_CATALOG_PG_CLASS_TABLE_ID; +use common_recordbatch::SendableRecordBatchStream; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use store_api::storage::{ConcreteDataType, ScanRequest}; + +use super::table_names::PG_CLASS; +use super::{PGCatalogTable, OID_COLUMN_NAME}; +use crate::error::Result; +use crate::CatalogManager; + +const CLASS_RELKIND: &str = "relkind"; +const CLASS_RELOWNER: &str = "relowner"; +const CLASS_RELNAME: &str = "relname"; +const CLASS_RELNAMESPACE: &str = "relnamespace"; + +pub(super) struct PGCatalogPgClass { + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, +} + +impl PGCatalogPgClass { + pub(super) fn new(catalog_name: String, catalog_manager: Weak) -> Self { + Self { + schema: Self::schema(), + catalog_name, + catalog_manager, + } + } + + pub(super) fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + ColumnSchema::new(OID_COLUMN_NAME, ConcreteDataType::uint32_datatype(), false), + ColumnSchema::new(CLASS_RELNAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new( + CLASS_RELNAMESPACE, + ConcreteDataType::uint32_datatype(), + false, + ), + ColumnSchema::new(CLASS_RELKIND, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(CLASS_RELOWNER, ConcreteDataType::uint32_datatype(), false), + ])) + } +} + +impl PGCatalogTable for PGCatalogPgClass { + fn table_id(&self) -> store_api::storage::TableId { + PG_CATALOG_PG_CLASS_TABLE_ID + } + + fn table_name(&self) -> &'static str { + PG_CLASS + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self, request: ScanRequest) -> Result { + let schema = self.schema.arrow_schema().clone(); + todo!() + } +} diff --git a/src/catalog/src/pg_catalog/table_names.rs b/src/catalog/src/pg_catalog/table_names.rs new file mode 100644 index 000000000000..667b85f15b35 --- /dev/null +++ b/src/catalog/src/pg_catalog/table_names.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub const PG_DATABASE: &str = "pg_databases"; +pub const PG_NAMESPACE: &str = "pg_namespace"; +pub const PG_CLASS: &str = "pg_class"; diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 876d4bcb67f4..8121a5ea2873 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -14,6 +14,7 @@ pub const SYSTEM_CATALOG_NAME: &str = "system"; pub const INFORMATION_SCHEMA_NAME: &str = "information_schema"; +pub const PG_CATALOG_NAME: &str = "pg_catalog"; pub const SYSTEM_CATALOG_TABLE_NAME: &str = "system_catalog"; pub const DEFAULT_CATALOG_NAME: &str = "greptime"; pub const DEFAULT_SCHEMA_NAME: &str = "public"; @@ -95,6 +96,12 @@ pub const INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID: u32 = 30; pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31; /// ----- End of information_schema tables ----- +/// ----- Begin of pg_catalog tables ----- +/// mask for pg_catalog tables id, to reserve enough table_id for tables, 0x200 == 256 == 0b100000000 +pub const PG_CATALOG_TABLE_ID_MASK: u32 = 0x200; +/// id for pg_catalog.pg_class +pub const PG_CATALOG_PG_CLASS_TABLE_ID: u32 = PG_CATALOG_TABLE_ID_MASK | 1; +/// ----- End of pg_catalog tables ----- pub const MITO_ENGINE: &str = "mito"; pub const MITO2_ENGINE: &str = "mito2"; pub const METRIC_ENGINE: &str = "metric"; From 080a17e5c24095f1d73abcb61c1fb8c7fca81b9a Mon Sep 17 00:00:00 2001 From: J0hn50n133 <0xjohnsonlee@gmail.com> Date: Mon, 8 Jul 2024 20:41:28 +0800 Subject: [PATCH 02/12] refactor: move memory_table to crate public level to reuse it in pgcatalog --- src/catalog/src/information_schema.rs | 5 +- .../tables.rs => information_memory_table.rs} | 77 +++++++++---------- src/catalog/src/lib.rs | 1 + .../{information_schema => }/memory_table.rs | 55 +++++-------- src/catalog/src/memory_table/tables.rs | 63 +++++++++++++++ 5 files changed, 121 insertions(+), 80 deletions(-) rename src/catalog/src/information_schema/{memory_table/tables.rs => information_memory_table.rs} (91%) rename src/catalog/src/{information_schema => }/memory_table.rs (80%) create mode 100644 src/catalog/src/memory_table/tables.rs diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 5a812ff6c1ea..4b3ed20f8307 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -14,8 +14,8 @@ mod cluster_info; pub mod columns; +mod information_memory_table; pub mod key_column_usage; -mod memory_table; mod partitions; mod predicate; mod region_peers; @@ -50,14 +50,15 @@ pub use table_names::*; use self::columns::InformationSchemaColumns; use crate::error::Result; use crate::information_schema::cluster_info::InformationSchemaClusterInfo; +use crate::information_schema::information_memory_table::get_schema_columns; use crate::information_schema::key_column_usage::InformationSchemaKeyColumnUsage; -use crate::information_schema::memory_table::{get_schema_columns, MemoryTable}; use crate::information_schema::partitions::InformationSchemaPartitions; use crate::information_schema::region_peers::InformationSchemaRegionPeers; use crate::information_schema::runtime_metrics::InformationSchemaMetrics; use crate::information_schema::schemata::InformationSchemaSchemata; use crate::information_schema::table_constraints::InformationSchemaTableConstraints; use crate::information_schema::tables::InformationSchemaTables; +use crate::memory_table::MemoryTable; use crate::CatalogManager; lazy_static! { diff --git a/src/catalog/src/information_schema/memory_table/tables.rs b/src/catalog/src/information_schema/information_memory_table.rs similarity index 91% rename from src/catalog/src/information_schema/memory_table/tables.rs rename to src/catalog/src/information_schema/information_memory_table.rs index e1696ab8e106..5bfd94707904 100644 --- a/src/catalog/src/information_schema/memory_table/tables.rs +++ b/src/catalog/src/information_schema/information_memory_table.rs @@ -15,17 +15,23 @@ use std::sync::Arc; use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE}; -use datatypes::prelude::{ConcreteDataType, VectorRef}; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -use datatypes::vectors::{Int64Vector, StringVector}; - -use crate::information_schema::table_names::*; +use common_recordbatch::SendableRecordBatchStream; +use datatypes::schema::{Schema, SchemaRef}; +use datatypes::vectors::{Int64Vector, StringVector, VectorRef}; +use store_api::storage::{ScanRequest, TableId}; + +use super::table_names::*; +use super::InformationTable; +// pub use tables::get_schema_columns; +use crate::error::Result; +use crate::memory_table::tables::{bigint_column, datetime_column, string_column, string_columns}; +use crate::memory_table::MemoryTable; const NO_VALUE: &str = "NO"; /// Find the schema and columns by the table_name, only valid for memory tables. /// Safety: the user MUST ensure the table schema exists, panic otherwise. -pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { +pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { let (column_schemas, columns): (_, Vec) = match table_name { COLUMN_PRIVILEGES => ( string_columns(&[ @@ -415,49 +421,38 @@ pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { (Arc::new(Schema::new(column_schemas)), columns) } -fn string_columns(names: &[&'static str]) -> Vec { - names.iter().map(|name| string_column(name)).collect() -} +impl InformationTable for MemoryTable { + fn table_id(&self) -> TableId { + self.table_id + } -fn string_column(name: &str) -> ColumnSchema { - ColumnSchema::new( - str::to_lowercase(name), - ConcreteDataType::string_datatype(), - false, - ) -} + fn table_name(&self) -> &'static str { + self.table_name + } -fn bigint_column(name: &str) -> ColumnSchema { - ColumnSchema::new( - str::to_lowercase(name), - ConcreteDataType::int64_datatype(), - false, - ) -} + fn schema(&self) -> SchemaRef { + self.schema.clone() + } -fn datetime_column(name: &str) -> ColumnSchema { - ColumnSchema::new( - str::to_lowercase(name), - ConcreteDataType::datetime_datatype(), - false, - ) + fn to_stream(&self, _request: ScanRequest) -> Result { + self.to_stream() + } } #[cfg(test)] mod tests { - use super::*; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use super::*; #[test] - fn test_string_columns() { - let columns = ["a", "b", "c"]; - let column_schemas = string_columns(&columns); - - assert_eq!(3, column_schemas.len()); - for (i, name) in columns.iter().enumerate() { - let cs = column_schemas.get(i).unwrap(); - - assert_eq!(*name, cs.name); - assert_eq!(ConcreteDataType::string_datatype(), cs.data_type); - } + fn test_information_memory_table_schema() { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("a", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("b", ConcreteDataType::string_datatype(), false), + ])); + + let table = MemoryTable::new(42, "test", schema.clone(), vec![]); + assert_eq!(schema, InformationTable::schema(&table)); } } diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index fa21ec517411..f9012840ce00 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -31,6 +31,7 @@ pub mod error; pub mod information_schema; pub mod kvbackend; pub mod memory; +mod memory_table; mod metrics; pub mod pg_catalog; pub mod table_source; diff --git a/src/catalog/src/information_schema/memory_table.rs b/src/catalog/src/memory_table.rs similarity index 80% rename from src/catalog/src/information_schema/memory_table.rs rename to src/catalog/src/memory_table.rs index 7f016f665409..563ca104c109 100644 --- a/src/catalog/src/information_schema/memory_table.rs +++ b/src/catalog/src/memory_table.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod tables; +pub mod tables; use std::sync::Arc; use arrow_schema::SchemaRef as ArrowSchemaRef; @@ -26,23 +26,21 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc use datatypes::schema::SchemaRef; use datatypes::vectors::VectorRef; use snafu::ResultExt; -use store_api::storage::{ScanRequest, TableId}; -pub use tables::get_schema_columns; +use store_api::storage::TableId; use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; -use crate::information_schema::InformationTable; /// A memory table with specified schema and columns. -pub(super) struct MemoryTable { - table_id: TableId, - table_name: &'static str, - schema: SchemaRef, - columns: Vec, +pub(crate) struct MemoryTable { + pub(crate) table_id: TableId, + pub(crate) table_name: &'static str, + pub(crate) schema: SchemaRef, + pub(crate) columns: Vec, } impl MemoryTable { /// Creates a memory table with table id, name, schema and columns. - pub(super) fn new( + pub fn new( table_id: TableId, table_name: &'static str, schema: SchemaRef, @@ -56,25 +54,10 @@ impl MemoryTable { } } - fn builder(&self) -> MemoryTableBuilder { + pub fn builder(&self) -> MemoryTableBuilder { MemoryTableBuilder::new(self.schema.clone(), self.columns.clone()) } -} - -impl InformationTable for MemoryTable { - fn table_id(&self) -> TableId { - self.table_id - } - - fn table_name(&self) -> &'static str { - self.table_name - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn to_stream(&self, _request: ScanRequest) -> Result { + pub fn to_stream(&self) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( @@ -95,7 +78,7 @@ impl InformationTable for MemoryTable { } } -struct MemoryTableBuilder { +pub(crate) struct MemoryTableBuilder { schema: SchemaRef, columns: Vec, } @@ -106,7 +89,7 @@ impl MemoryTableBuilder { } /// Construct the `information_schema.{table_name}` virtual table - async fn memory_records(&mut self) -> Result { + pub async fn memory_records(&mut self) -> Result { if self.columns.is_empty() { RecordBatch::new_empty(self.schema.clone()).context(CreateRecordBatchSnafu) } else { @@ -165,11 +148,10 @@ mod tests { ], ); - assert_eq!(42, table.table_id()); - assert_eq!("test", table.table_name()); - assert_eq!(schema, InformationTable::schema(&table)); + assert_eq!(42, table.table_id); + assert_eq!("test", table.table_name); - let stream = table.to_stream(ScanRequest::default()).unwrap(); + let stream = table.to_stream().unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -194,11 +176,10 @@ mod tests { let table = MemoryTable::new(42, "test", schema.clone(), vec![]); - assert_eq!(42, table.table_id()); - assert_eq!("test", table.table_name()); - assert_eq!(schema, InformationTable::schema(&table)); + assert_eq!(42, table.table_id); + assert_eq!("test", table.table_name); - let stream = table.to_stream(ScanRequest::default()).unwrap(); + let stream = table.to_stream().unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/catalog/src/memory_table/tables.rs b/src/catalog/src/memory_table/tables.rs new file mode 100644 index 000000000000..0b979e8d1007 --- /dev/null +++ b/src/catalog/src/memory_table/tables.rs @@ -0,0 +1,63 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; + +pub fn string_columns(names: &[&'static str]) -> Vec { + names.iter().map(|name| string_column(name)).collect() +} + +pub fn string_column(name: &str) -> ColumnSchema { + ColumnSchema::new( + str::to_lowercase(name), + ConcreteDataType::string_datatype(), + false, + ) +} + +pub fn bigint_column(name: &str) -> ColumnSchema { + ColumnSchema::new( + str::to_lowercase(name), + ConcreteDataType::int64_datatype(), + false, + ) +} + +pub fn datetime_column(name: &str) -> ColumnSchema { + ColumnSchema::new( + str::to_lowercase(name), + ConcreteDataType::datetime_datatype(), + false, + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_string_columns() { + let columns = ["a", "b", "c"]; + let column_schemas = string_columns(&columns); + + assert_eq!(3, column_schemas.len()); + for (i, name) in columns.iter().enumerate() { + let cs = column_schemas.get(i).unwrap(); + + assert_eq!(*name, cs.name); + assert_eq!(ConcreteDataType::string_datatype(), cs.data_type); + } + } +} From e73d6fcd1fa923eb688cdedd8e19fbf8599b3b85 Mon Sep 17 00:00:00 2001 From: J0hn50n133 <0xjohnsonlee@gmail.com> Date: Tue, 9 Jul 2024 00:45:23 +0800 Subject: [PATCH 03/12] refactor: new system_schema mod to manage implementation of information_schema and pg_catalog --- src/catalog/src/kvbackend/manager.rs | 3 +- src/catalog/src/lib.rs | 10 +- src/catalog/src/memory/manager.rs | 1 + src/catalog/src/system_schema.rs | 152 ++++++++++ .../{ => system_schema}/information_schema.rs | 280 +++++++----------- .../information_schema/cluster_info.rs | 2 +- .../information_schema/columns.rs | 0 .../information_memory_table.rs | 46 +-- .../information_schema/key_column_usage.rs | 2 +- .../information_schema/partitions.rs | 2 +- .../information_schema/predicate.rs | 0 .../information_schema/region_peers.rs | 2 +- .../information_schema/runtime_metrics.rs | 0 .../information_schema/schemata.rs | 2 +- .../information_schema/table_constraints.rs | 0 .../information_schema/table_names.rs | 0 .../information_schema/tables.rs | 2 +- .../information_schema/utils.rs | 0 .../src/{ => system_schema}/memory_table.rs | 62 ++-- .../memory_table/tables.rs | 24 ++ .../src/{ => system_schema}/pg_catalog.rs | 69 +++-- .../pg_catalog/pg_catalog_memory_table.rs | 46 +++ .../pg_catalog/pg_class.rs | 4 +- .../pg_catalog/table_names.rs | 1 + src/common/catalog/src/consts.rs | 1 + 25 files changed, 439 insertions(+), 272 deletions(-) create mode 100644 src/catalog/src/system_schema.rs rename src/catalog/src/{ => system_schema}/information_schema.rs (65%) rename src/catalog/src/{ => system_schema}/information_schema/cluster_info.rs (99%) rename src/catalog/src/{ => system_schema}/information_schema/columns.rs (100%) rename src/catalog/src/{ => system_schema}/information_schema/information_memory_table.rs (92%) rename src/catalog/src/{ => system_schema}/information_schema/key_column_usage.rs (99%) rename src/catalog/src/{ => system_schema}/information_schema/partitions.rs (99%) rename src/catalog/src/{ => system_schema}/information_schema/predicate.rs (100%) rename src/catalog/src/{ => system_schema}/information_schema/region_peers.rs (99%) rename src/catalog/src/{ => system_schema}/information_schema/runtime_metrics.rs (100%) rename src/catalog/src/{ => system_schema}/information_schema/schemata.rs (99%) rename src/catalog/src/{ => system_schema}/information_schema/table_constraints.rs (100%) rename src/catalog/src/{ => system_schema}/information_schema/table_names.rs (100%) rename src/catalog/src/{ => system_schema}/information_schema/tables.rs (99%) rename src/catalog/src/{ => system_schema}/information_schema/utils.rs (100%) rename src/catalog/src/{ => system_schema}/memory_table.rs (89%) rename src/catalog/src/{ => system_schema}/memory_table/tables.rs (77%) rename src/catalog/src/{ => system_schema}/pg_catalog.rs (58%) create mode 100644 src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs rename src/catalog/src/{ => system_schema}/pg_catalog/pg_class.rs (94%) rename src/catalog/src/{ => system_schema}/pg_catalog/table_names.rs (95%) diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 3fc721b121f9..81f522a300cd 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -47,7 +47,8 @@ use crate::error::{ }; use crate::information_schema::InformationSchemaProvider; use crate::kvbackend::TableCacheRef; -use crate::pg_catalog::PGCatalogProvider; +use crate::system_schema::pg_catalog::PGCatalogProvider; +use crate::system_schema::SystemSchemaProvider; use crate::CatalogManager; /// Access all existing catalog, schema and tables. diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index f9012840ce00..e691ccd98376 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -28,14 +28,16 @@ use table::TableRef; use crate::error::Result; pub mod error; -pub mod information_schema; pub mod kvbackend; pub mod memory; -mod memory_table; mod metrics; -pub mod pg_catalog; -pub mod table_source; +pub mod system_schema; +pub mod information_schema { + // re-export to make it compatible with the legacy code + pub use crate::system_schema::information_schema::*; +} +pub mod table_source; #[async_trait::async_trait] pub trait CatalogManager: Send + Sync { fn as_any(&self) -> &dyn Any; diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index 97f64be6dc2d..a236a6b4b79f 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -29,6 +29,7 @@ use table::TableRef; use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; use crate::information_schema::InformationSchemaProvider; +use crate::system_schema::SystemSchemaProvider; use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest}; type SchemaEntries = HashMap>; diff --git a/src/catalog/src/system_schema.rs b/src/catalog/src/system_schema.rs new file mode 100644 index 000000000000..85c996455032 --- /dev/null +++ b/src/catalog/src/system_schema.rs @@ -0,0 +1,152 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod information_schema; +mod memory_table; +pub mod pg_catalog; + +use std::collections::HashMap; +use std::sync::Arc; + +use common_error::ext::BoxedError; +use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; +use datatypes::schema::SchemaRef; +use futures_util::StreamExt; +use snafu::ResultExt; +use store_api::data_source::DataSource; +use store_api::storage::ScanRequest; +use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu}; +use table::metadata::{TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType}; +use table::TableRef; + +use crate::error::Result; + +pub trait SystemSchemaProvider { + /// Returns a map of [TableRef] in information schema. + fn tables(&self) -> &HashMap; + + /// Returns the [TableRef] by table name. + fn table(&self, name: &str) -> Option { + self.tables().get(name).cloned() + } + + /// Returns table names in the order of table id. + fn table_names(&self) -> Vec { + let mut tables = self.tables().values().clone().collect::>(); + + tables.sort_by(|t1, t2| { + t1.table_info() + .table_id() + .partial_cmp(&t2.table_info().table_id()) + .unwrap() + }); + tables + .into_iter() + .map(|t| t.table_info().name.clone()) + .collect() + } +} + +trait SystemSchemaProviderInner { + fn schema_name() -> &'static str; + fn system_table(&self, name: &str) -> Option; + + fn table_info(catalog_name: String, table: &SystemTableRef) -> TableInfoRef { + let table_meta = TableMetaBuilder::default() + .schema(table.schema()) + .primary_key_indices(vec![]) + .next_column_id(0) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .table_id(table.table_id()) + .name(table.table_name().to_string()) + .catalog_name(catalog_name) + .schema_name(Self::schema_name().to_string()) + .meta(table_meta) + .table_type(table.table_type()) + .build() + .unwrap(); + Arc::new(table_info) + } +} + +pub(crate) trait SystemTable { + fn table_id(&self) -> TableId; + + fn table_name(&self) -> &'static str; + + fn schema(&self) -> SchemaRef; + + fn to_stream(&self, request: ScanRequest) -> Result; + + fn table_type(&self) -> TableType { + TableType::Temporary + } +} + +pub(crate) type SystemTableRef = Arc; + +struct SystemTableDataSource { + table: SystemTableRef, +} + +impl SystemTableDataSource { + fn new(table: SystemTableRef) -> Self { + Self { table } + } + + fn try_project(&self, projection: &[usize]) -> std::result::Result { + let schema = self + .table + .schema() + .try_project(projection) + .context(SchemaConversionSnafu) + .map_err(BoxedError::new)?; + Ok(Arc::new(schema)) + } +} + +impl DataSource for SystemTableDataSource { + fn get_stream( + &self, + request: ScanRequest, + ) -> std::result::Result { + let projection = request.projection.clone(); + let projected_schema = match &projection { + Some(projection) => self.try_project(projection)?, + None => self.table.schema(), + }; + + let stream = self + .table + .to_stream(request) + .map_err(BoxedError::new) + .context(TablesRecordBatchSnafu) + .map_err(BoxedError::new)? + .map(move |batch| match &projection { + Some(p) => batch.and_then(|b| b.try_project(p)), + None => batch, + }); + + let stream = RecordBatchStreamWrapper { + schema: projected_schema, + stream: Box::pin(stream), + output_ordering: None, + metrics: Default::default(), + }; + + Ok(Box::pin(stream)) + } +} diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/system_schema/information_schema.rs similarity index 65% rename from src/catalog/src/information_schema.rs rename to src/catalog/src/system_schema/information_schema.rs index 4b3ed20f8307..8bcd00dad9c0 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/system_schema/information_schema.rs @@ -30,35 +30,30 @@ use std::collections::HashMap; use std::sync::{Arc, Weak}; use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME}; -use common_error::ext::BoxedError; -use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; +use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::SchemaRef; -use futures_util::StreamExt; use lazy_static::lazy_static; use paste::paste; pub(crate) use predicate::Predicates; -use snafu::ResultExt; -use store_api::data_source::DataSource; use store_api::storage::{ScanRequest, TableId}; -use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu}; -use table::metadata::{ - FilterPushDownType, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, -}; +use table::metadata::{FilterPushDownType, TableType}; use table::{Table, TableRef}; pub use table_names::*; use self::columns::InformationSchemaColumns; +use super::{SystemSchemaProviderInner, SystemTable, SystemTableDataSource, SystemTableRef}; use crate::error::Result; -use crate::information_schema::cluster_info::InformationSchemaClusterInfo; -use crate::information_schema::information_memory_table::get_schema_columns; -use crate::information_schema::key_column_usage::InformationSchemaKeyColumnUsage; -use crate::information_schema::partitions::InformationSchemaPartitions; -use crate::information_schema::region_peers::InformationSchemaRegionPeers; -use crate::information_schema::runtime_metrics::InformationSchemaMetrics; -use crate::information_schema::schemata::InformationSchemaSchemata; -use crate::information_schema::table_constraints::InformationSchemaTableConstraints; -use crate::information_schema::tables::InformationSchemaTables; -use crate::memory_table::MemoryTable; +use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo; +use crate::system_schema::information_schema::information_memory_table::get_schema_columns; +use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage; +use crate::system_schema::information_schema::partitions::InformationSchemaPartitions; +use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers; +use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics; +use crate::system_schema::information_schema::schemata::InformationSchemaSchemata; +use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints; +use crate::system_schema::information_schema::tables::InformationSchemaTables; +use crate::system_schema::memory_table::MemoryTable; +use crate::system_schema::SystemSchemaProvider; use crate::CatalogManager; lazy_static! { @@ -110,104 +105,19 @@ pub struct InformationSchemaProvider { tables: HashMap, } -impl InformationSchemaProvider { - pub fn new(catalog_name: String, catalog_manager: Weak) -> Self { - let mut provider = Self { - catalog_name, - catalog_manager, - tables: HashMap::new(), - }; - - provider.build_tables(); - - provider - } - - /// Returns table names in the order of table id. - pub fn table_names(&self) -> Vec { - let mut tables = self.tables.values().clone().collect::>(); - - tables.sort_by(|t1, t2| { - t1.table_info() - .table_id() - .partial_cmp(&t2.table_info().table_id()) - .unwrap() - }); - tables - .into_iter() - .map(|t| t.table_info().name.clone()) - .collect() - } - - /// Returns a map of [TableRef] in information schema. - pub fn tables(&self) -> &HashMap { +impl SystemSchemaProvider for InformationSchemaProvider { + fn tables(&self) -> &HashMap { assert!(!self.tables.is_empty()); &self.tables } - - /// Returns the [TableRef] by table name. - pub fn table(&self, name: &str) -> Option { - self.tables.get(name).cloned() - } - - fn build_tables(&mut self) { - let mut tables = HashMap::new(); - - // SECURITY NOTE: - // Carefully consider the tables that may expose sensitive cluster configurations, - // authentication details, and other critical information. - // Only put these tables under `greptime` catalog to prevent info leak. - if self.catalog_name == DEFAULT_CATALOG_NAME { - tables.insert( - RUNTIME_METRICS.to_string(), - self.build_table(RUNTIME_METRICS).unwrap(), - ); - tables.insert( - BUILD_INFO.to_string(), - self.build_table(BUILD_INFO).unwrap(), - ); - tables.insert( - REGION_PEERS.to_string(), - self.build_table(REGION_PEERS).unwrap(), - ); - tables.insert( - CLUSTER_INFO.to_string(), - self.build_table(CLUSTER_INFO).unwrap(), - ); - } - - tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap()); - tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap()); - tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap()); - tables.insert( - KEY_COLUMN_USAGE.to_string(), - self.build_table(KEY_COLUMN_USAGE).unwrap(), - ); - tables.insert( - TABLE_CONSTRAINTS.to_string(), - self.build_table(TABLE_CONSTRAINTS).unwrap(), - ); - - // Add memory tables - for name in MEMORY_TABLES.iter() { - tables.insert((*name).to_string(), self.build_table(name).expect(name)); - } - - self.tables = tables; - } - - fn build_table(&self, name: &str) -> Option { - self.information_table(name).map(|table| { - let table_info = Self::table_info(self.catalog_name.clone(), &table); - let filter_pushdown = FilterPushDownType::Inexact; - let data_source = Arc::new(InformationTableDataSource::new(table)); - let table = Table::new(table_info, filter_pushdown, data_source); - Arc::new(table) - }) +} +impl SystemSchemaProviderInner for InformationSchemaProvider { + fn schema_name() -> &'static str { + INFORMATION_SCHEMA_NAME } - fn information_table(&self, name: &str) -> Option { + fn system_table(&self, name: &str) -> Option { match name.to_ascii_lowercase().as_str() { TABLES => Some(Arc::new(InformationSchemaTables::new( self.catalog_name.clone(), @@ -266,24 +176,75 @@ impl InformationSchemaProvider { _ => None, } } +} + +impl InformationSchemaProvider { + pub fn new(catalog_name: String, catalog_manager: Weak) -> Self { + let mut provider = Self { + catalog_name, + catalog_manager, + tables: HashMap::new(), + }; + + provider.build_tables(); + + provider + } + + fn build_tables(&mut self) { + let mut tables = HashMap::new(); + + // SECURITY NOTE: + // Carefully consider the tables that may expose sensitive cluster configurations, + // authentication details, and other critical information. + // Only put these tables under `greptime` catalog to prevent info leak. + if self.catalog_name == DEFAULT_CATALOG_NAME { + tables.insert( + RUNTIME_METRICS.to_string(), + self.build_table(RUNTIME_METRICS).unwrap(), + ); + tables.insert( + BUILD_INFO.to_string(), + self.build_table(BUILD_INFO).unwrap(), + ); + tables.insert( + REGION_PEERS.to_string(), + self.build_table(REGION_PEERS).unwrap(), + ); + tables.insert( + CLUSTER_INFO.to_string(), + self.build_table(CLUSTER_INFO).unwrap(), + ); + } + + tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap()); + tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap()); + tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap()); + tables.insert( + KEY_COLUMN_USAGE.to_string(), + self.build_table(KEY_COLUMN_USAGE).unwrap(), + ); + tables.insert( + TABLE_CONSTRAINTS.to_string(), + self.build_table(TABLE_CONSTRAINTS).unwrap(), + ); + + // Add memory tables + for name in MEMORY_TABLES.iter() { + tables.insert((*name).to_string(), self.build_table(name).expect(name)); + } + + self.tables = tables; + } - fn table_info(catalog_name: String, table: &InformationTableRef) -> TableInfoRef { - let table_meta = TableMetaBuilder::default() - .schema(table.schema()) - .primary_key_indices(vec![]) - .next_column_id(0) - .build() - .unwrap(); - let table_info = TableInfoBuilder::default() - .table_id(table.table_id()) - .name(table.table_name().to_string()) - .catalog_name(catalog_name) - .schema_name(INFORMATION_SCHEMA_NAME.to_string()) - .meta(table_meta) - .table_type(table.table_type()) - .build() - .unwrap(); - Arc::new(table_info) + fn build_table(&self, name: &str) -> Option { + self.system_table(name).map(|table| { + let table_info = Self::table_info(self.catalog_name.clone(), &table); + let filter_pushdown = FilterPushDownType::Inexact; + let data_source = Arc::new(SystemTableDataSource::new(table)); + let table = Table::new(table_info, filter_pushdown, data_source); + Arc::new(table) + }) } } @@ -301,57 +262,28 @@ trait InformationTable { } } -type InformationTableRef = Arc; - -struct InformationTableDataSource { - table: InformationTableRef, -} - -impl InformationTableDataSource { - fn new(table: InformationTableRef) -> Self { - Self { table } +// Provide compatibility for legacy `information_schema` code. +impl SystemTable for T +where + T: InformationTable, +{ + fn table_id(&self) -> TableId { + InformationTable::table_id(self) } - fn try_project(&self, projection: &[usize]) -> std::result::Result { - let schema = self - .table - .schema() - .try_project(projection) - .context(SchemaConversionSnafu) - .map_err(BoxedError::new)?; - Ok(Arc::new(schema)) + fn table_name(&self) -> &'static str { + InformationTable::table_name(self) } -} - -impl DataSource for InformationTableDataSource { - fn get_stream( - &self, - request: ScanRequest, - ) -> std::result::Result { - let projection = request.projection.clone(); - let projected_schema = match &projection { - Some(projection) => self.try_project(projection)?, - None => self.table.schema(), - }; - let stream = self - .table - .to_stream(request) - .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu) - .map_err(BoxedError::new)? - .map(move |batch| match &projection { - Some(p) => batch.and_then(|b| b.try_project(p)), - None => batch, - }); + fn schema(&self) -> SchemaRef { + InformationTable::schema(self) + } - let stream = RecordBatchStreamWrapper { - schema: projected_schema, - stream: Box::pin(stream), - output_ordering: None, - metrics: Default::default(), - }; + fn table_type(&self) -> TableType { + InformationTable::table_type(self) + } - Ok(Box::pin(stream)) + fn to_stream(&self, request: ScanRequest) -> Result { + InformationTable::to_stream(self, request) } } diff --git a/src/catalog/src/information_schema/cluster_info.rs b/src/catalog/src/system_schema/information_schema/cluster_info.rs similarity index 99% rename from src/catalog/src/information_schema/cluster_info.rs rename to src/catalog/src/system_schema/information_schema/cluster_info.rs index 6959ad1bd595..e878b94baf4a 100644 --- a/src/catalog/src/information_schema/cluster_info.rs +++ b/src/catalog/src/system_schema/information_schema/cluster_info.rs @@ -41,7 +41,7 @@ use store_api::storage::{ScanRequest, TableId}; use super::CLUSTER_INFO; use crate::error::{CreateRecordBatchSnafu, InternalSnafu, ListNodesSnafu, Result}; -use crate::information_schema::{utils, InformationTable, Predicates}; +use crate::system_schema::information_schema::{utils, InformationTable, Predicates}; use crate::CatalogManager; const PEER_ID: &str = "peer_id"; diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/system_schema/information_schema/columns.rs similarity index 100% rename from src/catalog/src/information_schema/columns.rs rename to src/catalog/src/system_schema/information_schema/columns.rs diff --git a/src/catalog/src/information_schema/information_memory_table.rs b/src/catalog/src/system_schema/information_schema/information_memory_table.rs similarity index 92% rename from src/catalog/src/information_schema/information_memory_table.rs rename to src/catalog/src/system_schema/information_schema/information_memory_table.rs index 5bfd94707904..021704f2fec1 100644 --- a/src/catalog/src/information_schema/information_memory_table.rs +++ b/src/catalog/src/system_schema/information_schema/information_memory_table.rs @@ -15,17 +15,13 @@ use std::sync::Arc; use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE}; -use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::{Schema, SchemaRef}; use datatypes::vectors::{Int64Vector, StringVector, VectorRef}; -use store_api::storage::{ScanRequest, TableId}; use super::table_names::*; -use super::InformationTable; -// pub use tables::get_schema_columns; -use crate::error::Result; -use crate::memory_table::tables::{bigint_column, datetime_column, string_column, string_columns}; -use crate::memory_table::MemoryTable; +use crate::system_schema::memory_table::tables::{ + bigint_column, datetime_column, string_column, string_columns, +}; const NO_VALUE: &str = "NO"; @@ -420,39 +416,3 @@ pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec (Arc::new(Schema::new(column_schemas)), columns) } - -impl InformationTable for MemoryTable { - fn table_id(&self) -> TableId { - self.table_id - } - - fn table_name(&self) -> &'static str { - self.table_name - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn to_stream(&self, _request: ScanRequest) -> Result { - self.to_stream() - } -} - -#[cfg(test)] -mod tests { - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::ColumnSchema; - - use super::*; - #[test] - fn test_information_memory_table_schema() { - let schema = Arc::new(Schema::new(vec![ - ColumnSchema::new("a", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("b", ConcreteDataType::string_datatype(), false), - ])); - - let table = MemoryTable::new(42, "test", schema.clone(), vec![]); - assert_eq!(schema, InformationTable::schema(&table)); - } -} diff --git a/src/catalog/src/information_schema/key_column_usage.rs b/src/catalog/src/system_schema/information_schema/key_column_usage.rs similarity index 99% rename from src/catalog/src/information_schema/key_column_usage.rs rename to src/catalog/src/system_schema/information_schema/key_column_usage.rs index 5cefc7449af2..a77986afce24 100644 --- a/src/catalog/src/information_schema/key_column_usage.rs +++ b/src/catalog/src/system_schema/information_schema/key_column_usage.rs @@ -34,7 +34,7 @@ use super::KEY_COLUMN_USAGE; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::{InformationTable, Predicates}; +use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; pub const CONSTRAINT_SCHEMA: &str = "constraint_schema"; diff --git a/src/catalog/src/information_schema/partitions.rs b/src/catalog/src/system_schema/information_schema/partitions.rs similarity index 99% rename from src/catalog/src/information_schema/partitions.rs rename to src/catalog/src/system_schema/information_schema/partitions.rs index ed918442d1b5..3e49a2ddbd3b 100644 --- a/src/catalog/src/information_schema/partitions.rs +++ b/src/catalog/src/system_schema/information_schema/partitions.rs @@ -44,8 +44,8 @@ use crate::error::{ CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::{InformationTable, Predicates}; use crate::kvbackend::KvBackendCatalogManager; +use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; const TABLE_CATALOG: &str = "table_catalog"; diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/system_schema/information_schema/predicate.rs similarity index 100% rename from src/catalog/src/information_schema/predicate.rs rename to src/catalog/src/system_schema/information_schema/predicate.rs diff --git a/src/catalog/src/information_schema/region_peers.rs b/src/catalog/src/system_schema/information_schema/region_peers.rs similarity index 99% rename from src/catalog/src/information_schema/region_peers.rs rename to src/catalog/src/system_schema/information_schema/region_peers.rs index 06b55ebf438f..4bcc28144701 100644 --- a/src/catalog/src/information_schema/region_peers.rs +++ b/src/catalog/src/system_schema/information_schema/region_peers.rs @@ -39,8 +39,8 @@ use crate::error::{ CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::{InformationTable, Predicates}; use crate::kvbackend::KvBackendCatalogManager; +use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; const REGION_ID: &str = "region_id"; diff --git a/src/catalog/src/information_schema/runtime_metrics.rs b/src/catalog/src/system_schema/information_schema/runtime_metrics.rs similarity index 100% rename from src/catalog/src/information_schema/runtime_metrics.rs rename to src/catalog/src/system_schema/information_schema/runtime_metrics.rs diff --git a/src/catalog/src/information_schema/schemata.rs b/src/catalog/src/system_schema/information_schema/schemata.rs similarity index 99% rename from src/catalog/src/information_schema/schemata.rs rename to src/catalog/src/system_schema/information_schema/schemata.rs index d4151e827916..1cccd28999a8 100644 --- a/src/catalog/src/information_schema/schemata.rs +++ b/src/catalog/src/system_schema/information_schema/schemata.rs @@ -36,7 +36,7 @@ use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::{utils, InformationTable, Predicates}; +use crate::system_schema::information_schema::{utils, InformationTable, Predicates}; use crate::CatalogManager; pub const CATALOG_NAME: &str = "catalog_name"; diff --git a/src/catalog/src/information_schema/table_constraints.rs b/src/catalog/src/system_schema/information_schema/table_constraints.rs similarity index 100% rename from src/catalog/src/information_schema/table_constraints.rs rename to src/catalog/src/system_schema/information_schema/table_constraints.rs diff --git a/src/catalog/src/information_schema/table_names.rs b/src/catalog/src/system_schema/information_schema/table_names.rs similarity index 100% rename from src/catalog/src/information_schema/table_names.rs rename to src/catalog/src/system_schema/information_schema/table_names.rs diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/system_schema/information_schema/tables.rs similarity index 99% rename from src/catalog/src/information_schema/tables.rs rename to src/catalog/src/system_schema/information_schema/tables.rs index ef8eee267289..d9b05b80352f 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/system_schema/information_schema/tables.rs @@ -38,7 +38,7 @@ use super::TABLES; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::{InformationTable, Predicates}; +use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; pub const TABLE_CATALOG: &str = "table_catalog"; diff --git a/src/catalog/src/information_schema/utils.rs b/src/catalog/src/system_schema/information_schema/utils.rs similarity index 100% rename from src/catalog/src/information_schema/utils.rs rename to src/catalog/src/system_schema/information_schema/utils.rs diff --git a/src/catalog/src/memory_table.rs b/src/catalog/src/system_schema/memory_table.rs similarity index 89% rename from src/catalog/src/memory_table.rs rename to src/catalog/src/system_schema/memory_table.rs index 563ca104c109..666acc7f99e1 100644 --- a/src/catalog/src/memory_table.rs +++ b/src/catalog/src/system_schema/memory_table.rs @@ -26,8 +26,9 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc use datatypes::schema::SchemaRef; use datatypes::vectors::VectorRef; use snafu::ResultExt; -use store_api::storage::TableId; +use store_api::storage::{ScanRequest, TableId}; +use super::SystemTable; use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; /// A memory table with specified schema and columns. @@ -57,25 +58,6 @@ impl MemoryTable { pub fn builder(&self) -> MemoryTableBuilder { MemoryTableBuilder::new(self.schema.clone(), self.columns.clone()) } - pub fn to_stream(&self) -> Result { - let schema = self.schema.arrow_schema().clone(); - let mut builder = self.builder(); - let stream = Box::pin(DfRecordBatchStreamAdapter::new( - schema, - futures::stream::once(async move { - builder - .memory_records() - .await - .map(|x| x.into_df_record_batch()) - .map_err(Into::into) - }), - )); - Ok(Box::pin( - RecordBatchStreamAdapter::try_new(stream) - .map_err(BoxedError::new) - .context(InternalSnafu)?, - )) - } } pub(crate) struct MemoryTableBuilder { @@ -120,6 +102,40 @@ impl DfPartitionStream for MemoryTable { } } +impl SystemTable for MemoryTable { + fn table_id(&self) -> TableId { + self.table_id + } + + fn table_name(&self) -> &'static str { + self.table_name + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self, _request: ScanRequest) -> Result { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .memory_records() + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -130,6 +146,7 @@ mod tests { use datatypes::vectors::StringVector; use super::*; + use crate::system_schema::SystemTable; #[tokio::test] async fn test_memory_table() { @@ -149,9 +166,10 @@ mod tests { ); assert_eq!(42, table.table_id); + assert_eq!(schema, SystemTable::schema(&table)); assert_eq!("test", table.table_name); - let stream = table.to_stream().unwrap(); + let stream = table.to_stream(ScanRequest::default()).unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -179,7 +197,7 @@ mod tests { assert_eq!(42, table.table_id); assert_eq!("test", table.table_name); - let stream = table.to_stream().unwrap(); + let stream = table.to_stream(ScanRequest::default()).unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/catalog/src/memory_table/tables.rs b/src/catalog/src/system_schema/memory_table/tables.rs similarity index 77% rename from src/catalog/src/memory_table/tables.rs rename to src/catalog/src/system_schema/memory_table/tables.rs index 0b979e8d1007..3c87c37d7a0c 100644 --- a/src/catalog/src/memory_table/tables.rs +++ b/src/catalog/src/system_schema/memory_table/tables.rs @@ -27,6 +27,30 @@ pub fn string_column(name: &str) -> ColumnSchema { ) } +pub fn u32_column(name: &str) -> ColumnSchema { + ColumnSchema::new( + str::to_lowercase(name), + ConcreteDataType::uint32_datatype(), + false, + ) +} + +pub fn bool_column(name: &str) -> ColumnSchema { + ColumnSchema::new( + str::to_lowercase(name), + ConcreteDataType::boolean_datatype(), + false, + ) +} + +pub fn i16_column(name: &str) -> ColumnSchema { + ColumnSchema::new( + str::to_lowercase(name), + ConcreteDataType::int16_datatype(), + false, + ) +} + pub fn bigint_column(name: &str) -> ColumnSchema { ColumnSchema::new( str::to_lowercase(name), diff --git a/src/catalog/src/pg_catalog.rs b/src/catalog/src/system_schema/pg_catalog.rs similarity index 58% rename from src/catalog/src/pg_catalog.rs rename to src/catalog/src/system_schema/pg_catalog.rs index 6f0332c04dcf..ccac106dee3a 100644 --- a/src/catalog/src/pg_catalog.rs +++ b/src/catalog/src/system_schema/pg_catalog.rs @@ -12,27 +12,36 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod pg_catalog_memory_table; mod pg_class; mod table_names; use std::collections::HashMap; use std::sync::{Arc, Weak}; -use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_recordbatch::SendableRecordBatchStream; -use datatypes::schema::SchemaRef; +use datatypes::schema::{ColumnSchema, SchemaRef}; +use lazy_static::lazy_static; use store_api::storage::{ScanRequest, TableId}; use table::metadata::TableType; use table::TableRef; -use self::table_names::PG_CLASS; +use super::memory_table::tables::u32_column; use crate::error::Result; use crate::CatalogManager; +lazy_static! { + static ref MEMORY_TABLES: &'static [&'static str] = &[table_names::PG_TYPE]; +} + /// The column name for the OID column. /// The OID column is a unique identifier of type u32 for each object in the database. const OID_COLUMN_NAME: &str = "oid"; +fn oid_column() -> ColumnSchema { + u32_column(OID_COLUMN_NAME) +} + /// [`PGCatalogProvider`] is the provider for a schema named `pg_catalog`, it is not a catalog. pub struct PGCatalogProvider { catalog_name: String, @@ -40,6 +49,23 @@ pub struct PGCatalogProvider { tables: HashMap, } +// TODO(j0hn50n133): Not sure whether to avoid duplication with `information_schema` or not. +macro_rules! setup_memory_table { + ($name: expr) => { + paste! { + { + let (schema, columns) = get_schema_columns($name); + Some(Arc::new(MemoryTable::new( + consts::[], + $name, + schema, + columns + )) as _) + } + } + }; +} + impl PGCatalogProvider { pub fn new(catalog_name: String, catalog_manager: Weak) -> Self { let mut provider = Self { @@ -53,39 +79,42 @@ impl PGCatalogProvider { fn build_tables(&mut self) { // SECURITY NOTE: - // Follow the same security rules as [InformationSchemaProvider::build_tables]. - if self.catalog_name == DEFAULT_CATALOG_NAME { - self.tables.insert( - table_names::PG_CLASS.to_string(), - self.build_table(PG_CLASS).unwrap(), - ); + // Must follow the same security rules as [`InformationSchemaProvider::build_tables`]. + let mut tables = HashMap::new(); + for name in MEMORY_TABLES.iter() { + tables.insert(name.to_string(), self.build_table(name).expect(name)); } + self.tables = tables; } fn build_table(&self, name: &str) -> Option { - // self. - todo!() + self.pg_catalog_table(name).map(|table| todo!()) } pub fn table_names(&self) -> Vec { - // TODO(j0hn50n133): replace hardcoded table names with collected table names - vec![ - table_names::PG_DATABASE.to_string(), - table_names::PG_NAMESPACE.to_string(), - table_names::PG_CLASS.to_string(), - ] + let mut tables = self.tables.values().clone().collect::>(); + tables.sort_by(|t1, t2| { + t1.table_info() + .table_id() + .partial_cmp(&t2.table_info().table_id()) + .unwrap() + }); + tables + .into_iter() + .map(|t| t.table_info().name.clone()) + .collect() } - pub fn pg_catalog_table(&self, name: &str) -> Option { + fn pg_catalog_table(&self, name: &str) -> Option { match name.to_ascii_lowercase().as_str() { - table_names::PG_CLASS => todo!(), + table_names::PG_TYPE => todo!(), _ => None, } } /// Returns the [TableRef] by table name. pub fn table(&self, name: &str) -> Option { - todo!() + self.tables.get(name).cloned() } } diff --git a/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs b/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs new file mode 100644 index 000000000000..41e52bcc8fd3 --- /dev/null +++ b/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs @@ -0,0 +1,46 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use datatypes::schema::{Schema, SchemaRef}; +use datatypes::vectors::{BooleanVector, Int16Vector, StringVector, UInt32Vector, VectorRef}; + +use super::oid_column; +use super::table_names::PG_TYPE; +use crate::system_schema::memory_table::tables::{bool_column, i16_column, string_column}; + +pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { + //TODO(j0hn50n133): u32_column("typnamespace"), we don't have such thing as namespace id or database id. + let (column_schemas, columns): (_, Vec) = match table_name { + PG_TYPE => ( + vec![ + oid_column(), + string_column("typname"), + i16_column("typlen"), + bool_column("typbyval"), + bool_column("typisdefined"), + ], + vec![ + Arc::new(UInt32Vector::from_vec(vec![16])), // oid + Arc::new(StringVector::from(vec!["bool"])), // typlen + Arc::new(Int16Vector::from_vec(vec![1])), // typname + Arc::new(BooleanVector::from(vec![true])), // typbyval + Arc::new(BooleanVector::from(vec![true])), // typisdefined + ], + ), + _ => unreachable!("Unknown table in pg_catalog: {}", table_name), + }; + (Arc::new(Schema::new(column_schemas)), columns) +} diff --git a/src/catalog/src/pg_catalog/pg_class.rs b/src/catalog/src/system_schema/pg_catalog/pg_class.rs similarity index 94% rename from src/catalog/src/pg_catalog/pg_class.rs rename to src/catalog/src/system_schema/pg_catalog/pg_class.rs index 574e693965e3..73d1653f9050 100644 --- a/src/catalog/src/pg_catalog/pg_class.rs +++ b/src/catalog/src/system_schema/pg_catalog/pg_class.rs @@ -20,7 +20,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use store_api::storage::{ConcreteDataType, ScanRequest}; use super::table_names::PG_CLASS; -use super::{PGCatalogTable, OID_COLUMN_NAME}; +use super::{oid_column, PGCatalogTable}; use crate::error::Result; use crate::CatalogManager; @@ -46,7 +46,7 @@ impl PGCatalogPgClass { pub(super) fn schema() -> SchemaRef { Arc::new(Schema::new(vec![ - ColumnSchema::new(OID_COLUMN_NAME, ConcreteDataType::uint32_datatype(), false), + oid_column(), ColumnSchema::new(CLASS_RELNAME, ConcreteDataType::string_datatype(), false), ColumnSchema::new( CLASS_RELNAMESPACE, diff --git a/src/catalog/src/pg_catalog/table_names.rs b/src/catalog/src/system_schema/pg_catalog/table_names.rs similarity index 95% rename from src/catalog/src/pg_catalog/table_names.rs rename to src/catalog/src/system_schema/pg_catalog/table_names.rs index 667b85f15b35..c5cb720f3bb9 100644 --- a/src/catalog/src/pg_catalog/table_names.rs +++ b/src/catalog/src/system_schema/pg_catalog/table_names.rs @@ -15,3 +15,4 @@ pub const PG_DATABASE: &str = "pg_databases"; pub const PG_NAMESPACE: &str = "pg_namespace"; pub const PG_CLASS: &str = "pg_class"; +pub const PG_TYPE: &str = "pg_type"; diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 8121a5ea2873..04e22a8e1815 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -101,6 +101,7 @@ pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31; pub const PG_CATALOG_TABLE_ID_MASK: u32 = 0x200; /// id for pg_catalog.pg_class pub const PG_CATALOG_PG_CLASS_TABLE_ID: u32 = PG_CATALOG_TABLE_ID_MASK | 1; +pub const PG_CATALOG_PG_TYPE_TABLE_ID: u32 = PG_CATALOG_TABLE_ID_MASK | 2; /// ----- End of pg_catalog tables ----- pub const MITO_ENGINE: &str = "mito"; pub const MITO2_ENGINE: &str = "mito2"; From d5bdd65bbb5e65f79dd57b1c3e4562a434ce918d Mon Sep 17 00:00:00 2001 From: J0hn50n133 <0xjohnsonlee@gmail.com> Date: Wed, 10 Jul 2024 15:31:20 +0800 Subject: [PATCH 04/12] feat: pg_catalog.pg_type --- src/catalog/src/kvbackend/manager.rs | 3 + src/catalog/src/system_schema.rs | 16 ++++- .../src/system_schema/information_schema.rs | 19 ++---- src/catalog/src/system_schema/memory_table.rs | 2 + .../memory_table/table_columns.rs | 50 ++++++++++++++ src/catalog/src/system_schema/pg_catalog.rs | 68 +++++++------------ .../pg_catalog/pg_catalog_memory_table.rs | 60 ++++++++++------ .../src/system_schema/pg_catalog/pg_class.rs | 27 +------- 8 files changed, 142 insertions(+), 103 deletions(-) create mode 100644 src/catalog/src/system_schema/memory_table/table_columns.rs diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 81f522a300cd..8d2dae38cc4b 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -357,6 +357,9 @@ impl SystemCatalog { )) }); information_schema_provider.table(table_name) + } else if schema == PG_CATALOG_NAME { + PGCatalogProvider::new(catalog.to_string(), self.catalog_manager.clone()) + .table(table_name) } else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME { Some(NumbersTable::table(NUMBERS_TABLE_ID)) } else { diff --git a/src/catalog/src/system_schema.rs b/src/catalog/src/system_schema.rs index 85c996455032..d804d9c97ca4 100644 --- a/src/catalog/src/system_schema.rs +++ b/src/catalog/src/system_schema.rs @@ -27,8 +27,10 @@ use snafu::ResultExt; use store_api::data_source::DataSource; use store_api::storage::ScanRequest; use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu}; -use table::metadata::{TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType}; -use table::TableRef; +use table::metadata::{ + FilterPushDownType, TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, +}; +use table::{Table, TableRef}; use crate::error::Result; @@ -59,7 +61,17 @@ pub trait SystemSchemaProvider { } trait SystemSchemaProviderInner { + fn catalog_name(&self) -> &str; fn schema_name() -> &'static str; + fn build_table(&self, name: &str) -> Option { + self.system_table(name).map(|table| { + let table_info = Self::table_info(self.catalog_name().to_string(), &table); + let filter_pushdown = FilterPushDownType::Inexact; + let data_source = Arc::new(SystemTableDataSource::new(table)); + let table = Table::new(table_info, filter_pushdown, data_source); + Arc::new(table) + }) + } fn system_table(&self, name: &str) -> Option; fn table_info(catalog_name: String, table: &SystemTableRef) -> TableInfoRef { diff --git a/src/catalog/src/system_schema/information_schema.rs b/src/catalog/src/system_schema/information_schema.rs index 8bcd00dad9c0..6dc2a8f8226a 100644 --- a/src/catalog/src/system_schema/information_schema.rs +++ b/src/catalog/src/system_schema/information_schema.rs @@ -36,12 +36,12 @@ use lazy_static::lazy_static; use paste::paste; pub(crate) use predicate::Predicates; use store_api::storage::{ScanRequest, TableId}; -use table::metadata::{FilterPushDownType, TableType}; -use table::{Table, TableRef}; +use table::metadata::TableType; +use table::TableRef; pub use table_names::*; use self::columns::InformationSchemaColumns; -use super::{SystemSchemaProviderInner, SystemTable, SystemTableDataSource, SystemTableRef}; +use super::{SystemSchemaProviderInner, SystemTable, SystemTableRef}; use crate::error::Result; use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo; use crate::system_schema::information_schema::information_memory_table::get_schema_columns; @@ -113,6 +113,9 @@ impl SystemSchemaProvider for InformationSchemaProvider { } } impl SystemSchemaProviderInner for InformationSchemaProvider { + fn catalog_name(&self) -> &str { + &self.catalog_name + } fn schema_name() -> &'static str { INFORMATION_SCHEMA_NAME } @@ -236,16 +239,6 @@ impl InformationSchemaProvider { self.tables = tables; } - - fn build_table(&self, name: &str) -> Option { - self.system_table(name).map(|table| { - let table_info = Self::table_info(self.catalog_name.clone(), &table); - let filter_pushdown = FilterPushDownType::Inexact; - let data_source = Arc::new(SystemTableDataSource::new(table)); - let table = Table::new(table_info, filter_pushdown, data_source); - Arc::new(table) - }) - } } trait InformationTable { diff --git a/src/catalog/src/system_schema/memory_table.rs b/src/catalog/src/system_schema/memory_table.rs index 666acc7f99e1..1d6a71ac8165 100644 --- a/src/catalog/src/system_schema/memory_table.rs +++ b/src/catalog/src/system_schema/memory_table.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod table_columns; pub mod tables; + use std::sync::Arc; use arrow_schema::SchemaRef as ArrowSchemaRef; diff --git a/src/catalog/src/system_schema/memory_table/table_columns.rs b/src/catalog/src/system_schema/memory_table/table_columns.rs new file mode 100644 index 000000000000..75c2baf6334d --- /dev/null +++ b/src/catalog/src/system_schema/memory_table/table_columns.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[macro_export] +macro_rules! memory_table_cols{ + ([$($colname:ident),*], $t:expr) => { + let t = &$t; + $( + let mut $colname = Vec::with_capacity(t.len()); + )* + paste::paste!{ + for &($([]),*) in t { + $( + $colname.push([]); + )* + } + } + }; +} + +#[cfg(test)] +mod tests { + + #[test] + fn test_memory_table_columns() { + memory_table_cols!( + [oid, typname, typlen], + [ + (1, "String", -1), + (2, "Binary", -1), + (3, "Time", 8), + (4, "Datetime", 8) + ] + ); + assert_eq!(&oid[..], &[1, 2, 3, 4]); + assert_eq!(&typname[..], &["String", "Binary", "Time", "Datetime"]); + assert_eq!(&typlen[..], &[-1, -1, 8, 8]); + } +} diff --git a/src/catalog/src/system_schema/pg_catalog.rs b/src/catalog/src/system_schema/pg_catalog.rs index ccac106dee3a..851aa1bde59a 100644 --- a/src/catalog/src/system_schema/pg_catalog.rs +++ b/src/catalog/src/system_schema/pg_catalog.rs @@ -19,15 +19,17 @@ mod table_names; use std::collections::HashMap; use std::sync::{Arc, Weak}; -use common_recordbatch::SendableRecordBatchStream; -use datatypes::schema::{ColumnSchema, SchemaRef}; +use common_catalog::consts::{self, PG_CATALOG_NAME}; +use datatypes::schema::ColumnSchema; use lazy_static::lazy_static; -use store_api::storage::{ScanRequest, TableId}; -use table::metadata::TableType; +use paste::paste; +use pg_catalog_memory_table::get_schema_columns; use table::TableRef; +pub use table_names::*; use super::memory_table::tables::u32_column; -use crate::error::Result; +use super::memory_table::MemoryTable; +use super::{SystemSchemaProvider, SystemSchemaProviderInner, SystemTableRef}; use crate::CatalogManager; lazy_static! { @@ -49,6 +51,14 @@ pub struct PGCatalogProvider { tables: HashMap, } +impl SystemSchemaProvider for PGCatalogProvider { + fn tables(&self) -> &HashMap { + assert!(!self.tables.is_empty()); + + &self.tables + } +} + // TODO(j0hn50n133): Not sure whether to avoid duplication with `information_schema` or not. macro_rules! setup_memory_table { ($name: expr) => { @@ -86,51 +96,21 @@ impl PGCatalogProvider { } self.tables = tables; } +} - fn build_table(&self, name: &str) -> Option { - self.pg_catalog_table(name).map(|table| todo!()) - } - - pub fn table_names(&self) -> Vec { - let mut tables = self.tables.values().clone().collect::>(); - tables.sort_by(|t1, t2| { - t1.table_info() - .table_id() - .partial_cmp(&t2.table_info().table_id()) - .unwrap() - }); - tables - .into_iter() - .map(|t| t.table_info().name.clone()) - .collect() +impl SystemSchemaProviderInner for PGCatalogProvider { + fn schema_name() -> &'static str { + PG_CATALOG_NAME } - fn pg_catalog_table(&self, name: &str) -> Option { - match name.to_ascii_lowercase().as_str() { - table_names::PG_TYPE => todo!(), + fn system_table(&self, name: &str) -> Option { + match name { + table_names::PG_TYPE => setup_memory_table!(PG_TYPE), _ => None, } } - /// Returns the [TableRef] by table name. - pub fn table(&self, name: &str) -> Option { - self.tables.get(name).cloned() + fn catalog_name(&self) -> &str { + &self.catalog_name } } - -/// The trait for all tables in the `pg_catalog` schema. -trait PGCatalogTable { - fn table_id(&self) -> TableId; - - fn table_name(&self) -> &'static str; - - fn schema(&self) -> SchemaRef; - - fn to_stream(&self, request: ScanRequest) -> Result; - - fn table_type(&self) -> TableType { - TableType::Temporary - } -} - -type PGCatalogTableRef = Arc; diff --git a/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs b/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs index 41e52bcc8fd3..31700d8ce864 100644 --- a/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs +++ b/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs @@ -14,32 +14,54 @@ use std::sync::Arc; -use datatypes::schema::{Schema, SchemaRef}; -use datatypes::vectors::{BooleanVector, Int16Vector, StringVector, UInt32Vector, VectorRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::vectors::{Int16Vector, StringVector, UInt32Vector, VectorRef}; use super::oid_column; use super::table_names::PG_TYPE; -use crate::system_schema::memory_table::tables::{bool_column, i16_column, string_column}; +use crate::memory_table_cols; +use crate::system_schema::memory_table::tables::{i16_column, string_column}; +fn pg_type_schema_columns() -> (Vec, Vec) { + memory_table_cols!( + [oid, typname, typlen], + [ + (1, "String", -1), + (2, "Binary", -1), + (3, "Int8", 1), + (4, "Int16", 2), + (5, "Int32", 4), + (6, "Int64", 8), + (7, "UInt8", 1), + (8, "UInt16", 2), + (9, "UInt32", 4), + (10, "UInt64", 8), + (11, "Float32", 4), + (12, "Float64", 8), + (13, "Decimal", 16), + (14, "Date", 4), + (15, "DateTime", 8), + (16, "Timestamp", 8), + (17, "Time", 8), + (18, "Duration", 8), + (19, "Interval", 16), + (20, "List", -1), + ] + ); + ( + // not quiet identical with pg, we only follow the definition in pg + vec![oid_column(), string_column("typname"), i16_column("typlen")], + vec![ + Arc::new(UInt32Vector::from_vec(oid)), // oid + Arc::new(StringVector::from(typname)), + Arc::new(Int16Vector::from_vec(typlen)), // typlen in bytes + ], + ) +} pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { //TODO(j0hn50n133): u32_column("typnamespace"), we don't have such thing as namespace id or database id. let (column_schemas, columns): (_, Vec) = match table_name { - PG_TYPE => ( - vec![ - oid_column(), - string_column("typname"), - i16_column("typlen"), - bool_column("typbyval"), - bool_column("typisdefined"), - ], - vec![ - Arc::new(UInt32Vector::from_vec(vec![16])), // oid - Arc::new(StringVector::from(vec!["bool"])), // typlen - Arc::new(Int16Vector::from_vec(vec![1])), // typname - Arc::new(BooleanVector::from(vec![true])), // typbyval - Arc::new(BooleanVector::from(vec![true])), // typisdefined - ], - ), + PG_TYPE => pg_type_schema_columns(), _ => unreachable!("Unknown table in pg_catalog: {}", table_name), }; (Arc::new(Schema::new(column_schemas)), columns) diff --git a/src/catalog/src/system_schema/pg_catalog/pg_class.rs b/src/catalog/src/system_schema/pg_catalog/pg_class.rs index 73d1653f9050..a774fa74777d 100644 --- a/src/catalog/src/system_schema/pg_catalog/pg_class.rs +++ b/src/catalog/src/system_schema/pg_catalog/pg_class.rs @@ -14,14 +14,10 @@ use std::sync::{Arc, Weak}; -use common_catalog::consts::PG_CATALOG_PG_CLASS_TABLE_ID; -use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -use store_api::storage::{ConcreteDataType, ScanRequest}; +use store_api::storage::ConcreteDataType; -use super::table_names::PG_CLASS; -use super::{oid_column, PGCatalogTable}; -use crate::error::Result; +use super::oid_column; use crate::CatalogManager; const CLASS_RELKIND: &str = "relkind"; @@ -58,22 +54,3 @@ impl PGCatalogPgClass { ])) } } - -impl PGCatalogTable for PGCatalogPgClass { - fn table_id(&self) -> store_api::storage::TableId { - PG_CATALOG_PG_CLASS_TABLE_ID - } - - fn table_name(&self) -> &'static str { - PG_CLASS - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn to_stream(&self, request: ScanRequest) -> Result { - let schema = self.schema.arrow_schema().clone(); - todo!() - } -} From dc67715bbdda6e12314f0a5d35fd29620113c9c9 Mon Sep 17 00:00:00 2001 From: J0hn50n133 <0xjohnsonlee@gmail.com> Date: Wed, 10 Jul 2024 15:34:24 +0800 Subject: [PATCH 05/12] fix: remove unused code to avoid warning --- .../src/system_schema/memory_table/tables.rs | 8 --- src/catalog/src/system_schema/pg_catalog.rs | 5 +- .../src/system_schema/pg_catalog/pg_class.rs | 56 ------------------- 3 files changed, 2 insertions(+), 67 deletions(-) delete mode 100644 src/catalog/src/system_schema/pg_catalog/pg_class.rs diff --git a/src/catalog/src/system_schema/memory_table/tables.rs b/src/catalog/src/system_schema/memory_table/tables.rs index 3c87c37d7a0c..c6459717769c 100644 --- a/src/catalog/src/system_schema/memory_table/tables.rs +++ b/src/catalog/src/system_schema/memory_table/tables.rs @@ -35,14 +35,6 @@ pub fn u32_column(name: &str) -> ColumnSchema { ) } -pub fn bool_column(name: &str) -> ColumnSchema { - ColumnSchema::new( - str::to_lowercase(name), - ConcreteDataType::boolean_datatype(), - false, - ) -} - pub fn i16_column(name: &str) -> ColumnSchema { ColumnSchema::new( str::to_lowercase(name), diff --git a/src/catalog/src/system_schema/pg_catalog.rs b/src/catalog/src/system_schema/pg_catalog.rs index 851aa1bde59a..2b2ca876642f 100644 --- a/src/catalog/src/system_schema/pg_catalog.rs +++ b/src/catalog/src/system_schema/pg_catalog.rs @@ -13,7 +13,6 @@ // limitations under the License. mod pg_catalog_memory_table; -mod pg_class; mod table_names; use std::collections::HashMap; @@ -47,7 +46,7 @@ fn oid_column() -> ColumnSchema { /// [`PGCatalogProvider`] is the provider for a schema named `pg_catalog`, it is not a catalog. pub struct PGCatalogProvider { catalog_name: String, - catalog_manager: Weak, + _catalog_manager: Weak, tables: HashMap, } @@ -80,7 +79,7 @@ impl PGCatalogProvider { pub fn new(catalog_name: String, catalog_manager: Weak) -> Self { let mut provider = Self { catalog_name, - catalog_manager, + _catalog_manager: catalog_manager, tables: HashMap::new(), }; provider.build_tables(); diff --git a/src/catalog/src/system_schema/pg_catalog/pg_class.rs b/src/catalog/src/system_schema/pg_catalog/pg_class.rs deleted file mode 100644 index a774fa74777d..000000000000 --- a/src/catalog/src/system_schema/pg_catalog/pg_class.rs +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::{Arc, Weak}; - -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -use store_api::storage::ConcreteDataType; - -use super::oid_column; -use crate::CatalogManager; - -const CLASS_RELKIND: &str = "relkind"; -const CLASS_RELOWNER: &str = "relowner"; -const CLASS_RELNAME: &str = "relname"; -const CLASS_RELNAMESPACE: &str = "relnamespace"; - -pub(super) struct PGCatalogPgClass { - schema: SchemaRef, - catalog_name: String, - catalog_manager: Weak, -} - -impl PGCatalogPgClass { - pub(super) fn new(catalog_name: String, catalog_manager: Weak) -> Self { - Self { - schema: Self::schema(), - catalog_name, - catalog_manager, - } - } - - pub(super) fn schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - oid_column(), - ColumnSchema::new(CLASS_RELNAME, ConcreteDataType::string_datatype(), false), - ColumnSchema::new( - CLASS_RELNAMESPACE, - ConcreteDataType::uint32_datatype(), - false, - ), - ColumnSchema::new(CLASS_RELKIND, ConcreteDataType::string_datatype(), false), - ColumnSchema::new(CLASS_RELOWNER, ConcreteDataType::uint32_datatype(), false), - ])) - } -} From a92021eeb3444b8f797eb85dcddf7b8c0c024e53 Mon Sep 17 00:00:00 2001 From: J0hn50n133 <0xjohnsonlee@gmail.com> Date: Wed, 10 Jul 2024 16:33:45 +0800 Subject: [PATCH 06/12] test: add pg_catalog sqlness test --- .../common/create/create_database.result | 1 + .../common/create/create_database_opts.result | 3 ++ .../common/information_schema/tables.result | 1 + .../common/show/show_databases_tables.result | 2 ++ .../common/system/information_schema.result | 9 +++++- .../common/system/information_schema.sql | 4 ++- .../common/system/pg_catalog.result | 32 +++++++++++++++++++ .../standalone/common/system/pg_catalog.sql | 4 +++ .../standalone/common/view/create.result | 1 + 9 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 tests/cases/standalone/common/system/pg_catalog.result create mode 100644 tests/cases/standalone/common/system/pg_catalog.sql diff --git a/tests/cases/standalone/common/create/create_database.result b/tests/cases/standalone/common/create/create_database.result index 9612b3115ec5..8273385f4e48 100644 --- a/tests/cases/standalone/common/create/create_database.result +++ b/tests/cases/standalone/common/create/create_database.result @@ -18,6 +18,7 @@ show databases; | greptime_private | | illegal-database | | information_schema | +| pg_catalog | | public | +--------------------+ diff --git a/tests/cases/standalone/common/create/create_database_opts.result b/tests/cases/standalone/common/create/create_database_opts.result index c824290d715a..3b8e420885fc 100644 --- a/tests/cases/standalone/common/create/create_database_opts.result +++ b/tests/cases/standalone/common/create/create_database_opts.result @@ -10,6 +10,7 @@ SHOW DATABASES; | greptime_private | | information_schema | | mydb | +| pg_catalog | | public | +--------------------+ @@ -21,6 +22,7 @@ SHOW FULL DATABASES; | greptime_private | | | information_schema | | | mydb | ttl='1h' | +| pg_catalog | | | public | | +--------------------+----------+ @@ -65,6 +67,7 @@ SHOW DATABASES; +--------------------+ | greptime_private | | information_schema | +| pg_catalog | | public | +--------------------+ diff --git a/tests/cases/standalone/common/information_schema/tables.result b/tests/cases/standalone/common/information_schema/tables.result index 93a93a9c9805..f48e65630282 100644 --- a/tests/cases/standalone/common/information_schema/tables.result +++ b/tests/cases/standalone/common/information_schema/tables.result @@ -29,6 +29,7 @@ select table_catalog, table_schema, table_name from information_schema.tables wh +---------------+--------------+------------+ | greptime | abc | t | | greptime | abcde | t | +| greptime | pg_catalog | pg_type | | greptime | public | numbers | +---------------+--------------+------------+ diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index 8f5ec97bda2a..6ffd5957dd8e 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -5,6 +5,7 @@ SHOW DATABASES; +--------------------+ | greptime_private | | information_schema | +| pg_catalog | | public | +--------------------+ @@ -15,6 +16,7 @@ SHOW FULL DATABASES; +--------------------+---------+ | greptime_private | | | information_schema | | +| pg_catalog | | | public | | +--------------------+---------+ diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 3f38df1c43ed..269995ed7973 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -42,6 +42,7 @@ order by table_schema, table_name; | greptime | information_schema | table_privileges | LOCAL TEMPORARY | 23 | 0 | 0 | 0 | 0 | 0 | | 11 | Fixed | 0 | 0 | 0 | DATETIME | | | | 0 | | | Y | | greptime | information_schema | tables | LOCAL TEMPORARY | 3 | 0 | 0 | 0 | 0 | 0 | | 11 | Fixed | 0 | 0 | 0 | DATETIME | | | | 0 | | | Y | | greptime | information_schema | triggers | LOCAL TEMPORARY | 24 | 0 | 0 | 0 | 0 | 0 | | 11 | Fixed | 0 | 0 | 0 | DATETIME | | | | 0 | | | Y | +| greptime | pg_catalog | pg_type | LOCAL TEMPORARY | 514 | 0 | 0 | 0 | 0 | 0 | | 11 | Fixed | 0 | 0 | 0 | DATETIME | | | | 0 | | | Y | | greptime | public | numbers | LOCAL TEMPORARY | 2 | 0 | 0 | 0 | 0 | 0 | test_engine | 11 | Fixed | 0 | 0 | 0 | DATETIME | | | | 0 | | | Y | +---------------+--------------------+---------------------------------------+-----------------+----------+-------------+-----------------+--------------+------------------+----------------+-------------+---------+------------+------------+-----------+----------------+-------------------------+-------------+------------+-----------------+----------+----------------+---------------+-----------+ @@ -387,6 +388,9 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | triggers | trigger_catalog | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | triggers | trigger_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | triggers | trigger_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | pg_catalog | pg_type | oid | 1 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | +| greptime | pg_catalog | pg_type | typlen | 3 | | | 5 | 0 | | | | | | select,insert | | Int16 | smallint | FIELD | | No | smallint | | | +| greptime | pg_catalog | pg_type | typname | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | public | numbers | number | 1 | | | 10 | 0 | | | | PRI | | select,insert | | UInt32 | int unsigned | TAG | | No | int unsigned | | | +---------------+--------------------+---------------------------------------+-----------------------------------+------------------+--------------------------+------------------------+-------------------+---------------+--------------------+--------------------+----------------+------------+-------+---------------+-----------------------+----------------------+-----------------+---------------+----------------+-------------+-----------------+----------------+--------+ @@ -442,7 +446,7 @@ order by table_name; select table_name from information_schema.tables -where table_schema not in ('my_db', 'information_schema') +where table_schema not in ('my_db', 'information_schema', 'pg_catalog') order by table_name; +------------+ @@ -456,6 +460,7 @@ from information_schema.tables where table_catalog = 'greptime' and table_schema != 'public' and table_schema != 'information_schema' + and table_schema != 'pg_catalog' order by table_schema, table_name; +---------------+--------------+------------+------------+--------+ @@ -469,6 +474,7 @@ from information_schema.columns where table_catalog = 'greptime' and table_schema != 'public' and table_schema != 'information_schema' + and table_schema != 'pg_catalog' order by table_schema, table_name, column_name; +---------------+--------------+------------+-------------+--------------+---------------+ @@ -558,6 +564,7 @@ select * from schemata where catalog_name = 'greptime' and schema_name != 'publi +--------------+--------------------+----------------------------+------------------------+----------+---------+ | greptime | greptime_private | utf8 | utf8_bin | | | | greptime | information_schema | utf8 | utf8_bin | | | +| greptime | pg_catalog | utf8 | utf8_bin | | | +--------------+--------------------+----------------------------+------------------------+----------+---------+ -- test engines diff --git a/tests/cases/standalone/common/system/information_schema.sql b/tests/cases/standalone/common/system/information_schema.sql index 7be288c5eaa3..ab281adadd3f 100644 --- a/tests/cases/standalone/common/system/information_schema.sql +++ b/tests/cases/standalone/common/system/information_schema.sql @@ -37,7 +37,7 @@ order by table_name; select table_name from information_schema.tables -where table_schema not in ('my_db', 'information_schema') +where table_schema not in ('my_db', 'information_schema', 'pg_catalog') order by table_name; select table_catalog, table_schema, table_name, table_type, engine @@ -45,6 +45,7 @@ from information_schema.tables where table_catalog = 'greptime' and table_schema != 'public' and table_schema != 'information_schema' + and table_schema != 'pg_catalog' order by table_schema, table_name; select table_catalog, table_schema, table_name, column_name, data_type, semantic_type @@ -52,6 +53,7 @@ from information_schema.columns where table_catalog = 'greptime' and table_schema != 'public' and table_schema != 'information_schema' + and table_schema != 'pg_catalog' order by table_schema, table_name, column_name; -- test query filter for columns -- diff --git a/tests/cases/standalone/common/system/pg_catalog.result b/tests/cases/standalone/common/system/pg_catalog.result new file mode 100644 index 000000000000..f2b9677feb99 --- /dev/null +++ b/tests/cases/standalone/common/system/pg_catalog.result @@ -0,0 +1,32 @@ +-- should not able to create pg_catalog +create database pg_catalog; + +Error: 1004(InvalidArguments), Schema pg_catalog already exists + +select * from pg_catalog.pg_type order by oid; + ++-----+-----------+--------+ +| oid | typname | typlen | ++-----+-----------+--------+ +| 1 | String | -1 | +| 2 | Binary | -1 | +| 3 | Int8 | 1 | +| 4 | Int16 | 2 | +| 5 | Int32 | 4 | +| 6 | Int64 | 8 | +| 7 | UInt8 | 1 | +| 8 | UInt16 | 2 | +| 9 | UInt32 | 4 | +| 10 | UInt64 | 8 | +| 11 | Float32 | 4 | +| 12 | Float64 | 8 | +| 13 | Decimal | 16 | +| 14 | Date | 4 | +| 15 | DateTime | 8 | +| 16 | Timestamp | 8 | +| 17 | Time | 8 | +| 18 | Duration | 8 | +| 19 | Interval | 16 | +| 20 | List | -1 | ++-----+-----------+--------+ + diff --git a/tests/cases/standalone/common/system/pg_catalog.sql b/tests/cases/standalone/common/system/pg_catalog.sql new file mode 100644 index 000000000000..b958b4c3d6c2 --- /dev/null +++ b/tests/cases/standalone/common/system/pg_catalog.sql @@ -0,0 +1,4 @@ +-- should not able to create pg_catalog +create database pg_catalog; + +select * from pg_catalog.pg_type order by oid; \ No newline at end of file diff --git a/tests/cases/standalone/common/view/create.result b/tests/cases/standalone/common/view/create.result index bfbff6b970eb..3910086baa6a 100644 --- a/tests/cases/standalone/common/view/create.result +++ b/tests/cases/standalone/common/view/create.result @@ -94,6 +94,7 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; | greptime | information_schema | optimizer_trace | LOCAL TEMPORARY |ID |ID |ID |ID |ID |ID | |ID | Fixed |ID |ID |ID | DATETIME | | | |ID | | | Y | | greptime | information_schema | parameters | LOCAL TEMPORARY |ID |ID |ID |ID |ID |ID | |ID | Fixed |ID |ID |ID | DATETIME | | | |ID | | | Y | | greptime | information_schema | partitions | LOCAL TEMPORARY |ID |ID |ID |ID |ID |ID | |ID | Fixed |ID |ID |ID | DATETIME | | | |ID | | | Y | +| greptime | pg_catalog | pg_type | LOCAL TEMPORARY |ID |ID |ID |ID |ID |ID | |ID | Fixed |ID |ID |ID | DATETIME | | | |ID | | | Y | | greptime | information_schema | profiling | LOCAL TEMPORARY |ID |ID |ID |ID |ID |ID | |ID | Fixed |ID |ID |ID | DATETIME | | | |ID | | | Y | | greptime | information_schema | referential_constraints | LOCAL TEMPORARY |ID |ID |ID |ID |ID |ID | |ID | Fixed |ID |ID |ID | DATETIME | | | |ID | | | Y | | greptime | information_schema | region_peers | LOCAL TEMPORARY |ID |ID |ID |ID |ID |ID | |ID | Fixed |ID |ID |ID | DATETIME | | | |ID | | | Y | From 9aad2fca931cad678fcee13ed6260022630c7789 Mon Sep 17 00:00:00 2001 From: J0hn50n133 <0xjohnsonlee@gmail.com> Date: Thu, 11 Jul 2024 16:11:48 +0800 Subject: [PATCH 07/12] feat: pg_catalog_cache in system_catalog --- src/catalog/src/kvbackend/manager.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 8d2dae38cc4b..c583786988b8 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -89,6 +89,7 @@ impl KvBackendCatalogManager { system_catalog: SystemCatalog { catalog_manager: me.clone(), catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY), + pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY), information_schema_provider: Arc::new(InformationSchemaProvider::new( DEFAULT_CATALOG_NAME.to_string(), me.clone(), @@ -307,6 +308,9 @@ fn build_table(table_info_value: TableInfoValue) -> Result { struct SystemCatalog { catalog_manager: Weak, catalog_cache: Cache>, + pg_catalog_cache: Cache>, + + // system_schema_provier for default catalog information_schema_provider: Arc, pg_catalog_provider: Arc, } @@ -358,8 +362,18 @@ impl SystemCatalog { }); information_schema_provider.table(table_name) } else if schema == PG_CATALOG_NAME { - PGCatalogProvider::new(catalog.to_string(), self.catalog_manager.clone()) - .table(table_name) + if catalog == DEFAULT_CATALOG_NAME { + self.pg_catalog_provider.table(table_name) + } else { + let pg_catalog_provider = + self.pg_catalog_cache.get_with_by_ref(catalog, move || { + Arc::new(PGCatalogProvider::new( + catalog.to_string(), + self.catalog_manager.clone(), + )) + }); + pg_catalog_provider.table(table_name) + } } else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME { Some(NumbersTable::table(NUMBERS_TABLE_ID)) } else { From eca7744a9023286963416d195f4aadeb9a797b51 Mon Sep 17 00:00:00 2001 From: J0hn50n133 <0xjohnsonlee@gmail.com> Date: Thu, 11 Jul 2024 16:27:04 +0800 Subject: [PATCH 08/12] fix: integration test --- tests-integration/src/tests/instance_test.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index b0bc7f4c881f..aefa437532f0 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -462,6 +462,7 @@ async fn test_execute_show_databases_tables(instance: Arc) { +--------------------+ | greptime_private | | information_schema | +| pg_catalog | | public | +--------------------+\ "; @@ -1899,6 +1900,7 @@ async fn test_show_databases(instance: Arc) { +--------------------+ | greptime_private | | information_schema | +| pg_catalog | | public | +--------------------+"; check_output_stream(output, expected).await; @@ -1912,6 +1914,7 @@ async fn test_show_databases(instance: Arc) { | Database | +--------------------+ | information_schema | +| pg_catalog | +--------------------+"; check_output_stream(output, expected).await; } From 010bb932a227a091ef06de4e8748e1fda8b0cf17 Mon Sep 17 00:00:00 2001 From: J0hn50n133 <0xjohnsonlee@gmail.com> Date: Sat, 13 Jul 2024 18:35:20 +0800 Subject: [PATCH 09/12] test: rollback unit test --- src/catalog/src/lib.rs | 1 + src/catalog/src/system_schema/memory_table.rs | 9 +++++---- .../system_schema/pg_catalog/pg_catalog_memory_table.rs | 1 - 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index e691ccd98376..4809f4d6b45a 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -33,6 +33,7 @@ pub mod memory; mod metrics; pub mod system_schema; pub mod information_schema { + // TODO(j0hn50n133): migrate to the new path later // re-export to make it compatible with the legacy code pub use crate::system_schema::information_schema::*; } diff --git a/src/catalog/src/system_schema/memory_table.rs b/src/catalog/src/system_schema/memory_table.rs index 1d6a71ac8165..92caa641a9d0 100644 --- a/src/catalog/src/system_schema/memory_table.rs +++ b/src/catalog/src/system_schema/memory_table.rs @@ -167,9 +167,9 @@ mod tests { ], ); - assert_eq!(42, table.table_id); - assert_eq!(schema, SystemTable::schema(&table)); + assert_eq!(42, table.table_id()); assert_eq!("test", table.table_name); + assert_eq!(schema, SystemTable::schema(&table)); let stream = table.to_stream(ScanRequest::default()).unwrap(); @@ -196,8 +196,9 @@ mod tests { let table = MemoryTable::new(42, "test", schema.clone(), vec![]); - assert_eq!(42, table.table_id); - assert_eq!("test", table.table_name); + assert_eq!(42, table.table_id()); + assert_eq!("test", table.table_name()); + assert_eq!(schema, SystemTable::schema(&table)); let stream = table.to_stream(ScanRequest::default()).unwrap(); diff --git a/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs b/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs index 31700d8ce864..9a3e18a50701 100644 --- a/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs +++ b/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs @@ -59,7 +59,6 @@ fn pg_type_schema_columns() -> (Vec, Vec) { } pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { - //TODO(j0hn50n133): u32_column("typnamespace"), we don't have such thing as namespace id or database id. let (column_schemas, columns): (_, Vec) = match table_name { PG_TYPE => pg_type_schema_columns(), _ => unreachable!("Unknown table in pg_catalog: {}", table_name), From f34cd7fd6516ab3752502edc264ad4eae6372469 Mon Sep 17 00:00:00 2001 From: J0hn50n133 <0xjohnsonlee@gmail.com> Date: Mon, 15 Jul 2024 15:33:28 +0800 Subject: [PATCH 10/12] refactor: mix pg_catalog table_id with old ones --- src/common/catalog/src/consts.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 04e22a8e1815..faaa3ba1ef4f 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -97,11 +97,9 @@ pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31; /// ----- End of information_schema tables ----- /// ----- Begin of pg_catalog tables ----- -/// mask for pg_catalog tables id, to reserve enough table_id for tables, 0x200 == 256 == 0b100000000 -pub const PG_CATALOG_TABLE_ID_MASK: u32 = 0x200; -/// id for pg_catalog.pg_class -pub const PG_CATALOG_PG_CLASS_TABLE_ID: u32 = PG_CATALOG_TABLE_ID_MASK | 1; -pub const PG_CATALOG_PG_TYPE_TABLE_ID: u32 = PG_CATALOG_TABLE_ID_MASK | 2; +pub const PG_CATALOG_PG_CLASS_TABLE_ID: u32 = 256; +pub const PG_CATALOG_PG_TYPE_TABLE_ID: u32 = 257; + /// ----- End of pg_catalog tables ----- pub const MITO_ENGINE: &str = "mito"; pub const MITO2_ENGINE: &str = "mito2"; From 6fb82bb1ca51ef3414318fa5408c2e9c398cf686 Mon Sep 17 00:00:00 2001 From: J0hn50n133 <0xjohnsonlee@gmail.com> Date: Mon, 15 Jul 2024 16:05:48 +0800 Subject: [PATCH 11/12] fix: add todo information --- src/catalog/src/lib.rs | 3 +-- .../src/system_schema/pg_catalog/pg_catalog_memory_table.rs | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 4809f4d6b45a..394500bb757e 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -33,8 +33,7 @@ pub mod memory; mod metrics; pub mod system_schema; pub mod information_schema { - // TODO(j0hn50n133): migrate to the new path later - // re-export to make it compatible with the legacy code + // TODO(j0hn50n133): re-export to make it compatible with the legacy code, migrate to the new path later pub use crate::system_schema::information_schema::*; } diff --git a/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs b/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs index 9a3e18a50701..53be0c50dd16 100644 --- a/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs +++ b/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs @@ -21,7 +21,9 @@ use super::oid_column; use super::table_names::PG_TYPE; use crate::memory_table_cols; use crate::system_schema::memory_table::tables::{i16_column, string_column}; + fn pg_type_schema_columns() -> (Vec, Vec) { + // TODO(j0hn50n133): acquire this information from `DataType` instead of hardcoding it to avoid regression. memory_table_cols!( [oid, typname, typlen], [ From f54d7d8750686098a2cc11a4d4478d051ade3167 Mon Sep 17 00:00:00 2001 From: J0hn50n133 <0xjohnsonlee@gmail.com> Date: Mon, 15 Jul 2024 16:51:10 +0800 Subject: [PATCH 12/12] tests: rerun sqlness --- .../cases/standalone/common/system/information_schema.result | 4 ++++ tests/cases/standalone/common/view/create.result | 1 + 2 files changed, 5 insertions(+) diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 2262fad905b7..2d7cb914ed8f 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -44,6 +44,7 @@ order by table_schema, table_name; |greptime|information_schema|tables|LOCALTEMPORARY|3|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| |greptime|information_schema|triggers|LOCALTEMPORARY|24|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| |greptime|information_schema|views|LOCALTEMPORARY|32|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| +|greptime|pg_catalog|pg_type|LOCALTEMPORARY|257|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| |greptime|public|numbers|LOCALTEMPORARY|2|0|0|0|0|0|test_engine|11|Fixed|0|0|0|DATETIME||||0|||Y| +++++++++++++++++++++++++ @@ -399,6 +400,9 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | views | table_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | views | table_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | views | view_definition | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | pg_catalog | pg_type | oid | 1 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | +| greptime | pg_catalog | pg_type | typlen | 3 | | | 5 | 0 | | | | | | select,insert | | Int16 | smallint | FIELD | | No | smallint | | | +| greptime | pg_catalog | pg_type | typname | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | public | numbers | number | 1 | | | 10 | 0 | | | | PRI | | select,insert | | UInt32 | int unsigned | TAG | | No | int unsigned | | | +---------------+--------------------+---------------------------------------+-----------------------------------+------------------+--------------------------+------------------------+-------------------+---------------+--------------------+--------------------+----------------+------------+-------+---------------+-----------------------+----------------------+-----------------+---------------+----------------+-------------+-----------------+----------------+--------+ diff --git a/tests/cases/standalone/common/view/create.result b/tests/cases/standalone/common/view/create.result index c1cb872111cd..675da6b6ba59 100644 --- a/tests/cases/standalone/common/view/create.result +++ b/tests/cases/standalone/common/view/create.result @@ -89,6 +89,7 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; |greptime|information_schema|optimizer_trace|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|parameters|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|partitions|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| +|greptime|pg_catalog|pg_type|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|profiling|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|referential_constraints|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|region_peers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|