diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 45c77edb81a1..7807913e74b5 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use common_telemetry::info; use datafusion::datasource::DefaultTableSource; use datafusion::error::Result as DfResult; use datafusion_common::config::ConfigOptions; @@ -46,7 +45,9 @@ impl AnalyzerRule for DistPlannerAnalyzer { ) -> datafusion_common::Result { let plan = plan.transform(&Self::inspect_plan_with_subquery)?; let mut rewriter = PlanRewriter::default(); - plan.rewrite(&mut rewriter) + let result = plan.rewrite(&mut rewriter)?; + + Ok(result) } } @@ -138,10 +139,6 @@ impl PlanRewriter { /// Return true if should stop and expand. The input plan is the parent node of current node fn should_expand(&mut self, plan: &LogicalPlan) -> bool { if DFLogicalSubstraitConvertor.encode(plan).is_err() { - info!( - "substrait error: {:?}", - DFLogicalSubstraitConvertor.encode(plan) - ); return true; } @@ -251,6 +248,13 @@ impl TreeNodeRewriter for PlanRewriter { return Ok(node); } + // only expand when the leaf is table scan + if node.inputs().is_empty() && !matches!(node, LogicalPlan::TableScan(_)) { + self.set_expanded(); + self.pop_stack(); + return Ok(node); + } + self.maybe_set_partitions(&node); let Some(parent) = self.get_parent() else { diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index a8efea7da026..f8e81230eb3c 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -54,13 +54,10 @@ macro_rules! sql_tests { $service, test_mysql_auth, - // ignore: https://github.com/GreptimeTeam/greptimedb/issues/2445 - // test_mysql_crud, + test_mysql_crud, test_postgres_auth, - // ignore: https://github.com/GreptimeTeam/greptimedb/issues/2445 - // test_postgres_crud, - // ignore: https://github.com/GreptimeTeam/greptimedb/issues/2445 - // test_postgres_parameter_inference, + test_postgres_crud, + test_postgres_parameter_inference, ); )* }; @@ -123,7 +120,6 @@ pub async fn test_mysql_auth(store_type: StorageType) { guard.remove_all().await; } -#[allow(dead_code)] pub async fn test_mysql_crud(store_type: StorageType) { common_telemetry::init_default_ut_logging(); @@ -135,12 +131,12 @@ pub async fn test_mysql_crud(store_type: StorageType) { .await .unwrap(); - assert!(sqlx::query( + sqlx::query( "create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob)", ) .execute(&pool) .await - .is_ok()); + .unwrap(); for i in 0..10 { let dt: DateTime = DateTime::from_naive_utc_and_offset( NaiveDateTime::from_timestamp_opt(60, i).unwrap(), @@ -149,7 +145,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { let d = NaiveDate::from_yo_opt(2015, 100).unwrap(); let hello = format!("hello{i}"); let bytes = hello.as_bytes(); - assert!(sqlx::query("insert into demo values(?, ?, ?, ?, ?)") + sqlx::query("insert into demo values(?, ?, ?, ?, ?)") .bind(i) .bind(i) .bind(d) @@ -157,7 +153,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { .bind(bytes) .execute(&pool) .await - .is_ok()); + .unwrap(); } let rows = sqlx::query("select i, d, dt, b from demo") @@ -270,7 +266,6 @@ pub async fn test_postgres_auth(store_type: StorageType) { guard.remove_all().await; } -#[allow(dead_code)] pub async fn test_postgres_crud(store_type: StorageType) { let (addr, mut guard, fe_pg_server) = setup_pg_server(store_type, "sql_crud").await; @@ -347,7 +342,6 @@ pub async fn test_postgres_crud(store_type: StorageType) { guard.remove_all().await; } -#[allow(dead_code)] pub async fn test_postgres_parameter_inference(store_type: StorageType) { let (addr, mut guard, fe_pg_server) = setup_pg_server(store_type, "sql_inference").await;