Skip to content

Commit

Permalink
feat: adds information_schema.schemata (#3051)
Browse files Browse the repository at this point in the history
* feat: improve information_schema.columns

* feat: adds information_schema.schemata

* fix: instance test

* fix: comment
  • Loading branch information
killme2008 authored Dec 29, 2023
1 parent 7551432 commit 11ae85b
Show file tree
Hide file tree
Showing 10 changed files with 434 additions and 119 deletions.
7 changes: 7 additions & 0 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod columns;
mod memory_table;
mod schemata;
mod table_names;
mod tables;

Expand Down Expand Up @@ -41,6 +42,7 @@ pub use table_names::*;
use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::memory_table::{get_schema_columns, MemoryTable};
use crate::information_schema::schemata::InformationSchemaSchemata;
use crate::information_schema::tables::InformationSchemaTables;
use crate::CatalogManager;

Expand Down Expand Up @@ -126,6 +128,7 @@ impl InformationSchemaProvider {
fn build_tables(&mut self) {
let mut tables = HashMap::new();
tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());

// Add memory tables
Expand Down Expand Up @@ -168,6 +171,10 @@ impl InformationSchemaProvider {
}
CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
EVENTS => setup_memory_table!(EVENTS),
SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
_ => None,
}
}
Expand Down
45 changes: 40 additions & 5 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ const TABLE_NAME: &str = "table_name";
const COLUMN_NAME: &str = "column_name";
const DATA_TYPE: &str = "data_type";
const SEMANTIC_TYPE: &str = "semantic_type";
const COLUMN_DEFAULT: &str = "column_default";
const IS_NULLABLE: &str = "is_nullable";
const COLUMN_TYPE: &str = "column_type";
const COLUMN_COMMENT: &str = "column_comment";

impl InformationSchemaColumns {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Expand All @@ -69,6 +73,10 @@ impl InformationSchemaColumns {
ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(DATA_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(SEMANTIC_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_DEFAULT, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(IS_NULLABLE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_COMMENT, ConcreteDataType::string_datatype(), true),
]))
}

Expand Down Expand Up @@ -126,6 +134,11 @@ struct InformationSchemaColumnsBuilder {
column_names: StringVectorBuilder,
data_types: StringVectorBuilder,
semantic_types: StringVectorBuilder,

column_defaults: StringVectorBuilder,
is_nullables: StringVectorBuilder,
column_types: StringVectorBuilder,
column_comments: StringVectorBuilder,
}

impl InformationSchemaColumnsBuilder {
Expand All @@ -144,6 +157,10 @@ impl InformationSchemaColumnsBuilder {
column_names: StringVectorBuilder::with_capacity(42),
data_types: StringVectorBuilder::with_capacity(42),
semantic_types: StringVectorBuilder::with_capacity(42),
column_defaults: StringVectorBuilder::with_capacity(42),
is_nullables: StringVectorBuilder::with_capacity(42),
column_types: StringVectorBuilder::with_capacity(42),
column_comments: StringVectorBuilder::with_capacity(42),
}
}

Expand Down Expand Up @@ -187,9 +204,8 @@ impl InformationSchemaColumnsBuilder {
&catalog_name,
&schema_name,
&table_name,
&column.name,
&column.data_type.name(),
semantic_type,
column,
);
}
} else {
Expand All @@ -206,16 +222,31 @@ impl InformationSchemaColumnsBuilder {
catalog_name: &str,
schema_name: &str,
table_name: &str,
column_name: &str,
data_type: &str,
semantic_type: &str,
column_schema: &ColumnSchema,
) {
let data_type = &column_schema.data_type.name();

self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.table_names.push(Some(table_name));
self.column_names.push(Some(column_name));
self.column_names.push(Some(&column_schema.name));
self.data_types.push(Some(data_type));
self.semantic_types.push(Some(semantic_type));
self.column_defaults.push(
column_schema
.default_constraint()
.map(|s| format!("{}", s))
.as_deref(),
);
if column_schema.is_nullable() {
self.is_nullables.push(Some("Yes"));
} else {
self.is_nullables.push(Some("No"));
}
self.column_types.push(Some(data_type));
self.column_comments
.push(column_schema.column_comment().map(|x| x.as_ref()));
}

fn finish(&mut self) -> Result<RecordBatch> {
Expand All @@ -226,6 +257,10 @@ impl InformationSchemaColumnsBuilder {
Arc::new(self.column_names.finish()),
Arc::new(self.data_types.finish()),
Arc::new(self.semantic_types.finish()),
Arc::new(self.column_defaults.finish()),
Arc::new(self.is_nullables.finish()),
Arc::new(self.column_types.finish()),
Arc::new(self.column_comments.finish()),
];

RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
Expand Down
210 changes: 210 additions & 0 deletions src/catalog/src/information_schema/schemata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_SCHEMATA_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::StringVectorBuilder;
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;

use super::SCHEMATA;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::InformationTable;
use crate::CatalogManager;

/// The `information_schema.schemata` table implementation.
pub(super) struct InformationSchemaSchemata {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
}

impl InformationSchemaSchemata {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
}
}

pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new("catalog_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("schema_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"default_character_set_name",
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"default_collation_name",
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new("sql_path", ConcreteDataType::string_datatype(), true),
]))
}

fn builder(&self) -> InformationSchemaSchemataBuilder {
InformationSchemaSchemataBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
)
}
}

impl InformationTable for InformationSchemaSchemata {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_SCHEMATA_TABLE_ID
}

fn table_name(&self) -> &'static str {
SCHEMATA
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn to_stream(&self) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_schemata()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}

/// Builds the `information_schema.schemata` table row by row
///
/// Columns are based on <https://docs.pingcap.com/tidb/stable/information-schema-schemata>
struct InformationSchemaSchemataBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,

catalog_names: StringVectorBuilder,
schema_names: StringVectorBuilder,
charset_names: StringVectorBuilder,
collation_names: StringVectorBuilder,
sql_paths: StringVectorBuilder,
}

impl InformationSchemaSchemataBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(42),
schema_names: StringVectorBuilder::with_capacity(42),
charset_names: StringVectorBuilder::with_capacity(42),
collation_names: StringVectorBuilder::with_capacity(42),
sql_paths: StringVectorBuilder::with_capacity(42),
}
}

/// Construct the `information_schema.schemata` virtual table
async fn make_schemata(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
.schema_exists(&catalog_name, &schema_name)
.await?
{
continue;
}

self.add_schema(&catalog_name, &schema_name);
}

self.finish()
}

fn add_schema(&mut self, catalog_name: &str, schema_name: &str) {
self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.charset_names.push(Some("utf8"));
self.collation_names.push(Some("utf8_bin"));
self.sql_paths.push(None);
}

fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.catalog_names.finish()),
Arc::new(self.schema_names.finish()),
Arc::new(self.charset_names.finish()),
Arc::new(self.collation_names.finish()),
Arc::new(self.sql_paths.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}

impl DfPartitionStream for InformationSchemaSchemata {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}

fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_schemata()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}
1 change: 1 addition & 0 deletions src/catalog/src/information_schema/table_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ pub const COLLATIONS: &str = "collations";
pub const COLLATION_CHARACTER_SET_APPLICABILITY: &str = "collation_character_set_applicability";
pub const CHECK_CONSTRAINTS: &str = "check_constraints";
pub const EVENTS: &str = "events";
pub const SCHEMATA: &str = "schemata";
2 changes: 2 additions & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub const INFORMATION_SCHEMA_COLLATION_CHARACTER_SET_APPLICABILITY_TABLE_ID: u32
pub const INFORMATION_SCHEMA_CHECK_CONSTRAINTS_TABLE_ID: u32 = 12;
/// id for information_schema.EVENTS
pub const INFORMATION_SCHEMA_EVENTS_TABLE_ID: u32 = 13;
/// id for information_schema.SCHEMATA
pub const INFORMATION_SCHEMA_SCHEMATA_TABLE_ID: u32 = 14;
/// ----- End of information_schema tables -----
pub const MITO_ENGINE: &str = "mito";
Expand Down
11 changes: 10 additions & 1 deletion src/datatypes/src/schema/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ impl ColumnSchema {
&mut self.metadata
}

/// Retrieve the column comment
pub fn column_comment(&self) -> Option<&String> {
self.metadata.get(COMMENT_KEY)
}

pub fn with_time_index(mut self, is_time_index: bool) -> Self {
self.is_time_index = is_time_index;
if is_time_index {
Expand Down Expand Up @@ -315,12 +320,16 @@ mod tests {

#[test]
fn test_column_schema_with_metadata() {
let metadata = Metadata::from([("k1".to_string(), "v1".to_string())]);
let metadata = Metadata::from([
("k1".to_string(), "v1".to_string()),
(COMMENT_KEY.to_string(), "test comment".to_string()),
]);
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
.with_metadata(metadata)
.with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
.unwrap();
assert_eq!("v1", column_schema.metadata().get("k1").unwrap());
assert_eq!("test comment", column_schema.column_comment().unwrap());
assert!(column_schema
.metadata()
.get(DEFAULT_CONSTRAINT_KEY)
Expand Down
Loading

0 comments on commit 11ae85b

Please sign in to comment.