From 51a42c09731509bc340cf867e6246828440261a4 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Mon, 20 May 2024 23:01:15 +0800 Subject: [PATCH] test: table source, serializer and decoder --- Cargo.lock | 1 + src/catalog/Cargo.toml | 3 + src/catalog/src/kvbackend/manager.rs | 2 +- src/catalog/src/table_source.rs | 102 +++++++++++++++++- src/cmd/src/frontend.rs | 3 +- src/cmd/src/standalone.rs | 3 +- src/query/src/optimizer.rs | 2 +- src/query/src/plan.rs | 47 +++++++- .../src/query_engine/default_serializer.rs | 55 ++++++++++ tests-integration/src/cluster.rs | 3 +- tests-integration/src/standalone.rs | 3 +- 11 files changed, 209 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2adb62304bc..51c4bf75afda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1263,6 +1263,7 @@ dependencies = [ "async-stream", "async-trait", "bytes", + "cache", "catalog", "chrono", "common-catalog", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 850d728aae4c..185614e98152 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -49,8 +49,11 @@ table.workspace = true tokio.workspace = true [dev-dependencies] +cache.workspace = true catalog = { workspace = true, features = ["testing"] } chrono.workspace = true +common-meta = { workspace = true, features = ["testing"] } +common-query = { workspace = true, features = ["testing"] } common-test-util.workspace = true log-store.workspace = true object-store.workspace = true diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 1bfc9ddd2483..85b12e6b1bbd 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -67,7 +67,7 @@ pub struct KvBackendCatalogManager { const CATALOG_CACHE_MAX_CAPACITY: u64 = 128; impl KvBackendCatalogManager { - pub async fn new( + pub fn new( mode: Mode, meta_client: Option>, backend: KvBackendRef, diff --git a/src/catalog/src/table_source.rs b/src/catalog/src/table_source.rs index 357417ba0b4d..f5b9185a9625 100644 --- a/src/catalog/src/table_source.rs +++ b/src/catalog/src/table_source.rs @@ -33,9 +33,10 @@ use datafusion::catalog::CatalogProvider; use memory_catalog::MemoryCatalogProviderList; use crate::error::{ - CastManagerSnafu, DatafusionSnafu, DecodePlanSnafu, GetTableCacheSnafu, QueryAccessDeniedSnafu, + CastManagerSnafu, DatafusionSnafu, DecodePlanSnafu, GetViewCacheSnafu, QueryAccessDeniedSnafu, Result, TableNotExistSnafu, ViewInfoNotFoundSnafu, }; +use crate::kvbackend::KvBackendCatalogManager; use crate::CatalogManagerRef; pub struct DfTableSourceProvider { @@ -110,7 +111,6 @@ impl DfTableSourceProvider { })?; let provider: Arc = if table.table_info().table_type == TableType::View { - use crate::kvbackend::KvBackendCatalogManager; let catalog_manager = self .catalog_manager .as_any() @@ -121,7 +121,7 @@ impl DfTableSourceProvider { .view_info_cache()? .get(table.table_info().ident.table_id) .await - .context(GetTableCacheSnafu)? + .context(GetViewCacheSnafu)? .context(ViewInfoNotFoundSnafu { name: &table.table_info().name, })?; @@ -133,7 +133,7 @@ impl DfTableSourceProvider { )); for table_name in &view_info.table_names { - // We don't have a cross-catalog view. + // We don't support cross-catalog views. debug_assert_eq!(catalog_name, &table_name.catalog_name); let catalog_name = &table_name.catalog_name; @@ -239,4 +239,98 @@ mod tests { let table_ref = TableReference::full("greptime", "greptime_private", "columns"); assert!(table_provider.resolve_table_ref(table_ref).is_ok()); } + + use std::collections::HashSet; + + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; + use common_config::Mode; + use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; + use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_query::error::Result as QueryResult; + use common_query::logical_plan::SubstraitPlanDecoder; + use datafusion::catalog::CatalogProviderList; + use datafusion::logical_expr::builder::LogicalTableSource; + use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder}; + + struct MockDecoder; + impl MockDecoder { + pub fn arc() -> Arc { + Arc::new(MockDecoder) + } + } + + #[async_trait::async_trait] + impl SubstraitPlanDecoder for MockDecoder { + async fn decode( + &self, + _message: bytes::Bytes, + _catalog_list: Arc, + ) -> QueryResult { + Ok(mock_plan()) + } + } + + fn mock_plan() -> LogicalPlan { + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + ]); + let table_source = LogicalTableSource::new(SchemaRef::new(schema)); + + let projection = None; + + let builder = + LogicalPlanBuilder::scan("person", Arc::new(table_source), projection).unwrap(); + + builder + .filter(col("id").gt(lit(500))) + .unwrap() + .build() + .unwrap() + } + + #[tokio::test] + async fn test_resolve_view() { + let query_ctx = &QueryContext::with("greptime", "public"); + let backend = Arc::new(MemoryKvBackend::default()); + let layered_cache_builder = LayeredCacheRegistryBuilder::default() + .add_cache_registry(CacheRegistryBuilder::default().build()); + let fundamental_cache_registry = build_fundamental_cache_registry(backend.clone()); + let layered_cache_registry = Arc::new( + with_default_composite_cache_registry( + layered_cache_builder.add_cache_registry(fundamental_cache_registry), + ) + .unwrap() + .build(), + ); + + let catalog_manager = KvBackendCatalogManager::new( + Mode::Standalone, + None, + backend.clone(), + layered_cache_registry, + ); + let table_metadata_manager = TableMetadataManager::new(backend); + let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]); + view_info.table_type = TableType::View; + let logical_plan = vec![1, 2, 3]; + // Create view metadata + table_metadata_manager + .create_view_metadata(view_info.clone().into(), &logical_plan, HashSet::new()) + .await + .unwrap(); + + let mut table_provider = + DfTableSourceProvider::new(catalog_manager, true, query_ctx, MockDecoder::arc()); + + // View not found + let table_ref = TableReference::bare("not_exists_view"); + assert!(table_provider.resolve_table(table_ref).await.is_err()); + + let table_ref = TableReference::bare(view_info.name); + let source = table_provider.resolve_table(table_ref).await.unwrap(); + assert_eq!(*source.get_logical_plan().unwrap(), mock_plan()); + } } diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 963d73d5083a..0f667a5679c1 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -287,8 +287,7 @@ impl StartCommand { Some(meta_client.clone()), cached_meta_backend.clone(), layered_cache_registry.clone(), - ) - .await; + ); let executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index dd4e4a7dae8d..c62a51a876dc 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -414,8 +414,7 @@ impl StartCommand { None, kv_backend.clone(), layered_cache_registry.clone(), - ) - .await; + ); let table_metadata_manager = Self::create_table_metadata_manager(kv_backend.clone()).await?; diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index e6a971417c23..1cb54c7126c3 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -17,7 +17,7 @@ pub mod order_hint; pub mod remove_duplicate; pub mod string_normalization; #[cfg(test)] -mod test_util; +pub(crate) mod test_util; pub mod type_conversion; use datafusion_common::config::ConfigOptions; diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs index d31badf81401..d836123cc18c 100644 --- a/src/query/src/plan.rs +++ b/src/query/src/plan.rs @@ -179,9 +179,54 @@ impl TreeNodeVisitor for TableNamesExtractor { } } -/// Extract fully resolved table names from logical plan +/// Extract fully resolved table names from logical plan. +/// Note:: it must be called before optimizing the plan. pub fn extract_full_table_names(plan: &DfLogicalPlan) -> Result> { let mut extractor = TableNamesExtractor::default(); let _ = plan.visit(&mut extractor).context(DataFusionSnafu)?; Ok(extractor.table_names) } + +#[cfg(test)] +pub(crate) mod tests { + + use std::sync::Arc; + + use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; + use datafusion::logical_expr::builder::LogicalTableSource; + use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder}; + + use super::*; + + pub(crate) fn mock_plan() -> LogicalPlan { + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), + ]); + let table_source = LogicalTableSource::new(SchemaRef::new(schema)); + + let projection = None; + + let builder = + LogicalPlanBuilder::scan("devices", Arc::new(table_source), projection).unwrap(); + + builder + .filter(col("id").gt(lit(500))) + .unwrap() + .build() + .unwrap() + } + + #[test] + fn test_extract_full_table_names() { + let table_names = extract_full_table_names(&mock_plan()).unwrap(); + + assert_eq!(1, table_names.len()); + assert!(table_names.contains(&TableName::new( + DEFAULT_CATALOG_NAME.to_string(), + DEFAULT_SCHEMA_NAME.to_string(), + "devices".to_string() + ))); + } +} diff --git a/src/query/src/query_engine/default_serializer.rs b/src/query/src/query_engine/default_serializer.rs index bc2b7af98681..8e12be68191b 100644 --- a/src/query/src/query_engine/default_serializer.rs +++ b/src/query/src/query_engine/default_serializer.rs @@ -165,6 +165,7 @@ impl TreeNodeRewriter for MergeScanRewriter { let session_state = self.session_state.clone(); let input = encoded_merge_scan.input.clone(); + // FIXME(dennis): it's ugly, is there a better way? let input = std::thread::spawn(move || { common_runtime::block_on_bg(async move { DFLogicalSubstraitConvertor @@ -193,3 +194,57 @@ impl TreeNodeRewriter for MergeScanRewriter { } } } + +#[cfg(test)] +mod tests { + use session::context::QueryContext; + + use super::*; + use crate::dummy_catalog::DummyCatalogList; + use crate::optimizer::test_util::mock_table_provider; + use crate::plan::tests::mock_plan; + use crate::QueryEngineFactory; + + #[tokio::test] + async fn test_serializer_decode_plan() { + let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap(); + let factory = QueryEngineFactory::new(catalog_list, None, None, None, false); + + let engine = factory.query_engine(); + + let plan = MergeScanLogicalPlan::new(mock_plan(), false); + let plan = LogicalPlan::Extension(Extension { + node: Arc::new(plan), + }); + + let bytes = DFLogicalSubstraitConvertor + .encode(&plan, DefaultSerializer) + .unwrap(); + + let plan_decoder = engine + .engine_context(QueryContext::arc()) + .new_plan_decoder() + .unwrap(); + let table_provider = Arc::new(mock_table_provider(1.into())); + let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider)); + + let decode_plan = plan_decoder.decode(bytes, catalog_list).await.unwrap(); + + match decode_plan { + LogicalPlan::Extension(Extension { node }) => { + assert_eq!("MergeScan", node.name()); + + let merge_scan = node + .as_any() + .downcast_ref::() + .expect("Failed to downcast to MergeScanLogicalPlan"); + assert_eq!( + "Filter: devices.k0 > Int32(500) + TableScan: devices projection=[k0, ts, v0]", + format!("{:?}", *merge_scan.input()), + ); + } + _ => unreachable!(), + } + } +} diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 892058794328..78a0ba0f5c6f 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -368,8 +368,7 @@ impl GreptimeDbClusterBuilder { Some(meta_client.clone()), cached_meta_backend.clone(), cache_registry.clone(), - ) - .await; + ); let handlers_executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 3e3a3b6f24af..35a14e261260 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -147,8 +147,7 @@ impl GreptimeDbStandaloneBuilder { None, kv_backend.clone(), cache_registry.clone(), - ) - .await; + ); let flow_builder = FlownodeBuilder::new( 1, // for standalone mode this value is default to one