Skip to content

Commit

Permalink
feat: show create view and creating view with columns (#4086)
Browse files Browse the repository at this point in the history
* feat: parse column names when creating view

* feat: save the view definition into view info

* feat: supports view columns and show create view

* feat: save plan columns for validation

* fix: typo

* chore: comments and style

* chore: apply suggestions

* test: assert CreateView display result

* chore: style

Co-authored-by: Weny Xu <[email protected]>

* chore: avoid the clone

Co-authored-by: Weny Xu <[email protected]>

* fix: compile error after rebeasing

---------

Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
killme2008 and WenyXu authored Jul 9, 2024
1 parent 458e5d7 commit 33ed745
Show file tree
Hide file tree
Showing 33 changed files with 1,371 additions and 158 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ etcd-client = { version = "0.13" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b83f00958fe4cbc77b85b7407bca206e98bdc845" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5c801650435d464891114502539b701c77a1b914" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
41 changes: 40 additions & 1 deletion src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"View plan columns changed from: {} to: {}",
origin_names,
actual_names
))]
ViewPlanColumnsChanged {
origin_names: String,
actual_names: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to find table partitions"))]
FindPartitions { source: partition::error::Error },

Expand Down Expand Up @@ -173,6 +185,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to project view columns"))]
ProjectViewColumns {
#[snafu(source)]
error: DataFusionError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Table metadata manager error"))]
TableMetadataManager {
source: common_meta::error::Error,
Expand Down Expand Up @@ -208,6 +228,21 @@ pub enum Error {
},
}

impl Error {
pub fn should_fail(&self) -> bool {
use Error::*;

matches!(
self,
GetViewCache { .. }
| ViewInfoNotFound { .. }
| DecodePlan { .. }
| ViewPlanColumnsChanged { .. }
| ProjectViewColumns { .. }
)
}
}

pub type Result<T> = std::result::Result<T, Error>;

impl ErrorExt for Error {
Expand All @@ -220,6 +255,8 @@ impl ErrorExt for Error {
| Error::CacheNotFound { .. }
| Error::CastManager { .. } => StatusCode::Unexpected,

Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments,

Error::ViewInfoNotFound { .. } => StatusCode::TableNotFound,

Error::SystemCatalog { .. } => StatusCode::StorageUnavailable,
Expand All @@ -245,7 +282,9 @@ impl ErrorExt for Error {
}

Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
Error::Datafusion { .. } => StatusCode::EngineExecuteQuery,
Error::ProjectViewColumns { .. } | Error::Datafusion { .. } => {
StatusCode::EngineExecuteQuery
}
Error::TableMetadataManager { source, .. } => source.status_code(),
Error::GetViewCache { source, .. } | Error::GetTableCache { source, .. } => {
source.status_code()
Expand Down
133 changes: 101 additions & 32 deletions src/catalog/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,24 @@ use std::sync::Arc;

use bytes::Bytes;
use common_catalog::format_full_table_name;
use common_query::logical_plan::SubstraitPlanDecoderRef;
use common_query::logical_plan::{rename_logical_plan_columns, SubstraitPlanDecoderRef};
use datafusion::common::{ResolvedTableReference, TableReference};
use datafusion::datasource::view::ViewTable;
use datafusion::datasource::{provider_as_source, TableProvider};
use datafusion::logical_expr::TableSource;
use itertools::Itertools;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
mod dummy_catalog;
use dummy_catalog::DummyCatalogList;
use table::TableRef;

use crate::error::{
CastManagerSnafu, DatafusionSnafu, DecodePlanSnafu, GetViewCacheSnafu, QueryAccessDeniedSnafu,
Result, TableNotExistSnafu, ViewInfoNotFoundSnafu,
CastManagerSnafu, DatafusionSnafu, DecodePlanSnafu, GetViewCacheSnafu, ProjectViewColumnsSnafu,
QueryAccessDeniedSnafu, Result, TableNotExistSnafu, ViewInfoNotFoundSnafu,
ViewPlanColumnsChangedSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::CatalogManagerRef;
Expand All @@ -43,6 +46,7 @@ pub struct DfTableSourceProvider {
default_catalog: String,
default_schema: String,
plan_decoder: SubstraitPlanDecoderRef,
enable_ident_normalization: bool,
}

impl DfTableSourceProvider {
Expand All @@ -51,6 +55,7 @@ impl DfTableSourceProvider {
disallow_cross_catalog_query: bool,
query_ctx: &QueryContext,
plan_decoder: SubstraitPlanDecoderRef,
enable_ident_normalization: bool,
) -> Self {
Self {
catalog_manager,
Expand All @@ -59,6 +64,7 @@ impl DfTableSourceProvider {
default_catalog: query_ctx.current_catalog().to_owned(),
default_schema: query_ctx.current_schema(),
plan_decoder,
enable_ident_normalization,
}
}

Expand Down Expand Up @@ -108,32 +114,7 @@ impl DfTableSourceProvider {
})?;

let provider: Arc<dyn TableProvider> = if table.table_info().table_type == TableType::View {
let catalog_manager = self
.catalog_manager
.as_any()
.downcast_ref::<KvBackendCatalogManager>()
.context(CastManagerSnafu)?;

let view_info = catalog_manager
.view_info_cache()?
.get(table.table_info().ident.table_id)
.await
.context(GetViewCacheSnafu)?
.context(ViewInfoNotFoundSnafu {
name: &table.table_info().name,
})?;

// Build the catalog list provider for deserialization.
let catalog_list = Arc::new(DummyCatalogList::new(self.catalog_manager.clone()));
let logical_plan = self
.plan_decoder
.decode(Bytes::from(view_info.view_info.clone()), catalog_list, true)
.await
.context(DecodePlanSnafu {
name: &table.table_info().name,
})?;

Arc::new(ViewTable::try_new(logical_plan, None).context(DatafusionSnafu)?)
self.create_view_provider(&table).await?
} else {
Arc::new(DfTableProviderAdapter::new(table))
};
Expand All @@ -143,6 +124,80 @@ impl DfTableSourceProvider {
let _ = self.resolved_tables.insert(resolved_name, source.clone());
Ok(source)
}

async fn create_view_provider(&self, table: &TableRef) -> Result<Arc<dyn TableProvider>> {
let catalog_manager = self
.catalog_manager
.as_any()
.downcast_ref::<KvBackendCatalogManager>()
.context(CastManagerSnafu)?;

let view_info = catalog_manager
.view_info_cache()?
.get(table.table_info().ident.table_id)
.await
.context(GetViewCacheSnafu)?
.context(ViewInfoNotFoundSnafu {
name: &table.table_info().name,
})?;

// Build the catalog list provider for deserialization.
let catalog_list = Arc::new(DummyCatalogList::new(self.catalog_manager.clone()));
let logical_plan = self
.plan_decoder
.decode(Bytes::from(view_info.view_info.clone()), catalog_list, true)
.await
.context(DecodePlanSnafu {
name: &table.table_info().name,
})?;

let columns: Vec<_> = view_info.columns.iter().map(|c| c.as_str()).collect();

let original_plan_columns: Vec<_> =
view_info.plan_columns.iter().map(|c| c.as_str()).collect();

let plan_columns: Vec<_> = logical_plan
.schema()
.columns()
.into_iter()
.map(|c| c.name)
.collect();

// Only check columns number, because substrait doesn't include aliases currently.
// See https://github.com/apache/datafusion/issues/10815#issuecomment-2158666881
// and https://github.com/apache/datafusion/issues/6489
// TODO(dennis): check column names
ensure!(
original_plan_columns.len() == plan_columns.len(),
ViewPlanColumnsChangedSnafu {
origin_names: original_plan_columns.iter().join(","),
actual_names: plan_columns.iter().join(","),
}
);

// We have to do `columns` projection here, because
// substrait doesn't include aliases neither for tables nor for columns:
// https://github.com/apache/datafusion/issues/10815#issuecomment-2158666881
let logical_plan = if !columns.is_empty() {
rename_logical_plan_columns(
self.enable_ident_normalization,
logical_plan,
plan_columns
.iter()
.map(|c| c.as_str())
.zip(columns.into_iter())
.collect(),
)
.context(ProjectViewColumnsSnafu)?
} else {
logical_plan
};

Ok(Arc::new(
ViewTable::try_new(logical_plan, Some(view_info.definition.to_string()))
.context(DatafusionSnafu)?,
))
}
}

#[cfg(test)]
Expand All @@ -162,6 +217,7 @@ mod tests {
true,
query_ctx,
DummyDecoder::arc(),
true,
);

let table_ref = TableReference::bare("table_name");
Expand Down Expand Up @@ -277,19 +333,32 @@ mod tests {
let logical_plan = vec![1, 2, 3];
// Create view metadata
table_metadata_manager
.create_view_metadata(view_info.clone().into(), logical_plan, HashSet::new())
.create_view_metadata(
view_info.clone().into(),
logical_plan,
HashSet::new(),
vec!["a".to_string(), "b".to_string()],
vec!["id".to_string(), "name".to_string()],
"definition".to_string(),
)
.await
.unwrap();

let mut table_provider =
DfTableSourceProvider::new(catalog_manager, true, query_ctx, MockDecoder::arc());
DfTableSourceProvider::new(catalog_manager, true, query_ctx, MockDecoder::arc(), true);

// View not found
let table_ref = TableReference::bare("not_exists_view");
assert!(table_provider.resolve_table(table_ref).await.is_err());

let table_ref = TableReference::bare(view_info.name);
let source = table_provider.resolve_table(table_ref).await.unwrap();
assert_eq!(*source.get_logical_plan().unwrap(), mock_plan());
assert_eq!(
r#"
Projection: person.id AS a, person.name AS b
Filter: person.id > Int32(500)
TableScan: person"#,
format!("\n{:?}", source.get_logical_plan().unwrap())
);
}
}
11 changes: 9 additions & 2 deletions src/common/meta/src/cache/table/view_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ fn invalidator<'a>(
ident: &'a CacheIdent,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
if let CacheIdent::TableId(table_id) = ident {
cache.invalidate(table_id).await
if let CacheIdent::TableId(view_id) = ident {
cache.invalidate(view_id).await
}
Ok(())
})
Expand Down Expand Up @@ -111,13 +111,17 @@ mod tests {
});
set
};
let definition = "CREATE VIEW test AS SELECT * FROM numbers";

task.view_info.ident.table_id = 1024;
table_metadata_manager
.create_view_metadata(
task.view_info.clone(),
task.create_view.logical_plan.clone(),
table_names,
vec!["a".to_string()],
vec!["number".to_string()],
definition.to_string(),
)
.await
.unwrap();
Expand All @@ -132,6 +136,9 @@ mod tests {
.map(|t| t.clone().into())
.collect::<HashSet<_>>()
);
assert_eq!(view_info.definition, task.create_view.definition);
assert_eq!(view_info.columns, task.create_view.columns);
assert_eq!(view_info.plan_columns, task.create_view.plan_columns);

assert!(cache.contains_key(&1024));
cache
Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteKey;
use crate::key::view_info::ViewInfoKey;
use crate::key::MetaKey;

/// KvBackend cache invalidator
Expand Down Expand Up @@ -76,6 +77,9 @@ where

let key = TableRouteKey::new(*table_id);
self.invalidate_key(&key.to_bytes()).await;

let key = ViewInfoKey::new(*table_id);
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::TableName(table_name) => {
let key: TableNameKey = table_name.into();
Expand Down
16 changes: 15 additions & 1 deletion src/common/meta/src/ddl/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,20 @@ impl CreateViewProcedure {
})?;
let new_logical_plan = self.data.task.raw_logical_plan().clone();
let table_names = self.data.task.table_names();
let columns = self.data.task.columns().clone();
let plan_columns = self.data.task.plan_columns().clone();
let new_view_definition = self.data.task.view_definition().to_string();

manager
.update_view_info(view_id, &current_view_info, new_logical_plan, table_names)
.update_view_info(
view_id,
&current_view_info,
new_logical_plan,
table_names,
columns,
plan_columns,
new_view_definition,
)
.await?;

info!("Updated view metadata for view {view_id}");
Expand All @@ -210,6 +221,9 @@ impl CreateViewProcedure {
raw_view_info,
self.data.task.raw_logical_plan().clone(),
self.data.task.table_names(),
self.data.task.columns().clone(),
self.data.task.plan_columns().clone(),
self.data.task.view_definition().to_string(),
)
.await?;

Expand Down
Loading

0 comments on commit 33ed745

Please sign in to comment.