Skip to content

Commit

Permalink
feat: adds information_schema.schemata
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Dec 29, 2023
1 parent 28dc483 commit 219efa1
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 83 deletions.
7 changes: 7 additions & 0 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod columns;
mod memory_table;
mod schemata;
mod table_names;
mod tables;

Expand Down Expand Up @@ -41,6 +42,7 @@ pub use table_names::*;
use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::memory_table::{get_schema_columns, MemoryTable};
use crate::information_schema::schemata::InformationSchemaSchemata;
use crate::information_schema::tables::InformationSchemaTables;
use crate::CatalogManager;

Expand Down Expand Up @@ -126,6 +128,7 @@ impl InformationSchemaProvider {
fn build_tables(&mut self) {
let mut tables = HashMap::new();
tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());

// Add memory tables
Expand Down Expand Up @@ -168,6 +171,10 @@ impl InformationSchemaProvider {
}
CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
EVENTS => setup_memory_table!(EVENTS),
SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
_ => None,
}
}
Expand Down
210 changes: 210 additions & 0 deletions src/catalog/src/information_schema/schemata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_SCHEMATA_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::StringVectorBuilder;
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;

use super::SCHEMATA;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::InformationTable;
use crate::CatalogManager;

/// The `information_schema.schemata` table implementation.
pub(super) struct InformationSchemaSchemata {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
}

impl InformationSchemaSchemata {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
}
}

pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new("catalog_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("schema_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"default_character_set_name",
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"default_collation_name",
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new("sql_path", ConcreteDataType::string_datatype(), true),
]))
}

fn builder(&self) -> InformationSchemaSchemataBuilder {
InformationSchemaSchemataBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
)
}
}

impl InformationTable for InformationSchemaSchemata {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_SCHEMATA_TABLE_ID
}

fn table_name(&self) -> &'static str {
SCHEMATA
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn to_stream(&self) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_schemata()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}

/// Builds the `information_schema.TABLE` table row by row
///
/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
struct InformationSchemaSchemataBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,

catalog_names: StringVectorBuilder,
schema_names: StringVectorBuilder,
charset_names: StringVectorBuilder,
collation_names: StringVectorBuilder,
sql_paths: StringVectorBuilder,
}

impl InformationSchemaSchemataBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(42),
schema_names: StringVectorBuilder::with_capacity(42),
charset_names: StringVectorBuilder::with_capacity(42),
collation_names: StringVectorBuilder::with_capacity(42),
sql_paths: StringVectorBuilder::with_capacity(42),
}
}

/// Construct the `information_schema.schemata` virtual table
async fn make_schemata(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
.schema_exists(&catalog_name, &schema_name)
.await?
{
continue;
}

self.add_schema(&catalog_name, &schema_name);
}

self.finish()
}

fn add_schema(&mut self, catalog_name: &str, schema_name: &str) {
self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.charset_names.push(Some("utf8"));
self.collation_names.push(Some("utf8_bin"));
self.sql_paths.push(None);
}

fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.catalog_names.finish()),
Arc::new(self.schema_names.finish()),
Arc::new(self.charset_names.finish()),
Arc::new(self.collation_names.finish()),
Arc::new(self.sql_paths.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}

impl DfPartitionStream for InformationSchemaSchemata {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}

fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_schemata()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}
2 changes: 2 additions & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub const INFORMATION_SCHEMA_COLLATION_CHARACTER_SET_APPLICABILITY_TABLE_ID: u32
pub const INFORMATION_SCHEMA_CHECK_CONSTRAINTS_TABLE_ID: u32 = 12;
/// id for information_schema.EVENTS
pub const INFORMATION_SCHEMA_EVENTS_TABLE_ID: u32 = 13;
/// id for information_schema.SCHEMATA
pub const INFORMATION_SCHEMA_SCHEMATA_TABLE_ID: u32 = 14;
/// ----- End of information_schema tables -----
pub const MITO_ENGINE: &str = "mito";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ show tables;
| columns |
| engines |
| events |
| schemata |
| tables |
+---------------------------------------+

Loading

0 comments on commit 219efa1

Please sign in to comment.