Skip to content

Commit

Permalink
fix: fail to decode plan because of invalid table names
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed May 21, 2024
1 parent 60b428e commit 55b1545
Show file tree
Hide file tree
Showing 15 changed files with 365 additions and 344 deletions.
51 changes: 5 additions & 46 deletions src/catalog/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use std::sync::Arc;
use bytes::Bytes;
use common_catalog::format_full_table_name;
use common_query::logical_plan::SubstraitPlanDecoderRef;
use datafusion::catalog::schema::MemorySchemaProvider;
use datafusion::catalog::MemoryCatalogProvider;
use datafusion::common::{ResolvedTableReference, TableReference};
use datafusion::datasource::view::ViewTable;
use datafusion::datasource::{provider_as_source, TableProvider};
Expand All @@ -28,9 +26,8 @@ use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
mod memory_catalog;
use datafusion::catalog::CatalogProvider;
use memory_catalog::MemoryCatalogProviderList;
mod dummy_catalog;
use dummy_catalog::DummyCatalogList;

use crate::error::{
CastManagerSnafu, DatafusionSnafu, DecodePlanSnafu, GetViewCacheSnafu, QueryAccessDeniedSnafu,
Expand Down Expand Up @@ -127,49 +124,10 @@ impl DfTableSourceProvider {
})?;

// Build the catalog list provider for deserialization.
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
let catalog_list = Arc::new(MemoryCatalogProviderList::with_catalog_provider(
catalog_provider.clone(),
));

for table_name in &view_info.table_names {
// We don't support cross-catalog views.
debug_assert_eq!(catalog_name, &table_name.catalog_name);

let catalog_name = &table_name.catalog_name;
let schema_name = &table_name.schema_name;
let table_name = &table_name.table_name;

let table = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await?
.with_context(|| TableNotExistSnafu {
table: format_full_table_name(catalog_name, schema_name, table_name),
})?;

let schema_provider =
if let Some(schema_provider) = catalog_provider.schema(schema_name) {
schema_provider
} else {
let schema_provider = Arc::new(MemorySchemaProvider::new());
let _ = catalog_provider
.register_schema(schema_name, schema_provider.clone())
.context(DatafusionSnafu)?;
schema_provider
};

let table_provider: Arc<dyn TableProvider> =
Arc::new(DfTableProviderAdapter::new(table));

let _ = schema_provider
.register_table(table_name.to_string(), table_provider)
.context(DatafusionSnafu)?;
}

let catalog_list = Arc::new(DummyCatalogList::new(self.catalog_manager.clone()));
let logical_plan = self
.plan_decoder
.decode(Bytes::from(view_info.view_info.clone()), catalog_list)
.decode(Bytes::from(view_info.view_info.clone()), catalog_list, true)
.await
.context(DecodePlanSnafu {
name: &table.table_info().name,
Expand Down Expand Up @@ -267,6 +225,7 @@ mod tests {
&self,
_message: bytes::Bytes,
_catalog_list: Arc<dyn CatalogProviderList>,
_optimize: bool,
) -> QueryResult<LogicalPlan> {
Ok(mock_plan())
}
Expand Down
129 changes: 129 additions & 0 deletions src/catalog/src/table_source/dummy_catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Dummy catalog for region server.
use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
use common_catalog::format_full_table_name;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::datasource::TableProvider;
use snafu::OptionExt;
use table::table::adapter::DfTableProviderAdapter;

use crate::error::TableNotExistSnafu;
use crate::CatalogManagerRef;

/// Delegate the resolving rquests to the `[CatalogManager]` unconditionally.
#[derive(Clone)]
pub struct DummyCatalogList {
catalog_manager: CatalogManagerRef,
}

impl DummyCatalogList {
/// Creates a new catalog list with the given catalog manager.
pub fn new(catalog_manager: CatalogManagerRef) -> Self {
Self { catalog_manager }
}
}

impl CatalogProviderList for DummyCatalogList {
fn as_any(&self) -> &dyn Any {
self
}

fn register_catalog(
&self,
_name: String,
_catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
None
}

fn catalog_names(&self) -> Vec<String> {
vec![]
}

fn catalog(&self, catalog_name: &str) -> Option<Arc<dyn CatalogProvider>> {
Some(Arc::new(DummyCatalogProvider {
catalog_name: catalog_name.to_string(),
catalog_manager: self.catalog_manager.clone(),
}))
}
}

/// A dummy catalog provider for [DummyCatalogList].
#[derive(Clone)]
struct DummyCatalogProvider {
catalog_name: String,
catalog_manager: CatalogManagerRef,
}

impl CatalogProvider for DummyCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
vec![]
}

fn schema(&self, schema_name: &str) -> Option<Arc<dyn SchemaProvider>> {
Some(Arc::new(DummySchemaProvider {
catalog_name: self.catalog_name.clone(),
schema_name: schema_name.to_string(),
catalog_manager: self.catalog_manager.clone(),
}))
}
}

/// A dummy schema provider for [DummyCatalogList].
#[derive(Clone)]
struct DummySchemaProvider {
catalog_name: String,
schema_name: String,
catalog_manager: CatalogManagerRef,
}

#[async_trait]
impl SchemaProvider for DummySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
vec![]
}

async fn table(&self, name: &str) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
let table = self
.catalog_manager
.table(&self.catalog_name, &self.schema_name, name)
.await?
.with_context(|| TableNotExistSnafu {
table: format_full_table_name(&self.catalog_name, &self.schema_name, name),
})?;

let table_provider: Arc<dyn TableProvider> = Arc::new(DfTableProviderAdapter::new(table));

Ok(Some(table_provider))
}

fn table_exist(&self, _name: &str) -> bool {
true
}
}
54 changes: 0 additions & 54 deletions src/catalog/src/table_source/memory_catalog.rs

This file was deleted.

56 changes: 54 additions & 2 deletions src/common/meta/src/ddl/drop_database/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@

use std::any::Any;

use common_catalog::format_full_table_name;
use common_procedure::Status;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use table::metadata::TableId;
use snafu::OptionExt;
use table::metadata::{TableId, TableType};

use super::executor::DropDatabaseExecutor;
use super::metadata::DropDatabaseRemoveMetadata;
use super::DropTableTarget;
use crate::cache_invalidator::Context;
use crate::ddl::drop_database::{DropDatabaseContext, State};
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::error::{Result, TableInfoNotFoundSnafu};
use crate::instruction::CacheIdent;
use crate::key::table_route::TableRouteValue;
use crate::table_name::TableName;

Expand Down Expand Up @@ -101,6 +105,40 @@ impl DropDatabaseCursor {
)),
}
}

async fn handle_view(
&self,
ddl_ctx: &DdlContext,
ctx: &mut DropDatabaseContext,
table_name: String,
table_id: TableId,
) -> Result<(Box<dyn State>, Status)> {
let view_name = TableName::new(&ctx.catalog, &ctx.schema, &table_name);
ddl_ctx
.table_metadata_manager
.destroy_view_info(table_id, &view_name)
.await?;

let cache_invalidator = &ddl_ctx.cache_invalidator;
let ctx = Context {
subject: Some("Invalidate table cache by dropping table".to_string()),
};

cache_invalidator
.invalidate(
&ctx,
&[
CacheIdent::TableName(view_name),
CacheIdent::TableId(table_id),
],
)
.await?;

Ok((
Box::new(DropDatabaseCursor::new(self.target)),
Status::executing(false),
))
}
}

#[async_trait::async_trait]
Expand All @@ -122,6 +160,20 @@ impl State for DropDatabaseCursor {
match ctx.tables.as_mut().unwrap().try_next().await? {
Some((table_name, table_name_value)) => {
let table_id = table_name_value.table_id();

let table_info_value = ddl_ctx
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await?
.with_context(|| TableInfoNotFoundSnafu {
table: format_full_table_name(&ctx.catalog, &ctx.schema, &table_name),
})?;

if table_info_value.table_info.table_type == TableType::View {
return self.handle_view(ddl_ctx, ctx, table_name, table_id).await;
}

match ddl_ctx
.table_metadata_manager
.table_route_manager()
Expand Down
27 changes: 27 additions & 0 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,33 @@ impl TableMetadataManager {
Ok(())
}

fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
let mut keys = Vec::with_capacity(3);
let view_name = TableNameKey::new(
&view_name.catalog_name,
&view_name.schema_name,
&view_name.table_name,
);
let table_info_key = TableInfoKey::new(view_id);
let view_info_key = ViewInfoKey::new(view_id);
keys.push(view_name.to_bytes());
keys.push(table_info_key.to_bytes());
keys.push(view_info_key.to_bytes());

Ok(keys)
}

/// Deletes metadata for view **permanently**.
/// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
let keys = self.view_info_keys(view_id, view_name)?;
let _ = self
.kv_backend
.batch_delete(BatchDeleteRequest::new().with_keys(keys))
.await?;
Ok(())
}

/// Renames the table name and returns an error if different metadata exists.
/// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
/// and the new `TableNameKey` MUST be empty.
Expand Down
2 changes: 2 additions & 0 deletions src/common/query/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ pub fn create_aggregate_function(
#[async_trait::async_trait]
pub trait SubstraitPlanDecoder {
/// Decode the `[LogicalPlan]` from bytes with the `[CatalogProviderList]`.
/// When `optimzie` is true, it will do the optimization for decoded plan.
async fn decode(
&self,
message: bytes::Bytes,
catalog_list: Arc<dyn CatalogProviderList>,
optimize: bool,
) -> Result<LogicalPlan>;
}

Expand Down
Loading

0 comments on commit 55b1545

Please sign in to comment.