diff --git a/src/catalog/src/table_source.rs b/src/catalog/src/table_source.rs index f5b9185a9625..27ff3dbf18ef 100644 --- a/src/catalog/src/table_source.rs +++ b/src/catalog/src/table_source.rs @@ -18,8 +18,6 @@ use std::sync::Arc; use bytes::Bytes; use common_catalog::format_full_table_name; use common_query::logical_plan::SubstraitPlanDecoderRef; -use datafusion::catalog::schema::MemorySchemaProvider; -use datafusion::catalog::MemoryCatalogProvider; use datafusion::common::{ResolvedTableReference, TableReference}; use datafusion::datasource::view::ViewTable; use datafusion::datasource::{provider_as_source, TableProvider}; @@ -28,9 +26,8 @@ use session::context::QueryContext; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; -mod memory_catalog; -use datafusion::catalog::CatalogProvider; -use memory_catalog::MemoryCatalogProviderList; +mod dummy_catalog; +use dummy_catalog::DummyCatalogList; use crate::error::{ CastManagerSnafu, DatafusionSnafu, DecodePlanSnafu, GetViewCacheSnafu, QueryAccessDeniedSnafu, @@ -127,49 +124,10 @@ impl DfTableSourceProvider { })?; // Build the catalog list provider for deserialization. - let catalog_provider = Arc::new(MemoryCatalogProvider::new()); - let catalog_list = Arc::new(MemoryCatalogProviderList::with_catalog_provider( - catalog_provider.clone(), - )); - - for table_name in &view_info.table_names { - // We don't support cross-catalog views. - debug_assert_eq!(catalog_name, &table_name.catalog_name); - - let catalog_name = &table_name.catalog_name; - let schema_name = &table_name.schema_name; - let table_name = &table_name.table_name; - - let table = self - .catalog_manager - .table(catalog_name, schema_name, table_name) - .await? - .with_context(|| TableNotExistSnafu { - table: format_full_table_name(catalog_name, schema_name, table_name), - })?; - - let schema_provider = - if let Some(schema_provider) = catalog_provider.schema(schema_name) { - schema_provider - } else { - let schema_provider = Arc::new(MemorySchemaProvider::new()); - let _ = catalog_provider - .register_schema(schema_name, schema_provider.clone()) - .context(DatafusionSnafu)?; - schema_provider - }; - - let table_provider: Arc = - Arc::new(DfTableProviderAdapter::new(table)); - - let _ = schema_provider - .register_table(table_name.to_string(), table_provider) - .context(DatafusionSnafu)?; - } - + let catalog_list = Arc::new(DummyCatalogList::new(self.catalog_manager.clone())); let logical_plan = self .plan_decoder - .decode(Bytes::from(view_info.view_info.clone()), catalog_list) + .decode(Bytes::from(view_info.view_info.clone()), catalog_list, true) .await .context(DecodePlanSnafu { name: &table.table_info().name, @@ -267,6 +225,7 @@ mod tests { &self, _message: bytes::Bytes, _catalog_list: Arc, + _optimize: bool, ) -> QueryResult { Ok(mock_plan()) } diff --git a/src/catalog/src/table_source/dummy_catalog.rs b/src/catalog/src/table_source/dummy_catalog.rs new file mode 100644 index 000000000000..243371a4b97f --- /dev/null +++ b/src/catalog/src/table_source/dummy_catalog.rs @@ -0,0 +1,129 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Dummy catalog for region server. + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use common_catalog::format_full_table_name; +use datafusion::catalog::schema::SchemaProvider; +use datafusion::catalog::{CatalogProvider, CatalogProviderList}; +use datafusion::datasource::TableProvider; +use snafu::OptionExt; +use table::table::adapter::DfTableProviderAdapter; + +use crate::error::TableNotExistSnafu; +use crate::CatalogManagerRef; + +/// Delegate the resolving rquests to the `[CatalogManager]` unconditionally. +#[derive(Clone)] +pub struct DummyCatalogList { + catalog_manager: CatalogManagerRef, +} + +impl DummyCatalogList { + /// Creates a new catalog list with the given catalog manager. + pub fn new(catalog_manager: CatalogManagerRef) -> Self { + Self { catalog_manager } + } +} + +impl CatalogProviderList for DummyCatalogList { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + _name: String, + _catalog: Arc, + ) -> Option> { + None + } + + fn catalog_names(&self) -> Vec { + vec![] + } + + fn catalog(&self, catalog_name: &str) -> Option> { + Some(Arc::new(DummyCatalogProvider { + catalog_name: catalog_name.to_string(), + catalog_manager: self.catalog_manager.clone(), + })) + } +} + +/// A dummy catalog provider for [DummyCatalogList]. +#[derive(Clone)] +struct DummyCatalogProvider { + catalog_name: String, + catalog_manager: CatalogManagerRef, +} + +impl CatalogProvider for DummyCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + vec![] + } + + fn schema(&self, schema_name: &str) -> Option> { + Some(Arc::new(DummySchemaProvider { + catalog_name: self.catalog_name.clone(), + schema_name: schema_name.to_string(), + catalog_manager: self.catalog_manager.clone(), + })) + } +} + +/// A dummy schema provider for [DummyCatalogList]. +#[derive(Clone)] +struct DummySchemaProvider { + catalog_name: String, + schema_name: String, + catalog_manager: CatalogManagerRef, +} + +#[async_trait] +impl SchemaProvider for DummySchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + vec![] + } + + async fn table(&self, name: &str) -> datafusion::error::Result>> { + let table = self + .catalog_manager + .table(&self.catalog_name, &self.schema_name, name) + .await? + .with_context(|| TableNotExistSnafu { + table: format_full_table_name(&self.catalog_name, &self.schema_name, name), + })?; + + let table_provider: Arc = Arc::new(DfTableProviderAdapter::new(table)); + + Ok(Some(table_provider)) + } + + fn table_exist(&self, _name: &str) -> bool { + true + } +} diff --git a/src/catalog/src/table_source/memory_catalog.rs b/src/catalog/src/table_source/memory_catalog.rs deleted file mode 100644 index 21770d4f42d8..000000000000 --- a/src/catalog/src/table_source/memory_catalog.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Dummy catalog for region server. - -use std::any::Any; -use std::sync::Arc; - -use datafusion::catalog::{CatalogProvider, CatalogProviderList}; - -/// Resolve to the given `[CatalogProvider]` unconditionally. -#[derive(Clone)] -pub struct MemoryCatalogProviderList { - catalog: Arc, -} - -impl MemoryCatalogProviderList { - pub fn with_catalog_provider(catalog: Arc) -> Self { - Self { catalog } - } -} - -impl CatalogProviderList for MemoryCatalogProviderList { - fn as_any(&self) -> &dyn Any { - self - } - - fn register_catalog( - &self, - _name: String, - _catalog: Arc, - ) -> Option> { - None - } - - fn catalog_names(&self) -> Vec { - vec![] - } - - fn catalog(&self, _name: &str) -> Option> { - Some(self.catalog.clone()) - } -} diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs index 7e1cb05bb98d..bf1750eb1669 100644 --- a/src/common/meta/src/ddl/drop_database/cursor.rs +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -14,17 +14,21 @@ use std::any::Any; +use common_catalog::format_full_table_name; use common_procedure::Status; use futures::TryStreamExt; use serde::{Deserialize, Serialize}; -use table::metadata::TableId; +use snafu::OptionExt; +use table::metadata::{TableId, TableType}; use super::executor::DropDatabaseExecutor; use super::metadata::DropDatabaseRemoveMetadata; use super::DropTableTarget; +use crate::cache_invalidator::Context; use crate::ddl::drop_database::{DropDatabaseContext, State}; use crate::ddl::DdlContext; -use crate::error::Result; +use crate::error::{Result, TableInfoNotFoundSnafu}; +use crate::instruction::CacheIdent; use crate::key::table_route::TableRouteValue; use crate::table_name::TableName; @@ -101,6 +105,40 @@ impl DropDatabaseCursor { )), } } + + async fn handle_view( + &self, + ddl_ctx: &DdlContext, + ctx: &mut DropDatabaseContext, + table_name: String, + table_id: TableId, + ) -> Result<(Box, Status)> { + let view_name = TableName::new(&ctx.catalog, &ctx.schema, &table_name); + ddl_ctx + .table_metadata_manager + .destroy_view_info(table_id, &view_name) + .await?; + + let cache_invalidator = &ddl_ctx.cache_invalidator; + let ctx = Context { + subject: Some("Invalidate table cache by dropping table".to_string()), + }; + + cache_invalidator + .invalidate( + &ctx, + &[ + CacheIdent::TableName(view_name), + CacheIdent::TableId(table_id), + ], + ) + .await?; + + Ok(( + Box::new(DropDatabaseCursor::new(self.target)), + Status::executing(false), + )) + } } #[async_trait::async_trait] @@ -122,6 +160,20 @@ impl State for DropDatabaseCursor { match ctx.tables.as_mut().unwrap().try_next().await? { Some((table_name, table_name_value)) => { let table_id = table_name_value.table_id(); + + let table_info_value = ddl_ctx + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await? + .with_context(|| TableInfoNotFoundSnafu { + table: format_full_table_name(&ctx.catalog, &ctx.schema, &table_name), + })?; + + if table_info_value.table_info.table_type == TableType::View { + return self.handle_view(ddl_ctx, ctx, table_name, table_id).await; + } + match ddl_ctx .table_metadata_manager .table_route_manager() diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 933334c35baa..ad5bd60ce2d6 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -805,6 +805,33 @@ impl TableMetadataManager { Ok(()) } + fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result>> { + let mut keys = Vec::with_capacity(3); + let view_name = TableNameKey::new( + &view_name.catalog_name, + &view_name.schema_name, + &view_name.table_name, + ); + let table_info_key = TableInfoKey::new(view_id); + let view_info_key = ViewInfoKey::new(view_id); + keys.push(view_name.to_bytes()); + keys.push(table_info_key.to_bytes()); + keys.push(view_info_key.to_bytes()); + + Ok(keys) + } + + /// Deletes metadata for view **permanently**. + /// The caller MUST ensure it has the exclusive access to `ViewNameKey`. + pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> { + let keys = self.view_info_keys(view_id, view_name)?; + let _ = self + .kv_backend + .batch_delete(BatchDeleteRequest::new().with_keys(keys)) + .await?; + Ok(()) + } + /// Renames the table name and returns an error if different metadata exists. /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s, /// and the new `TableNameKey` MUST be empty. diff --git a/src/common/query/src/logical_plan.rs b/src/common/query/src/logical_plan.rs index 32996b3b6841..ecf1b5092f4e 100644 --- a/src/common/query/src/logical_plan.rs +++ b/src/common/query/src/logical_plan.rs @@ -75,10 +75,12 @@ pub fn create_aggregate_function( #[async_trait::async_trait] pub trait SubstraitPlanDecoder { /// Decode the `[LogicalPlan]` from bytes with the `[CatalogProviderList]`. + /// When `optimzie` is true, it will do the optimization for decoded plan. async fn decode( &self, message: bytes::Bytes, catalog_list: Arc, + optimize: bool, ) -> Result; } diff --git a/src/common/query/src/test_util.rs b/src/common/query/src/test_util.rs index 594be8a3dd4a..141c284a7baf 100644 --- a/src/common/query/src/test_util.rs +++ b/src/common/query/src/test_util.rs @@ -35,6 +35,7 @@ impl SubstraitPlanDecoder for DummyDecoder { &self, _message: bytes::Bytes, _catalog_list: Arc, + _optimize: bool, ) -> Result { unreachable!() } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 4093eb7a4cf1..5621531c6444 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -660,7 +660,7 @@ impl RegionServerInner { // decode substrait plan to logical plan and execute it let logical_plan = plan_decoder - .decode(Bytes::from(plan), catalog_list) + .decode(Bytes::from(plan), catalog_list, false) .await .context(DecodeLogicalPlanSnafu)?; diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 20aae47d0057..2c6e0238c01b 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -43,7 +43,7 @@ use lazy_static::lazy_static; use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::partition::{PartitionBound, PartitionDef}; use query::parser::QueryStatement; -use query::plan::extract_full_table_names; +use query::plan::extract_and_rewrite_full_table_names; use query::query_engine::DefaultSerializer; use query::sql::create_table_stmt; use regex::Regex; @@ -401,18 +401,21 @@ impl StatementExecutor { } }; - // Extract the table names before optimizing the plan - let table_names = extract_full_table_names(logical_plan.df_plan()) - .context(ExtractTableNamesSnafu)? - .into_iter() - .map(|t| t.into()) - .collect(); + // Extract the table names and rewrite the table names into full-qulified. + let (table_names, plan) = + extract_and_rewrite_full_table_names(logical_plan.unwrap_df_plan(), ctx.clone()) + .context(ExtractTableNamesSnafu)?; + + let table_names = table_names.into_iter().map(|t| t.into()).collect(); - let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan(); + // TODO(dennis): we don't save the optimized plan yet, + // because of our own defined plan node(such as `user`) serialization issue. + // When the issue is fixed, we can use the `optimized_plan` instead. + // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan(); // encode logical plan let encoded_plan = DFLogicalSubstraitConvertor - .encode(&optimized_plan, DefaultSerializer) + .encode(&plan, DefaultSerializer) .context(SubstraitCodecSnafu)?; let expr = expr_factory::to_create_view_expr( diff --git a/src/query/src/dist_plan.rs b/src/query/src/dist_plan.rs index 1f543e5b71d4..6ab93d4e1dba 100644 --- a/src/query/src/dist_plan.rs +++ b/src/query/src/dist_plan.rs @@ -18,5 +18,5 @@ mod merge_scan; mod planner; pub use analyzer::DistPlannerAnalyzer; -pub use merge_scan::{EncodedMergeScan, MergeScanExec, MergeScanLogicalPlan}; +pub use merge_scan::{MergeScanExec, MergeScanLogicalPlan}; pub use planner::DistExtensionPlanner; diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index b52a5ec98f05..59b2e1bde605 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -118,61 +118,6 @@ impl MergeScanLogicalPlan { &self.input } } - -/// The encoded `MergeScanLogicalPlan`, -/// used as a temporary container for decoding -#[derive(Debug, Hash, PartialEq, Eq, Clone)] -pub struct EncodedMergeScan { - pub input: Vec, - pub is_placeholder: bool, -} - -impl UserDefinedLogicalNodeCore for EncodedMergeScan { - fn name(&self) -> &str { - Self::name() - } - - // Prevent further optimization. - // The input can be retrieved by `self.input()` - fn inputs(&self) -> Vec<&LogicalPlan> { - unreachable!(); - } - - fn schema(&self) -> &datafusion_common::DFSchemaRef { - unreachable!(); - } - - // Prevent further optimization - fn expressions(&self) -> Vec { - unreachable!(); - } - - fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - f, - "EncodedMergeScan [is_placeholder={}]", - self.is_placeholder - ) - } - - fn from_template(&self, _exprs: &[datafusion_expr::Expr], _inputs: &[LogicalPlan]) -> Self { - self.clone() - } -} - -impl EncodedMergeScan { - pub fn new(input: Vec, is_placeholder: bool) -> Self { - Self { - input, - is_placeholder, - } - } - - pub fn name() -> &'static str { - "EncodedMergeScan" - } -} - pub struct MergeScanExec { table: TableName, regions: Vec, diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs index d836123cc18c..ec7545470526 100644 --- a/src/query/src/plan.rs +++ b/src/query/src/plan.rs @@ -15,15 +15,15 @@ use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Display}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::table_name::TableName; use common_query::prelude::ScalarValue; use datafusion::datasource::DefaultTableSource; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_common::{ParamValues, TableReference}; use datafusion_expr::LogicalPlan as DfLogicalPlan; use datatypes::data_type::ConcreteDataType; use datatypes::schema::Schema; +use session::context::QueryContextRef; use snafu::ResultExt; pub use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; @@ -115,18 +115,21 @@ impl From for LogicalPlan { } } -/// Visitor to extract table names from logical plan (TableScan node) -#[derive(Default)] -pub struct TableNamesExtractor { - pub table_names: HashSet, +struct TableNamesExtractAndRewriter { + pub(crate) table_names: HashSet, + query_ctx: QueryContextRef, } -impl TreeNodeVisitor for TableNamesExtractor { +impl TreeNodeRewriter for TableNamesExtractAndRewriter { type Node = DfLogicalPlan; - fn f_down(&mut self, node: &Self::Node) -> datafusion::error::Result { + /// descend + fn f_down<'a>( + &mut self, + node: Self::Node, + ) -> datafusion::error::Result> { match node { - DfLogicalPlan::TableScan(scan) => { + DfLogicalPlan::TableScan(mut scan) => { if let Some(source) = scan.source.as_any().downcast_ref::() { if let Some(provider) = source .table_provider @@ -158,33 +161,54 @@ impl TreeNodeVisitor for TableNamesExtractor { // TODO(ruihang): Maybe the following two cases should not be valid TableReference::Partial { schema, table } => { self.table_names.insert(TableName::new( - DEFAULT_CATALOG_NAME.to_string(), + self.query_ctx.current_catalog(), schema.to_string(), table.to_string(), )); + + scan.table_name = TableReference::Full { + catalog: self.query_ctx.current_catalog().into(), + schema: schema.clone(), + table: table.clone(), + }; } TableReference::Bare { table } => { self.table_names.insert(TableName::new( - DEFAULT_CATALOG_NAME.to_string(), - DEFAULT_SCHEMA_NAME.to_string(), + self.query_ctx.current_catalog(), + self.query_ctx.current_schema(), table.to_string(), )); + + scan.table_name = TableReference::Full { + catalog: self.query_ctx.current_catalog().into(), + schema: self.query_ctx.current_schema().into(), + table: table.clone(), + }; } } - - Ok(TreeNodeRecursion::Continue) + Ok(Transformed::yes(DfLogicalPlan::TableScan(scan))) } - _ => Ok(TreeNodeRecursion::Continue), + node => Ok(Transformed::no(node)), } } } -/// Extract fully resolved table names from logical plan. -/// Note:: it must be called before optimizing the plan. -pub fn extract_full_table_names(plan: &DfLogicalPlan) -> Result> { - let mut extractor = TableNamesExtractor::default(); - let _ = plan.visit(&mut extractor).context(DataFusionSnafu)?; - Ok(extractor.table_names) +impl TableNamesExtractAndRewriter { + fn new(query_ctx: QueryContextRef) -> Self { + Self { + query_ctx, + table_names: HashSet::new(), + } + } +} + +pub fn extract_and_rewrite_full_table_names( + plan: DfLogicalPlan, + query_ctx: QueryContextRef, +) -> Result<(HashSet, DfLogicalPlan)> { + let mut extractor = TableNamesExtractAndRewriter::new(query_ctx); + let plan = plan.rewrite(&mut extractor).context(DataFusionSnafu)?; + Ok((extractor.table_names, plan.data)) } #[cfg(test)] @@ -193,8 +217,10 @@ pub(crate) mod tests { use std::sync::Arc; use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; + use common_catalog::consts::DEFAULT_CATALOG_NAME; use datafusion::logical_expr::builder::LogicalTableSource; use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder}; + use session::context::QueryContextBuilder; use super::*; @@ -220,13 +246,23 @@ pub(crate) mod tests { #[test] fn test_extract_full_table_names() { - let table_names = extract_full_table_names(&mock_plan()).unwrap(); + let ctx = QueryContextBuilder::default() + .current_schema("test".to_string()) + .build(); + + let (table_names, plan) = + extract_and_rewrite_full_table_names(mock_plan(), Arc::new(ctx)).unwrap(); assert_eq!(1, table_names.len()); assert!(table_names.contains(&TableName::new( DEFAULT_CATALOG_NAME.to_string(), - DEFAULT_SCHEMA_NAME.to_string(), + "test".to_string(), "devices".to_string() ))); + + assert_eq!( + "Filter: devices.id > Int32(500)\n TableScan: greptime.test.devices", + format!("{:?}", plan) + ); } } diff --git a/src/query/src/query_engine/default_serializer.rs b/src/query/src/query_engine/default_serializer.rs index 8e12be68191b..0d8070b78184 100644 --- a/src/query/src/query_engine/default_serializer.rs +++ b/src/query/src/query_engine/default_serializer.rs @@ -18,15 +18,13 @@ use common_error::ext::BoxedError; use common_function::function_registry::FUNCTION_REGISTRY; use common_function::scalars::udf::create_udf; use common_query::logical_plan::SubstraitPlanDecoder; -use common_telemetry::error; use datafusion::catalog::CatalogProviderList; use datafusion::common::DataFusionError; use datafusion::error::Result; use datafusion::execution::context::SessionState; use datafusion::execution::registry::SerializerRegistry; use datafusion::execution::FunctionRegistry; -use datafusion::logical_expr::{Extension, LogicalPlan}; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion::logical_expr::LogicalPlan; use datafusion_expr::UserDefinedLogicalNode; use greptime_proto::substrait_extension::MergeScan as PbMergeScan; use prost::Message; @@ -35,7 +33,7 @@ use snafu::ResultExt; use substrait::extension_serializer::ExtensionSerializer; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; -use crate::dist_plan::{EncodedMergeScan, MergeScanLogicalPlan}; +use crate::dist_plan::MergeScanLogicalPlan; use crate::error::DataFusionSnafu; /// Extended `[substrait::extension_serializer::ExtensionSerializer]` but supports `[MergeScanLogicalPlan]` serialization. @@ -72,17 +70,11 @@ impl SerializerRegistry for DefaultSerializer { bytes: &[u8], ) -> Result> { if name == MergeScanLogicalPlan::name() { - let pb_merge_scan = - PbMergeScan::decode(bytes).map_err(|e| DataFusionError::External(Box::new(e)))?; - - let input = pb_merge_scan.input; - let is_placeholder = pb_merge_scan.is_placeholder; - - // Use `EncodedMergeScan` as a temporary container, - // it will be rewritten into `MergeScanLogicalPlan` by `SubstraitPlanDecoder`. - // We can't decode the logical plan here because we don't have - // the `SessionState` and `CatalogProviderList`. - Ok(Arc::new(EncodedMergeScan::new(input, is_placeholder))) + // TODO(dennis): missing `session_state` to decode the logical plan in `MergeScanLogicalPlan`, + // so we only save the unoptimized logical plan for view currently. + Err(DataFusionError::Substrait(format!( + "Unsupported plan node: {name}" + ))) } else { ExtensionSerializer.deserialize_logical_plan(name, bytes) } @@ -108,20 +100,6 @@ impl DefaultPlanDecoder { Ok(Self { session_state }) } - - /// Rewrites `[EncodedMergeScan]` to `[MergeScanLogicalPlan]`. - fn rewrite_merge_scan( - &self, - plan: LogicalPlan, - catalog_list: Arc, - ) -> crate::error::Result { - let mut rewriter = MergeScanRewriter { - session_state: self.session_state.clone(), - catalog_list, - }; - - Ok(plan.rewrite(&mut rewriter).context(DataFusionSnafu)?.data) - } } #[async_trait::async_trait] @@ -130,6 +108,7 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder { &self, message: bytes::Bytes, catalog_list: Arc, + optimize: bool, ) -> common_query::error::Result { // The session_state already has the `DefaultSerialzier` as `SerializerRegistry`. let logical_plan = DFLogicalSubstraitConvertor @@ -138,59 +117,12 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder { .map_err(BoxedError::new) .context(common_query::error::DecodePlanSnafu)?; - self.rewrite_merge_scan(logical_plan, catalog_list) - .map_err(BoxedError::new) - .context(common_query::error::DecodePlanSnafu) - } -} - -struct MergeScanRewriter { - catalog_list: Arc, - session_state: SessionState, -} - -impl TreeNodeRewriter for MergeScanRewriter { - type Node = LogicalPlan; - - /// descend - fn f_down<'a>(&mut self, node: Self::Node) -> Result> { - match node { - LogicalPlan::Extension(Extension { node }) => { - if node.name() == EncodedMergeScan::name() { - let encoded_merge_scan = node - .as_any() - .downcast_ref::() - .expect("Failed to downcast to EncodedMergeScan"); - let catalog_list = self.catalog_list.clone(); - let session_state = self.session_state.clone(); - let input = encoded_merge_scan.input.clone(); - - // FIXME(dennis): it's ugly, is there a better way? - let input = std::thread::spawn(move || { - common_runtime::block_on_bg(async move { - DFLogicalSubstraitConvertor - .decode(&input[..], catalog_list, session_state) - .await - .map_err(|e| DataFusionError::External(Box::new(e))) - }) - }) - .join() - .map_err(|e| { - error!(e; "Failed to join thread when decoding logical plan"); - DataFusionError::Substrait("Failed to decode EncodedMergeScan".to_string()) - })??; - - Ok(Transformed::yes(LogicalPlan::Extension(Extension { - node: Arc::new(MergeScanLogicalPlan::new( - input, - encoded_merge_scan.is_placeholder, - )), - }))) - } else { - Ok(Transformed::no(LogicalPlan::Extension(Extension { node }))) - } - } - node => Ok(Transformed::no(node)), + if optimize { + self.session_state + .optimize(&logical_plan) + .context(common_query::error::GeneralDataFusionSnafu) + } else { + Ok(logical_plan) } } } @@ -212,10 +144,7 @@ mod tests { let engine = factory.query_engine(); - let plan = MergeScanLogicalPlan::new(mock_plan(), false); - let plan = LogicalPlan::Extension(Extension { - node: Arc::new(plan), - }); + let plan = mock_plan(); let bytes = DFLogicalSubstraitConvertor .encode(&plan, DefaultSerializer) @@ -228,23 +157,15 @@ mod tests { let table_provider = Arc::new(mock_table_provider(1.into())); let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider)); - let decode_plan = plan_decoder.decode(bytes, catalog_list).await.unwrap(); - - match decode_plan { - LogicalPlan::Extension(Extension { node }) => { - assert_eq!("MergeScan", node.name()); + let decode_plan = plan_decoder + .decode(bytes, catalog_list, false) + .await + .unwrap(); - let merge_scan = node - .as_any() - .downcast_ref::() - .expect("Failed to downcast to MergeScanLogicalPlan"); - assert_eq!( - "Filter: devices.k0 > Int32(500) + assert_eq!( + "Filter: devices.k0 > Int32(500) TableScan: devices projection=[k0, ts, v0]", - format!("{:?}", *merge_scan.input()), - ); - } - _ => unreachable!(), - } + format!("{:?}", decode_plan), + ); } } diff --git a/tests/cases/standalone/common/view/create.result b/tests/cases/standalone/common/view/create.result index d94732991eb5..855eb08bb7ed 100644 --- a/tests/cases/standalone/common/view/create.result +++ b/tests/cases/standalone/common/view/create.result @@ -1,9 +1,9 @@ --- test CREATE VIEW --- -CREATE DATABASE for_test_view; +CREATE DATABASE schema_for_view_test; Affected Rows: 1 -USE for_test_view; +USE schema_for_view_test; Affected Rows: 0 @@ -22,17 +22,17 @@ Error: 2000(InvalidSyntax), sql parser error: Expected SELECT, VALUES, or a subq --- Table already exists --- CREATE VIEW test_table as SELECT * FROM public.numbers; -Error: 4000(TableAlreadyExists), Table already exists: `greptime.for_test_view.test_table` +Error: 4000(TableAlreadyExists), Table already exists: `greptime.schema_for_view_test.test_table` --- Table already exists even when create_if_not_exists --- CREATE VIEW IF NOT EXISTS test_table as SELECT * FROM public.numbers; -Error: 4000(TableAlreadyExists), Table already exists: `greptime.for_test_view.test_table` +Error: 4000(TableAlreadyExists), Table already exists: `greptime.schema_for_view_test.test_table` --- Table already exists even when or_replace --- CREATE OR REPLACE VIEW test_table as SELECT * FROM public.numbers; -Error: 4000(TableAlreadyExists), Table already exists: `greptime.for_test_view.test_table` +Error: 4000(TableAlreadyExists), Table already exists: `greptime.schema_for_view_test.test_table` CREATE VIEW test_view as SELECT * FROM public.numbers; @@ -41,7 +41,7 @@ Affected Rows: 0 --- View already exists ---- CREATE VIEW test_view as SELECT * FROM public.numbers; -Error: 4000(TableAlreadyExists), View already exists: `greptime.for_test_view.test_view` +Error: 4000(TableAlreadyExists), View already exists: `greptime.schema_for_view_test.test_view` CREATE VIEW IF NOT EXISTS test_view as SELECT * FROM public.numbers; @@ -72,51 +72,51 @@ SHOW FULL TABLES; -- SQLNESS REPLACE (\s\d+\s) ID SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; -+---------------+--------------------+---------------------------------------+-----------------+----------+-------------+ -| table_catalog | table_schema | table_name | table_type | table_id | engine | -+---------------+--------------------+---------------------------------------+-----------------+----------+-------------+ -| greptime | information_schema | build_info | LOCAL TEMPORARY |ID | | -| greptime | information_schema | character_sets | LOCAL TEMPORARY |ID | | -| greptime | information_schema | check_constraints | LOCAL TEMPORARY |ID | | -| greptime | information_schema | cluster_info | LOCAL TEMPORARY |ID | | -| greptime | information_schema | collation_character_set_applicability | LOCAL TEMPORARY |ID | | -| greptime | information_schema | collations | LOCAL TEMPORARY |ID | | -| greptime | information_schema | column_privileges | LOCAL TEMPORARY |ID | | -| greptime | information_schema | column_statistics | LOCAL TEMPORARY |ID | | -| greptime | information_schema | columns | LOCAL TEMPORARY |ID | | -| greptime | information_schema | engines | LOCAL TEMPORARY |ID | | -| greptime | information_schema | events | LOCAL TEMPORARY |ID | | -| greptime | information_schema | files | LOCAL TEMPORARY |ID | | -| greptime | information_schema | global_status | LOCAL TEMPORARY |ID | | -| greptime | information_schema | key_column_usage | LOCAL TEMPORARY |ID | | -| greptime | public | numbers | LOCAL TEMPORARY |ID | test_engine | -| greptime | information_schema | optimizer_trace | LOCAL TEMPORARY |ID | | -| greptime | information_schema | parameters | LOCAL TEMPORARY |ID | | -| greptime | information_schema | partitions | LOCAL TEMPORARY |ID | | -| greptime | information_schema | profiling | LOCAL TEMPORARY |ID | | -| greptime | information_schema | referential_constraints | LOCAL TEMPORARY |ID | | -| greptime | information_schema | region_peers | LOCAL TEMPORARY |ID | | -| greptime | information_schema | routines | LOCAL TEMPORARY |ID | | -| greptime | information_schema | runtime_metrics | LOCAL TEMPORARY |ID | | -| greptime | information_schema | schema_privileges | LOCAL TEMPORARY |ID | | -| greptime | information_schema | schemata | LOCAL TEMPORARY |ID | | -| greptime | information_schema | session_status | LOCAL TEMPORARY |ID | | -| greptime | information_schema | table_constraints | LOCAL TEMPORARY |ID | | -| greptime | information_schema | table_privileges | LOCAL TEMPORARY |ID | | -| greptime | information_schema | tables | LOCAL TEMPORARY |ID | | -| greptime | for_test_view | test_table | BASE TABLE |ID | mito | -| greptime | for_test_view | test_view | VIEW |ID | | -| greptime | information_schema | triggers | LOCAL TEMPORARY |ID | | -+---------------+--------------------+---------------------------------------+-----------------+----------+-------------+ ++---------------+----------------------+---------------------------------------+-----------------+----------+-------------+ +| table_catalog | table_schema | table_name | table_type | table_id | engine | ++---------------+----------------------+---------------------------------------+-----------------+----------+-------------+ +| greptime | information_schema | build_info | LOCAL TEMPORARY |ID | | +| greptime | information_schema | character_sets | LOCAL TEMPORARY |ID | | +| greptime | information_schema | check_constraints | LOCAL TEMPORARY |ID | | +| greptime | information_schema | cluster_info | LOCAL TEMPORARY |ID | | +| greptime | information_schema | collation_character_set_applicability | LOCAL TEMPORARY |ID | | +| greptime | information_schema | collations | LOCAL TEMPORARY |ID | | +| greptime | information_schema | column_privileges | LOCAL TEMPORARY |ID | | +| greptime | information_schema | column_statistics | LOCAL TEMPORARY |ID | | +| greptime | information_schema | columns | LOCAL TEMPORARY |ID | | +| greptime | information_schema | engines | LOCAL TEMPORARY |ID | | +| greptime | information_schema | events | LOCAL TEMPORARY |ID | | +| greptime | information_schema | files | LOCAL TEMPORARY |ID | | +| greptime | information_schema | global_status | LOCAL TEMPORARY |ID | | +| greptime | information_schema | key_column_usage | LOCAL TEMPORARY |ID | | +| greptime | public | numbers | LOCAL TEMPORARY |ID | test_engine | +| greptime | information_schema | optimizer_trace | LOCAL TEMPORARY |ID | | +| greptime | information_schema | parameters | LOCAL TEMPORARY |ID | | +| greptime | information_schema | partitions | LOCAL TEMPORARY |ID | | +| greptime | information_schema | profiling | LOCAL TEMPORARY |ID | | +| greptime | information_schema | referential_constraints | LOCAL TEMPORARY |ID | | +| greptime | information_schema | region_peers | LOCAL TEMPORARY |ID | | +| greptime | information_schema | routines | LOCAL TEMPORARY |ID | | +| greptime | information_schema | runtime_metrics | LOCAL TEMPORARY |ID | | +| greptime | information_schema | schema_privileges | LOCAL TEMPORARY |ID | | +| greptime | information_schema | schemata | LOCAL TEMPORARY |ID | | +| greptime | information_schema | session_status | LOCAL TEMPORARY |ID | | +| greptime | information_schema | table_constraints | LOCAL TEMPORARY |ID | | +| greptime | information_schema | table_privileges | LOCAL TEMPORARY |ID | | +| greptime | information_schema | tables | LOCAL TEMPORARY |ID | | +| greptime | schema_for_view_test | test_table | BASE TABLE |ID | mito | +| greptime | schema_for_view_test | test_view | VIEW |ID | | +| greptime | information_schema | triggers | LOCAL TEMPORARY |ID | | ++---------------+----------------------+---------------------------------------+-----------------+----------+-------------+ -- SQLNESS REPLACE (\s\d+\s) ID SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'VIEW'; -+---------------+---------------+------------+------------+----------+--------+ -| table_catalog | table_schema | table_name | table_type | table_id | engine | -+---------------+---------------+------------+------------+----------+--------+ -| greptime | for_test_view | test_view | VIEW |ID | | -+---------------+---------------+------------+------------+----------+--------+ ++---------------+----------------------+------------+------------+----------+--------+ +| table_catalog | table_schema | table_name | table_type | table_id | engine | ++---------------+----------------------+------------+------------+----------+--------+ +| greptime | schema_for_view_test | test_view | VIEW |ID | | ++---------------+----------------------+------------+------------+----------+--------+ SHOW COLUMNS FROM test_view; @@ -154,7 +154,7 @@ USE public; Affected Rows: 0 -DROP DATABASE for_test_view; +DROP DATABASE schema_for_view_test; Affected Rows: 0 diff --git a/tests/cases/standalone/common/view/create.sql b/tests/cases/standalone/common/view/create.sql index 8b5f4e1bce14..a778180939a8 100644 --- a/tests/cases/standalone/common/view/create.sql +++ b/tests/cases/standalone/common/view/create.sql @@ -1,8 +1,8 @@ --- test CREATE VIEW --- -CREATE DATABASE for_test_view; +CREATE DATABASE schema_for_view_test; -USE for_test_view; +USE schema_for_view_test; CREATE TABLE test_table(a STRING, ts TIMESTAMP TIME INDEX); @@ -48,4 +48,4 @@ SELECT * FROM test_view LIMIT 10; USE public; -DROP DATABASE for_test_view; +DROP DATABASE schema_for_view_test;