From d981fbf9ee4d77a9d19efd6063aa2908f336ef4d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 26 Sep 2023 10:55:21 +0800 Subject: [PATCH 1/6] fix: check for table scan before expanding Signed-off-by: Ruihang Xia --- src/query/src/dist_plan/analyzer.rs | 51 ++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 45c77edb81a1..2d025670f582 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -44,13 +44,51 @@ impl AnalyzerRule for DistPlannerAnalyzer { plan: LogicalPlan, _config: &ConfigOptions, ) -> datafusion_common::Result { + if !Self::is_query(&plan) { + return Ok(plan); + } + let plan = plan.transform(&Self::inspect_plan_with_subquery)?; let mut rewriter = PlanRewriter::default(); - plan.rewrite(&mut rewriter) + let result = plan.rewrite(&mut rewriter)?; + + Ok(result) } } impl DistPlannerAnalyzer { + fn is_query(plan: &LogicalPlan) -> bool { + match plan { + LogicalPlan::Projection(_) + | LogicalPlan::Filter(_) + | LogicalPlan::Window(_) + | LogicalPlan::Aggregate(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Join(_) + | LogicalPlan::CrossJoin(_) + | LogicalPlan::Repartition(_) + | LogicalPlan::Union(_) + | LogicalPlan::TableScan(_) + | LogicalPlan::Subquery(_) + | LogicalPlan::SubqueryAlias(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Explain(_) + | LogicalPlan::Analyze(_) + | LogicalPlan::Extension(_) + | LogicalPlan::Distinct(_) + | LogicalPlan::Unnest(_) => true, + + // empty relation and plain values are also counted as non-query here + LogicalPlan::EmptyRelation(_) + | LogicalPlan::Values(_) + | LogicalPlan::Statement(_) + | LogicalPlan::Prepare(_) + | LogicalPlan::Dml(_) + | LogicalPlan::Ddl(_) + | LogicalPlan::DescribeTable(_) => false, + } + } + fn inspect_plan_with_subquery(plan: LogicalPlan) -> DfResult> { let exprs = plan .expressions() @@ -138,10 +176,6 @@ impl PlanRewriter { /// Return true if should stop and expand. The input plan is the parent node of current node fn should_expand(&mut self, plan: &LogicalPlan) -> bool { if DFLogicalSubstraitConvertor.encode(plan).is_err() { - info!( - "substrait error: {:?}", - DFLogicalSubstraitConvertor.encode(plan) - ); return true; } @@ -251,6 +285,13 @@ impl TreeNodeRewriter for PlanRewriter { return Ok(node); } + // only expand when the leaf is table scan + if node.inputs().is_empty() && !matches!(node, LogicalPlan::TableScan(_)) { + self.set_expanded(); + self.pop_stack(); + return Ok(node); + } + self.maybe_set_partitions(&node); let Some(parent) = self.get_parent() else { From 6cf00c14ff80703782903583ce4fdfe05fb6e998 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 26 Sep 2023 10:58:50 +0800 Subject: [PATCH 2/6] change assert_ok to unwrap Signed-off-by: Ruihang Xia --- tests-integration/tests/sql.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index a8efea7da026..f0186b5de4bc 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -135,12 +135,12 @@ pub async fn test_mysql_crud(store_type: StorageType) { .await .unwrap(); - assert!(sqlx::query( + sqlx::query( "create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob)", ) .execute(&pool) .await - .is_ok()); + .unwrap(); for i in 0..10 { let dt: DateTime = DateTime::from_naive_utc_and_offset( NaiveDateTime::from_timestamp_opt(60, i).unwrap(), @@ -149,7 +149,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { let d = NaiveDate::from_yo_opt(2015, 100).unwrap(); let hello = format!("hello{i}"); let bytes = hello.as_bytes(); - assert!(sqlx::query("insert into demo values(?, ?, ?, ?, ?)") + sqlx::query("insert into demo values(?, ?, ?, ?, ?)") .bind(i) .bind(i) .bind(d) @@ -157,7 +157,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { .bind(bytes) .execute(&pool) .await - .is_ok()); + .unwrap(); } let rows = sqlx::query("select i, d, dt, b from demo") From 3c549f30966a87d369eca00762fea35ee43c1e6d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 26 Sep 2023 14:27:06 +0800 Subject: [PATCH 3/6] fix clippy warning Signed-off-by: Ruihang Xia --- src/query/src/dist_plan/analyzer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 2d025670f582..49e041a0976a 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use common_telemetry::info; use datafusion::datasource::DefaultTableSource; use datafusion::error::Result as DfResult; use datafusion_common::config::ConfigOptions; From 3925a6acb66e4a154b81c830c345d80fdfa29dcc Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 26 Sep 2023 14:50:42 +0800 Subject: [PATCH 4/6] update sqlness result Signed-off-by: Ruihang Xia --- tests/cases/distributed/explain/subqueries.result | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/cases/distributed/explain/subqueries.result b/tests/cases/distributed/explain/subqueries.result index ba5c162e2692..f9fc17c585ab 100644 --- a/tests/cases/distributed/explain/subqueries.result +++ b/tests/cases/distributed/explain/subqueries.result @@ -113,12 +113,11 @@ EXPLAIN INSERT INTO other SELECT i, 2 FROM integers WHERE i=(SELECT MAX(i) FROM | logical_plan | Dml: op=[Insert] table=[other] | | | Projection: integers.i AS i, TimestampMillisecond(2, None) AS j | | | Inner Join: integers.i = __scalar_sq_1.MAX(integers.i) | -| | Projection: integers.i | -| | MergeScan [is_placeholder=false] | +| | TableScan: integers projection=[i] | | | SubqueryAlias: __scalar_sq_1 | -| | Aggregate: groupBy=[[]], aggr=[[MAX(integers.i)]] | -| | Projection: integers.i | -| | MergeScan [is_placeholder=false] | +| | Projection: MAX(integers.i) | +| | Aggregate: groupBy=[[]], aggr=[[MAX(integers.i)]] | +| | TableScan: integers projection=[i] | +--------------+-------------------------------------------------------------------+ drop table other; From 2197f70844662ab8afbaab71def2ab7e35ae42a0 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 26 Sep 2023 16:55:35 +0800 Subject: [PATCH 5/6] don't skip dml Signed-off-by: Ruihang Xia --- src/query/src/dist_plan/analyzer.rs | 36 ------------------- .../distributed/explain/subqueries.result | 9 ++--- 2 files changed, 5 insertions(+), 40 deletions(-) diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 49e041a0976a..7807913e74b5 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -43,10 +43,6 @@ impl AnalyzerRule for DistPlannerAnalyzer { plan: LogicalPlan, _config: &ConfigOptions, ) -> datafusion_common::Result { - if !Self::is_query(&plan) { - return Ok(plan); - } - let plan = plan.transform(&Self::inspect_plan_with_subquery)?; let mut rewriter = PlanRewriter::default(); let result = plan.rewrite(&mut rewriter)?; @@ -56,38 +52,6 @@ impl AnalyzerRule for DistPlannerAnalyzer { } impl DistPlannerAnalyzer { - fn is_query(plan: &LogicalPlan) -> bool { - match plan { - LogicalPlan::Projection(_) - | LogicalPlan::Filter(_) - | LogicalPlan::Window(_) - | LogicalPlan::Aggregate(_) - | LogicalPlan::Sort(_) - | LogicalPlan::Join(_) - | LogicalPlan::CrossJoin(_) - | LogicalPlan::Repartition(_) - | LogicalPlan::Union(_) - | LogicalPlan::TableScan(_) - | LogicalPlan::Subquery(_) - | LogicalPlan::SubqueryAlias(_) - | LogicalPlan::Limit(_) - | LogicalPlan::Explain(_) - | LogicalPlan::Analyze(_) - | LogicalPlan::Extension(_) - | LogicalPlan::Distinct(_) - | LogicalPlan::Unnest(_) => true, - - // empty relation and plain values are also counted as non-query here - LogicalPlan::EmptyRelation(_) - | LogicalPlan::Values(_) - | LogicalPlan::Statement(_) - | LogicalPlan::Prepare(_) - | LogicalPlan::Dml(_) - | LogicalPlan::Ddl(_) - | LogicalPlan::DescribeTable(_) => false, - } - } - fn inspect_plan_with_subquery(plan: LogicalPlan) -> DfResult> { let exprs = plan .expressions() diff --git a/tests/cases/distributed/explain/subqueries.result b/tests/cases/distributed/explain/subqueries.result index f9fc17c585ab..ba5c162e2692 100644 --- a/tests/cases/distributed/explain/subqueries.result +++ b/tests/cases/distributed/explain/subqueries.result @@ -113,11 +113,12 @@ EXPLAIN INSERT INTO other SELECT i, 2 FROM integers WHERE i=(SELECT MAX(i) FROM | logical_plan | Dml: op=[Insert] table=[other] | | | Projection: integers.i AS i, TimestampMillisecond(2, None) AS j | | | Inner Join: integers.i = __scalar_sq_1.MAX(integers.i) | -| | TableScan: integers projection=[i] | +| | Projection: integers.i | +| | MergeScan [is_placeholder=false] | | | SubqueryAlias: __scalar_sq_1 | -| | Projection: MAX(integers.i) | -| | Aggregate: groupBy=[[]], aggr=[[MAX(integers.i)]] | -| | TableScan: integers projection=[i] | +| | Aggregate: groupBy=[[]], aggr=[[MAX(integers.i)]] | +| | Projection: integers.i | +| | MergeScan [is_placeholder=false] | +--------------+-------------------------------------------------------------------+ drop table other; From 1342c404a82ed51bbfc8f68b26fef475e15bb517 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 26 Sep 2023 19:40:11 +0800 Subject: [PATCH 6/6] uncomment ignored tests Signed-off-by: Ruihang Xia --- tests-integration/tests/sql.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index f0186b5de4bc..f8e81230eb3c 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -54,13 +54,10 @@ macro_rules! sql_tests { $service, test_mysql_auth, - // ignore: https://github.com/GreptimeTeam/greptimedb/issues/2445 - // test_mysql_crud, + test_mysql_crud, test_postgres_auth, - // ignore: https://github.com/GreptimeTeam/greptimedb/issues/2445 - // test_postgres_crud, - // ignore: https://github.com/GreptimeTeam/greptimedb/issues/2445 - // test_postgres_parameter_inference, + test_postgres_crud, + test_postgres_parameter_inference, ); )* }; @@ -123,7 +120,6 @@ pub async fn test_mysql_auth(store_type: StorageType) { guard.remove_all().await; } -#[allow(dead_code)] pub async fn test_mysql_crud(store_type: StorageType) { common_telemetry::init_default_ut_logging(); @@ -270,7 +266,6 @@ pub async fn test_postgres_auth(store_type: StorageType) { guard.remove_all().await; } -#[allow(dead_code)] pub async fn test_postgres_crud(store_type: StorageType) { let (addr, mut guard, fe_pg_server) = setup_pg_server(store_type, "sql_crud").await; @@ -347,7 +342,6 @@ pub async fn test_postgres_crud(store_type: StorageType) { guard.remove_all().await; } -#[allow(dead_code)] pub async fn test_postgres_parameter_inference(store_type: StorageType) { let (addr, mut guard, fe_pg_server) = setup_pg_server(store_type, "sql_inference").await;